In [3]:
# In a notebook cell, prefix with ! to run shell commands
!pip install pyspark


Defaulting to user installation because normal site-packages is not writeable


[notice] A new release of pip available: 22.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip





In [4]:
# Next cell—start Spark and bring in the SQL/DataFrame API
from pyspark.sql import SparkSession
from pyspark.sql.functions import hour, month, when, col, count

# Create a SparkSession (the entry point for DataFrame & ML operations)
spark = SparkSession.builder \
    .appName("UKAccidentAnalysis") \
    .getOrCreate()

# Optional: show Spark version to confirm it’s running
print("Spark version:", spark.version)


Spark version: 3.5.5


In [5]:
# Read & Cache the Full CSV

# Adjust the path to point at your full dataset
path = "UK_Accident.csv"

# Ingest the file in parallel, inferring schema and reading header row
df = spark.read.csv(path, header=True, inferSchema=True)

# Cache it in memory for fast repeated access
df = df.cache()

# Quick count to verify you have ~1,000,000 rows
print("Total records:", df.count())


Total records: 1504150


In [7]:
from pyspark.sql.functions import to_timestamp, concat_ws, hour, month, when, col

'''
# 1) Parse Date & Time into a true timestamp using your pattern
df = df.withColumn(
    "Datetime",
    to_timestamp(
        concat_ws(" ", col("Date"), col("Time")),   # e.g. "08/06/2005 14:30"
        "dd/MM/yyyy HH:mm"                         # exact format of your data
    )
)

# 2) Extract all the temporal features once you have a valid Datetime
df = df.withColumn("Hour", hour("Datetime")) \
       .withColumn("Month", month("Datetime")) \
       .withColumn(
           "IsWeekend",
           when(col("Day_of_Week").isin(1, 7), 1).otherwise(0)
       )

# 3) (Optional) check that parse worked
df.select("Date", "Time", "Datetime", "Hour", "Month") \
  .where(col("Datetime").isNull()) \
  .show(5, truncate=False)

# 4) Re-run the aggregation to verify
df.groupBy("Month", "Accident_Severity") \
  .count() \
  .orderBy("Month", "Accident_Severity") \
  .show(48)
'''


from pyspark.sql.functions import to_date, date_format, to_timestamp, concat_ws, hour, month, when, col

# 1) Parse Date
df = df.withColumn("Date_parsed", to_date(col("Date"), "dd/MM/yyyy"))

# 2) Turn the Time timestamp into "HH:mm" strings
df = df.withColumn("Time_str", date_format(col("Time"), "HH:mm"))

# 3) Build a YYYY-MM-DD HH:mm string and parse it
df = df.withColumn(
    "Datetime",
    to_timestamp(
        concat_ws(" ",
                  date_format(col("Date_parsed"), "yyyy-MM-dd"),
                  col("Time_str")),
        "yyyy-MM-dd HH:mm"
    )
)

# 4) Extract Hour & Month
df = df.withColumn("Hour", hour("Datetime")) \
       .withColumn("Month", month("Datetime"))

# 5) Weekend flag
df = df.withColumn(
    "IsWeekend",
    when(col("Day_of_Week").isin(1, 7), 1).otherwise(0)
)

# 6) Verify
df.select("Date","Time","Date_parsed","Time_str","Datetime","Hour","Month") \
  .show(5,truncate=False)


+----------+-------------------+-----------+--------+-------------------+----+-----+
|Date      |Time               |Date_parsed|Time_str|Datetime           |Hour|Month|
+----------+-------------------+-----------+--------+-------------------+----+-----+
|04/01/2005|2025-05-12 17:42:00|2005-01-04 |17:42   |2005-01-04 17:42:00|17  |1    |
|05/01/2005|2025-05-12 17:36:00|2005-01-05 |17:36   |2005-01-05 17:36:00|17  |1    |
|06/01/2005|2025-05-12 00:15:00|2005-01-06 |00:15   |2005-01-06 00:15:00|0   |1    |
|07/01/2005|2025-05-12 10:35:00|2005-01-07 |10:35   |2005-01-07 10:35:00|10  |1    |
|10/01/2005|2025-05-12 21:13:00|2005-01-10 |21:13   |2005-01-10 21:13:00|21  |1    |
+----------+-------------------+-----------+--------+-------------------+----+-----+
only showing top 5 rows



In [10]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["Hour","IsWeekend","Speed_limit","Number_of_Vehicles"],
    outputCol="features",
    handleInvalid="skip"   # skip any row where an input is null
)

pipeline = Pipeline(stages=[indexer, assembler, spark_lr])
model = pipeline.fit(df)   # now null rows are just ignored


In [12]:

# ML

from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression as SparkLogistic
from pyspark.ml import Pipeline




# 4a. Index the target (Severity) to numeric labels
indexer = StringIndexer(inputCol="Accident_Severity", outputCol="label")

# 4b. Assemble your features into a single vector
assembler = VectorAssembler(
    inputCols=["Hour", "IsWeekend", "Speed_limit", "Number_of_Vehicles"],
    outputCol="features",
    handleInvalid="skip"
)

# 4c. Define a Spark-powered logistic regression
spark_lr = SparkLogistic(maxIter=20, regParam=0.1)

# 4d. Build & run the pipeline
pipeline = Pipeline(stages=[indexer, assembler, spark_lr])
model = pipeline.fit(df)  # trains on all 1 M rows in parallel

# 4e. Make predictions & inspect a few rows
preds = model.transform(df).select("Accident_Severity", "label", "prediction")
preds.show(10)


+-----------------+-----+----------+
|Accident_Severity|label|prediction|
+-----------------+-----+----------+
|                2|  1.0|       0.0|
|                3|  0.0|       0.0|
|                3|  0.0|       0.0|
|                3|  0.0|       0.0|
|                3|  0.0|       0.0|
|                3|  0.0|       0.0|
|                3|  0.0|       0.0|
|                3|  0.0|       0.0|
|                3|  0.0|       0.0|
|                3|  0.0|       0.0|
+-----------------+-----+----------+
only showing top 10 rows



In [13]:
# Evaluate performance

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# preds is your DataFrame with columns: label, prediction
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction"
)

for metric in ["accuracy", "f1", "weightedPrecision", "weightedRecall"]:
    evaluator.setMetricName(metric)
    score = evaluator.evaluate(preds)
    print(f"{metric.capitalize()}: {score:.4f}")


Accuracy: 0.8511
F1: 0.7827
Weightedprecision: 0.7244
Weightedrecall: 0.8511


In [14]:
# confusion matrix

preds.groupBy("prediction", "label")\
     .count()\
     .orderBy("prediction","label")\
     .show()

+----------+-----+-------+
|prediction|label|  count|
+----------+-----+-------+
|       0.0|  0.0|1280109|
|       0.0|  1.0| 204484|
|       0.0|  2.0|  19440|
+----------+-----+-------+



In [15]:
# model initially wrongly predicted based on majority of minor naccident severity

from pyspark.sql.functions import lit, when, col
from pyspark.ml.classification import LogisticRegression as SparkLogistic
from pyspark.ml import Pipeline

# 1) Compute inverse-frequency weights for each original severity
counts = df.groupBy("Accident_Severity").count().collect()
total = sum(r['count'] for r in counts)
weights = {r['Accident_Severity']: total/r['count'] for r in counts}

# 2) Look up how StringIndexer mapped severity→label
#    (run this after you fit the original indexer)
labels = indexer.fit(df).labels  
# e.g. labels = ['3','2','1'] in descending frequency

# 3) Create a column of weights based on the indexed label
df_weighted = df.withColumn(
    "classWeight",
    when(col("Accident_Severity") == int(labels[0]), lit(weights[int(labels[0])]))
    .when(col("Accident_Severity") == int(labels[1]), lit(weights[int(labels[1])]))
    .otherwise(lit(weights[int(labels[2])]))
)

# 4) Rebuild the pipeline with weightCol on the LR stage
spark_lr_weighted = SparkLogistic(
    maxIter=20,
    regParam=0.1,
    weightCol="classWeight"
)
pipeline_w = Pipeline(stages=[indexer, assembler, spark_lr_weighted])

# 5) Fit & evaluate on the weighted DataFrame
model_w = pipeline_w.fit(df_weighted)
preds_w  = model_w.transform(df_weighted)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction"
)

for metric in ["accuracy","f1","weightedPrecision","weightedRecall"]:
    evaluator.setMetricName(metric)
    print(f"{metric}: {evaluator.evaluate(preds_w):.4f}")

# 6) Confusion matrix for weighted model
preds_w.groupBy("prediction","label").count().orderBy("prediction","label").show()


accuracy: 0.5068
f1: 0.6048
weightedPrecision: 0.7915
weightedRecall: 0.5068
+----------+-----+------+
|prediction|label| count|
+----------+-----+------+
|       0.0|  0.0|686021|
|       0.0|  1.0| 73753|
|       0.0|  2.0|  3570|
|       1.0|  0.0|267382|
|       1.0|  1.0| 65059|
|       1.0|  2.0|  4746|
|       2.0|  0.0|326706|
|       2.0|  1.0| 65672|
|       2.0|  2.0| 11124|
+----------+-----+------+

