In [221]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [222]:
from pprint import pprint
from pyspark.sql import SparkSession

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.113:7077") \
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",3)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .appName("JunjieChuA3PartB")\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [223]:
#B.1 and B.2
data_frame = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.2.113:9000/parking-citations.csv')\
    .cache()

In [224]:
data_frame.show(5)
data_frame.printSchema()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|          Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|   13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|            CA|  

In [225]:
#B.3 and B.4
print(data_frame.count())
#result 9257460
print(data_frame.rdd.getNumPartitions())
#result 10

9257460
10


In [226]:
#B.5
df = data_frame.drop('VIN').drop('Latitude').drop('Longitude')
df.take(3)

[Row(Ticket number='1103341116', Issue Date='2015-12-21T00:00:00', Issue time='1251', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='200304', Make='HOND', Body Style='PA', Color='GY', Location='13147 WELBY WAY', Route='01521', Agency='1', Violation code='4000A1', Violation Description='NO EVIDENCE OF REG', Fine amount='50'),
 Row(Ticket number='1103700150', Issue Date='2015-12-21T00:00:00', Issue time='1435', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201512', Make='GMC', Body Style='VN', Color='WH', Location='525 S MAIN ST', Route='1C51', Agency='1', Violation code='4000A1', Violation Description='NO EVIDENCE OF REG', Fine amount='50'),
 Row(Ticket number='1104803000', Issue Date='2015-12-21T00:00:00', Issue time='2055', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201503', Make='NISS', Body Style='PA', Color='BK', Location='200 WORLD WAY', Route='2R2', Agency='2', Violation code='8939', Violation Desc

In [227]:
#B.6
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def string_to_float(x):
    if x is None:
        x = 0.0
    return float(x)

udfstring_to_float = udf(string_to_float, StringType())
dfnew = df.withColumn("float Fine amount",udfstring_to_float("Fine amount").cast('float'))

#check if all the empty values are 0 now
print(dfnew.select("Fine amount","float Fine amount").take(5))
#check the types of schema
dfnew.printSchema()
#find the max value 
max_value = dfnew.agg({"float Fine amount": "max"}).collect()[0][0]
print(max_value)
#count the number of max
print(dfnew.where(dfnew['float Fine amount'] == max_value).count())

[Row(Fine amount='50', float Fine amount=50.0), Row(Fine amount='50', float Fine amount=50.0), Row(Fine amount='58', float Fine amount=58.0), Row(Fine amount=None, float Fine amount=0.0), Row(Fine amount='93', float Fine amount=93.0)]
root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- float Fine amount: float (nullable = true)

505.0
6


In [228]:
#B.7
sums = dfnew.select('Make').count()
make = dfnew.select('Make').groupby(dfnew['Make']).count().orderBy('count', ascending=False)
make = make.withColumn("frequency",make['count']/sums)
make.show(20)

+----+-------+--------------------+
|Make|  count|           frequency|
+----+-------+--------------------+
|TOYT|1531949| 0.16548264858827366|
|HOND|1043276| 0.11269570702979002|
|FORD| 807498| 0.08722673389893124|
|NISS| 662097|  0.0715203738390444|
|CHEV| 631413| 0.06820585776228037|
| BMW| 422916| 0.04568380527704143|
|MERZ| 376830| 0.04070554990245705|
|VOLK| 316002| 0.03413484908387398|
|HYUN| 285286|0.030816876335409495|
|DODG| 271590| 0.02933742084761911|
|LEXS| 263269|0.028438578184512814|
| KIA| 217795|0.023526431656199432|
|JEEP| 214965|0.023220732252691344|
|AUDI| 179718|   0.019413316395642|
|MAZD| 169811|0.018343152441382408|
|OTHR| 154376|0.016675848450870973|
| GMC| 132788|0.014343891304958379|
|INFI| 120340| 0.01299924601348534|
|CHRY| 120317|0.012996761530700646|
|ACUR| 111265|0.012018955523437314|
+----+-------+--------------------+
only showing top 20 rows



In [229]:
#B.8
from pyspark.sql.functions import col, create_map, lit
from pyspark.sql.functions import coalesce
from itertools import chain
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

COLORS = { 'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black', 'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze', 'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold', 'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory', 'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon', 'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver', 'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown' }
#COLORS is the dict, col is the key which comes from dataframe.color
def transformcol(col):
    return COLORS.get(col)
udftransformcol = udf(transformcol, StringType())
dfnew = dfnew.withColumn("color_long", udftransformcol("Color"))

print("Show the Null in color long:")
dfnew.filter(dfnew['color_long'].isNull()).select("Ticket number","Color","color_long").show(5)

print("-------------------Now processing the Null-------------------")
#udf function, like:coalesce
def fillnull(col1,col2):
    if col1:
        return col1
    else:
        return col2
udffillnull=udf(fillnull, StringType())
dfnew = dfnew.withColumn("color_long",udffillnull(dfnew['color_long'],dfnew['Color']))

print("Show the results:")
dfnew.select("Ticket number","Color","color_long").show(5)
print("Show the results of colors which are not in the dict:")
dfnew.filter(dfnew['color_long']=="GR").select("Ticket number","Color","color_long").show(3)
dfnew.filter(dfnew['color_long']=="MA").select("Ticket number","Color","color_long").show(3)
dfnew.filter(dfnew['color_long']=="TA").select("Ticket number","Color","color_long").show(3)
dfnew.filter(dfnew['color_long']=="BU").select("Ticket number","Color","color_long").show(3)
print("Show the results of Null which is in the color_long col:")
dfnew.filter(dfnew['color_long'].isNull()).select("Ticket number","Color","color_long").show(3)

Show the Null in color long:
+-------------+-----+----------+
|Ticket number|Color|color_long|
+-------------+-----+----------+
|   1107539823| null|      null|
|   1108347984|   MA|      null|
|   1110265262|   TA|      null|
|   1111259715|   BU|      null|
|   1111884115|   GR|      null|
+-------------+-----+----------+
only showing top 5 rows

-------------------Now processing the Null-------------------
Show the results:
+-------------+-----+----------+
|Ticket number|Color|color_long|
+-------------+-----+----------+
|   1103341116|   GY|      Gray|
|   1103700150|   WH|     White|
|   1104803000|   BK|     Black|
|   1104820732|   WH|     White|
|   1105461453|   BK|     Black|
+-------------+-----+----------+
only showing top 5 rows

Show the results of colors which are not in the dict:
+-------------+-----+----------+
|Ticket number|Color|color_long|
+-------------+-----+----------+
|   1111884115|   GR|        GR|
|   1112099041|   GR|        GR|
|   1112716706|   GR|       

In [230]:
#B.9
toyotacolor = dfnew.filter(dfnew['Make']=="TOYT").groupby(dfnew['color_long']).count().orderBy('count', ascending=False)
toyotacolor = toyotacolor.withColumn('Make',lit('TOYT'))
toyotacolor = toyotacolor.select("Make","color_long","count")
toyotacolor.show(5)
mostcolor=toyotacolor.agg({"count": "max"}).collect()[0][0]
toyotacolor.where(toyotacolor['count'] == mostcolor).show()

+----+----------+------+
|Make|color_long| count|
+----+----------+------+
|TOYT|      Gray|346822|
|TOYT|     White|304620|
|TOYT|     Black|252199|
|TOYT|    Silver|248685|
|TOYT|      Blue|128051|
+----+----------+------+
only showing top 5 rows

+----+----------+------+
|Make|color_long| count|
+----+----------+------+
|TOYT|      Gray|346822|
+----+----------+------+



In [231]:
# release the cores for another application!
spark_context.stop()