In [1]:
from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.1.153:7077") \
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .appName("Part-B-Hamza_Imran_Saeed")\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [2]:
## B.1
data_frame = spark_session.read\
    .option("header", "true")\
    .csv("hdfs://192.168.1.153:9000/parking-citations.csv")\
    .cache()

In [3]:
## B.3
data_frame.count()

9257460

In [4]:
## B.1
data_frame.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|         

In [5]:
## B.2
data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [6]:
## B.4
data_frame.rdd.getNumPartitions()

10

In [7]:
## B.5
columns_to_drop = ['VIN', 'Latitude', 'Longitude']
data_frame = data_frame.drop(*columns_to_drop)

In [8]:
## B.5
data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)



In [9]:
## B.6
from pyspark.sql.types import IntegerType
data_frame_2 = data_frame.withColumn("Fine amount", data_frame["Fine amount"].cast(IntegerType()))
max_fine = data_frame_2.groupby('Fine amount')\
      .count().orderBy('Fine amount', ascending=False)
max_fine.take(30)
#     

[Row(Fine amount=505, count=6),
 Row(Fine amount=363, count=63366),
 Row(Fine amount=353, count=15),
 Row(Fine amount=345, count=40),
 Row(Fine amount=330, count=1),
 Row(Fine amount=293, count=10401),
 Row(Fine amount=255, count=30),
 Row(Fine amount=163, count=106748),
 Row(Fine amount=155, count=1),
 Row(Fine amount=143, count=373),
 Row(Fine amount=133, count=9185),
 Row(Fine amount=128, count=338),
 Row(Fine amount=123, count=3),
 Row(Fine amount=113, count=2),
 Row(Fine amount=105, count=729),
 Row(Fine amount=103, count=7401),
 Row(Fine amount=98, count=333),
 Row(Fine amount=93, count=1097437),
 Row(Fine amount=88, count=158),
 Row(Fine amount=85, count=5),
 Row(Fine amount=80, count=25),
 Row(Fine amount=78, count=34321),
 Row(Fine amount=75, count=4),
 Row(Fine amount=73, count=3096053),
 Row(Fine amount=70, count=6),
 Row(Fine amount=68, count=1329058),
 Row(Fine amount=65, count=5),
 Row(Fine amount=63, count=1760811),
 Row(Fine amount=60, count=406),
 Row(Fine amount=58, c

In [10]:
## B.6
maxAndCount = max_fine.take(1)
print("Max Fine amount is: " + str(maxAndCount[0][0]) + " and its count is: "+str(maxAndCount[0][1]))

Max Fine amount is: 505 and its count is: 6


In [11]:
## B.7
vehicles = data_frame.groupby('Make')\
      .count().orderBy('count', ascending=False)
vehicles.take(20)

[Row(Make='TOYT', count=1531949),
 Row(Make='HOND', count=1043276),
 Row(Make='FORD', count=807498),
 Row(Make='NISS', count=662097),
 Row(Make='CHEV', count=631413),
 Row(Make='BMW', count=422916),
 Row(Make='MERZ', count=376830),
 Row(Make='VOLK', count=316002),
 Row(Make='HYUN', count=285286),
 Row(Make='DODG', count=271590),
 Row(Make='LEXS', count=263269),
 Row(Make='KIA', count=217795),
 Row(Make='JEEP', count=214965),
 Row(Make='AUDI', count=179718),
 Row(Make='MAZD', count=169811),
 Row(Make='OTHR', count=154376),
 Row(Make='GMC', count=132788),
 Row(Make='INFI', count=120340),
 Row(Make='CHRY', count=120317),
 Row(Make='ACUR', count=111265)]

In [12]:
## B.8
import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def color_to_colorlong(color):
  COLORS = {'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
            'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
            'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
            'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
            'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
            'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
            'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White',
            'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'}
  if color in COLORS:
        return COLORS[color]
  return color

# User-defined function. Input type is a string.
udf_color_to_colorlong = udf(color_to_colorlong, StringType())


# 9999: missing (with scale factor of 10)
data_frame_with_colorlong = data_frame.withColumn("color long", udf_color_to_colorlong("Color"))
# data_frame_with_wnd_speed = data_frame_with_wnd_speed.filter(data_frame_with_wnd_speed['WND_SPEED_MS'] <= 900)

data_frame_with_colorlong.cache()

data_frame_with_colorlong.select('Color', 'color long').show()

+-----+----------+
|Color|color long|
+-----+----------+
|   GY|      Gray|
|   WH|     White|
|   BK|     Black|
|   WH|     White|
|   BK|     Black|
|   GY|      Gray|
|   BL|      Blue|
|   BK|     Black|
|   BR|     Brown|
|   SI|    Silver|
|   WH|     White|
|   GO|      Gold|
|   BK|     Black|
|   BK|     Black|
|   BK|     Black|
|   BK|     Black|
|   WH|     White|
| null|      null|
|   BK|     Black|
|   BK|     Black|
+-----+----------+
only showing top 20 rows



In [13]:
## B.8
data_frame_with_colorlong.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- color long: string (nullable = true)



In [14]:
## B.9
toyotas = data_frame_with_colorlong.filter("Make = 'TOYT'").groupby('color long')\
      .count().orderBy('count', ascending=False)
toyotas.show()

+----------+------+
|color long| count|
+----------+------+
|      Gray|346822|
|     White|304620|
|     Black|252199|
|    Silver|248685|
|      Blue|128051|
|       Red| 84175|
|     Green| 57627|
|      Gold| 30154|
|    Maroon| 19882|
|       Tan| 17006|
|     Beige| 11572|
|        OT| 10805|
|     Brown|  8466|
|    Yellow|  3413|
|        PR|  3010|
|    Orange|  2527|
|   Unknown|  1343|
|        TU|  1077|
|        CO|   423|
|      Pink|    89|
+----------+------+
only showing top 20 rows



In [17]:
## B.10
import pandas as pd
import os

csvDf = spark_session.read.format("csv").option("header", "true").\
option("inferschema", "true").option("mode", "DROPMALFORMED").\
load("hdfs://192.168.1.153:9000/parking-citations.csv") 


In [18]:
tickets = csvDf.limit(10000).toPandas()

In [19]:
tickets.head()

Unnamed: 0,Ticket number,Issue Date,Issue time,Meter Id,Marked Time,RP State Plate,Plate Expiry Date,VIN,Make,Body Style,Color,Location,Route,Agency,Violation code,Violation Description,Fine amount,Latitude,Longitude
0,1103341000.0,2015-12-21,1251.0,,,CA,200304.0,,HOND,PA,GY,13147 WELBY WAY,01521,1.0,4000A1,NO EVIDENCE OF REG,50.0,99999.0,99999.0
1,1103700000.0,2015-12-21,1435.0,,,CA,201512.0,,GMC,VN,WH,525 S MAIN ST,1C51,1.0,4000A1,NO EVIDENCE OF REG,50.0,99999.0,99999.0
2,1104803000.0,2015-12-21,2055.0,,,CA,201503.0,,NISS,PA,BK,200 WORLD WAY,2R2,2.0,8939,WHITE CURB,58.0,6439997.9,1802686.4
3,1104821000.0,2015-12-26,1515.0,,,CA,,,ACUR,PA,WH,100 WORLD WAY,2F11,2.0,000,17104h,,6440041.1,1802686.2
4,1105461000.0,2015-09-15,115.0,,,CA,200316.0,,CHEV,PA,BK,GEORGIA ST/OLYMPIC,1FB70,1.0,8069A,NO STOPPING/STANDING,93.0,99999.0,99999.0


In [20]:
import matplotlib.pyplot as plt
tickets.plot(kind="scatter", x="Longitude", y="Latitude", alpha=0.4)
plt.show()

<Figure size 640x480 with 1 Axes>

In [21]:
# release the cores for another application!
spark_context.stop()