In [48]:
from pyspark.sql import SparkSession

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.1.153:7077")\
        .appName("B_LU")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()

In [49]:
# B.1
df = spark_session.read\
    .option("header", "true")\
    .csv("hdfs://192.168.1.153:9000/parking-citations.csv")\
    .cache()
df.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|         

In [50]:
# B.2
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [51]:
# B.3
df.count()

9257460

In [52]:
# B.4
df.rdd.getNumPartitions()

10

In [53]:
# B.5
columns_to_drop = ['VIN', 'Latitude', 'Longitude']
df = df.drop(*columns_to_drop).cache()
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)



In [54]:
# B.6
from pyspark.sql.types import FloatType
df = df.withColumn("Fine amount",
                   df["Fine amount"].cast(FloatType()).alias("Fine amount")).cache()
# df.printSchema()
fine_max = df.agg({"Fine amount": "max"}).collect()[0][0]
print(fine_max)
print(df.filter(df["Fine amount"] == fine_max).count())

505.0
6


In [55]:
# B.7
df_make = df.groupby('Make').count().sort('count', ascending=False).limit(20)
df_make = df_make.withColumn('count', df_make['count'] / df.count())\
            .withColumnRenamed('count', 'Frequency')
df_make.show()

+----+--------------------+
|Make|           Frequency|
+----+--------------------+
|TOYT| 0.16548264858827366|
|HOND| 0.11269570702979002|
|FORD| 0.08722673389893124|
|NISS|  0.0715203738390444|
|CHEV| 0.06820585776228037|
| BMW| 0.04568380527704143|
|MERZ| 0.04070554990245705|
|VOLK| 0.03413484908387398|
|HYUN|0.030816876335409495|
|DODG| 0.02933742084761911|
|LEXS|0.028438578184512814|
| KIA|0.023526431656199432|
|JEEP|0.023220732252691344|
|AUDI|   0.019413316395642|
|MAZD|0.018343152441382408|
|OTHR|0.016675848450870973|
| GMC|0.014343891304958379|
|INFI| 0.01299924601348534|
|CHRY|0.012996761530700646|
|ACUR|0.012018955523437314|
+----+--------------------+



In [56]:
# B.8
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White',
'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}
def expand_colour(colour):
    if colour not in COLORS:
        return colour
    else:
        return COLORS[colour]

udf_expand_colour = udf(expand_colour, StringType())
df_new = df.withColumn('Color', udf_expand_colour('Color')).cache()
df_new.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+------+--------------------+-----+------+--------------+---------------------+-----------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date|Make|Body Style| Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+------+--------------------+-----+------+--------------+---------------------+-----------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|HOND|        PA|  Gray|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|       50.0|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|            CA|           201512| GMC|        VN| White|       525 S MAIN ST| 1C51|     1|        4000A1|

In [57]:
# B.9
df_TOYT = df_new.filter(df['Make'] == 'TOYT').groupby('Color').count()\
            .sort('count', ascending=False)
df_TOYT = df_TOYT.withColumn('count', 
                             df_TOYT['count'] / df_new.filter(df['Make'] == 'TOYT').count())\
            .withColumnRenamed('count', 'Frequency')
print(df_TOYT.first()['Color'])
df_TOYT.show()

Gray
+-------+--------------------+
|  Color|           Frequency|
+-------+--------------------+
|   Gray|  0.2263926540635491|
|  White| 0.19884473960947785|
|  Black|  0.1646262375575166|
| Silver|  0.1623324275155374|
|   Blue| 0.08358698625084778|
|    Red|0.054946346125099466|
|  Green|0.037616787504022656|
|   Gold|0.019683422881571124|
| Maroon|0.012978238831710455|
|    Tan|0.011100891739868625|
|  Beige|0.007553776268008922|
|     OT|0.007053106859301452|
|  Brown|0.005526293629879324|
| Yellow|0.002227880954261532|
|     PR|0.001964817366635...|
| Orange|0.001649532719431...|
|Unknown|8.766610376716197E-4|
|     TU|7.030260145735922E-4|
|     CO|2.761188525205474E-4|
|   Pink|5.809592878091894...|
+-------+--------------------+
only showing top 20 rows

