In [101]:
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [2]:
# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.119:7077") \
        .appName("haodong_zhao_partB")\
        .config("spark.executor.cores",2)\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/25 17:28:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/25 17:28:22 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.


---

# B.1

Load the CSV file from HDFS, and call show() to verify the data is loaded correctly.

In [5]:
df = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://host-192-168-2-119-de1:9000/parking-citations.csv')\
    .cache() # in-memory

In [6]:
df.show()

                                                                                

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|null|HOND|        PA|  

---

# B.2

Print the schema for the DataFrame

In [7]:
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



---

# B.3 

Count the number of rows in the CSV file.

In [8]:
df.count()

                                                                                

13077724

---

# B.4

Count the number of partitions in the underlying RDD.

In [9]:
data_frame.rdd.getNumPartitions()

16

---

# B.5 

Drop the columns VIN, Latitude and Longitude.

In [10]:
df = df.drop('VIN', 'Latitude', 'Longitude')

In [11]:
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



---

# B.6

Find the maximum fine amount. How many fines have this amount?

You need to convert the ‘fine amount’ column to a float to do this correctly.

In [30]:
df = df.withColumn("Fine amount",df["Fine amount"].cast(FloatType()))

In [31]:
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: float (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [32]:
fine_amount = df.select('Fine amount')
fa_sort = fine_amount.sort('Fine amount', ascending=False)
amount = fa_sort.first()[0]
amount

1100.0

In [33]:
fa_max.filter(col("Fine amount") == amount).count()

626

So, the maximum fine amount is 1100, and 626 fines have this amount.

---

# B.7 

Show the top 20 most frequent vehicle makes, and their frequencies.

In [77]:
makes_col = df.select('Make')
makes_freq= makes_col.groupby(makes_col.Make).count()

In [98]:
makes_freq.withColumnRenamed('count', 'Count')

DataFrame[Make: string, Count: bigint]

In [99]:
top20_makes_freq = makes_freq.sort("count", ascending=False).show(20)

+----+-------+
|Make|  count|
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS| 945133|
|CHEV| 892676|
| BMW| 603092|
|MERZ| 543298|
|VOLK| 432030|
|HYUN| 404917|
|DODG| 391686|
|LEXS| 368420|
| KIA| 328155|
|JEEP| 316300|
|AUDI| 255395|
|MAZD| 242344|
|OTHR| 205546|
| GMC| 184889|
|INFI| 174315|
|CHRY| 159948|
|SUBA| 154640|
+----+-------+
only showing top 20 rows



---

# B.8 

Let’s expand some abbreviations in the color column. Create a User Defined Function to create a new column, ‘color long’, mapping the original colors to their corresponding values in the dictionary below. If there is no key matching the original color, use the original color
```
COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black', 'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze', 'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold', 'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory', 'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon', 'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver', 'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}
```

In [100]:
COLORS = {
    'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 
    'BK':'Black', 'BL':'Blue', 'BN':'Brown', 
    'BR':'Brown', 'BZ':'Bronze', 'CH':'Charcoal', 
    'DK':'Dark', 'GD':'Gold', 'GO':'Gold', 
    'GN':'Green', 'GY':'Gray', 'GT':'Granite', 
    'IV':'Ivory', 'LT':'Light', 'OL':'Olive', 
    'OR':'Orange', 'MR':'Maroon', 'PK':'Pink', 
    'RD':'Red', 'RE':'Red', 'SI':'Silver', 
    'SL':'Silver', 'SM':'Smoke', 'TN':'Tan', 
    'VT':'Violet', 'WT':'White', 'WH':'White',
    'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

In [112]:
# user-defined function
def color_convert(abbr):
    if abbr in COLORS.keys():
        return COLORS[abbr]
    else: return abbr

udf_cc = udf(color_convert, StringType())

In [113]:
new_df = df.withColumn("Color Long", udf_cc("Color"))
new_df.select("Color", "Color Long").show()

+-----+----------+
|Color|Color Long|
+-----+----------+
|   GY|      Gray|
|   WH|     White|
|   BK|     Black|
|   WH|     White|
|   BK|     Black|
|   GY|      Gray|
|   BL|      Blue|
|   BK|     Black|
|   BR|     Brown|
|   SI|    Silver|
|   WH|     White|
|   GO|      Gold|
|   BK|     Black|
|   BK|     Black|
|   BK|     Black|
|   BK|     Black|
|   WH|     White|
| null|      null|
|   BK|     Black|
|   BK|     Black|
+-----+----------+
only showing top 20 rows



---

# B.9

Using this new column, what’s the most frequent colour value for Toyotas (TOYT)?

In [125]:
df_toyt = new_df.filter(df.Make=="TOYT")

In [131]:
df_toyt_color = df_toyt.select("Color Long").groupBy(r"Color Long").count()
color_freq = df_toyt_color.sort("count", ascending=False).show(1)



+----------+------+
|Color Long| count|
+----------+------+
|      Gray|489697|
+----------+------+
only showing top 1 row



                                                                                