In [1]:
from pyspark.sql import SparkSession
from operator import add

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.119:7077") \
        .appName("xiaoxia_a3b")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/14 07:04:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/14 07:04:25 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.


In [2]:
# B1
data_frame = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://host-192-168-2-119-de1:9000/parking-citations.csv')\
    .cache()

data_frame.show()

                                                                                

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|null|HOND|        PA|  

In [3]:
print(spark_context.uiWebUrl)

http://host-192-168-2-242-de1:4040


In [4]:
# B2
data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [5]:
# B3
row_num = data_frame.count()
print(row_num)



13077724


                                                                                

In [6]:
# B4
data_frame.rdd.getNumPartitions()

16

In [7]:
# B5 Drop the columns VIN, Latitude and Longitude

columns_to_drop = ["VIN", "Latitude", "Longitude"]
data_frame_2 = data_frame.drop(*columns_to_drop)
data_frame_2.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [8]:
# B6
import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

In [10]:
# B6
max_fine_df = data_frame_2\
              .withColumn("Fine amount", data_frame_2["Fine amount"].cast(FloatType()))\
              .groupBy("Fine amount")\
              .agg({"Fine amount": 'count'})\
              .orderBy("Fine amount", ascending=False)

max_fine_df.show(3)



+-----------+------------------+
|Fine amount|count(Fine amount)|
+-----------+------------------+
|     1100.0|               626|
|     1000.0|              1672|
|      505.0|                 6|
+-----------+------------------+
only showing top 3 rows



                                                                                

In [11]:
# B6
max_fine_df.printSchema()

root
 |-- Fine amount: float (nullable = true)
 |-- count(Fine amount): long (nullable = false)



In [13]:
# B7 Show the top 20 most frequent vehicle makes, and their frequencies
make_df = data_frame_2\
        .groupBy("Make")\
        .agg({"Make": 'count'})\
        .orderBy("count(Make)", ascending=False)

make_df.show()



+----+-----------+
|Make|count(Make)|
+----+-----------+
|TOYT|    2150768|
|HOND|    1479996|
|FORD|    1116235|
|NISS|     945133|
|CHEV|     892676|
| BMW|     603092|
|MERZ|     543298|
|VOLK|     432030|
|HYUN|     404917|
|DODG|     391686|
|LEXS|     368420|
| KIA|     328155|
|JEEP|     316300|
|AUDI|     255395|
|MAZD|     242344|
|OTHR|     205546|
| GMC|     184889|
|INFI|     174315|
|CHRY|     159948|
|SUBA|     154640|
+----+-----------+
only showing top 20 rows



                                                                                

In [14]:
# B7
# Rename a column
make_df = make_df.withColumnRenamed('count(Make)','count_make')
# Add frequency
make_df = make_df.withColumn('make_frequency', make_df.count_make/row_num)

make_df.show(20)

+----+----------+--------------------+
|Make|count_make|      make_frequency|
+----+----------+--------------------+
|TOYT|   2150768|  0.1644604214005434|
|HOND|   1479996| 0.11316923342318587|
|FORD|   1116235|  0.0853539193823023|
|NISS|    945133| 0.07227045011807864|
|CHEV|    892676|  0.0682592781435057|
| BMW|    603092| 0.04611597553213388|
|MERZ|    543298| 0.04154377321313709|
|VOLK|    432030| 0.03303556490410717|
|HYUN|    404917|0.030962344823915845|
|DODG|    391686|0.029950624435872788|
|LEXS|    368420|0.028171568691922232|
| KIA|    328155| 0.02509266903017681|
|JEEP|    316300|0.024186165727308515|
|AUDI|    255395|0.019529009787941694|
|MAZD|    242344| 0.01853105326278487|
|OTHR|    205546| 0.01571726089340928|
| GMC|    184889|0.014137704695404185|
|INFI|    174315|0.013329154216742913|
|CHRY|    159948| 0.01223056856070674|
|SUBA|    154640|0.011824687537372711|
+----+----------+--------------------+
only showing top 20 rows



                                                                                

In [15]:
# B8
from pyspark.sql.types import StringType

COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black', 
'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze', 
'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold', 
'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory', 
'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon', 
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver', 
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White', 
'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

COLORS

{'AL': 'Aluminum',
 'AM': 'Amber',
 'BG': 'Beige',
 'BK': 'Black',
 'BL': 'Blue',
 'BN': 'Brown',
 'BR': 'Brown',
 'BZ': 'Bronze',
 'CH': 'Charcoal',
 'DK': 'Dark',
 'GD': 'Gold',
 'GO': 'Gold',
 'GN': 'Green',
 'GY': 'Gray',
 'GT': 'Granite',
 'IV': 'Ivory',
 'LT': 'Light',
 'OL': 'Olive',
 'OR': 'Orange',
 'MR': 'Maroon',
 'PK': 'Pink',
 'RD': 'Red',
 'RE': 'Red',
 'SI': 'Silver',
 'SL': 'Silver',
 'SM': 'Smoke',
 'TN': 'Tan',
 'VT': 'Violet',
 'WT': 'White',
 'WH': 'White',
 'YL': 'Yellow',
 'YE': 'Yellow',
 'UN': 'Unknown'}

In [17]:
# B8

def color_to_long(COLORS):
    def color_to_long_(col):
        if COLORS.get(col) is not None:
            return COLORS.get(col)
        else:
            return col
    return udf(color_to_long_, StringType())


# User-defined function. Input type is a string.
# udf_color_to_color_long = udf(color_to_color_long, StringType())


color_long_df = data_frame_2\
    .withColumn("color_long", color_to_long(COLORS)("color"))

color_long_df.show(10)

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+------------------+-----------------+----------------------+----------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date|Make|Body Style|Color|          Location|Route|Agency|Violation code|Violation Description|Fine amount|Agency Description|Color Description|Body Style Description|color_long|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+------------------+-----------------+----------------------+----------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|HOND|        PA|   GY|   13147 WELBY WAY|01521|     1|        4000A1|  

                                                                                

In [19]:
# B9 The most frequent colour value for Toyotas (TOYT)

color_long_df.filter(color_long_df["Make"] == "TOYT")\
        .groupBy("color_long")\
        .agg({"color_long": 'count'})\
        .orderBy('count(color_long)', ascending=False)\
        .take(1)

                                                                                

[Row(color_long='Gray', count(color_long)=489697)]

In [20]:
# release the cores for another application!
spark_context.stop()