# **Phase 3 - Data Cleaning**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, to_date, hour, dayofweek, year, month, regexp_replace, trim, udf, lag, sum as spark_sum
from pyspark.sql.window import Window
from pyspark.sql.functions import median, avg, stddev, min, max
from pyspark.sql.functions import lower, upper, trim, lag, lead, avg, stddev, rank, dense_rank
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import year, month, col, lag, lead, when, avg, rank
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import IntegerType, DoubleType, DateType, TimestampType,StringType
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

**Cleaning 1: Handling Missing Values**

With this cleaning step, we handled missing values in the dataset. Replacing missing numeric values in specified columns with 0 and fills missing categorical values in specified columns with 'Unspecified'.

In [None]:
# ===============================================================================
# OPERATION 1: ANALYZE AND HANDLE MISSING VALUES
# ===============================================================================
# Purpose: We wanted to identify columns with missing values and handle them correctly.
# Impact: It improved data quality and prevented failures in further analysis.

In [None]:
#pyspark 1

spark = SparkSession.builder.appName("DataCleaning").getOrCreate()
file_path = "/path/to/csv/cleaned_dataset.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
print(f"Original dataset dimensions: {df.count()} rows x {len(df.columns)} columns")

missing_values = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

print("Missing values per column:")
missing_values.show()

total_rows = df.count()
missing_percentage = df.select([(count(when(col(c).isNull(), c)) / total_rows * 100).alias(c) for c in df.columns])

print("Columns with missing values (percentage):")
missing_percentage.show()

#replaced missing numeric values with 0
numerical_cols_to_be_replaced_by_Zero  = ['NUMBER OF PERSONS INJURED', 'NUMBER OF PERSONS KILLED']

df = df.fillna({col_name: 0 for col_name in numerical_cols_to_be_replaced_by_Zero})

#replaced missing categorical values (e.g., 'CONTRIBUTING FACTOR VEHICLE 1') with 'Unspecified'
categorical_cols_to_be_replaced_by_unspecified = ['CONTRIBUTING FACTOR VEHICLE 1', 'CONTRIBUTING FACTOR VEHICLE 2', 'CONTRIBUTING FACTOR VEHICLE 3',
                   'CONTRIBUTING FACTOR VEHICLE 4', 'CONTRIBUTING FACTOR VEHICLE 5', 'VEHICLE TYPE CODE 1', 'VEHICLE TYPE CODE 2',
                                                 'VEHICLE TYPE CODE 3', 'VEHICLE TYPE CODE 4', 'VEHICLE TYPE CODE 5']
df = df.fillna({col_name: 'Unspecified' for col_name in categorical_cols_to_be_replaced_by_unspecified})
missing_values_after = df.select([(spark_sum(col(c).isNull().cast("int"))).alias(c) for c in df.columns])
print("Missing values per column after replacement:")
missing_values_after.show()

# How it helps:
# Prior to handling: Identifying missing data informs us which columns may cause issues in modeling.
# After addressing: By Addressing missing data we make sure that our dataset is more reliable and it improves the quality of our analysis.

Original dataset dimensions: 2146665 rows x 29 columns
Missing values per column:
+----------+----------+-------+--------+--------+---------+--------+--------------+-----------------+---------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|CRASH DATE|CRASH TIME|BOROUGH|ZIP CODE|LATITUDE|LONGITUDE|LOCATION|ON STREET NAME|CROSS STREET NAME|OFF STREET NAME|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUMBER OF CYCLIST KILLED|NUMBER OF MOTORIST INJURED|NUMBER OF MOTORIST

**Cleaning 2: Dropping unnecessary colums and removing Duplicates**

In this step, we dropped unnecessary columns. We dropped the Location column because we already have separate columns for Latitude and Longitude.

In [None]:
# ===============================================================================
# OPERATION 2: DIMENSION REDUCTION (DROPPING COLUMNS AND DUPLICATES)
# ===============================================================================
# Purpose: We needed to remove unnecessary columns and duplicate records to reduce data volume
# Impact: It improved processing efficiency in the distributed environment by reducing data size
# This operation is particularly important for big data processing where reducing data size
# directly impacts processing time and cluster resource utilization

In [None]:
#Pyspark 2:

#drropped the 'LOCATION' column
df = df.drop('LOCATION')

#iddentified and removed duplicate rows
duplicate_count = df.count() - df.dropDuplicates().count()
print(f"Number of duplicate rows identified: {duplicate_count}")

#removed duplicate rows
df = df.dropDuplicates()
print(f"Dataset dimensions after removing duplicates: {df.count()} rows x {len(df.columns)} columns")

#modified DataFrame
df.show(5)


Number of duplicate rows identified: 0
Dataset dimensions after removing duplicates: 2146665 rows x 28 columns
+----------+----------+---------+--------+----------+-----------+--------------------+--------------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|CRASH DATE|CRASH TIME|  BOROUGH|ZIP CODE|  LATITUDE|  LONGITUDE|      ON STREET NAME|   CROSS STREET NAME|     OFF STREET NAME|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUMBER OF CYCLIST K

**Cleaning 3: Standardizing DateTime and making data types consistent**

In this step, we standardized the values in Crash Date column.

In [None]:
# ===============================================================================
# OPERATION 3: Standardizing Date Time and Data Types
# ===============================================================================
# Purpose: We made every date in a standard format and confirmed that data types are sufficient for the given data.
# Impact: This step helped in standardizing the data, making it uniform for further analysis.

In [None]:
#converted date string to date type
df = df.withColumn("CRASH DATE", to_date(col("CRASH DATE"), "yyyy-MM-dd"))

#checked for missing dates after conversion
missing_dates = df.select(
    col("CRASH DATE").isNull().cast("int").alias("Missing Crash Dates"),
    col("CRASH TIME").isNull().cast("int").alias("Missing Crash Times")
)
print("Missing Dates Summary:")
missing_dates.show()

#converted numeric columns to DoubleType for consistent numerical operations
numeric_columns = [
    'LATITUDE', 'LONGITUDE', 'NUMBER OF PERSONS INJURED', 'NUMBER OF PERSONS KILLED',
    'NUMBER OF PEDESTRIANS INJURED', 'NUMBER OF PEDESTRIANS KILLED',
    'NUMBER OF CYCLIST INJURED', 'NUMBER OF CYCLIST KILLED',
    'NUMBER OF MOTORIST INJURED', 'NUMBER OF MOTORIST KILLED'
]
for col_name in numeric_columns:
    df = df.withColumn(col_name, col(col_name).cast(DoubleType()))

#converted categorical columns to StringType for consistent string operations
categorical_columns = [
    'CONTRIBUTING FACTOR VEHICLE 1', 'CONTRIBUTING FACTOR VEHICLE 2',
    'CONTRIBUTING FACTOR VEHICLE 3', 'CONTRIBUTING FACTOR VEHICLE 4',
    'CONTRIBUTING FACTOR VEHICLE 5', 'VEHICLE TYPE CODE 1',
    'VEHICLE TYPE CODE 2', 'VEHICLE TYPE CODE 3',
    'VEHICLE TYPE CODE 4', 'VEHICLE TYPE CODE 5'
]
for col_name in categorical_columns:
    df = df.withColumn(col_name, col(col_name).cast(StringType()))

print("Schema after type conversion:")
df.printSchema()


Missing Dates Summary:
+-------------------+-------------------+
|Missing Crash Dates|Missing Crash Times|
+-------------------+-------------------+
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
|                  1|                  0|
+----------

**Cleaning 4: Outlier Detection and Removal**

In this step, we detected outliers, those which could add bias to the data - removed them.

In [None]:
# ===============================================================================
# OPERATION 4: OUTLIER DETECTION AND REMOVAL USING IQR METHOD
# ===============================================================================
# Purpose: Identified and removed statistical outliers from numerical columns
# Impact: It improved model performance by removing extreme values that could skew analysis
# Using distributed functions for statistical calculations is much more efficient
# than collecting data to the driver and computing locally

In [None]:
def detect_outliers_iqr(df, column):
    """
    Detect outliers using the Interquartile Range (IQR) method
    Returns the lower and upper bounds for filtering
    """
    # Calculate Q1 (25th percentile) and Q3 (75th percentile) using approxQuantile
    # This is a distributed operation that approximates quantiles efficiently
    Q1, Q3 = df.approxQuantile(column, [0.25, 0.75], 0.05)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    return lower_bound, upper_bound

# Apply outlier detection to injury columns
injured_columns = ['NUMBER OF PERSONS INJURED']
for col_name in injured_columns:
    lower_bound, upper_bound = detect_outliers_iqr(df, col_name)
    print(f"Outlier bounds for {col_name}: Lower={lower_bound}, Upper={upper_bound}")

    # Count outliers before removal
    outlier_count = df.filter((col(col_name) < lower_bound) | (col(col_name) > upper_bound)).count()
    print(f"Number of outliers in {col_name}: {outlier_count}")

    # Remove outliers
    df = df.filter((col(col_name) >= lower_bound) & (col(col_name) <= upper_bound))

Outlier bounds for NUMBER OF PERSONS INJURED: Lower=-1.5, Upper=2.5
Number of outliers in NUMBER OF PERSONS INJURED: 39310


**Cleaning 5: Further DataType Consistency for Numeric data**

In this step, we checked whether the required columns are of the appropriate datatype.

For example, ensuring Number of Persons Injured is Numeric.

In [None]:
# ===============================================================================
# OPERATION 5: TEXT STANDARDIZATION AND NORMALIZATION
# ===============================================================================
# Purpose: Normalize text fields for consistent analysis
# Impact: Improved grouping and aggregation accuracy by handling inconsistent text values
# Especially important for distributed processing where data comes from multiple sources

In [None]:
# Standardize categorical columns (convert to lowercase and handle nulls)
for col_name in categorical_columns:
    df = df.withColumn(
        col_name,
        when(col(col_name).isNull(), 'unknown')
        .otherwise(lower(col(col_name))).cast("string")
    )

print("Sample of standardized categorical columns:")
df.select(categorical_columns[:5]).show(5)  # Show first two columns as example

Sample of standardized categorical columns:
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|CONTRIBUTING FACTOR VEHICLE 1|CONTRIBUTING FACTOR VEHICLE 2|CONTRIBUTING FACTOR VEHICLE 3|CONTRIBUTING FACTOR VEHICLE 4|CONTRIBUTING FACTOR VEHICLE 5|
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|                  unspecified|                  unspecified|                  unspecified|                  unspecified|                  unspecified|
|                  unspecified|                  unspecified|                  unspecified|                  unspecified|                  unspecified|
|                  unspecified|                  unspecified|                  unspecified|                  unspecified|                  unspecified|
|                  unspecified|             

**Cleaning 6: Lat Long Validation**

In this step, we put Lat Long values in an appropriate range
To prevent the spatial analysis errors.

In [None]:
# ===============================================================================
# OPERATION 6: GEOGRAPHIC COORDINATE VALIDATION
# ===============================================================================
# Purpose: Ensure latitude and longitude values are within valid ranges
# Impact: Prevents spatial analysis errors from invalid coordinates
# This operation demonstrates data validation in a distributed context

In [None]:
df = df.withColumn(
    'LATITUDE',
    when((col('LATITUDE') >= -90) & (col('LATITUDE') <= 90), col('LATITUDE')).otherwise(None)
)

df = df.withColumn(
    'LONGITUDE',
    when((col('LONGITUDE') >= -180) & (col('LONGITUDE') <= 180), col('LONGITUDE')).otherwise(None)
)

# Count invalid coordinates that were fixed
invalid_coords = df.filter(col('LATITUDE').isNull() | col('LONGITUDE').isNull()).count()
print(f"Number of records with invalid coordinates: {invalid_coords}")
print("Sample of validated coordinates:")
df.select("LATITUDE", "LONGITUDE").show(5)

Number of records with invalid coordinates: 234699
Sample of validated coordinates:
+----------+-----------+
|  LATITUDE|  LONGITUDE|
+----------+-----------+
|40.6830004|-73.9734777|
|40.7662862|-73.9818487|
|40.7992856|-73.9453602|
|40.7786204|-73.8414464|
|40.7728532|-73.9060611|
+----------+-----------+
only showing top 5 rows



**Cleaning 7: Scaling Features**

In this step, we scaled our features to bring them in an appropriate range using min-max scaler.

In [None]:
# ===============================================================================
# OPERATION 7: FEATURE SCALING USING MIN-MAX SCALING
# ===============================================================================
# Purpose: Aim was to normalize numerical features to a standard range (0-1)
# Impact: It definitely enhances the comparability between features with different scales
# Demonstrates ML pipeline integration with data cleaning in distributed environment

In [None]:
# we can also save this updated data into a csv using the following command -
# data.to_csv('cleaned_dataset_with_standardized_text.csv', index=False)

In [None]:
columns_to_scale = ['NUMBER OF PERSONS INJURED', 'NUMBER OF PERSONS KILLED']

#dropped the 'features' column if it already exists
if "features" in df.columns:
    df = df.drop("features")

#assembled the specified columns into a feature vector
assembler = VectorAssembler(inputCols=columns_to_scale, outputCol="features")
df = assembler.transform(df)

#intialized the MinMaxScaler to scale the 'features' column
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(df)
df_scaled = scaler_model.transform(df)

#converted the vector column to an array so that we can extract elements
df_scaled = df_scaled.withColumn("scaled_array", vector_to_array("scaled_features"))

#extracted the individual scaled columns and add "_SCALED" suffix
df_scaled = df_scaled.withColumn("NUMBER OF PERSONS INJURED_SCALED", col("scaled_array")[0]) \
                     .withColumn("NUMBER OF PERSONS KILLED_SCALED", col("scaled_array")[1]) \
                     .drop("features", "scaled_features", "scaled_array")

print("Sample of scaled features:")
df_scaled.select(columns_to_scale + ["NUMBER OF PERSONS INJURED_SCALED", "NUMBER OF PERSONS KILLED_SCALED"]).show(5)

df = df_scaled

Sample of scaled features:
+-------------------------+------------------------+--------------------------------+-------------------------------+
|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PERSONS INJURED_SCALED|NUMBER OF PERSONS KILLED_SCALED|
+-------------------------+------------------------+--------------------------------+-------------------------------+
|                      0.0|                     0.0|                             0.0|                            0.0|
|                      1.0|                     0.0|                             0.5|                            0.0|
|                      0.0|                     1.0|                             0.0|                            0.2|
|                      0.0|                     0.0|                             0.0|                            0.0|
|                      1.0|                     0.0|                             0.5|                            0.0|
+-------------------------+--

**Cleaning 8: Handling critical columns by adding new feature/s**

In this step, we are creating a new category which will classify each incident based on how many people were injured in it.

In [None]:
# ===============================================================================
# OPERATION 8: FEATURE ENGINEERING - DERIVED METRICS
# ===============================================================================
# Purpose: Created new columns with derived metrics for enhanced analysis
# Impact: Enables deeper insights by calculating compound metrics from raw data
# This shows how distributed processing can create and validate new features efficiently

In [None]:
#new column creation by aggregating injury and death metrics
df = df.withColumn(
    "Total Injuries",
    col("NUMBER OF PERSONS INJURED") +
    col("NUMBER OF PEDESTRIANS INJURED") +
    col("NUMBER OF CYCLIST INJURED") +
    col("NUMBER OF MOTORIST INJURED")
)

df = df.withColumn(
    "Total Deaths",
    col("NUMBER OF PERSONS KILLED") +
    col("NUMBER OF PEDESTRIANS KILLED") +
    col("NUMBER OF CYCLIST KILLED") +
    col("NUMBER OF MOTORIST KILLED")
)

df = df.withColumn("Total Affected", col("Total Injuries") + col("Total Deaths"))


total_injuries = df.agg(F.sum("Total Injuries").alias("total_injuries")).collect()[0]["total_injuries"]
total_deaths = df.agg(F.sum("Total Deaths").alias("total_deaths")).collect()[0]["total_deaths"]

print("Total Injuries:", total_injuries)
print("Total Deaths:", total_deaths)


df = df.withColumn("Total Affected Check", col("Total Injuries") + col("Total Deaths"))
inconsistent_total_affected = df.filter(col("Total Affected") != col("Total Affected Check"))
inconsistent_count = inconsistent_total_affected.count()

print("Inconsistent Total Affected Rows:", inconsistent_count)
if inconsistent_count == 0:
    print("Data is consistent")
else:
    print("Data inconsistencies found:")
    inconsistent_total_affected.select("Total Affected", "Total Affected Check").show(5)

Total Injuries: 1072282.0
Total Deaths: 6104.0
Inconsistent Total Affected Rows: 0
Data is consistent


**Cleaning 9 & 10: Extracting features to perform Time Series Analysis**

Analyzing and investigating interesting trends that move with time (For e.g. - seasonal change in the accidents).

In [None]:
# ===============================================================================
# OPERATION 9: TEMPORAL FEATURE EXTRACTION
# ===============================================================================
# Purpose: To Extract date-based features for time-based analysis
# Impact: This step enabled seasonal trend analysis and time-based aggregations
# Demonstrates date manipulation in a distributed context

In [None]:
#new year, month, day columns
df = df.withColumn("YEAR", year(col("CRASH DATE")))
df = df.withColumn("MONTH", month(col("CRASH DATE")))
df = df.withColumn("DAY_OF_WEEK", dayofweek(col("CRASH DATE")))

#new column "Season" based on the month
df = df.withColumn("SEASON",
    when(month("CRASH DATE").isin([12, 1, 2]), "Winter")
    .when(month("CRASH DATE").isin([3, 4, 5]), "Spring")
    .when(month("CRASH DATE").isin([6, 7, 8]), "Summer")
    .otherwise("Fall")
)

print("Sample of temporal features:")
df.select("CRASH DATE", "YEAR", "MONTH", "DAY_OF_WEEK", "SEASON").show(5)

Sample of temporal features:
+----------+----+-----+-----------+------+
|CRASH DATE|YEAR|MONTH|DAY_OF_WEEK|SEASON|
+----------+----+-----+-----------+------+
|      NULL|NULL| NULL|       NULL|  Fall|
|      NULL|NULL| NULL|       NULL|  Fall|
|      NULL|NULL| NULL|       NULL|  Fall|
|      NULL|NULL| NULL|       NULL|  Fall|
|      NULL|NULL| NULL|       NULL|  Fall|
+----------+----+-----+-----------+------+
only showing top 5 rows



In [None]:
# ===============================================================================
# OPERATION 10: WINDOW FUNCTIONS FOR TIME-SERIES ANALYSIS (EXPLICITLY REQUIRED)
# ===============================================================================
# Purpose: We thought of using the window functions to analyze trends over time and within boroughs
# Impact: Enables advanced time-series analysis like moving averages and rankings
# This directly addresses the assignment requirement for windowing techniques


In [None]:
#windowing is a prominent technique used in Spark. it has a huge potential to increase data processing.

#a window specification partitioned by borough and ordered by date
borough_window = Window.partitionBy("BOROUGH").orderBy("CRASH DATE")

#a month-level window for seasonal analysis
month_window = Window.partitionBy("BOROUGH", "MONTH").orderBy("CRASH DATE")

#rolling metrics using window functions
df = df.withColumn("PREV_DAY_INJURIES", lag("NUMBER OF PERSONS INJURED", 1).over(borough_window))
df = df.withColumn("NEXT_DAY_INJURIES", lead("NUMBER OF PERSONS INJURED", 1).over(borough_window))
df = df.withColumn("INJURY_CHANGE", col("NUMBER OF PERSONS INJURED") - col("PREV_DAY_INJURIES"))

#ranking of accidents by borough
df = df.withColumn("BOROUGH_INJURY_RANK", rank().over(Window.partitionBy("BOROUGH").orderBy(col("NUMBER OF PERSONS INJURED").desc())))

#moving averages for injuries (7-day window)
seven_day_window = Window.partitionBy("BOROUGH").orderBy("CRASH DATE").rowsBetween(-6, 0)
df = df.withColumn("7DAY_AVG_INJURIES", avg("NUMBER OF PERSONS INJURED").over(seven_day_window))

df = df.withColumn("YEAR", year(col("CRASH DATE")))
df = df.withColumn("MONTH", month(col("CRASH DATE")))


monthly_agg = df.groupBy("BOROUGH", "YEAR", "MONTH").agg(
    F.sum("NUMBER OF PERSONS INJURED").alias("MONTHLY_INJURIES"),
    F.countDistinct("CRASH DATE").alias("ACCIDENT_DAYS")
)

#window for month-over-month comparison
month_comparison_window = Window.partitionBy("BOROUGH").orderBy("YEAR", "MONTH")

#month-over-month changes
monthly_agg = monthly_agg.withColumn(
    "PREV_MONTH_INJURIES",
    lag("MONTHLY_INJURIES", 1).over(month_comparison_window)
)

monthly_agg = monthly_agg.withColumn(
    "MONTHLY_CHANGE_PCT",
    when(col("PREV_MONTH_INJURIES").isNotNull() & (col("PREV_MONTH_INJURIES") > 0),
         ((col("MONTHLY_INJURIES") - col("PREV_MONTH_INJURIES")) / col("PREV_MONTH_INJURIES") * 100)
    ).otherwise(None)
)

print("Sample window function results:")
df.select("BOROUGH", "CRASH DATE", "NUMBER OF PERSONS INJURED", "PREV_DAY_INJURIES",
          "INJURY_CHANGE", "BOROUGH_INJURY_RANK", "7DAY_AVG_INJURIES").show(5)

print("Sample month-over-month analysis:")
monthly_agg.select("BOROUGH", "YEAR", "MONTH", "MONTHLY_INJURIES",
                   "PREV_MONTH_INJURIES", "MONTHLY_CHANGE_PCT").show(5)


Sample window function results:
+-------+----------+-------------------------+-----------------+-------------+-------------------+-----------------+
|BOROUGH|CRASH DATE|NUMBER OF PERSONS INJURED|PREV_DAY_INJURIES|INJURY_CHANGE|BOROUGH_INJURY_RANK|7DAY_AVG_INJURIES|
+-------+----------+-------------------------+-----------------+-------------+-------------------+-----------------+
|   NULL|      NULL|                      2.0|              1.0|          1.0|                  1|              2.0|
|   NULL|      NULL|                      2.0|              0.0|          2.0|                  1|              2.0|
|   NULL|      NULL|                      2.0|              0.0|          2.0|                  1|              2.0|
|   NULL|      NULL|                      2.0|              1.0|          1.0|                  1|              2.0|
|   NULL|      NULL|                      2.0|              0.0|          2.0|                  1|              2.0|
+-------+----------+------------

**Cleaning 11: Flagging the critical features**
Our ultimate goal is to create an automated recommender system and colaborate with the traffic control department of NYS.
For that, we are providing Green (Improvements) and Red (Require immediate attention) flasgs for different categories.

In [None]:
# ===============================================================================
# OPERATION 11: DATA CATEGORIZATION AND FLAGGING
# ===============================================================================
# Purpose: Create categorical features and flags for filtering and aggregation
# Impact: Simplifies downstream analysis with pre-computed categories and flags
# Shows practical application of conditional logic in distributed data processing

In [None]:
#critical missing data flag
df = df.withColumn(
    "CRITICAL_MISSING",
    col("LATITUDE").isNull() | col("LONGITUDE").isNull() | col("NUMBER OF PERSONS INJURED").isNull()
)

#accident severity categories
df = df.withColumn(
    "ACCIDENT_SEVERITY",
    when(col("NUMBER OF PERSONS INJURED") < 2, "Low")
     .when((col("NUMBER OF PERSONS INJURED") >= 2) & (col("NUMBER OF PERSONS INJURED") < 4), "Medium")
     .otherwise("High")
)

#different types of flags for injuries and fatalities
df = df.withColumn("HAS_INJURIES", when(col("NUMBER OF PERSONS INJURED") > 0, True).otherwise(False))
df = df.withColumn("HAS_FATALITIES", when(col("NUMBER OF PERSONS KILLED") > 0, True).otherwise(False))

print("Sample categorization and flags:")
df.select("LATITUDE", "LONGITUDE", "NUMBER OF PERSONS INJURED",
          "CRITICAL_MISSING", "ACCIDENT_SEVERITY", "HAS_INJURIES", "HAS_FATALITIES").show(5)

Sample categorization and flags:
+----------+-----------+-------------------------+----------------+-----------------+------------+--------------+
|  LATITUDE|  LONGITUDE|NUMBER OF PERSONS INJURED|CRITICAL_MISSING|ACCIDENT_SEVERITY|HAS_INJURIES|HAS_FATALITIES|
+----------+-----------+-------------------------+----------------+-----------------+------------+--------------+
|40.6830004|-73.9734777|                      0.0|           false|              Low|       false|         false|
|40.7662862|-73.9818487|                      1.0|           false|              Low|        true|         false|
|40.7992856|-73.9453602|                      0.0|           false|              Low|       false|          true|
|40.7786204|-73.8414464|                      0.0|           false|              Low|       false|         false|
|40.7728532|-73.9060611|                      1.0|           false|              Low|        true|         false|
+----------+-----------+-------------------------+-----

**Cleaning 12: Seasonal Tracking**

In this step, we are categorizing our incidents based on the season. Since our data does not have any data related to the weather conditions, this will allow us to approximate the conditions for each incident.

In [None]:
# ===============================================================================
# OPERATION 12: Season based analysis of crashes
# ===============================================================================
# Purpose: Categorize incidents based on season
# Impact: It helped us approximate the conditions for every incident (crash / injury / casualty)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import *

#initialize Spark with checkpoint configuration
spark = SparkSession.builder \
    .appName("CrashAnalysis") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "400") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

#checkpoint directory (crucial for checkpointing)
spark.sparkContext.setCheckpointDir('/tmp/checkpoints/')  # <-- ADD THIS LINE
print(df.head(5))
# Data preprocessing pipeline
clean_df = (df
    .filter(col("BOROUGH").isNotNull())
    .withColumn("CRASH_DATE", to_date(col("CRASH DATE"), "yyyy-MM-dd"))
    .drop("CRASH DATE")
    .withColumn("YEAR", year(col("CRASH_DATE")))
    .filter(col("YEAR") >= 2018)
    .repartition(400, "BOROUGH", "YEAR")
)

#range window specification
range_window = (Window.partitionBy("BOROUGH")
                .orderBy(col("CRASH_DATE").cast("long"))
                .rangeBetween(-6*86400, 0))

#enhanced DataFrame with checkpointing
enhanced_df = (clean_df
    .withColumn("DAILY_INJURIES", col("NUMBER OF PERSONS INJURED"))
    .withColumn("7DAY_AVG", avg(col("DAILY_INJURIES")).over(range_window))
).checkpoint()  # Now works with the directory set

try:
    print("Successful Analysis:")
    enhanced_df.select(
        "BOROUGH",
        "CRASH_DATE",
        "DAILY_INJURIES",
        "7DAY_AVG"
    ).limit(5).show(truncate=False)

except Exception as e:
    print(f"Fallback to disk storage due to: {str(e)}")
    enhanced_df.write.parquet("./safe_results")

spark.stop()

[Row(CRASH DATE=None, CRASH TIME='0:00', BOROUGH=None, ZIP CODE=None, LATITUDE=None, LONGITUDE=None, ON STREET NAME='BELT PARKWAY                    ', CROSS STREET NAME=None, OFF STREET NAME=None, NUMBER OF PERSONS INJURED=2.0, NUMBER OF PERSONS KILLED=0.0, NUMBER OF PEDESTRIANS INJURED=0.0, NUMBER OF PEDESTRIANS KILLED=0.0, NUMBER OF CYCLIST INJURED=0.0, NUMBER OF CYCLIST KILLED=0.0, NUMBER OF MOTORIST INJURED=2.0, NUMBER OF MOTORIST KILLED=0.0, CONTRIBUTING FACTOR VEHICLE 1='following too closely', CONTRIBUTING FACTOR VEHICLE 2='unspecified', CONTRIBUTING FACTOR VEHICLE 3='unspecified', CONTRIBUTING FACTOR VEHICLE 4='unspecified', CONTRIBUTING FACTOR VEHICLE 5='unspecified', COLLISION_ID=4247787, VEHICLE TYPE CODE 1='sedan', VEHICLE TYPE CODE 2='unspecified', VEHICLE TYPE CODE 3='unspecified', VEHICLE TYPE CODE 4='unspecified', VEHICLE TYPE CODE 5='unspecified', NUMBER OF PERSONS INJURED_SCALED=1.0, NUMBER OF PERSONS KILLED_SCALED=0.0, Total Injuries=4.0, Total Deaths=0.0, Total Aff

In [None]:
# ===============================================================================
#SUMMARY STATISTICS AND FINAL VALIDATION
# ===============================================================================
# Purpose: Calculate summary statistics and perform final validation checks
# Impact: Provides quality metrics and ensures data integrity before analytics
# Demonstrates aggregation operations in a distributed environment

In [None]:
#summary statistics
summary_stats = df.select(
    F.count("*").alias("TOTAL_RECORDS"),
    F.countDistinct("CRASH DATE").alias("DISTINCT_DAYS"),
    F.countDistinct("BOROUGH").alias("DISTINCT_BOROUGHS"),
    F.sum("Total Injuries").alias("TOTAL_INJURIES"),
    F.sum("Total Deaths").alias("TOTAL_DEATHS"),
    F.avg("NUMBER OF PERSONS INJURED").alias("AVG_INJURIES_PER_ACCIDENT"),
    F.max("NUMBER OF PERSONS INJURED").alias("MAX_INJURIES"),
    F.min("CRASH DATE").alias("EARLIEST_DATE"),
    F.max("CRASH DATE").alias("LATEST_DATE")
).collect()[0]

#summary
print("\n===== DATA CLEANING AND PROCESSING SUMMARY =====")
print(f"Total records processed: {summary_stats['TOTAL_RECORDS']}")
print(f"Date range: {summary_stats['EARLIEST_DATE']} to {summary_stats['LATEST_DATE']}")
print(f"Distinct days: {summary_stats['DISTINCT_DAYS']}")
print(f"Distinct boroughs: {summary_stats['DISTINCT_BOROUGHS']}")
print(f"Total injuries: {summary_stats['TOTAL_INJURIES']}")
print(f"Total deaths: {summary_stats['TOTAL_DEATHS']}")
print(f"Average injuries per accident: {summary_stats['AVG_INJURIES_PER_ACCIDENT']:.2f}")
print(f"Maximum injuries in one accident: {summary_stats['MAX_INJURIES']}")

#cache the final cleaned dataframe for efficient reuse
df.cache()

print("\nDistributed data cleaning and processing complete!")

# **Phase 3 - EDA**

In [None]:
#initialized a new Spark Session
spark = SparkSession.builder \
    .appName("NYC Crash Data Analysis") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

**EDA1: Vehicles Contributing to accidents**

The bar chart highlights the top 10 factors responsible for motor accidents.

**Inference:**

**Driver distraction**, **failure to yield** and **tailgating** are the major reasons for crashed.

Furthermore, large number of **unspecified** accidents indicate missing data.

In [None]:
###############################################################################
# VISUALIZATION 1: Top Contributing Factors
###############################################################################

In [None]:
#occurrences of each contributing factor
top_factors_df = df.groupBy('CONTRIBUTING FACTOR VEHICLE 1') \
                    .count() \
                    .orderBy(F.desc('count')) \
                    .limit(10)

# Convert the Spark DataFrame to Pandas for easier plotting
top_factors_pd = top_factors_df.toPandas()

# Plot the top contributing factors
plt.figure(figsize=(12, 7))
sns.barplot(x='CONTRIBUTING FACTOR VEHICLE 1', y='count', data=top_factors_pd, palette="magma")
plt.title('Top 10 Contributing Factors to Crashes', fontsize=16)
plt.xlabel('Contributing Factors', fontsize=14)
plt.ylabel('Number of Crashes', fontsize=14)
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

**EDA2: Vehicle Types Involved In Crashes**

**Inference:**

The most common vehicles involved in accidents are Sedans, SUVs and passenger vehicles.

Taxis, tracks and buses contribute to accidents at a lower rate.

The graph indicates that personal and commercial vehicles contribute differently to crash patterns.


In [None]:
###############################################################################
# VISUALIZATION 2: Top Vehicle Types
###############################################################################

In [None]:
top_vehicle_types_df = df.groupBy('VEHICLE TYPE CODE 1') \
                          .count() \
                          .orderBy(F.desc('count')) \
                          .limit(10)

top_vehicle_types_pd = top_vehicle_types_df.toPandas()

plt.figure(figsize=(12, 7))
sns.barplot(x='VEHICLE TYPE CODE 1', y='count', data=top_vehicle_types_pd, palette="cividis")
plt.title('Top 10 Vehicle Types Involved in Crashes', fontsize=16)
plt.xlabel('Vehicle Type', fontsize=14)
plt.ylabel('Number of Crashes', fontsize=14)
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()


**EDA3: Types of Injuries aggregrated based on Cyclists and Motorist**

**Inference:**

Motorist injuries dominates pedestarian/cyclist injuries.

But pedastrians are more vulnerable as fatalities are more for pedastrians compared to motorists.

In [None]:
###############################################################################
# VISUALIZATION 3: Types of Injuries and Deaths
###############################################################################

In [None]:
#aggregating the data for Cyclist and Motorist injuries and deaths
crash_types_df = df.select(
    F.sum('NUMBER OF PEDESTRIANS INJURED').alias('Pedestrian Injuries'),
    F.sum('NUMBER OF CYCLIST INJURED').alias('Cyclist Injuries'),
    F.sum('NUMBER OF MOTORIST INJURED').alias('Motorist Injuries'),
    F.sum('NUMBER OF PEDESTRIANS KILLED').alias('Pedestrian Deaths'),
    F.sum('NUMBER OF CYCLIST KILLED').alias('Cyclist Deaths'),
    F.sum('NUMBER OF MOTORIST KILLED').alias('Motorist Deaths')
).collect()

#convert the aggregated data to a dictionary
crash_types_dict = {
    'Pedestrian Injuries': crash_types_df[0]['Pedestrian Injuries'],
    'Cyclist Injuries': crash_types_df[0]['Cyclist Injuries'],
    'Motorist Injuries': crash_types_df[0]['Motorist Injuries'],
    'Pedestrian Deaths': crash_types_df[0]['Pedestrian Deaths'],
    'Cyclist Deaths': crash_types_df[0]['Cyclist Deaths'],
    'Motorist Deaths': crash_types_df[0]['Motorist Deaths']
}

#convert to Pandas DataFrame for easier plotting
crash_types_pd = pd.DataFrame(list(crash_types_dict.items()), columns=['Crash Type', 'Count'])

#plotting the crash types and their frequencies
plt.figure(figsize=(12, 7))
sns.barplot(x='Count', y='Crash Type', data=crash_types_pd, palette="mako")
plt.title('Types of Crashes and Their Frequencies')
plt.xlabel('Count')
plt.ylabel('Type of Crash')
plt.tight_layout()
plt.show()

**EDA4: Which hour sees most crashes in a day**

**Inference:**

Most crashes and accidents occur in between 8-10am in the morning (business and peak traffic hours)

In [None]:
###############################################################################
# VISUALIZATION 4: Crashes by Hour of Day
###############################################################################

In [None]:
from pyspark.sql import functions as F

#convert CRASH TIME to a timestamp if necessary and create a new 'Hour' column
df = df.withColumn("Hour", F.hour(F.col("CRASH TIME")))

#group by the newly created 'Hour' column and count
crashes_per_hour = df.groupBy('Hour').count()

#calculate the average number of crashes per hour
total_crashes = crashes_per_hour.agg(F.sum('count').alias('total_crashes')).collect()[0]['total_crashes']
num_unique_hours = crashes_per_hour.count()
average_crashes_per_hour = total_crashes / num_unique_hours

#convert to Pandas DataFrame for plotting
crashes_per_hour_pd = crashes_per_hour.toPandas()

#plot the average number of crashes per hour
plt.figure(figsize=(12, 6))
sns.barplot(x='Hour', y='count', data=crashes_per_hour_pd)
plt.title('Average Number of Crashes per Hour of Day')
plt.xlabel('Hour of Day')
plt.ylabel('Number of Crashes')
plt.xticks(range(0, 24))
plt.tight_layout()
plt.show()

print(f"Average Number of Crashes per Hour: {average_crashes_per_hour}")


**EDA5: Which Month is infamous for huge number of accidents**

**Inference:**

More accidents are seen in the months of July and December.

In [None]:
###############################################################################
# VISUALIZATION 5: Monthly Crash Trend
###############################################################################

In [None]:
#Grouped by Year and Month to count the number of crashes per month
monthly_crashes = df.groupBy('Year', 'Month').count()

#Sorted first by Year and Month
monthly_crashes = monthly_crashes.orderBy('Year', 'Month')

#converted to Pandas DataFrame for plotting
monthly_crashes_pd = monthly_crashes.toPandas()

monthly_crashes_pd['Date'] = pd.to_datetime(monthly_crashes_pd[['Year', 'Month']].assign(DAY=1))

plt.figure(figsize=(15, 7))
plt.plot(monthly_crashes_pd['Date'], monthly_crashes_pd['count'], marker='o', linestyle='-', color='b')
plt.title('Number of Crashes per Month', fontsize=16)
plt.xlabel('Date', fontsize=14)
plt.ylabel('Number of Crashes', fontsize=14)
plt.tight_layout()
plt.show()


**EDA6: Time Series Analysis to get more clarity of time of the day**

**Inference:**

The graph shows peak crash times:

**8-10 AM:** Likely due to rush hour traffic

**4-6 PM:**  Higher due to people commuting back home

**6-11 PM:** Steady decrease in accidents due to reduced traffic


**EDA6: Also, Crashes per month for last 12 years**

**Inference:**

The graph is relatively stable from 2012 to 2019 but we see a sudden drop in accidents from around early 2020. It is likey due to COVID-19 related lockdowns.

Crashes increased again after pandemic but they are lower then pre pandemic levels.
This suggests a potential long-term shift in traffic patterns due to COVID-19.

In [None]:
###############################################################################
# VISUALIZATION 6: Time Series Decomposition
###############################################################################

In [None]:
#time series decomposition - need to convert to pandas
#daily crash counts
daily_crashes_df = df.groupBy('CRASH DATE').count().orderBy('CRASH DATE')
daily_crashes_pd = daily_crashes_df.toPandas()
daily_crashes_pd.set_index('CRASH DATE', inplace=True)

#the daily crashes time series
plt.figure(figsize=(15, 6))
plt.plot(daily_crashes_pd.index, daily_crashes_pd['count'], label='Daily crashes', color='blue')
plt.title('Daily Motor Vehicle Collisions in NYC')
plt.xlabel('Date', fontsize=14)
plt.ylabel('Number of Crashes', fontsize=14)
plt.legend()
plt.show()

#Decompose the time series using pandas for statsmodels compatibility
# Making sure the index is datetime and sorted
daily_crashes_pd.index = pd.to_datetime(daily_crashes_pd.index)
daily_crashes_pd = daily_crashes_pd.sort_index()

#Handle missing dates by creating a full date range and joining
full_date_range = pd.date_range(start=daily_crashes_pd.index.min(), end=daily_crashes_pd.index.max())
daily_crashes_filled = daily_crashes_pd.reindex(full_date_range, fill_value=0)

decomposition = seasonal_decompose(daily_crashes_filled['count'], model='additive', period=365)

# Plot the decomposed components
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(15, 12))
decomposition.trend.plot(ax=ax1)
ax1.set_title('Trend')
decomposition.seasonal.plot(ax=ax2)
ax2.set_title('Seasonality')
decomposition.resid.plot(ax=ax3)
ax3.set_title('Residuals')
plt.tight_layout()
plt.show()

**EDA7: Distribution of Data by Boroughs**

**Inference:**

Brooklyn has the most number of crashes followed by queens and manhattan.

Staten Island has the least crashes.

This distribution actually follows the population density trends in NYC.

In [None]:
###############################################################################
# VISUALIZATION 7: Distribution by Borough
###############################################################################

In [None]:
#get borough distribution
borough_count_df = df.groupBy('BOROUGH').count().orderBy(F.desc('count'))
borough_count_pd = borough_count_df.toPandas()

#plotting the distribution of crashes by borough
plt.figure(figsize=(12, 7))
sns.barplot(x=borough_count_pd['BOROUGH'], y=borough_count_pd['count'], palette="viridis")
plt.title('Distribution of Crashes by Borough', fontsize=16)
plt.xlabel('Borough', fontsize=14)
plt.ylabel('Number of Crashes', fontsize=14)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

#create pie chart
plt.figure(figsize=(8, 8))
plt.pie(borough_count_pd['count'], labels=borough_count_pd['BOROUGH'], autopct='%1.1f%%', colors=plt.cm.Paired.colors)
plt.title('Borough-wise Crash Distribution', fontsize=16)
plt.axis('equal')
plt.show()

**EDA8: Heatmaps**

**Inference**

There is a cluster of collisions in the vicinity of New York City, encompassing Manhattan and Brooklyn. There are also a few spots on the heatmap that show the severity of crashes close to the Manhattan Bridge, the far end of the Brooklyn-Queens Expressway, and Nostrand Avenue. There are several areas in East New York as well.

In [None]:
###############################################################################
# VISUALIZATION 8: Heatmap of Crashes
###############################################################################

In [None]:
import seaborn as sns
from statsmodels.tsa.seasonal import seasonal_decompose
import folium
from folium.plugins import HeatMap
#For the heatmap, we need lat/long data from the Spark DataFrame
df_geo_pd = df_geo.select('LATITUDE', 'LONGITUDE').toPandas()

#base map
m = folium.Map(location=[40.730610, -73.935242], zoom_start=10)  # Centered around NYC

#main heatmap
heat_data = [[row['LATITUDE'], row['LONGITUDE']] for _, row in df_geo_pd.iterrows()]
HeatMap(heat_data, radius=8, max_zoom=13).add_to(m)

#Save the map
m.save("Heatmap.html")


**EDA9: Residuals In Time Series (One more layer over normal Time Series plots) **

**Inference**

High Amount of Fluctuations are seen around the Covid-19 period which can be due to less cars and less civilians outside.

In [None]:
#EDA9: Residuals in time series
from statsmodels.tsa.seasonal import seasonal_decompose
#total number of crashes per day, group by CRASH DATE
daily_crashes = data.groupby('CRASH DATE').size()

#plot style
sns.set(style="darkgrid")

#pltting the daily crashes time series
plt.figure(figsize=(15, 6))
plt.plot(daily_crashes.index,daily_crashes.values, label='Daily crashes',color='blue')
plt.title('Daily Motor Vehicle Collisions in NYC')
plt.xlabel('Date', fontsize = 14)
plt.ylabel('Number of Crashes', fontsize = 14)
plt.legend()
plt.show()

#decompose the time series
decomposition = seasonal_decompose(daily_crashes, model='additive', period=365)

#the decomposed components
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(15, 12))
decomposition.trend.plot(ax=ax1)
ax1.set_title('Trend')
decomposition.seasonal.plot(ax=ax2)
ax2.set_title('Seasonality')
decomposition.resid.plot(ax=ax3)
ax3.set_title('Residuals')
plt.tight_layout()
plt.show()

**EDA10: Severity plot on a web page (HTML) / Extension of EDA-8**

**Inference**




In [None]:
###############################################################################
# VISUALIZATION 10: Crash Severity Visualization
###############################################################################

In [None]:
#Sample data for visualization (using pandas for folium)
sample_data_severity = df_geo.select(
    'LATITUDE', 'LONGITUDE', 'NUMBER OF PERSONS KILLED', 'NUMBER OF PERSONS INJURED'
).sample(fraction=0.01, seed=42).toPandas()


m_severity = folium.Map(location=[40.730610, -73.935242], zoom_start=10)

for index, row in sample_data_severity.iterrows():
    if row['NUMBER OF PERSONS KILLED'] > 0:
        color = "Black"  # Fatalities
        folium.features.RegularPolygonMarker(
          location=[row['LATITUDE'], row['LONGITUDE']],
          number_of_sides=3,
          radius=5,
          color=color,
          fill=True,
          fill_color=color
        ).add_to(m_severity)
    elif row['NUMBER OF PERSONS INJURED'] > 0:
        color = "Red"  # Injuries
        folium.CircleMarker(
          location=[row['LATITUDE'], row['LONGITUDE']],
          radius=5,
          color=color,
          fill=True,
          fill_color=color
        ).add_to(m_severity)
    else:
        color = "Green"  # No injuries or fatalities
        folium.features.RegularPolygonMarker(
          location=[row['LATITUDE'], row['LONGITUDE']],
          number_of_sides=4,
          radius=5,
          color=color,
          fill=True,
          fill_color=color
        ).add_to(m_severity)

#Save the map
m_severity.save("severity.html")

In [None]:
###############################################################################
# VISUALIZATION 11: Seasonal Analysis of Injuries and Deaths
###############################################################################

**EDA11: Season-wise Analysis of Total Injuries and Deaths**

**Inference:**

Fall and Summer have the highest number of affected indviduals.
This could be due to the fact that more people are out in these seasons either on vacations, cruising the city for fun or doing outdoor activies.

While spring and winter have less affected indviduals.
This could be due to the fact that NYC gets chilly around this time and people usually don't step out much.

In [None]:
#grouping by season for injuries
season_injuries_df = df.groupBy('Season').agg(F.sum('Total Injuries').alias('Total Injuries'))
season_injuries_pd = season_injuries_df.toPandas()

plt.figure(figsize=(10, 6))
sns.barplot(x='Season', y='Total Injuries', data=season_injuries_pd, palette='coolwarm')
plt.title('Total Injuries by Season', fontsize=16)
plt.xlabel('Season', fontsize=14)
plt.ylabel('Total Injuries', fontsize=14)
plt.tight_layout()
plt.show()

season_deaths_df = df.groupBy('Season').agg(F.sum('Total Deaths').alias('Total Deaths'))
season_deaths_pd = season_deaths_df.toPandas()

plt.figure(figsize=(10, 6))
sns.barplot(x='Season', y='Total Deaths', data=season_deaths_pd, palette='coolwarm')
plt.title('Total Deaths by Season', fontsize=16)
plt.xlabel('Season', fontsize=14)
plt.ylabel('Total Deaths', fontsize=14)
plt.tight_layout()
plt.show()

season_affected_df = df.groupBy('Season').agg(F.sum('Total Affected').alias('Total Affected'))
season_affected_pd = season_affected_df.toPandas()

plt.figure(figsize=(10, 6))
sns.barplot(x='Season', y='Total Affected', data=season_affected_pd, palette='coolwarm')
plt.title('Total Affected by Season', fontsize=16)
plt.xlabel('Season', fontsize=14)
plt.ylabel('Total Affected', fontsize=14)
plt.tight_layout()
plt.show()

In [None]:
#grouping the data by season and summing the total deaths
season_injuries = data.groupby('Season')['Total Deaths'].sum().reset_index()

plt.figure(figsize=(10, 6))
sns.barplot(x='Season', y='Total Deaths', data=season_injuries, palette='coolwarm')
plt.title('Total Deaths by Season', fontsize=16)
plt.xlabel('Season', fontsize=14)
plt.ylabel('Total Deaths', fontsize=14)
plt.tight_layout()
plt.show()

In [None]:
#grouping the data by season and summing the total affected
season_injuries = data.groupby('Season')['Total Affected'].sum().reset_index()

#plotting the data
plt.figure(figsize=(10, 6))
sns.barplot(x='Season', y='Total Affected', data=season_injuries, palette='coolwarm')
plt.title('Total Affected by Season', fontsize=16)
plt.xlabel('Season', fontsize=14)
plt.ylabel('Total Affected', fontsize=14)
plt.tight_layout()
plt.show()

In [None]:
#LET'S FIND ANSWERS TO:
#What is the correlation between the time of day and the severity of crashes (fatalities, injuries) across different boroughs?
#How do crash frequencies and causes vary in underserved communities compared to more affluent areas (based on zip codes)?
#USING VISUALIZATIONS

In [None]:
###############################################################################
# VISUALIZATION 12: Injuries by Hour
###############################################################################

In [None]:
#group by hour and sum injuries
hourly_injuries_df = df.groupBy('Hour').agg(F.sum('NUMBER OF PERSONS INJURED').alias('Total Injuries'))
hourly_injuries_pd = hourly_injuries_df.toPandas()

#line chart
plt.figure(figsize=(8, 5))
plt.plot(hourly_injuries_pd['Hour'], hourly_injuries_pd['Total Injuries'], marker='o', color='orange')
plt.title("Total Injuries by Hour of Day")
plt.xlabel("Hour of Day")
plt.ylabel("Number of Injuries")
plt.grid()
plt.show()

In [None]:
###############################################################################
# VISUALIZATION 13: Map based analysis
###############################################################################

In [None]:
import geopandas as gpd
from shapely.geometry import Point

zip_crash_data = data.groupby('ZIP CODE').size().reset_index(name='CRASH COUNT')

#convert CRASH DATE and CRASH TIME to strings
data['CRASH DATE'] = data['CRASH DATE'].astype(str)
data['CRASH TIME'] = data['CRASH TIME'].astype(str)

data["geometry"] = data.apply(
    lambda row: Point(row["LONGITUDE"], row["LATITUDE"]) if pd.notnull(row["LONGITUDE"]) and pd.notnull(row["LATITUDE"]) else None,
    axis=1
)

#filter out rows with missing geometries
data = data[data["geometry"].notnull()]

gdf = gpd.GeoDataFrame(data, geometry="geometry")

#set the coordinate reference system (CRS) to WGS84 (EPSG:4326)
gdf.set_crs(epsg=4326, inplace=True)

#ensure ZIP CODE is a string
gdf['ZIP CODE'] = gdf['ZIP CODE'].astype(str)

#merge with geo data
merged = gdf.merge(zip_crash_data, on='ZIP CODE')

#create map
m = folium.Map(location=[40.7128, -74.0060], zoom_start=10)
folium.Choropleth(
    geo_data=merged,
    name="choropleth",
    data=merged,
    columns=["ZIP CODE", "CRASH COUNT"],
    key_on="feature.properties.ZIP CODE",
    fill_color="YlOrRd",
    fill_opacity=0.7,
    line_opacity=0.2,
).add_to(m)

m.save("crash_map.html")
m

In [None]:
# Stop the Spark session
spark.stop()

In [None]:
# Install ngrok
!pip install pyngrok
!pip install graphviz

# **Phase 3 - ML Models**

In [None]:
###################################
# MULTIPLE SPARK ML MODELS
# Using MinMaxScaler + VectorClipper for Naive Bayes to ensure nonnegative features.
###################################
import pyspark
from graphviz import Digraph
import re
from IPython.display import display
# from pyngrok import ngrok
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, to_timestamp, dayofweek, hour, trim,
    row_number
)
from pyspark.sql.types import FloatType
from pyspark.sql.window import Window

#spark ML libraries
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, MinMaxScaler
)
from pyspark.ml.classification import (
    LogisticRegression,
    DecisionTreeClassifier,
    GBTClassifier,
    RandomForestClassifier,
    MultilayerPerceptronClassifier,
    LinearSVC,
    NaiveBayes
)
from pyspark.ml.clustering import KMeans as SparkKMeans
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator,
    ClusteringEvaluator
)

#python visualization & utilities
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

##########################################
# CUSTOM TRANSFORMER: VectorClipper
# This transformer clips any negative values in a vector column to zero.
##########################################
from pyspark.ml import Transformer
from pyspark import keyword_only
from pyspark.ml.linalg import VectorUDT, Vectors
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import udf
class VectorClipper(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    @keyword_only
    def __init__(self, inputCol="scaled_features", outputCol="scaled_features"):
        super(VectorClipper, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol

    def _transform(self, dataset):
        def clip_vector(v):
            # Convert vector to array, clip negatives to 0, then back to vector.
            arr = v.toArray() if hasattr(v, "toArray") else v
            clipped = [max(0.0, x) for x in arr]
            return Vectors.dense(clipped)
        clip_udf = udf(clip_vector, VectorUDT())
        return dataset.withColumn(self.outputCol, clip_udf(dataset[self.inputCol]))

#These functions will visualize the DAG diagrams within the notebook
def visualize_spark_pipeline(stages, title="Spark ML Pipeline DAG"):
    dot = Digraph(format='png')
    dot.attr(rankdir='LR', fontsize='12', compound='true')

    with dot.subgraph(name='cluster_pipeline') as c:
        c.attr(label=title, style='dashed')
        for i, stage in enumerate(stages):
            stage_name = stage.__class__.__name__
            node_id = f"stage_{i}"
            c.node(node_id, stage_name, shape="box")
            if i > 0:
                c.edge(f"stage_{i-1}", node_id)
    display(dot)

def visualize_operations_dag(plan_str, title="Spark Physical Plan DAG"):
    from graphviz import Digraph
    from IPython.display import display
    import re

    dot = Digraph(format='png')
    dot.attr(rankdir='TB', fontsize='10')

    lines = plan_str.strip().split('\n')
    nodes, edges, stack = [], [], []

    for line in lines:
        indent = len(line) - len(line.lstrip())
        raw_label = line.strip()

        op_match = re.search(r'[\*\+\-:]*\(?\d*\)?\s*([A-Za-z]+)', raw_label)
        op_name = op_match.group(1).strip() if op_match else raw_label
        label = op_name

        node_id = f"node_{len(nodes)}"
        nodes.append((node_id, label))

        while stack and stack[-1][1] >= indent:
            stack.pop()
        if stack:
            edges.append((stack[-1][0], node_id))

        stack.append((node_id, indent))

    for node_id, label in nodes:
        dot.node(node_id, label, shape="box", style="filled", fillcolor="lightblue")

    for src, dst in edges:
        dot.edge(src, dst)

    dot.attr(label=title)
    display(dot)

def visualize_stagewise_rdd_dag(debug_string, title="RDD Stagewise DAG"):

    if isinstance(debug_string, bytes):
        debug_string = debug_string.decode("utf-8")

    lines = debug_string.strip().splitlines()

    wide_deps = [
        "ShuffledRDD", "ShuffleDependency",
        "reduceByKey", "groupByKey", "sortByKey",
        "repartition", "coalesce", "join", "cogroup"
    ]

    dot = Digraph(format='png')
    dot.attr(rankdir='TB', fontsize='10')

    stages = {0: []}
    edges = []
    current_stage = 0
    last_node = None
    carry_edge = None

    for raw in lines:
        line = raw.strip()

        m = re.match(r'^\(?\d+\)?\s*([A-Za-z]+)', line)
        if m:
            op = m.group(1)
            op = re.sub(r'RDD$', '', op)
            label = op[0].lower() + op[1:]
        else:
            label = line

        node_id = f"n{current_stage}_{len(stages[current_stage])}"
        stages[current_stage].append((node_id, label))

        if carry_edge:
            edges.append((carry_edge, node_id))
            carry_edge = None
        elif last_node:
            edges.append((last_node, node_id))

        last_node = node_id

        if any(dep in line for dep in wide_deps):
            carry_edge = last_node
            current_stage += 1
            stages[current_stage] = []
            last_node = None

    for sid, nodes in stages.items():
        with dot.subgraph(name=f"cluster_{sid}") as c:
            c.attr(label=f"Stage {sid}", color="red", style="dashed")
            for nid, lbl in nodes:
                c.node(nid, lbl, shape="box", style="filled", fillcolor="lightblue")

    for src, dst in edges:
        dot.edge(src, dst)

    dot.attr(label=title)
    display(dot)



#created a new spark session
spark = SparkSession.builder \
    .appName("Phase3_MultipleModels_Extended") \
    .getOrCreate()

#Uncomment this if you need to access Spark UI in colab
# ngrok.set_auth_token("2veWgSMKSZUXoq3cDHk6gj7KPy2_2Soz335NRQEtrGw9tLcwC")
# # Start ngrok tunnel
# http_tunnel = ngrok.connect(4040)
# public_url = http_tunnel.public_url
# print(f"Spark UI: {public_url}")

#data loading
from google.colab import drive
drive.mount('/content/drive')
df_spark = spark.read.csv(
    "/content/drive/MyDrive/DIC Project/cleaned_dataset.csv",
    header=True,
    inferSchema=True
)

#optional sampling if needed
df_spark = df_spark.sample(False, 0.25, seed=42).sample(False, 0.25, seed=42)

#feature engineering using RDD
df_spark = df_spark.withColumn("CRASH DATE", to_timestamp(col("CRASH DATE"), "yyyy-MM-dd"))
df_spark = df_spark.withColumn("DayOfWeek", dayofweek(col("CRASH DATE")) - 2)
df_spark = df_spark.withColumn("CRASH TIME", to_timestamp(col("CRASH TIME"), "H:mm"))
df_spark = df_spark.withColumn("HourOfDay", hour(col("CRASH TIME")))
df_spark = df_spark.withColumn("BOROUGH", trim(col("BOROUGH")))
df_spark = df_spark.withColumn("ZIP CODE", col("ZIP CODE").cast(FloatType()))

for factor_col in [
    "CONTRIBUTING FACTOR VEHICLE 1",
    "CONTRIBUTING FACTOR VEHICLE 2",
    "CONTRIBUTING FACTOR VEHICLE 3"]:
    df_spark = df_spark.fillna("Unknown", subset=[factor_col])

# Windowing: accident counts per borough
windowSpec = Window.partitionBy("BOROUGH").orderBy("CRASH DATE")
df_spark = df_spark.withColumn("Borough_RowNum", row_number().over(windowSpec))

# RDD: count total accidents per borough and rejoin
rdd_borough = (
    df_spark.rdd
    .map(lambda row: (row["BOROUGH"], 1))
    .reduceByKey(lambda a, b: a + b)
    .repartition(4)
)

rdd_borough.count()
debug_str = rdd_borough.toDebugString()
visualize_stagewise_rdd_dag(debug_str, title="Stagewise RDD DAG – Borough Accident Aggregation")


df_borough_count = rdd_borough.toDF(["BOROUGH", "Total_Accidents"])
df_spark = df_spark.join(df_borough_count, on="BOROUGH", how="left")

#create binary target: SEVERE_ACCIDENT if any injury or fatality occurred
df_spark = df_spark.withColumn("SEVERE_ACCIDENT", when(
    (col("NUMBER OF PERSONS INJURED") > 0) | (col("NUMBER OF PERSONS KILLED") > 0), 1).otherwise(0))

#rename HourOfDay to CRASH_HOUR for clarity
df_spark = df_spark.withColumnRenamed("HourOfDay", "CRASH_HOUR")

#feature and label selection
feature_cols = [
    "CONTRIBUTING FACTOR VEHICLE 1",
    "BOROUGH",
    "CRASH_HOUR",
    "VEHICLE TYPE CODE 1",
    "LATITUDE",
    "LONGITUDE",
    "Borough_RowNum",
    "Total_Accidents"
]
label_col = "SEVERE_ACCIDENT"

#drop rows with nulls in features or label
df_spark = df_spark.dropna(subset=feature_cols + [label_col])

print('######## DAG FOR PROCESSING #########')
df_spark.explain(False)

#splitting the data
train_df, test_df = df_spark.randomSplit([0.8, 0.2], seed=42)

#common Pipeline Stages
borough_indexer = StringIndexer(inputCol="BOROUGH", outputCol="BOROUGH_idx", handleInvalid="keep")
factor_indexer = StringIndexer(inputCol="CONTRIBUTING FACTOR VEHICLE 1", outputCol="FACTOR1_idx", handleInvalid="keep")
vehicle_indexer = StringIndexer(inputCol="VEHICLE TYPE CODE 1", outputCol="VEHICLE_idx", handleInvalid="keep")

encoder = OneHotEncoder(
    inputCols=["BOROUGH_idx", "FACTOR1_idx", "VEHICLE_idx"],
    outputCols=["BOROUGH_enc", "FACTOR1_enc", "VEHICLE_enc"]
)

assembler = VectorAssembler(
    inputCols=["CRASH_HOUR", "LATITUDE", "LONGITUDE", "Borough_RowNum", "Total_Accidents",
               "BOROUGH_enc", "FACTOR1_enc", "VEHICLE_enc"],
    outputCol="assembled_features"
)

#StandardScaler used for all models except Naive Bayes (which requires nonnegative features)
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaled_features", withMean=True, withStd=True)

#MinMaxScaler scales features to [0, 1]
minmax_scaler = MinMaxScaler(inputCol="assembled_features", outputCol="scaled_features")

#VectorClipper will clip any residual negative values to 0
clipper = VectorClipper(inputCol="scaled_features", outputCol="scaled_features")

##########################################
# SUPERVISED MODELS USING SPARK ML
##########################################
mlp_layers = [250, 32, 16, 2]

#models dictionary.
models = {
    "Logistic Regression": LogisticRegression(featuresCol="scaled_features", labelCol=label_col, maxIter=100),
    "Decision Tree": DecisionTreeClassifier(featuresCol="scaled_features", labelCol=label_col, maxDepth=5),
    "GBT": GBTClassifier(featuresCol="scaled_features", labelCol=label_col, maxIter=50, maxDepth=5),
    "Random Forest": RandomForestClassifier(featuresCol="scaled_features", labelCol=label_col, numTrees=50),
    "Multilayer Perceptron": MultilayerPerceptronClassifier(featuresCol="scaled_features", labelCol=label_col,
                                                            maxIter=100, layers=mlp_layers),
    "Linear SVC": LinearSVC(featuresCol="scaled_features", labelCol=label_col, maxIter=100),
    "Naive Bayes": NaiveBayes(featuresCol="scaled_features", labelCol=label_col)
}

#evaluators
evaluator_auc = BinaryClassificationEvaluator(labelCol=label_col, rawPredictionCol="rawPrediction", metricName="areaUnderROC")
evaluator_acc = MulticlassClassificationEvaluator(labelCol=label_col, metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol=label_col, metricName="f1")

for name, model in models.items():
    print(f"\n=== Model: {name} ===")
    pipeline_stages = [borough_indexer, factor_indexer, vehicle_indexer, encoder, assembler]
    if name == "Naive Bayes":
        pipeline_stages += [minmax_scaler, clipper, model]
    else:
        pipeline_stages += [scaler, model]

    pipeline = Pipeline(stages=pipeline_stages)

    visualize_spark_pipeline(pipeline_stages, title=f"{name} - ML Pipeline DAG")

    fitted_model = pipeline.fit(train_df)
    preds = fitted_model.transform(test_df)

    plan = preds._jdf.queryExecution().executedPlan().toString()
    visualize_operations_dag(plan, title=f"{name} - Physical Execution DAG")

    auc = evaluator_auc.evaluate(preds)
    acc = evaluator_acc.evaluate(preds)
    f1 = evaluator_f1.evaluate(preds)

    print(f"AUC: {auc:.4f}, Accuracy: {acc:.4f}, F1 Score: {f1:.4f}")

    cm = preds.groupBy(label_col, "prediction").count().toPandas()
    cm_pivot = cm.pivot(index=label_col, columns="prediction", values="count").fillna(0).values

    plt.figure(figsize=(5, 4))
    sns.heatmap(cm_pivot, annot=True, fmt=".0f", cmap="Blues")
    plt.title(f"{name} Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.show()

##########################################
# UNSUPERVISED CLUSTERING: KMEANS (Spark ML)
##########################################
# Apply full transformation to the complete dataset to obtain scaled features.
pipeline_full = Pipeline(stages=[borough_indexer, factor_indexer, vehicle_indexer, encoder, assembler, scaler])
fitted_pipeline_full = pipeline_full.fit(df_spark)
df_transformed = fitted_pipeline_full.transform(df_spark)

visualize_spark_pipeline(pipeline_full.getStages(), title="KMeans - ML Feature Engineering DAG")

kmeans_plan = df_transformed._jdf.queryExecution().executedPlan().toString()
visualize_operations_dag(kmeans_plan, title="KMeans - Physical Execution DAG")

k_range = list(range(2, 10))
inertia_list = []
silhouette_scores = []

for k in k_range:
    kmeans = SparkKMeans(featuresCol="scaled_features", k=k, seed=42)
    kmodel = kmeans.fit(df_transformed)
    inertia = kmodel.summary.trainingCost  # Sum of squared distances
    inertia_list.append(inertia)

    evaluator_cluster = ClusteringEvaluator(featuresCol="scaled_features", metricName="silhouette", distanceMeasure="squaredEuclidean")
    silhouette = evaluator_cluster.evaluate(kmodel.transform(df_transformed))
    silhouette_scores.append(silhouette)
    print(f"k={k}: Inertia={inertia:.2f}, Silhouette Score={silhouette:.3f}")

#plot the elbow graph (Inertia vs. k)
plt.figure(figsize=(6, 4))
plt.plot(k_range, inertia_list, marker='o')
plt.xlabel('Number of Clusters (k)')
plt.ylabel('Inertia')
plt.title('Elbow Method for Optimal k (Spark KMeans)')
plt.show()

optimal_k = k_range[np.argmax(silhouette_scores)]
print(f"Optimal k based on silhouette score: {optimal_k}")

kmeans_optimal = SparkKMeans(featuresCol="scaled_features", k=optimal_k, seed=42)
kmodel_opt = kmeans_optimal.fit(df_transformed)
df_clustered = kmodel_opt.transform(df_transformed)

from pyspark.ml.feature import PCA
pca = PCA(k=2, inputCol="scaled_features", outputCol="pca_features")
pca_model = pca.fit(df_clustered)
df_pca = pca_model.transform(df_clustered)

pdf_pca = df_pca.select("pca_features").toPandas()
pdf_clusters = df_pca.select("prediction").toPandas()
pca_array = np.array([row[0] for row in pdf_pca.values])
df_plot = pd.DataFrame(pca_array, columns=['PC1', 'PC2'])
df_plot['Cluster'] = pdf_clusters

plt.figure(figsize=(8, 6))
sns.scatterplot(x="PC1", y="PC2", hue="Cluster", data=df_plot, palette="deep")
plt.title("KMeans Clusters")
plt.show()

#stop spark session
spark.stop()
