In [1]:
from pyspark.sql import SparkSession

In [2]:
spark_session = SparkSession.builder.master("spark://192.168.2.122:7077").appName("kulathunga-vidumini").config("spark.dynamicAllocation.enabled", True).config("spark.dynamicAllocation.shuffleTracking.enabled",True).config("spark.shuffle.service.enabled", True).config("spark.dynamicAllocation.executorIdleTimeout","30s").config("spark.cores.max", 2).getOrCreate()



Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/26 17:01:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

In [4]:
lines = spark_context.textFile('hdfs://192.168.2.122:9000/europarl/parking-citations.csv')

In [6]:
df = spark_session.read.csv("hdfs://192.168.2.122:9000/parking-citations.csv", header=True, inferSchema=True)

# show the first 20 rows of the dataframe
df.show(20)

                                                                                

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21 00:00:00|    1251.0|    null|       null|            CA|         200304.0|null|HOND|        PA|   GY|

In [7]:
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Issue time: double (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: double (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: double (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: double (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [8]:
df.count()

                                                                                

13077724

In [9]:
df.rdd.getNumPartitions()

16

In [10]:
df = df.drop("VIN", "Latitude", "Longitude")

In [11]:
from pyspark.sql.types import FloatType
df = df.withColumn("Fine amount", df["Fine amount"].cast(FloatType()))

In [12]:
maximum_value = df.agg({"Fine amount": "max"}).collect()[0][0]

                                                                                

In [13]:
df.filter(df["Fine amount"] == maximum_value).count()

                                                                                

626

In [14]:
# group the rows by the values in the column and count the number of occurrences
frequent_items = df.groupBy("Make").count()

# sort the resulting dataframe in descending order by the count and select the top 20 rows
top_20 = frequent_items.orderBy("count", ascending=False).limit(20)

# print the top 20 frequent items to the console
top_20.show()



+----+-------+
|Make|  count|
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS| 945133|
|CHEV| 892676|
| BMW| 603092|
|MERZ| 543298|
|VOLK| 432030|
|HYUN| 404917|
|DODG| 391686|
|LEXS| 368420|
| KIA| 328155|
|JEEP| 316300|
|AUDI| 255395|
|MAZD| 242344|
|OTHR| 205546|
| GMC| 184889|
|INFI| 174315|
|CHRY| 159948|
|SUBA| 154640|
+----+-------+



                                                                                

In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the COLORS dictionary
COLORS = {
    'AL': 'Aluminum', 'AM': 'Amber', 'BG': 'Beige', 'BK': 'Black', 'BL': 'Blue',
    'BN': 'Brown', 'BR': 'Brown', 'BZ': 'Bronze', 'CH': 'Charcoal', 'DK': 'Dark',
    'GD': 'Gold', 'GO': 'Gold', 'GN': 'Green', 'GY': 'Gray', 'GT': 'Granite',
    'IV': 'Ivory', 'LT': 'Light', 'OL': 'Olive', 'OR': 'Orange', 'MR': 'Maroon',
    'PK': 'Pink', 'RD': 'Red', 'RE': 'Red', 'SI': 'Silver', 'SL': 'Silver',
    'SM': 'Smoke', 'TN': 'Tan', 'VT': 'Violet', 'WT': 'White', 'WH': 'White',
    'YL': 'Yellow', 'YE': 'Yellow', 'UN': 'Unknown'
}

# Define the UDF to map the colors
def map_udf_colors(color):
    return COLORS.get(color, color)

# Register the UDF with Spark
map_colors = udf(map_udf_colors, StringType())

# Create a new column "color_long" in the dataframe
df = df.withColumn("color_long", map_colors(df.Color))

In [16]:
df.show(5)

[Stage 15:>                                                         (0 + 1) / 1]

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+------------------+-----------------+----------------------+----------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date|Make|Body Style|Color|          Location|Route|Agency|Violation code|Violation Description|Fine amount|Agency Description|Color Description|Body Style Description|color_long|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+------------------+-----------------+----------------------+----------+
|   1103341116|2015-12-21 00:00:00|    1251.0|    null|       null|            CA|         200304.0|HOND|        PA|   GY|   13147 WELBY WAY|01521|   1.0|        4000A1|   NO 

                                                                                

In [17]:
from pyspark.sql.functions import desc

# Filter the dataframe to only include rows where "Make" is "TOYT"
toyota = df.filter(df.Make == "TOYT")

# Group the rows by "color_long" and count the number of occurrences of each color
toyota_color_counts = toyota.groupBy("color_long").count()

# Sort the resulting dataframe in descending order by the count of each color
sorted_count = toyota_color_counts.sort(desc("count"))

# Select the first row to get the most frequent color
sorted_count.first()["color_long"]

                                                                                

'Gray'

In [18]:
spark_session.stop()