In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, desc


In [2]:
spark_session = (
    SparkSession.builder
    .master("spark://192.168.2.156:7077")
    .appName("Alexnader_partB")
    .config("spark.dynamicAllocation.enabled", True)
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True)
    .config("spark.shuffle.service.enabled", False)
    .config("spark.dynamicAllocation.executorIdleTimeout", "30s")
    .config("spark.cores.max", 2)
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/12 11:49:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/12 11:49:37 WARN StandaloneSchedulerBackend: Dynamic allocation enabled without spark.executor.cores explicitly set, you may get more executors allocated than expected. It's recommended to set spark.executor.cores explicitly. Please check SPARK-30299 for more details.


In [3]:
#!hdfs dfs -ls hdfs://192.168.2.156:9000/data


In [4]:
citations_df = spark_session.read.csv(
    "hdfs://192.168.2.156:9000/data/los-angeles-parking-citations",
    header=True,
    inferSchema=True
)

                                                                                

In [5]:
citations_df.show(5)


+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|          Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    NULL|       NULL|            CA|           200304|NULL|HOND|        PA|   GY|  

In [6]:
citations_df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [7]:
row_count = citations_df.count()
print("Number of rows in the dataset:", row_count)



Number of rows in the dataset: 13079582


                                                                                

In [8]:
partitions_count = citations_df.rdd.getNumPartitions()
print("Number of partitions:", partitions_count)

Number of partitions: 16


In [9]:
citations_df = citations_df.drop("VIN", "Latitude", "Longitude")

In [10]:
from pyspark.sql.functions import col

citations_df = citations_df.withColumn("Fine amount", col("Fine amount").cast("float"))


In [11]:
from pyspark.sql.functions import max

max_fine = citations_df.select(max("Fine amount")).collect()[0][0]
print("Maximum fine amount is:", max_fine)


                                                                                

Maximum fine amount is: 1100.0


In [12]:
count_max_fine = citations_df.filter(col("Fine amount") == max_fine).count()
print(f"Number of rows with this maximum fine ({max_fine}):", count_max_fine)




Number of rows with this maximum fine (1100.0): 626


                                                                                

In [13]:
make_counts = citations_df.groupBy("Make").count().orderBy(desc("count"))
make_counts.show(20, truncate=False)


[Stage 14:>                                                         (0 + 1) / 1]

+----+-------+
|Make|count  |
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS|945133 |
|CHEV|892676 |
|BMW |603092 |
|MERZ|543298 |
|VOLK|432030 |
|HYUN|404917 |
|DODG|391686 |
|LEXS|368420 |
|KIA |328155 |
|JEEP|316300 |
|AUDI|255395 |
|MAZD|242344 |
|OTHR|205546 |
|GMC |184889 |
|INFI|174315 |
|CHRY|159948 |
|SUBA|154640 |
+----+-------+
only showing top 20 rows



                                                                                

In [14]:
COLORS = {
    'AL': 'Aluminum', 'AM': 'Amber', 'BG': 'Beige', 'BK': 'Black',
    'BL': 'Blue', 'BN': 'Brown', 'BR': 'Brown', 'BZ': 'Bronze',
    'CH': 'Charcoal', 'DK': 'Dark', 'GD': 'Gold', 'GO': 'Gold',
    'GN': 'Green', 'GY': 'Gray', 'GT': 'Granite', 'IV': 'Ivory',
    'LT': 'Light', 'OL': 'Olive', 'OR': 'Orange', 'MR': 'Maroon',
    'PK': 'Pink', 'RD': 'Red', 'RE': 'Red', 'SI': 'Silver', 'SL': 'Silver',
    'SM': 'Smoke', 'TN': 'Tan', 'VT': 'Violet', 'WT': 'White', 'WH': 'White',
    'YL': 'Yellow', 'YE': 'Yellow', 'UN': 'Unknown'
}

In [15]:
def expand_color(color_code):
    if color_code is None:
        return None
    color_code = color_code.strip().upper()
    return COLORS.get(color_code, color_code)

In [16]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

expand_color_udf = udf(expand_color, StringType())


In [17]:
citations_df = citations_df.withColumn("color_long", expand_color_udf(col("Color")))


In [18]:
toyota_df = citations_df.filter(col("Make") == "TOYT")


In [19]:
toyota_colors = toyota_df.groupBy("color_long").count().orderBy(desc("count"))
toyota_colors.show(5, truncate=False)




+----------+------+
|color_long|count |
+----------+------+
|Gray      |489697|
|White     |434595|
|Black     |353812|
|Silver    |347894|
|Blue      |180091|
+----------+------+
only showing top 5 rows



                                                                                

In [20]:
top_toyota_color = toyota_colors.collect()[0]
print("The most frequent Toyota color is:", top_toyota_color["color_long"])


                                                                                

The most frequent Toyota color is: Gray


In [22]:
spark_session.stop()
