In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Motor Vehicle Collisions EDA") \
    .getOrCreate()

print("Spark Session initialized.")

Spark Session initialized.


## Load Data

### Subtask:
Load the dataset from '/content/Motor_Vehicle_Collisions_-_Crashes.csv' into a PySpark DataFrame.


In [None]:
csv_file_path = '/content/Motor_Vehicle_Collisions_-_Crashes.csv'
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
print(f"Data loaded successfully from {csv_file_path} into a PySpark DataFrame.")

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/Motor_Vehicle_Collisions_-_Crashes.csv.

## Data Inspection

Display the schema and show a sample of the data to understand its structure and content within PySpark.


In [None]:
print("DataFrame Schema:")
df.printSchema()

print("\nSample of the DataFrame:")
df.show(5, truncate=False)

## Summary Statistics

Generate descriptive statistics for the PySpark DataFrame to get an overview of numerical columns.


In [None]:
print("Descriptive Statistics for DataFrame:")
df.describe().show()

## Check for Missing Values

Identify and count missing values in each column of the PySpark DataFrame to assess data completeness.


In [None]:
from pyspark.sql import functions as F
import pandas as pd

missing_values_counts = []

for column_name in df.columns:
    null_count = df.where(F.col(column_name).isNull()).count()
    missing_values_counts.append({'Column Name': column_name, 'Missing Values Count': null_count})

missing_values_df = pd.DataFrame(missing_values_counts)

print("Missing Values Count per Column:")
print(missing_values_df.to_string())

## Visualize Key Distributions

Convert the PySpark DataFrame to a Pandas DataFrame and create relevant visualizations (e.g., histograms for numerical data, bar plots for categorical data) to understand the distribution of important variables. Ensure all plots have appropriate legends.


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

sample_ratio = 0.1
pandas_df = df.sample(False, sample_ratio, seed=42).toPandas()
print("PySpark DataFrame successfully sampled and converted to Pandas DataFrame.")

In [None]:
plt.figure(figsize=(18, 15))

# Plot 1: Distribution of Crashes by Borough
plt.subplot(2, 2, 1)
borough_counts = pandas_df['BOROUGH'].value_counts().sort_index()
sns.lineplot(x=borough_counts.index, y=borough_counts.values, marker='o')
plt.title('Distribution of Crashes by Borough')
plt.xlabel('Borough')
plt.ylabel('Number of Crashes')
plt.xticks(rotation=45)

# Plot 2: Distribution of Number of Persons Injured
plt.subplot(2, 2, 2)
sns.histplot(pandas_df['NUMBER OF PERSONS INJURED'].dropna(), bins=range(0, 10), kde=False, color='skyblue')
plt.title('Distribution of Number of Persons Injured')
plt.xlabel('Number of Persons Injured')
plt.ylabel('Frequency')
plt.xticks(range(0, 10))

# Plot 3: Top 10 Contributing Factors (Vehicle 1)
plt.subplot(2, 2, 3)
top_10_factors = pandas_df['CONTRIBUTING FACTOR VEHICLE 1'].value_counts().nlargest(10)
sns.barplot(x=top_10_factors.index, y=top_10_factors.values,
            hue=top_10_factors.index, palette="magma", dodge=False, legend=False)
plt.title('Top 10 Contributing Factors (Vehicle 1)')
plt.xlabel('Contributing Factor')
plt.ylabel('Number of Occurrences')
plt.xticks(rotation=90)

# Plot 4: Top 10 Vehicle Types Involved
plt.subplot(2, 2, 4)
top_10_vehicle_types = pandas_df['VEHICLE TYPE CODE 1'].value_counts().nlargest(10)
sns.barplot(x=top_10_vehicle_types.index, y=top_10_vehicle_types.values,
            hue=top_10_vehicle_types.index, palette="plasma", dodge=False, legend=False)
plt.title('Top 10 Vehicle Types Involved')
plt.xlabel('Vehicle Type')
plt.ylabel('Number of Occurrences')
plt.xticks(rotation=90)

plt.tight_layout()
plt.show()
print("Visualizations generated successfully.")

## Prepare Data for Correlation

Select numerical columns relevant for correlation analysis, assemble them into a vector column, and handle any missing values in these columns to ensure valid correlation calculation using PySpark.


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F

numerical_cols = [
    'LATITUDE', 'LONGITUDE',
    'NUMBER OF PERSONS INJURED', 'NUMBER OF PERSONS KILLED',
    'NUMBER OF PEDESTRIANS INJURED', 'NUMBER OF PEDESTRIANS KILLED',
    'NUMBER OF CYCLIST INJURED', 'NUMBER OF CYCLIST KILLED',
    'NUMBER OF MOTORIST INJURED', 'NUMBER OF MOTORIST KILLED'
]

df_cleaned = df.na.drop(subset=['LATITUDE', 'LONGITUDE'])

for col_name in numerical_cols:
    if col_name not in ['LATITUDE', 'LONGITUDE']:
        df_cleaned = df_cleaned.na.fill(0, subset=[col_name])

print(f"DataFrame size after dropping nulls in LATITUDE/LONGITUDE and filling others: {df_cleaned.count()} rows")

assembler = VectorAssembler(inputCols=numerical_cols, outputCol="features")

df_vector = assembler.transform(df_cleaned)

print("Schema of DataFrame with vector column:")
df_vector.printSchema()

print("Sample of DataFrame with new 'features' vector column:")
df_vector.select(numerical_cols + ["features"]).show(5, truncate=False)


In [None]:
from pyspark.ml.stat import Correlation

correlation_matrix = Correlation.corr(df_vector, "features").head()

print("Correlation Matrix:")
print(correlation_matrix[0])

## Visualize Correlation Heatmap

Convert the PySpark correlation matrix to a Pandas DataFrame and use `seaborn.heatmap` to visualize the correlations. This heatmap will help identify which variables are strongly correlated with 'NUMBER OF PEDESTRIANS KILLED' and could be good candidates for model features. Ensure the plot has a clear title, axis labels, and color bar.


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

corr_matrix_dense = correlation_matrix[0]

corr_matrix_np = corr_matrix_dense.toArray()

correlation_df = pd.DataFrame(corr_matrix_np, index=numerical_cols, columns=numerical_cols)

plt.figure(figsize=(12, 10))
sns.heatmap(correlation_df, annot=True, cmap='coolwarm', fmt='.2f', linewidths=.5)

plt.title('Correlation Matrix of Numerical Features', fontsize=16)

plt.xlabel('Features', fontsize=12)
plt.ylabel('Features', fontsize=12)

plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)

plt.tight_layout()
plt.show()

print("Correlation heatmap generated successfully.")

# Building the Model

## Data Preparation
A new column named "Severity" is built where the severity of accident is rated between 0,1 and 2.

0 -> No injuries or Deaths

1 -> Injuries

2 -> Deaths

In [None]:
from pyspark.sql import functions as F

df_vector = df_vector.withColumn(
    "SEVERITY",
    F.when(
        (F.col("NUMBER OF PERSONS KILLED") > 0), 2
    ).when(
        (F.col("NUMBER OF PERSONS INJURED") > 0), 1
    ).otherwise(0)
)

df_vector.select("NUMBER OF PERSONS INJURED",
          "NUMBER OF PERSONS KILLED",
          "SEVERITY").show(10)

Converting the existing format of the crash date and time into usable ones

In [None]:
from pyspark.sql.functions import hour, dayofweek, month, col as F_col

df = df.withColumn("CRASH_HOUR", hour(F_col("CRASH TIME")))
df = df.withColumn("CRASH_DAYOFWEEK", dayofweek(F_col("CRASH DATE")))
df = df.withColumn("CRASH_MONTH", month(F_col("CRASH DATE")))

ConnectionRefusedError: [Errno 111] Connection refused

We replace null values with the term "Unknown" for categorical variables and 0 for numerical

In [None]:
from pyspark.sql import functions as F

# Severity target
df = df.withColumn(
    "SEVERITY",
    F.when(F.col("NUMBER OF PERSONS KILLED") > 0, 2)
     .when(F.col("NUMBER OF PERSONS INJURED") > 0, 1)
     .otherwise(0)
)

df = df.fillna({"BOROUGH": "UNKNOWN",
                "CONTRIBUTING FACTOR VEHICLE 1": "Unknown",
                "VEHICLE TYPE CODE 1": "Unknown",
                "VEHICLE TYPE CODE 2": "Unknown"})
df = df.fillna(0)   # numeric nulls

In [None]:
categorical_cols = [
    "BOROUGH",
    "CONTRIBUTING FACTOR VEHICLE 1",
    "VEHICLE TYPE CODE 1",
    "VEHICLE TYPE CODE 2"
]


StringIndexer transforms categorical string fields into numeric indices by assigning each unique category a distinct integer. This enables machine-learning models—which require numerical inputs—to properly interpret categorical variables. Using handleInvalid='keep' ensures unseen or null categories are safely mapped instead of causing errors.

In [None]:
from pyspark.ml.feature import StringIndexer

indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_IDX", handleInvalid="keep")
    for col in categorical_cols
]


In [None]:
numerical_cols = [
    "LATITUDE", "LONGITUDE",
    "CRASH_HOUR", "CRASH_DAYOFWEEK", "CRASH_MONTH"
]

VectorAssembler combines multiple input feature columns into a single vector column called "features", creating the unified numeric feature representation required by Spark ML models for training and prediction.

In [None]:
from pyspark.ml.feature import VectorAssembler

feature_cols = numerical_cols + [col + "_IDX" for col in categorical_cols]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)


### Logistic Regression Model

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features",
    labelCol="SEVERITY",
    maxIter=50,
    regParam=0.01
)


In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers + [assembler, lr])

train, test = df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train)
pred = model.transform(test)

### Performance metrics:

We've used metrics like Accuracy, F1, Precision and Recall

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="SEVERITY", predictionCol="prediction")

acc = evaluator.evaluate(pred, {evaluator.metricName: "accuracy"})
f1 = evaluator.evaluate(pred, {evaluator.metricName: "f1"})
prec = evaluator.evaluate(pred, {evaluator.metricName: "weightedPrecision"})
rec = evaluator.evaluate(pred, {evaluator.metricName: "weightedRecall"})


print("Accuracy:", acc)
print("F1 Score:", f1)
print("Weighted Precision:", prec)
print("Weighted Recall:", rec)

In [None]:
pred.groupBy("SEVERITY", "prediction").count().orderBy("SEVERITY", "prediction").show()
confusion_df = pred.groupBy("SEVERITY") \
                   .pivot("prediction") \
                   .count() \
                   .fillna(0) \
                   .orderBy("SEVERITY")

confusion_df.show()

## Alternate Model

XGBoost Classifier

In [None]:
from xgboost.spark import SparkXGBClassifier

xgb = SparkXGBClassifier(
    features_col="features",
    label_col="SEVERITY",
    numClass=3,
    maxDepth=8,
    eta=0.1,
    numRound=150,
    subsample=0.8,
    colsampleByTree=0.8
)

In [None]:
from pyspark.ml import Pipeline

pipeline2 = Pipeline(stages=indexers + [assembler, xgb])

model2 = pipeline2.fit(train)
pred2 = model2.transform(test)


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="SEVERITY",
    predictionCol="prediction"
)

accuracy = evaluator.evaluate(pred2, {evaluator.metricName: "accuracy"})
f1 = evaluator.evaluate(pred2, {evaluator.metricName: "f1"})
prec = evaluator.evaluate(pred2, {evaluator.metricName: "weightedPrecision"})
rec = evaluator.evaluate(pred2, {evaluator.metricName: "weightedRecall"})

print("XGBoost Accuracy:", accuracy)
print("XGBoost F1 Score:", f1)
print("Weighted Precision:", prec)
print("Weighted Recall:", rec)