In [37]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, min, max, avg, mean, stddev, expr,when


In [2]:
spark = SparkSession.builder.appName("DataMindsApp").getOrCreate()


In [17]:
data = spark.read.csv(r"C:\Users\S566651\Desktop\BigData\Police Data.csv", header=True, inferSchema=True)

In [18]:
data.show(5)

+---------+-------------------+------------+-------------+--------------+----------+-----------+----------------+---------+----------------+-----------+-------------+-----------+-------------+------------------+---------+
|stop_date|          stop_time|country_name|driver_gender|driver_age_raw|driver_age|driver_race|   violation_raw|violation|search_conducted|search_type| stop_outcome|is_arrested|stop_duration|drugs_related_stop|Driver ID|
+---------+-------------------+------------+-------------+--------------+----------+-----------+----------------+---------+----------------+-----------+-------------+-----------+-------------+------------------+---------+
| 1/2/2005|2024-12-06 01:55:00|        NULL|            M|          1985|        20|      White|        Speeding| Speeding|           false|       NULL|     Citation|      false|     0-15 Min|             false|DL-177987|
|1/18/2005|2024-12-06 08:15:00|        NULL|            M|          1965|        40|      White|        Speeding

In [19]:
number_of_rows = data.count()
print("Number of records in Data: ", number_of_rows)

Number of records in Data:  10227


In [20]:
data = data.drop('country_name')
data.show()

+---------+-------------------+-------------+--------------+----------+-----------+--------------------+---------+----------------+-----------+-------------+-----------+-------------+------------------+---------+
|stop_date|          stop_time|driver_gender|driver_age_raw|driver_age|driver_race|       violation_raw|violation|search_conducted|search_type| stop_outcome|is_arrested|stop_duration|drugs_related_stop|Driver ID|
+---------+-------------------+-------------+--------------+----------+-----------+--------------------+---------+----------------+-----------+-------------+-----------+-------------+------------------+---------+
| 1/2/2005|2024-12-06 01:55:00|            M|          1985|        20|      White|            Speeding| Speeding|           false|       NULL|     Citation|      false|     0-15 Min|             false|DL-177987|
|1/18/2005|2024-12-06 08:15:00|            M|          1965|        40|      White|            Speeding| Speeding|           false|       NULL|     

In [13]:
column_names = data.columns
print("Column Names: ", column_names)

Column Names:  ['stop_date', 'stop_time', 'driver_gender', 'driver_age_raw', 'driver_age', 'driver_race', 'violation_raw', 'violation', 'search_conducted', 'search_type', 'stop_outcome', 'is_arrested', 'stop_duration', 'drugs_related_stop', 'Driver ID']


In [27]:
# Goal 1:Analyze Arrest Patterns by Driver Race

In [28]:
filtered_data = data.filter((data.driver_race.isNotNull()) & (data.is_arrested.isNotNull()))
grouped_data = filtered_data.groupBy("driver_race", "is_arrested").agg(count("*").alias("count"))
grouped_data.show()

+-----------+-----------+-----+
|driver_race|is_arrested|count|
+-----------+-----------+-----+
|      Other|      false|   28|
|      Asian|      false|  271|
|   Hispanic|       true|   27|
|      Black|      false| 1016|
|      Asian|       true|   10|
|      White|      false| 7798|
|      Black|       true|  157|
|   Hispanic|      false|  172|
|      White|       true|  332|
+-----------+-----------+-----+



In [None]:
# Goal 2:Analyze gender disparities in traffic Violations 

In [31]:
voilation_data = data.dropna(subset=["violation", "driver_gender"])
violation_gender_count = voilation_data.groupBy('violation', 'driver_gender').agg(count('*').alias('count'))
violation_gender_count.show()

+-------------------+-------------+-----+
|          violation|driver_gender|count|
+-------------------+-------------+-----+
|           Speeding|            F| 2127|
|   Moving violation|            F|  278|
|           Speeding|            M| 5220|
|              Other|            M|  150|
|Registration/plates|            F|   76|
|          Equipment|            F|  102|
|Registration/plates|            M|  213|
|              Other|            F|   41|
|   Moving violation|            M| 1085|
|          Equipment|            M|  518|
+-------------------+-------------+-----+



In [None]:
# Goal 3:Assess how gender influences the likelihood of being searched during stops

In [32]:
search_during_stop = data.groupBy('driver_gender', 'search_conducted').agg(count('search_conducted').alias('count'))
search_during_stop.show()

+-------------+----------------+-----+
|driver_gender|search_conducted|count|
+-------------+----------------+-----+
|            M|            true|  371|
|            F|           false| 2570|
|            F|            true|   54|
|            M|           false| 6815|
+-------------+----------------+-----+



In [None]:
# Goal 4:Distribution of Stop Duration and Average Duration of Traffic Stops

In [33]:
stop_duration_counts = data.groupBy("stop_duration").count()
stop_duration_counts.show()

+-------------+-----+
|stop_duration|count|
+-------------+-----+
|     0-15 Min| 8304|
|    16-30 Min| 1156|
|      30+ Min|  350|
+-------------+-----+



In [38]:
data_with_mapped_duration = data.withColumn(
    "stop_duration_mapped",
    when(col("stop_duration") == "0-15 Min", 7.5)
    .when(col("stop_duration") == "16-30 Min", 24)
    .when(col("stop_duration") == "30+ Min", 45)
    .otherwise(None)  # Handle any other values, or keep them as null
)

In [39]:
mean_of_stopDuration = data_with_mapped_duration.agg(mean("stop_duration_mapped").alias("mean_stop_duration")).collect()[0]["mean_stop_duration"]
print(f"Mean of stop duration: {mean_of_stopDuration}")

Mean of stop duration: 10.782262996941895


In [None]:
# Goal 5:Examine age distributions across different traffic violations

In [41]:
meanage_perviolation = data.groupBy('violation').agg(
    count('driver_age').alias('count'),
    mean('driver_age').alias('mean'),
    min('driver_age').alias('min'),
    max('driver_age').alias('max'),
    stddev('driver_age').alias('stddev')
)
meanage_perviolation.show()

+-------------------+-----+------------------+---+---+------------------+
|          violation|count|              mean|min|max|            stddev|
+-------------------+-----+------------------+---+---+------------------+
|          Equipment|  618| 30.64724919093851| 17| 81|11.227794604250757|
|              Other|  185|32.016216216216215| 17| 72|12.007459491135391|
|   Moving violation| 1349|  34.7123795404003| 17| 76|12.518662722760283|
|Registration/plates|  288|31.270833333333332| 17| 59|10.235021349313634|
|           Speeding| 7323| 32.83941007783695| 16| 88| 12.38438555186899|
+-------------------+-----+------------------+---+---+------------------+



In [None]:
# Goal 6:Explore Top patterns in repeat violations and their recurrence over time

In [42]:
data = data.withColumn("violation", col("violation").cast("string"))
data = data.withColumn("Driver ID", col("Driver ID").cast("string"))

In [44]:
individual = data.groupBy("Driver ID", "violation").agg(count("violation").alias("count"))
top_20_individual = individual.orderBy(col("count").desc()).limit(20)
top_20_individual.show()

+---------+---------+-----+
|Driver ID|violation|count|
+---------+---------+-----+
|DL-742617| Speeding|   30|
|DL-135712| Speeding|   29|
|DL-186006| Speeding|   28|
|DL-763752| Speeding|   27|
|DL-657309| Speeding|   27|
|DL-597816| Speeding|   27|
|DL-614068| Speeding|   27|
|DL-306693| Speeding|   27|
|DL-722007| Speeding|   26|
|DL-218428| Speeding|   26|
|DL-853879| Speeding|   26|
|DL-893672| Speeding|   26|
|DL-233530| Speeding|   26|
|DL-540432| Speeding|   25|
|DL-783524| Speeding|   25|
|DL-864956| Speeding|   25|
|DL-898831| Speeding|   25|
|DL-646980| Speeding|   25|
|DL-904028| Speeding|   25|
|DL-137844| Speeding|   25|
+---------+---------+-----+

