In [1]:
import pyspark
from pyspark.sql import SparkSession

# Start notebook
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.119:7077") \
        .appName("victor_hwasser_applicationB")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/03 07:19:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/03 07:19:15 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.


In [2]:
from pyspark import SparkConf, SparkContext
#conf = spark_session.conf
sc = spark_session.sparkContext 

In [3]:
"""
For this section, we’ll use the PySpark DataFrames/SQL API. Use the existing cluster, and a
notebook on your own client machine, which you must deploy yourself.
We’ll work with a large dataset in CSV format. Our dataset is the Los Angeles Parking
Citations (https://www.kaggle.com/cityofLA/los-angeles-parking-citations). I have pre-loaded
the dataset into the HDFS cluster.

"""


'\nFor this section, we’ll use the PySpark DataFrames/SQL API. Use the existing cluster, and a\nnotebook on your own client machine, which you must deploy yourself.\nWe’ll work with a large dataset in CSV format. Our dataset is the Los Angeles Parking\nCitations (https://www.kaggle.com/cityofLA/los-angeles-parking-citations). I have pre-loaded\nthe dataset into the HDFS cluster.\n\n'

In [4]:
# B.1 Load the CSV file from HDFS, and call show() to verify the data is loaded correctly.
df = spark_session.read.csv('hdfs://192.168.2.119:9000/parking-citations.csv', header=True)
df.show()


                                                                                

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|null|HOND|        PA|  

In [5]:
# B.2 Print the schema for the DataFrame.

print(df.schema)

# B.3 Count the number of rows in the CSV file.

df_count = df.count()
print(df_count)

# B.4 Count the number of partitions in the underlying RDD.

print(df.rdd.getNumPartitions())


StructType(List(StructField(Ticket number,StringType,true),StructField(Issue Date,StringType,true),StructField(Issue time,StringType,true),StructField(Meter Id,StringType,true),StructField(Marked Time,StringType,true),StructField(RP State Plate,StringType,true),StructField(Plate Expiry Date,StringType,true),StructField(VIN,StringType,true),StructField(Make,StringType,true),StructField(Body Style,StringType,true),StructField(Color,StringType,true),StructField(Location,StringType,true),StructField(Route,StringType,true),StructField(Agency,StringType,true),StructField(Violation code,StringType,true),StructField(Violation Description,StringType,true),StructField(Fine amount,StringType,true),StructField(Latitude,StringType,true),StructField(Longitude,StringType,true),StructField(Agency Description,StringType,true),StructField(Color Description,StringType,true),StructField(Body Style Description,StringType,true)))


[Stage 4:>                                                          (0 + 1) / 1]

13077724
16


                                                                                

In [6]:
# B.5 Drop the columns VIN, Latitude and Longitude.

columns_to_drop = ['VIN', 'Latitude', 'Longitude']
for c in columns_to_drop:
    df = df.drop(c)
    
print(df.columns)

['Ticket number', 'Issue Date', 'Issue time', 'Meter Id', 'Marked Time', 'RP State Plate', 'Plate Expiry Date', 'Make', 'Body Style', 'Color', 'Location', 'Route', 'Agency', 'Violation code', 'Violation Description', 'Fine amount', 'Agency Description', 'Color Description', 'Body Style Description']


In [7]:
# B.6 Find the maximum fine amount. How many fines have this amount? 
# You need to convert the ‘fine amount’ column to a float to do this correctly.
from pyspark.sql.functions import max, min 
from pyspark.sql.types import FloatType

df = df.withColumn("Fine amount", df["Fine amount"].cast(FloatType()))
max_fine_amount = df.agg(max("Fine amount")).collect()[0][0]
all_max = df.filter(df['Fine amount'] == 98.0)
print("Fine amount:", max_fine_amount)
print("How many fines have this amount:", all_max.count())


                                                                                

Fine amount: 1100.0




How many fines have this amount: 728


                                                                                

In [9]:
# B.7 Show the top 20 most frequent vehicle makes, and their frequencies.
# most_freq = df.agg(asc("Make")).collect()[0][0]
top_20_vec = df.rdd.map(lambda x: (x['Make'],1)).reduceByKey(lambda a, b: a+b)
top_20_vec = top_20_vec.sortBy(lambda x: x[1], ascending=False)
print(top_20_vec.take(20))

# We can see that Toyota has the highest freqency.

                                                                                

[('TOYT', 2150768), ('HOND', 1479996), ('FORD', 1116235), ('NISS', 945133), ('CHEV', 892676), ('BMW', 603092), ('MERZ', 543298), ('VOLK', 432030), ('HYUN', 404917), ('DODG', 391686), ('LEXS', 368420), ('KIA', 328155), ('JEEP', 316300), ('AUDI', 255395), ('MAZD', 242344), ('OTHR', 205546), ('GMC', 184889), ('INFI', 174315), ('CHRY', 159948), ('SUBA', 154640)]


In [31]:
# B.8 Let’s expand some abbreviations in the color column. Create a User Defined Function to 
# create a new column, ‘color long’, mapping the original colors to their corresponding values
# in the dictionary below. If there is no key matching the original color, use the original color. 

COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White',
'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}
 
from pyspark.sql.functions import udf
from pyspark.sql.functions import col

# Create function
def color_long(val):
    if val in COLORS:
        return COLORS[val]
    else:
        return val

# Convert function to UDF
color_long_UDF = udf(lambda x: color_long(x))

# Apply the function
df_with_color = df.withColumn("color long", color_long_UDF(col("Color")))

# Check the first five values
print(df_with_color.take(5))

[Stage 22:>                                                         (0 + 1) / 1]

[Row(Ticket number='1103341116', Issue Date='2015-12-21T00:00:00.000', Issue time='1251', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='200304', Make='HOND', Body Style='PA', Color='GY', Location='13147 WELBY WAY', Route='01521', Agency='1', Violation code='4000A1', Violation Description='NO EVIDENCE OF REG', Fine amount=50.0, Agency Description=None, Color Description=None, Body Style Description=None, color long='Gray'), Row(Ticket number='1103700150', Issue Date='2015-12-21T00:00:00.000', Issue time='1435', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201512', Make='GMC', Body Style='VN', Color='WH', Location='525 S MAIN ST', Route='1C51', Agency='1', Violation code='4000A1', Violation Description='NO EVIDENCE OF REG', Fine amount=50.0, Agency Description=None, Color Description=None, Body Style Description=None, color long='White'), Row(Ticket number='1104803000', Issue Date='2015-12-21T00:00:00.000', Issue time='2055', Meter I

                                                                                

In [37]:
# B.9 Using this new column, what’s the most frequent colour value for Toyotas (TOYT)?

top_color1 = df_with_color.filter(df['Make'] == 'TOYT')
top_color2 = top_color1.rdd.map(lambda x: (x['color long'],1)).reduceByKey(lambda a, b: a+b)
top_color3 = top_color2.sortBy(lambda x: x[1], ascending=False)
print(top_color3.take(10))

                                                                                

[('Gray', 489697), ('White', 434595), ('Black', 353812), ('Silver', 347894), ('Blue', 180091), ('Red', 119074), ('Green', 74968), ('Gold', 40646), ('Maroon', 26242), ('Tan', 23355)]
