In [23]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Part B") \
    .getOrCreate()

### Loading the CSV file from `HDFS`

In [24]:
df = spark.read.csv("hdfs://192.168.2.250:9000/parking-citations.csv", header=True, inferSchema=True)


                                                                                

### It is important to note that this DataFrame is type spark.dataframe and not the usual pandas

In [25]:
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [26]:

df.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21 00:00:00|    1251.0|    NULL|       NULL|            CA|         200304.0|NULL|HOND|        PA|   GY|

### Print the schema for the `DataFrame

In [27]:
df.printSchema()


root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Issue time: double (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: double (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: double (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: double (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



### Count the number of rows in the CSV file

In [28]:
number_of_rows = df.count()
print("Number of rows =", number_of_rows) 

                                                                                

Number of rows = 13077724


In [29]:
number_of_partitions = df.rdd.getNumPartitions()
print("Number of partitions in the RDD:", number_of_partitions)

Number of partitions in the RDD: 16


### Drop the columns `VIN, Latitude, Longtitude`

In [30]:
columns_to_drop = ['VIN', 'Latitude', 'Longitude']
df = df.drop(*columns_to_drop)
df.columns

['Ticket number',
 'Issue Date',
 'Issue time',
 'Meter Id',
 'Marked Time',
 'RP State Plate',
 'Plate Expiry Date',
 'Make',
 'Body Style',
 'Color',
 'Location',
 'Route',
 'Agency',
 'Violation code',
 'Violation Description',
 'Fine amount',
 'Agency Description',
 'Color Description',
 'Body Style Description']

### Find the maximum fine amount and its occurencies

In [32]:
# cast the column into float
df = df.withColumn('`Fine amount`', df['Fine amount'].cast("float"))


In [33]:
df.select('Fine amount').show(5)


+-----------+
|Fine amount|
+-----------+
|       50.0|
|       50.0|
|       58.0|
|       NULL|
|       93.0|
+-----------+
only showing top 5 rows



In [34]:
max_fine_amount = df.selectExpr('max(`Fine amount`) as max_fine_amount').collect()[0]['max_fine_amount']


                                                                                

In [35]:
max_fine_amount

1100.0

In [36]:
max_fine_amount_occurences = df.filter(df["Fine amount"] == max_fine_amount).count()


                                                                                

In [37]:
print(f'The maximum fine amount {max_fine_amount} occurs {max_fine_amount_occurences} times in the dataset')

The maximum fine amount 1100.0 occurs 626 times in the dataset


### Find the top 20 most frequent vehicle makes, and their frequencies

In [38]:
vehicle_makes = df.groupBy('Make').count().orderBy('count', ascending=False).limit(20)

# Show the result
vehicle_makes.show()



+----+-------+
|Make|  count|
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS| 945133|
|CHEV| 892676|
| BMW| 603092|
|MERZ| 543298|
|VOLK| 432030|
|HYUN| 404917|
|DODG| 391686|
|LEXS| 368420|
| KIA| 328155|
|JEEP| 316300|
|AUDI| 255395|
|MAZD| 242344|
|OTHR| 205546|
| GMC| 184889|
|INFI| 174315|
|CHRY| 159948|
|SUBA| 154640|
+----+-------+



                                                                                

### Find the most frequent colour value for Toyotas by creating a `User Defined Function` that creates a new column `color long`

In [41]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


COLORS = {
    'AL': 'Aluminum', 'AM': 'Amber', 'BG': 'Beige', 'BK': 'Black',
    'BL': 'Blue', 'BN': 'Brown', 'BR': 'Brown', 'BZ': 'Bronze',
    'CH': 'Charcoal', 'DK': 'Dark', 'GD': 'Gold', 'GO': 'Gold',
    'GN': 'Green', 'GY': 'Gray', 'GT': 'Granite', 'IV': 'Ivory',
    'LT': 'Light', 'OL': 'Olive', 'OR': 'Orange', 'MR': 'Maroon',
    'PK': 'Pink', 'RD': 'Red', 'RE': 'Red', 'SI': 'Silver', 'SL': 'Silver',
    'SM': 'Smoke', 'TN': 'Tan', 'VT': 'Violet', 'WT': 'White', 'WH': 'White',
    'YL': 'Yellow', 'YE': 'Yellow', 'UN': 'Unknown'
}

def expand_color_abbreviations(color):
    return COLORS.get(color, color)

expand_color_abbreviations_udf = udf(expand_color_abbreviations, StringType())

df = df.withColumn('color full', expand_color_abbreviations_udf(df['color']))

df.select('color full').show(5)

+----------+
|color full|
+----------+
|      Gray|
|     White|
|     Black|
|     White|
|     Black|
+----------+
only showing top 5 rows



In [43]:
toyota = df.filter(df['Make'] == 'TOYT')

# Calculate the most frequent color value
toyota_color_count= toyota.groupBy('color full').count().orderBy('count', ascending=False).first()

# Print the result
print("Most frequent color for Toyotas (TOYT):", toyota_color_count['color full'])



Most frequent color for Toyotas (TOYT): Gray


                                                                                