In [3]:
!pip install findspark
!pip install pyspark
!apt-get install -qq openjdk-17-jdk-headless
from google.colab import drive

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Selecting previously unselected package openjdk-17-jre-headless:amd64.
(Reading database ... 126101 files and directories currently installed.)
Preparing to unpack .../openjdk-17-jre-headless_17.0.14+7-1~22.04.1_amd64.deb ...
Unpacking openjdk-17-jre-headless:amd64 (17.0.14+7-1~22.04.1) ...
Selecting previously unselected package openjdk-17-jdk-headless:amd64.
Preparing to unpack .../openjdk-17-jdk-headless_17.0.14+7-1~22.04.1_amd64.deb ...
Unpacking openjdk-17-jdk-headless:amd64 (17.0.14+7-1~22.04.1) ...
Setting up openjdk-17-jre-headless:amd64 (17.0.14+7-1~22.04.1) ...
update-alternatives: using /usr/lib/jvm/java-17-openjdk-amd64/bin/java to provide /usr/bin/java (java) in auto mode
update-alternatives: using /usr/lib/jvm/java-17-openjdk-amd64/bin/jpackage 

In [4]:
drive.flush_and_unmount()
drive.mount('/content/drive')

Mounted at /content/drive


In [37]:
import findspark, os
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IG Calculation") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

In [53]:
train = spark.read.parquet('/content/drive/MyDrive/exports/dataset/train_selected_tree.parquet')
test = spark.read.parquet("/content/drive/MyDrive/exports/dataset/test_selected.parquet")

In [54]:
train.groupBy("Accident_Severity_ind").count().orderBy("Accident_Severity_ind").show()

+---------------------+------+
|Accident_Severity_ind| count|
+---------------------+------+
|                  0.0|208826|
|                  1.0| 35039|
|                  2.0|  4692|
+---------------------+------+



In [55]:
columns_to_drop = [
    "Local_Authority_(District)_ind",
    "Casualty_Reference_ind",
    "Police_Force_ind",
    "Vehicle_Reference_Casualty_ind"
]

train = train.drop(*columns_to_drop)
test = test.drop(*columns_to_drop)

In [56]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DoubleType
import numpy as np, math
from pyspark.sql import functions as F

In [57]:
train.printSchema()
test.printSchema()

root
 |-- Casualty_Severity_ind: double (nullable = true)
 |-- Casualty_Type_ind: double (nullable = true)
 |-- Vehicle_Manoeuvre_ind: double (nullable = true)
 |-- Number_of_Casualties_ind: double (nullable = true)
 |-- Speed_limit_ind: double (nullable = true)
 |-- Urban_or_Rural_Area_ind: double (nullable = true)
 |-- Did_Police_Officer_Attend_Scene_of_Accident_ind: double (nullable = true)
 |-- Junction_Detail_ind: double (nullable = true)
 |-- Vehicle_Leaving_Carriageway_ind: double (nullable = true)
 |-- Junction_Location_ind: double (nullable = true)
 |-- Vehicle_Type_ind: double (nullable = true)
 |-- Junction_Control_ind: double (nullable = true)
 |-- 1st_Point_of_Impact_ind: double (nullable = true)
 |-- Number_of_Vehicles_ind: double (nullable = true)
 |-- Light_Conditions_ind: double (nullable = true)
 |-- Hit_Object_off_Carriageway_ind: double (nullable = true)
 |-- Accident_Severity_ind: double (nullable = true)
 |-- classWeight: double (nullable = true)

root
 |-- Casual

In [58]:
numeric_columns = [
    "Number_of_Vehicles_ind",
    "Number_of_Casualties_ind",
    "Speed_limit_ind",
]

# to check if we need to scale
stats = (train
         .select(*[F.col(c).cast('double') for c in numeric_columns])
         .summary('min', 'max', 'mean', 'stddev'))
stats.show(truncate=False)

+-------+----------------------+------------------------+------------------+
|summary|Number_of_Vehicles_ind|Number_of_Casualties_ind|Speed_limit_ind   |
+-------+----------------------+------------------------+------------------+
|min    |0.0                   |0.0                     |0.0               |
|max    |2.0                   |2.0                     |2.0               |
|mean   |0.514602284385473     |0.7264852729957314      |0.546104112939889 |
|stddev |0.7209292729492647    |0.8195345470328419      |0.7159022083531436|
+-------+----------------------+------------------------+------------------+



In [59]:
TARGET = "Accident_Severity_ind"
train = train.withColumnRenamed(TARGET, 'label')
test  = test .withColumnRenamed(TARGET, 'label')

In [60]:
#set numerical and categorical atributes
service_cols = {"label", "classWeight"}
categorical_columns = [c for c in train.columns
                       if c not in numeric_columns and c not in service_cols]

In [61]:
# we use StringIndexer to sync metadata (numClasses) with actual data
idx_cols = [c + "_idx" for c in categorical_columns]
indexers = [
    StringIndexer(inputCol=c,
                  outputCol=idx,
                  handleInvalid="keep")
    for c, idx in zip(categorical_columns, idx_cols)
]

# we use OneHotEncoder to convert categories to vectors
ohe_cols = [c + "_ohe" for c in categorical_columns]
encoder  = OneHotEncoder(inputCols=idx_cols,
                         outputCols=ohe_cols,
                         handleInvalid="keep")

# VectorAssembler
assembler = VectorAssembler(
    inputCols=numeric_columns + ohe_cols,
    outputCol="features"
)

In [62]:
# build SVM and One-vs-Rest

#base version detect only 2 classes, while we have 3
base_svm = LinearSVC(
    labelCol="label",
    featuresCol="features",
    weightCol="classWeight",
    maxIter=100,
    regParam=0.1
)

#we use OVR to build 3 versions "0 vs rest", "1 vs rest", "2 vs rest"  and take the one whose hyperplane gave the largest positive indentation
ovr = OneVsRest(
    classifier=base_svm,
    labelCol="label",
    featuresCol="features",
    weightCol="classWeight"
)

In [63]:
pipe = Pipeline(stages=indexers + [encoder, assembler, ovr])
model = pipe.fit(train)
preds = model.transform(test)

In [64]:
metric_rdd = (preds
              .select("prediction", "label")
              .rdd
              .map(lambda r: (float(r[0]), float(r[1]))))

metrics = MulticlassMetrics(metric_rdd)

# confusion-matrix
cm = metrics.confusionMatrix().toArray().astype(float)
print("Confusion matrix (rows = true, cols = pred):\n", cm.astype(int), "\n")

# precision,recall, F1-score for each class
labels = list(range(cm.shape[0]))

precisions = [metrics.precision(l) for l in labels]
recalls    = [metrics.recall(l)    for l in labels]
f1s = [0. if (p + r) == 0 else 2*p*r/(p + r)
       for p, r in zip(precisions, recalls)]

# macro-average
prec_macro   = float(np.mean(precisions))
recall_macro = float(np.mean(recalls))
f1_macro     = float(np.mean(f1s))

print(f"Precision (macro): {prec_macro:.4f}")
print(f"Recall (macro): {recall_macro:.4f}")
print(f"F1-score (macro): {f1_macro:.4f}")

# multi-class MCC

c  = np.trace(cm)
n  = cm.sum()
p  = cm.sum(axis=0)
t  = cm.sum(axis=1)

num = (c * n) - np.dot(p, t)
den = math.sqrt((n**2 - np.dot(p, p)) *
                (n**2 - np.dot(t, t)))
mcc = num / den if den else 0.0

print(f"MCC : {mcc:.4f}")

Confusion matrix (rows = true, cols = pred):
 [[135721    129   2239]
 [  5968  16036    798]
 [   620    574   1855]] 

Precision (macro): 0.7636
Recall (macro): 0.7648
F1-score (macro): 0.7488
MCC : 0.7571
