In [1]:
from pyspark.sql import SparkSession, Window, functions as F
from pyspark.sql.functions import col, udf, regexp_replace, year, countDistinct, col, count, desc, when, row_number, to_timestamp, hour, trim
from pyspark.sql.types import DateType, StringType

# Create SparkSession
spark = SparkSession.builder.appName("Data frames").getOrCreate()

####################
#   Data Loading   #
####################

# Read the CSV file into a DataFrame
df = spark.read.options(header='True').csv('Datasets.csv')

# drop all empty row
df_filter = df.na.drop("all")

In [2]:
####################
# Examine the data #
####################

# converting all string into single date format
date_df = df_filter.withColumn('Issue Date', regexp_replace('Issue Date', '/', '-'))

# converting all string to date data type
date_df = date_df.withColumn('Issue Date', F.to_date(F.unix_timestamp('Issue Date', 'MM-dd-yyyy').cast('timestamp')))

# converting date column to year
df_with_year = date_df.withColumn("year", year(date_df["Issue Date"]))

# group year accourding to count and sort it.
total_tickets_by_year = df_with_year.groupBy("year").count().orderBy(F.asc("year"))
total_tickets_by_year.show()

+----+-----+
|year|count|
+----+-----+
|1973|    1|
|2000|   16|
|2003|    2|
|2005|    1|
|2008|    1|
|2010|    9|
|2011|    4|
|2012|   36|
|2013|49073|
|2014|   40|
|2015|   23|
|2016|    5|
|2017|    5|
|2018|    4|
|2019|    5|
|2023|    1|
|2030|    1|
|2031|    4|
|2041|    1|
|2049|    1|
+----+-----+



In [3]:
#number of unique states
unique_states_count = date_df.select("Registration State").distinct().count()

print("Number of unique states:", unique_states_count)

Number of unique states: 61


In [4]:
# Step 1: Find the state with the maximum entries
state_entries = date_df.groupBy("Registration State").agg(count("*").alias("count")).orderBy(desc("count"))

state_entries.show(5)

state_with_max_entries = state_entries.select("Registration State").first()[0]

print("State with max entries is {} \n".format(state_with_max_entries))

# Step 2: Replace '99' entries with the state having maximum entries
df_corrected = date_df.withColumn("Registration State", when(col("Registration State") == '99',state_with_max_entries).
                                  otherwise(col("Registration State")))

df_corrected.groupBy("Registration State").agg(count("*").alias("count")).orderBy(desc("count")).show(5)
# Step 3: Count the number of unique states again
unique_states_count = df_corrected.select("Registration State").distinct().count()

print("Number of unique states after correction:", unique_states_count)

+------------------+-----+
|Registration State|count|
+------------------+-----+
|                NY|35447|
|                NJ| 5467|
|                PA| 1952|
|                99| 1056|
|                CT|  871|
+------------------+-----+
only showing top 5 rows

State with max entries is NY 

+------------------+-----+
|Registration State|count|
+------------------+-----+
|                NY|36503|
|                NJ| 5467|
|                PA| 1952|
|                CT|  871|
|                FL|  574|
+------------------+-----+
only showing top 5 rows

Number of unique states after correction: 60


In [5]:
#####################
# Aggregation tasks #
#####################

# Group by violation code and count the occurrences
violation_counts = df_corrected.groupBy("Violation Code").count()

# Sort the counts in descending order
sorted_counts = violation_counts.orderBy(desc("count"))

# Display the top five violation codes
top_five = sorted_counts.limit(5)

# Show the results
print("Top 5 Vehicle Code:")
top_five.show()


Top 5 Vehicle Code:
+--------------+-----+
|Violation Code|count|
+--------------+-----+
|            46|11266|
|             5| 4367|
|            21| 4009|
|            14| 3821|
|            20| 3242|
+--------------+-----+



In [6]:
# Group by 'vehicle body type' and count the occurrences
body_type_counts = df_corrected.groupBy("Vehicle Body Type").count()

# Sort the counts in descending order
sorted_body_type_counts = body_type_counts.orderBy(col("count").desc())

# Display the top five 'vehicle body types'
top_five_body_types = sorted_body_type_counts.limit(5)

# Show the results for 'vehicle body types'
print("Top 5 Vehicle Body Types:")
top_five_body_types.show()

Top 5 Vehicle Body Types:
+-----------------+-----+
|Vehicle Body Type|count|
+-----------------+-----+
|              SDN|11534|
|              VAN|10083|
|             DELV| 8393|
|             SUBN| 8296|
|             4DSD| 2708|
+-----------------+-----+



In [7]:
df_vehicle_make_filtered = df_corrected.filter(col("Vehicle Make").isNotNull())

# Group by 'vehicle make' and count the occurrences
make_counts = df_vehicle_make_filtered.groupBy("Vehicle Make").count()

# Sort the counts in descending order
sorted_make_counts = make_counts.orderBy(col("count").desc())

# Display the top five 'vehicle makes'
top_five_makes = sorted_make_counts.limit(5)

# Show the results for 'vehicle makes'
print("Top 5 Vehicle Makes:")
top_five_makes.show()

Top 5 Vehicle Makes:
+------------+-----+
|Vehicle Make|count|
+------------+-----+
|        FORD| 6934|
|       CHEVR| 3871|
|       TOYOT| 3797|
|       HONDA| 3059|
|         GMC| 3036|
+------------+-----+



In [8]:
# Filter out erroneous entries with 'Violation Precinct' as '0' and group by 'Violation Precinct'
violation_precinct_counts = df_corrected.filter(col("Violation Precinct") != '0').groupBy("Violation Precinct").count()

# Sort the counts in descending order
sorted_violation_precinct_counts = violation_precinct_counts.orderBy(col("count").desc())

# Display the top five 'Violation Precincts'
top_five_violation_precincts = sorted_violation_precinct_counts.limit(5)

# Show the results for 'Violation Precincts'
print("Top 5 Violation Precincts:")
top_five_violation_precincts.show()

Top 5 Violation Precincts:
+------------------+-----+
|Violation Precinct|count|
+------------------+-----+
|                19| 3247|
|                18| 2019|
|                14| 1879|
|                 1| 1793|
|                17| 1551|
+------------------+-----+



In [9]:

# Filter out erroneous entries with 'Issuer Precinct' as '0' and group by 'Issuer Precinct'
issuer_precinct_counts = df_corrected.filter(col("Issuer Precinct") != '0').groupBy("Issuer Precinct").count()

# Sort the counts in descending order
sorted_issuer_precinct_counts = issuer_precinct_counts.orderBy(col("count").desc())

# Display the top six 'Issuer Precincts'
top_six_issuer_precincts = sorted_issuer_precinct_counts.limit(6)

# Show the results for 'Issuer Precincts'
print("Top 6 Issuer Precincts:")
top_six_issuer_precincts.show()

Top 6 Issuer Precincts:
+---------------+-----+
|Issuer Precinct|count|
+---------------+-----+
|            401| 5368|
|             19| 1213|
|            109| 1072|
|            162| 1047|
|             18| 1016|
|              1|  989|
+---------------+-----+



In [10]:
# Filter out rows with Issuer Precinct equal to 0
filtered_df = df_corrected.filter(df_corrected['Issuer Precinct'] != 0)

# Get the top three precincts with the most tickets issued
top_precincts = filtered_df.groupBy('Issuer Precinct').count().orderBy(desc('count')).limit(3)
print("Top 3 Precinct with highest violations:")
top_precincts.show()

# Calculate the frequency of each combination of Issuer Precinct and Violation Code
frequency_df = filtered_df.groupBy('Issuer Precinct', 'Violation Code').agg(count('*').alias('Frequency'))

# Create a window specification partitioned by Issuer Precinct and ordered by frequency in descending order
window_spec = Window.partitionBy('Issuer Precinct').orderBy(desc('Frequency'))

# Add a row number column to rank the violations within each Issuer Precinct
ranked_df = frequency_df.withColumn('row_number', row_number().over(window_spec))

# Filter the top violation for each distinct Issuer Precinct
top_violations_df = ranked_df.filter(ranked_df['row_number'] == 1)

# Sort the top three distinct Issuer Precinct values and highest violation code
result = top_violations_df.orderBy(desc('Frequency'), 'Issuer Precinct').limit(3)
print("Highest violation codes for top 3 Precinct:")
result.select("Issuer Precinct", "Violation Code", "Frequency").show()


Top 3 Precinct with highest violations:
+---------------+-----+
|Issuer Precinct|count|
+---------------+-----+
|            401| 5368|
|             19| 1213|
|            109| 1072|
+---------------+-----+

Highest violation codes for top 3 Precinct:
+---------------+--------------+---------+
|Issuer Precinct|Violation Code|Frequency|
+---------------+--------------+---------+
|            401|            46|     2146|
|             23|            46|      489|
|             19|            46|      458|
+---------------+--------------+---------+



In [11]:
# Filter out rows with Violation Time equal to 0
filtered_Violation_time_df = df_corrected.filter(col("Violation Time").isNotNull())

count_df = filtered_Violation_time_df.groupBy(filtered_Violation_time_df["Violation Time"].substr(-1, 1).alias("TimePeriod")) \
    .count() \
    .withColumn("TimePeriod", when(col("TimePeriod") == "A", "AM").otherwise("PM")) \
    .withColumnRenamed("TimePeriod", "Time of day")

count_df.show()

+-----------+-----+
|Time of day|count|
+-----------+-----+
|         AM|26881|
|         PM|22348|
+-----------+-----+



In [12]:
print("Total number of rows = {}".format(df.count()))

# drop all empty row
df_drop_na = df.na.drop("all")
print("Total number of rows after removing empty rows = {}".format(df_drop_na.count()))

Total number of rows = 68856
Total number of rows after removing empty rows = 49233


In [14]:
def convert_time(time):
    hour = int(time[:2])
    minute = int(time[2:4])
    period = time[4:]

    if period == 'P' and hour != 12:
        hour += 12
    elif period == 'A' and hour == 12:
        hour = 0

    return f'{hour:02d}:{minute:02d}'

convert_time_udf = udf(convert_time, StringType())

df_corrected = filtered_Violation_time_df.withColumn('Violation Time', convert_time_udf(filtered_Violation_time_df['Violation Time']))

df_corrected.show(5)


Py4JJavaError: An error occurred while calling o275.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 60.0 failed 1 times, most recent failure: Lost task 0.0 in stage 60.0 (TID 70) (USBLRAMANMISHR1.us.deloitte.com executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:82)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:708)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:752)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:179)
	... 25 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:82)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:708)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:752)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:179)
	... 25 more


In [None]:
#####################
#    Writing Data   #
#####################

# Specify the path where you want to save the CSV file
output_path = "output.csv"

# Write the DataFrame to a CSV file
filtered_Violation_time_df.write.csv(output_path, header=True, mode="overwrite")