In [1]:
import findspark
findspark.init()
import pyspark
import pyarrow
from pyspark.sql import SQLContext
import os
import sys
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
conf = pyspark.SparkConf()\
        .setAppName('spark_pipeline')\
        .setMaster('local')\
        .set('spark.driver.memory', '8g')\
        .set('spark.executor.memory', '8g')\
        .set('spark.executor.instances', 4)\
        .set('spark.executor.cores', 4)\
        .set('spark.driver.maxResultSize', '8g')\
        .set('spark.sql.shuffle.partitions', 100)\
        .set('spark.default.parallelism', 200)\
        .set('spark.sql.broadcastTimeout', 36000)\
        .set('spark.kryoserializer.buffer.max', '1024m')\
        .set('spark.sql.execution.arrow.enabled', 'false')\
        .set('spark.dynamicAllocation.enabled', "False")\
        .set('spark.port.maxRetries',30) 
sc = pyspark.SparkContext.getOrCreate(conf)
spark = pyspark.sql.SparkSession(sc)
sqlContext = SQLContext.getOrCreate(sc)    




In [2]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.ml.feature import StringIndexer,OneHotEncoder
from pyspark.sql import SQLContext, SparkSession, Row
from pyspark.mllib.stat import Statistics
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, StandardScaler
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import lit
from pyspark.sql.functions import when, count, isnull

from pyspark.ml.classification import MultilayerPerceptronClassifier

from pyspark.mllib.evaluation import MulticlassMetrics, BinaryClassificationMetrics
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.functions import vector_to_array

import pyspark.sql.functions as F


from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.functions import vector_to_array

from pyspark.ml.linalg import Vectors

#spark = SparkSession.Builder().appName('DDAM_Project_west').getOrCreate()

In [3]:
df = spark.read.options(inferSchema = True, delimiter = ',', header = True).csv('../../Datasets/West_Incidents_Cleaned.csv') 

In [4]:
df=df.drop('Wind_Chill_F','Pressure_in','_c0') #devo droppare queste colonne. 

In [5]:
#Checking missing values
from pyspark.sql.functions import when, count, isnull
missing = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])
missing.toPandas().T

Unnamed: 0,0
Severity,0
Start_Time,0
Start_Lat,0
Start_Lng,0
Distance_mi,0
County,0
State,0
Temperature_F,0
Humidity_perc,0
Visibility_mi,0


### Preparazione dei dati 

In [6]:
# droppo colonne che non uso un fase di classificazione 
to_drop=['Start_Time','City','County','State','Wind_Direction','day_of_the_week','season']
df_class=df.drop(*to_drop)
#df_class.show()

# da controllare 

In [7]:
# Converto le colonne booleane in numeriche

boolean_attr = ['Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal']
for col_ in boolean_attr:
    df_class = df_class.withColumn(col_, col(col_).cast("int"))

#df_class.show()

In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import when, count, isnull
#replace_astronomical_twilight= udf(lambda value: 0 if value == "Day" else 1, IntegerType())
#replace_working_weekend = udf(lambda value: 0 if value == "WorkingDay" else 1, IntegerType())

#df_class = df_class.withColumn("Astronomical_Twilight", replace_astronomical_twilight("Astronomical_Twilight"))
#df_class = df_class.withColumn("Working_Weekend", replace_working_weekend("Working_Weekend"))
df_class = df_class.withColumn("Astronomical_Twilight", when(df_class["Astronomical_Twilight"] == 'Day', 0).otherwise(1))
df_class = df_class.withColumn("Working_Weekend", when(df_class["Working_Weekend"] == 'WorkingDay', 0).otherwise(1))


#df_class.show()

In [9]:
# faccio indexer su weather condition 
indexer = StringIndexer(inputCol="Weather_Condition", outputCol="Weather_Condition_Indexed")
df_class = indexer.fit(df_class).transform(df_class)

In [10]:
# faccio il one hot encoding di Weather 
from pyspark.ml.feature import OneHotEncoder
onehotencoder_vector = OneHotEncoder(inputCol="Weather_Condition_Indexed", outputCol="Weather_Condition_1hot")
df_class = onehotencoder_vector.fit(df_class).transform(df_class)

In [11]:
# droppo la vecchia colonna Weather
df_class=df_class.drop('Weather_Condition')
#df_class.show()

df_class=df_class.drop('Weather_Condition_Indexed')

In [12]:
def print_metrics_and_cf(predictions):
    # Select (prediction, true label) and compute test error
    evaluator = MulticlassClassificationEvaluator(labelCol="Severity"
                                              , predictionCol="prediction"
                                              , metricName="accuracy")

    accuracy = evaluator.evaluate(predictions)
    print("Accuracy :",accuracy)
    print("Test Error = %g" % (1.0 - accuracy))
    print('----------------------------')

    # AUC score 
    scoreAndLabels = predictions.select('probability','Severity').rdd.map(lambda row: (float(row['Severity']),float(row['probability'][1]) ))
    #metrics = BinaryClassificationMetrics(scoreAndLabels)
    #auROC = metrics.areaUnderROC
    #print('Auc_:', auROC,'\n ----------------------')
    
    
    # metrics
    predictionAndLabels = predictions.select("prediction", "Severity").rdd.map(lambda x: (float(x[0]), float(x[1])))
    metrics = MulticlassMetrics(predictionAndLabels)

    list_avg=[]
    for i in range(4):
        precision = metrics.precision(label=float(i))  
        recall = metrics.recall(label=float(i))
        f1Score= metrics.fMeasure(label=float(i)) # need .0
        print("Precision for class ",i+1,": {:.2%}".format(precision))
        print("Recall for class ",i+1,": {:.2%}".format(recall))
        print("avg_F1-Score for class ",i+1,": {:.2%}".format(f1Score))
        list_avg.append(f1Score)
        print('----------------------------')
    sum_=0
    #print(list_avg)
    for elem in list_avg:
        sum_+=elem

    avg_f1=sum_/4
    print('----------------------')

    print("avg_F1-Score: {:.2%}".format(avg_f1))

    # Confusion Matrix
    #alternativa per pc 
    print('----------------------')
    cf= metrics.confusionMatrix().toArray()
    print(cf)
    print('----------------------')
''' labels = ["0", "1", "2","3"]
    _ = plt.figure(figsize=(7, 7))
    sns.heatmap(metrics.confusionMatrix().toArray(),
                cmap='viridis',
                annot=True,fmt='0',
                cbar=False, 
                xticklabels=labels, 
                yticklabels=labels)''' 

' labels = ["0", "1", "2","3"]\n    _ = plt.figure(figsize=(7, 7))\n    sns.heatmap(metrics.confusionMatrix().toArray(),\n                cmap=\'viridis\',\n                annot=True,fmt=\'0\',\n                cbar=False, \n                xticklabels=labels, \n                yticklabels=labels)'

### SEVERITY

In [13]:
# Definisco funzione le metriche 
'''def print_metrics_and_cf(predictions):
    prediction_counts = predictions.groupBy("prediction").count()
    # Stampare i risultati
    print("Counts of predictions in the test set:")
    prediction_counts.show()
    
    # Select (prediction, true label) and compute test error
    evaluator = MulticlassClassificationEvaluator(labelCol="Severity"
                                              , predictionCol="prediction"
                                              , metricName="accuracy")

    accuracy = evaluator.evaluate(predictions)
    print("Accuracy :",accuracy)
    print("Test Error = %g" % (1.0 - accuracy))
    print('----------------------------')
    
    # metrics
    predictionAndLabels = predictions.select("prediction", "Severity").rdd.map(lambda x: (float(x[0]), float(x[1])))
    metrics = MulticlassMetrics(predictionAndLabels)

    list_avg=[]
    for i in range(4):
        precision = metrics.precision(label=float(i))  
        recall = metrics.recall(label=float(i))
        f1Score= metrics.fMeasure(label=float(i)) # need .0
        print("Precision for class ",i+1,": {:.2%}".format(precision))
        print("Recall for class ",i+1,": {:.2%}".format(recall))
        print("avg_F1-Score for class ",i+1,": {:.2%}".format(f1Score))
        list_avg.append(f1Score)
        print('----------------------------')
    sum_=0
    #print(list_avg)
    for elem in list_avg:
        sum_+=elem

    avg_f1=sum_/4
    print('----------------------')

    print("avg_F1-Score: {:.2%}".format(avg_f1))

    # Confusion Matrix
    #https://www.kaggle.com/code/ashokkumarpalivela/multiclass-classification-using-pyspark

    labels = ["0", "1", "2","3"]
    _ = plt.figure(figsize=(7, 7))
    sns.heatmap(metrics.confusionMatrix().toArray(),
                cmap='viridis',
                annot=True,fmt='0',
                cbar=False, 
                xticklabels=labels, 
                yticklabels=labels)'''

'def print_metrics_and_cf(predictions):\n    prediction_counts = predictions.groupBy("prediction").count()\n    # Stampare i risultati\n    print("Counts of predictions in the test set:")\n    prediction_counts.show()\n    \n    # Select (prediction, true label) and compute test error\n    evaluator = MulticlassClassificationEvaluator(labelCol="Severity"\n                                              , predictionCol="prediction"\n                                              , metricName="accuracy")\n\n    accuracy = evaluator.evaluate(predictions)\n    print("Accuracy :",accuracy)\n    print("Test Error = %g" % (1.0 - accuracy))\n    print(\'----------------------------\')\n    \n    # metrics\n    predictionAndLabels = predictions.select("prediction", "Severity").rdd.map(lambda x: (float(x[0]), float(x[1])))\n    metrics = MulticlassMetrics(predictionAndLabels)\n\n    list_avg=[]\n    for i in range(4):\n        precision = metrics.precision(label=float(i))  \n        recall = metric

In [13]:
# colonne numeriche 
num_col = [item[0] for item in df_class.dtypes if not item[1].startswith('string')]
num_col.remove("Severity")

print(num_col)

['Start_Lat', 'Start_Lng', 'Distance_mi', 'Temperature_F', 'Humidity_perc', 'Visibility_mi', 'Wind_Speed_mph', 'Precipitation_in', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'month', 'hour', 'Working_Weekend', 'Astronomical_Twilight', 'Weather_Condition_1hot']


In [14]:
# scalo Severity di 1 in modo da farlo partire da 0 
df_class= df_class.withColumn("Severity", col("Severity") - 1)
# la classe 2 adesso diventa la 1 

#df_class.show()

### A_Sbilanciato 

In [15]:
assembler = VectorAssembler(inputCols=num_col, outputCol="features")

output_dataset = assembler.transform(df_class)

classificationData = output_dataset.select("features", "Severity")

#classificationData.show(truncate=False)

In [16]:
(trainingData, testData) = classificationData.randomSplit([0.7, 0.3],seed=1)

In [17]:
trainingData.show()

+--------------------+--------+
|            features|Severity|
+--------------------+--------+
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
+--------------------+--------+
only showing top 20 rows



In [18]:
testData.show()

+--------------------+--------+
|            features|Severity|
+--------------------+--------+
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
|(28,[0,1,2,3,4,5,...|       1|
+--------------------+--------+
only showing top 20 rows



In [19]:
from pyspark.sql import functions as F
g=trainingData.groupBy('Severity').agg(F.count('Severity').alias('Count'))
g=g.sort('Severity')
g_p=g.toPandas()

n_0=g_p.iloc[0,1]
n_1=g_p.iloc[1,1]
n_2=g_p.iloc[2,1]
n_3=g_p.iloc[3,1]

### A_DT default

In [21]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier

dt = DecisionTreeClassifier(labelCol="Severity", featuresCol="features")
dt = dt.fit(trainingData)
predictions = dt.transform(testData)
print_metrics_and_cf(predictions)

Accuracy : 0.9440947084950863
Test Error = 0.0559053
----------------------------




Precision for class  1 : 61.02%
Recall for class  1 : 17.83%
avg_F1-Score for class  1 : 27.60%
----------------------------
Precision for class  2 : 95.69%
Recall for class  2 : 98.61%
avg_F1-Score for class  2 : 97.13%
----------------------------
Precision for class  3 : 55.88%
Recall for class  3 : 41.99%
avg_F1-Score for class  3 : 47.95%
----------------------------
Precision for class  4 : 65.50%
Recall for class  4 : 10.30%
avg_F1-Score for class  4 : 17.80%
----------------------------
----------------------
avg_F1-Score: 47.62%
----------------------
[[2.52000e+02 1.08000e+03 8.10000e+01 0.00000e+00]
 [1.01000e+02 1.05812e+05 1.31000e+03 7.90000e+01]
 [6.00000e+01 2.37700e+03 1.76400e+03 0.00000e+00]
 [0.00000e+00 1.30400e+03 2.00000e+00 1.50000e+02]]
----------------------


In [22]:
dt.featureImportances

SparseVector(28, {0: 0.0064, 1: 0.0642, 2: 0.7365, 3: 0.0024, 6: 0.0409, 10: 0.0081, 19: 0.0138, 20: 0.0805, 21: 0.0215, 22: 0.0256})

### A___ DT testo configurazioni diverse di parametri

### A_a)Grid Search con pesi 

In [23]:
dt = DecisionTreeClassifier(labelCol="Severity", featuresCol="features")

In [24]:
n=df.count()

##### definisco i pesi 

In [25]:
from pyspark.sql import functions as F
g=trainingData.groupBy('Severity').agg(F.count('Severity').alias('Count'))
g.sort('Count').show()

+--------+------+
|Severity| Count|
+--------+------+
|       3|  3275|
|       0|  3407|
|       2|  9871|
|       1|250512|
+--------+------+



In [26]:
trainingData_w = trainingData.withColumn("weights1", lit(1))

In [27]:
# Creiamo la colonna 'weights' basata sulla frequenza delle classi
#trainingData_w
trainingData_w = trainingData_w.withColumn("weights2", when(trainingData["Severity"] == 1, 1.0)
    .when(trainingData["Severity"] == 0, 267020 / 3407)  # Calcolo del peso per la classe 1
    .when(trainingData["Severity"] == 2, 267020 / 9871)  # Calcolo del peso per la classe 3
    .when(trainingData["Severity"] == 3, 267020 / 3275)  # Calcolo del peso per la classe 4
)

In [28]:
#DA VEDERE SE MI SERVE  !!!!!!!!!!!!!!!!!!!!!!!!!!!!
# Creiamo la colonna 'weights' basata sulla frequenza delle classi
#trainingData = trainingData.withColumn("weights3", when(trainingData["Severity"] == 4,108039/1933).otherwise(1))

In [29]:
param_grid = (ParamGridBuilder()
              #.addGrid(dt.maxDepth, [5,10,15,20,25,30])
              .addGrid(dt.maxDepth, [5,20,30])
              .addGrid(dt.maxBins, [32, 64])
              #.addGrid(dt.minInstancesPerNode, [round(0.01*n),round(0.005*n),round(0.001*n),round(0.02*n)])
              .addGrid(dt.minInstancesPerNode, [round(0.001*n),round(0.02*n)])
              .addGrid(dt.weightCol, ['weights1','weights2']) #SOLO 2 COLONNE DEI PESI 
              .addGrid(dt.impurity, ['entropy', 'gini'])
              .build())

# Crea il CrossValidator
cross_validator = CrossValidator(estimator=dt,
                                 estimatorParamMaps=param_grid,
                                 evaluator=MulticlassClassificationEvaluator(labelCol="Severity", 
                                                                             predictionCol="prediction", 
                                                                             metricName="accuracy"),
                                 numFolds=5,
                                 )

# Esegui la cross-validation e scegli il miglior set di parametri
cv_model = cross_validator.fit(trainingData_w)

# Ottieni il miglior modello dalla cross-validation
best_model = cv_model.bestModel

# Esegui predizioni sul set di test utilizzando il miglior modello
result = best_model.transform(testData)

# Valuta l'accuratezza sul set di test
evaluator = MulticlassClassificationEvaluator(labelCol="Severity", 
                                              predictionCol="prediction", 
                                              metricName="accuracy")

accuracy = evaluator.evaluate(result)
print("Accuratezza sul set di test utilizzando il miglior modello = {:.2%}".format(accuracy))

Accuratezza sul set di test utilizzando il miglior modello = 94.66%


In [30]:
best_model.featureImportances


SparseVector(28, {0: 0.0539, 1: 0.0747, 2: 0.7456, 3: 0.0147, 4: 0.007, 10: 0.0036, 19: 0.0052, 20: 0.0644, 21: 0.016, 22: 0.0148})

In [31]:
result.select('prediction').distinct().collect()


[Row(prediction=0.0),
 Row(prediction=2.0),
 Row(prediction=1.0),
 Row(prediction=3.0)]

In [32]:
print_metrics_and_cf(result)

Accuracy : 0.9465603469380618
Test Error = 0.0534397
----------------------------
Precision for class  1 : 50.82%
Recall for class  1 : 39.28%
avg_F1-Score for class  1 : 44.31%
----------------------------
Precision for class  2 : 96.71%
Recall for class  2 : 98.08%
avg_F1-Score for class  2 : 97.39%
----------------------------
Precision for class  3 : 53.24%
Recall for class  3 : 53.56%
avg_F1-Score for class  3 : 53.40%
----------------------------
Precision for class  4 : 90.46%
Recall for class  4 : 14.97%
avg_F1-Score for class  4 : 25.69%
----------------------------
----------------------
avg_F1-Score: 55.20%
----------------------
[[5.55000e+02 5.46000e+02 3.12000e+02 0.00000e+00]
 [3.81000e+02 1.05237e+05 1.66100e+03 2.30000e+01]
 [1.56000e+02 1.79500e+03 2.25000e+03 0.00000e+00]
 [0.00000e+00 1.23500e+03 3.00000e+00 2.18000e+02]]
----------------------


In [33]:
result_counts = result.groupBy("prediction").count()

# Stampare i risultati
result_counts.show()

+----------+------+
|prediction| count|
+----------+------+
|       0.0|  1092|
|       2.0|  4226|
|       1.0|108813|
|       3.0|   241|
+----------+------+



In [34]:
result_counts = result.groupBy("Severity").count()

# Stampare i risultati
result_counts.show()

+--------+------+
|Severity| count|
+--------+------+
|       1|107302|
|       3|  1456|
|       2|  4201|
|       0|  1413|
+--------+------+



### A_b)

In [35]:
dt_param1 = DecisionTreeClassifier(labelCol="Severity", featuresCol="features",
                                  maxDepth=20,
                                  maxBins=32)
                                   
dt_param1 = dt_param1.fit(trainingData)
predictions_param1 = dt_param1.transform(testData)
print_metrics_and_cf(predictions_param1)

Accuracy : 0.9422236211660197
Test Error = 0.0577764
----------------------------
Precision for class  1 : 45.75%
Recall for class  1 : 44.52%
avg_F1-Score for class  1 : 45.12%
----------------------------
Precision for class  2 : 97.02%
Recall for class  2 : 97.23%
avg_F1-Score for class  2 : 97.13%
----------------------------
Precision for class  3 : 55.28%
Recall for class  3 : 54.99%
avg_F1-Score for class  3 : 55.13%
----------------------------
Precision for class  4 : 38.45%
Recall for class  4 : 34.07%
avg_F1-Score for class  4 : 36.13%
----------------------------
----------------------
avg_F1-Score: 58.38%
----------------------
[[6.29000e+02 5.79000e+02 2.05000e+02 0.00000e+00]
 [5.34000e+02 1.04329e+05 1.65200e+03 7.87000e+02]
 [2.08000e+02 1.67600e+03 2.31000e+03 7.00000e+00]
 [4.00000e+00 9.44000e+02 1.20000e+01 4.96000e+02]]
----------------------


In [36]:
dt_param1.featureImportances

SparseVector(28, {0: 0.1168, 1: 0.1272, 2: 0.3201, 3: 0.084, 4: 0.071, 5: 0.0153, 6: 0.0606, 7: 0.0032, 8: 0.0014, 9: 0.0003, 10: 0.0078, 11: 0.0003, 12: 0.0099, 13: 0.0002, 14: 0.0018, 16: 0.0029, 17: 0.0049, 18: 0.0002, 19: 0.0145, 20: 0.0575, 21: 0.0709, 22: 0.0169, 23: 0.003, 24: 0.0039, 25: 0.0034, 26: 0.001, 27: 0.0008})

### A_c) 

In [37]:
from pyspark.sql import functions as F
g=trainingData.groupBy('Severity').agg(F.count('Severity').alias('Count'))
g.sort('Count').show()

+--------+------+
|Severity| Count|
+--------+------+
|       3|  3275|
|       0|  3407|
|       2|  9871|
|       1|250512|
+--------+------+



In [38]:
trainingData_w = trainingData_w.withColumn("weights2", when(trainingData["Severity"] == 1, 1.0)
    .when(trainingData["Severity"] == 0, 267020 / 3335)  # Calcolo del peso per la classe 1
    .when(trainingData["Severity"] == 2, 267020 / 9857)  # Calcolo del peso per la classe 3
    .when(trainingData["Severity"] == 3, 267020 / 3272)  # Calcolo del peso per la classe 4
)

In [None]:
# Real Weights
trainingData_w = trainingData_w.withColumn("weights2", when(trainingData["Severity"] == 1, 1.0)
    .when(trainingData["Severity"] == 0, 267020 / 3407)  # Calcolo del peso per la classe 1
    .when(trainingData["Severity"] == 2, 267020 / 9871)  # Calcolo del peso per la classe 3
    .when(trainingData["Severity"] == 3, 267020 / 3275)  # Calcolo del peso per la classe 4
)

In [39]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier

dt_param1 = DecisionTreeClassifier(labelCol="Severity", featuresCol="features",
                                  maxDepth=30,
                                  maxBins=32,
                                weightCol='weights2'  
                                  )
                                   

dt_param1 = dt_param1.fit(trainingData_w)
predictions_param1 = dt_param1.transform(testData)
print_metrics_and_cf(predictions_param1)

Accuracy : 0.9262756618752842
Test Error = 0.0737243
----------------------------
Precision for class  1 : 40.99%
Recall for class  1 : 44.59%
avg_F1-Score for class  1 : 42.71%
----------------------------
Precision for class  2 : 97.02%
Recall for class  2 : 95.52%
avg_F1-Score for class  2 : 96.27%
----------------------------
Precision for class  3 : 48.88%
Recall for class  3 : 50.80%
avg_F1-Score for class  3 : 49.82%
----------------------------
Precision for class  4 : 24.06%
Recall for class  4 : 46.70%
avg_F1-Score for class  4 : 31.76%
----------------------------
----------------------
avg_F1-Score: 55.14%
----------------------
[[6.30000e+02 5.70000e+02 2.12000e+02 1.00000e+00]
 [6.60000e+02 1.02496e+05 2.00900e+03 2.13700e+03]
 [2.46000e+02 1.81300e+03 2.13400e+03 8.00000e+00]
 [1.00000e+00 7.64000e+02 1.10000e+01 6.80000e+02]]
----------------------


In [40]:
dt_param1.featureImportances

SparseVector(28, {0: 0.1272, 1: 0.1, 2: 0.3045, 3: 0.0581, 4: 0.058, 5: 0.0115, 6: 0.0427, 7: 0.0027, 8: 0.0008, 9: 0.0003, 10: 0.0058, 11: 0.0002, 12: 0.0091, 13: 0.0001, 14: 0.0014, 16: 0.0016, 17: 0.0054, 18: 0.0, 19: 0.0133, 20: 0.1276, 21: 0.0752, 22: 0.0351, 23: 0.0083, 24: 0.0049, 25: 0.0036, 26: 0.001, 27: 0.0015})

### B_ RandomForest

In [41]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="Severity", featuresCol="features", numTrees=10)

# Train model.  This also runs the indexers.
model = rf.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

print_metrics_and_cf(predictions)

Accuracy : 0.9381841709509321
Test Error = 0.0618158
----------------------------
Precision for class  1 : 0.00%
Recall for class  1 : 0.00%
avg_F1-Score for class  1 : 0.00%
----------------------------
Precision for class  2 : 93.82%
Recall for class  2 : 100.00%
avg_F1-Score for class  2 : 96.81%
----------------------------
Precision for class  3 : 0.00%
Recall for class  3 : 0.00%
avg_F1-Score for class  3 : 0.00%
----------------------------
Precision for class  4 : 0.00%
Recall for class  4 : 0.00%
avg_F1-Score for class  4 : 0.00%
----------------------------
----------------------
avg_F1-Score: 24.20%
----------------------
[[     0.   1413.      0.      0.]
 [     0. 107302.      0.      0.]
 [     0.   4201.      0.      0.]
 [     0.   1456.      0.      0.]]
----------------------


In [42]:
model.featureImportances

SparseVector(28, {0: 0.0481, 1: 0.0234, 2: 0.684, 3: 0.0288, 4: 0.0488, 5: 0.0011, 6: 0.0035, 7: 0.0011, 8: 0.0, 10: 0.0065, 12: 0.002, 14: 0.0012, 16: 0.0005, 17: 0.0004, 18: 0.0007, 19: 0.0232, 20: 0.1046, 21: 0.0202, 22: 0.0017, 23: 0.0003})

### B_i) TUNING RANDOM FOREST

In [43]:
rf = RandomForestClassifier(labelCol="Severity", featuresCol="features", numTrees=20,maxDepth=20, seed=10
                           )
# Train model.  This also runs the indexers.
model = rf.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)
print_metrics_and_cf(predictions)

Accuracy : 0.9535550659252264
Test Error = 0.0464449
----------------------------
Precision for class  1 : 59.77%
Recall for class  1 : 43.74%
avg_F1-Score for class  1 : 50.51%
----------------------------
Precision for class  2 : 96.65%
Recall for class  2 : 98.73%
avg_F1-Score for class  2 : 97.68%
----------------------------
Precision for class  3 : 65.49%
Recall for class  3 : 49.15%
avg_F1-Score for class  3 : 56.16%
----------------------------
Precision for class  4 : 76.60%
Recall for class  4 : 30.36%
avg_F1-Score for class  4 : 43.48%
----------------------------
----------------------
avg_F1-Score: 61.96%
----------------------
[[6.18000e+02 6.26000e+02 1.69000e+02 0.00000e+00]
 [3.17000e+02 1.05935e+05 9.16000e+02 1.34000e+02]
 [9.90000e+01 2.03600e+03 2.06500e+03 1.00000e+00]
 [0.00000e+00 1.01100e+03 3.00000e+00 4.42000e+02]]
----------------------


In [44]:
model.featureImportances

SparseVector(28, {0: 0.0925, 1: 0.0897, 2: 0.3808, 3: 0.0684, 4: 0.0621, 5: 0.0166, 6: 0.0519, 7: 0.0045, 8: 0.0018, 9: 0.0002, 10: 0.0076, 11: 0.0004, 12: 0.0079, 13: 0.0005, 14: 0.0024, 15: 0.0, 16: 0.0041, 17: 0.0053, 18: 0.0003, 19: 0.0148, 20: 0.0741, 21: 0.0698, 22: 0.0162, 23: 0.0073, 24: 0.0082, 25: 0.008, 26: 0.0022, 27: 0.0024})

### C_ NN

###### data preparation for NN

In [45]:
to_drop=['Start_Time','City','County','State','Wind_Direction','day_of_the_week','season']
df_class=df.drop(*to_drop)
colonne_booleane = ['Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal']

# Converti le colonne booleane in numeriche
for col_name in colonne_booleane:
    df_class = df_class.withColumn(col_name, col(col_name).cast("int"))


df_class = df_class.withColumn("Astronomical_Twilight", when(df_class["Astronomical_Twilight"] == 'Day', 0).otherwise(1))
df_class = df_class.withColumn("Working_Weekend", when(df_class["Working_Weekend"] == 'WorkingDay', 0).otherwise(1))




indexer = StringIndexer(inputCol='Weather_Condition', outputCol='class_numeric')
indexer_fitted = indexer.fit(df_class)
df_indexed = indexer_fitted.transform(df_class)


encoder = OneHotEncoder(inputCols=['class_numeric'], outputCols=['class_onehot'],dropLast=False)
df_onehot = encoder.fit(df_indexed).transform(df_indexed)


df_col_onehot = df_onehot.select('*', vector_to_array('class_onehot').alias('col_onehot'))

num_categories = len(df_col_onehot.first()['col_onehot'])   # 3
cols_expanded = [(F.col('col_onehot')[i].alias(f'{indexer_fitted.labels[i]}')) for i in range(num_categories)]
df_cols_onehot = df_col_onehot.select('Severity',
 'Start_Lat',
 'Start_Lng',
 'Distance_mi',
 'Temperature_F',
 'Humidity_perc',
 'Visibility_mi',
 'Wind_Speed_mph',
 'Precipitation_in',
 'Amenity',
 'Bump',
 'Crossing',
 'Give_Way',
 'Junction',
 'No_Exit',
 'Railway',
 'Roundabout',
 'Station',
 'Stop',
 'Traffic_Calming',
 'Traffic_Signal',
 'month',
 'hour',
 'Working_Weekend',
 'Astronomical_Twilight', *cols_expanded)


num_col = [item[0] for item in df_cols_onehot.dtypes if not item[1].startswith('string')]
num_col.remove("Severity")

df_MLP= df_cols_onehot.withColumn("Severity", col("Severity") - 1)
#df_MLP.show()

In [46]:
df_MLP_binary=df_MLP


In [47]:
severity_counts = df_MLP.groupBy("Severity").count()
severity_counts.show()

+--------+------+
|Severity| count|
+--------+------+
|       1|357814|
|       3|  4731|
|       2| 14072|
|       0|  4820|
+--------+------+



In [48]:
assembler_nn = VectorAssembler(inputCols=num_col, outputCol="features")

output_dataset_nn = assembler_nn.transform(df_MLP)

classificationData_nn = output_dataset_nn.select("features", "Severity")

#classificationData_nn.show(truncate=False)
(trainingData_nn, testData_nn) = classificationData_nn.randomSplit([0.7, 0.3],seed=0)

###### Run NN

In [49]:
# MLP Layers  -> output layer must have the same number of units of the Severity classes
layers = [len(num_col),4]

# Create the Multilayer Perceptron Classifier and set its parameters
trainer = MultilayerPerceptronClassifier(
    layers=layers,
    labelCol="Severity",
    featuresCol="features",
    maxIter=100,  
    tol=1e-6,
    seed=None,
    blockSize=32,
    stepSize=0.03,  
    solver="l-bfgs",
    initialWeights=None,
    probabilityCol="probability",
    rawPredictionCol="rawPrediction"
)

# Train the model
model = trainer.fit(trainingData_nn)

# Make predictions on the test set
predictions= model.transform(testData_nn)

print_metrics_and_cf(predictions)

Accuracy : 0.9377966590617051
Test Error = 0.0622033
----------------------------
Precision for class  1 : 2.78%
Recall for class  1 : 0.07%
avg_F1-Score for class  1 : 0.13%
----------------------------
Precision for class  2 : 93.81%
Recall for class  2 : 99.97%
avg_F1-Score for class  2 : 96.79%
----------------------------
Precision for class  3 : 0.00%
Recall for class  3 : 0.00%
avg_F1-Score for class  3 : 0.00%
----------------------------
Precision for class  4 : 0.00%
Recall for class  4 : 0.00%
avg_F1-Score for class  4 : 0.00%
----------------------------
----------------------
avg_F1-Score: 24.23%
----------------------
[[1.00000e+00 1.46700e+03 0.00000e+00 0.00000e+00]
 [3.50000e+01 1.07282e+05 2.00000e+00 0.00000e+00]
 [0.00000e+00 4.16600e+03 0.00000e+00 0.00000e+00]
 [0.00000e+00 1.44600e+03 0.00000e+00 0.00000e+00]]
----------------------


 ## U_ UNDERSAMPLING
 

In [20]:
import seaborn as sns
import matplotlib.pyplot as plt

In [21]:
from pyspark.sql import functions as F
g=trainingData.groupBy('Severity').agg(F.count('Severity').alias('Count'))
g=g.sort('Severity')
g.show()

+--------+------+
|Severity| Count|
+--------+------+
|       0|  3407|
|       1|250512|
|       2|  9871|
|       3|  3275|
+--------+------+



In [22]:
g_p=g.toPandas()

In [23]:
n_0=g_p.iloc[0,1]
n_1=g_p.iloc[1,1]
n_2=g_p.iloc[2,1]
n_3=g_p.iloc[3,1]

In [24]:
und=(n_0+n_2+n_3)/3 # si applica semplicemente la media

In [25]:
train_und = trainingData.sampleBy('Severity', fractions={0:1.0 ,1: und/n_1 , 2:1.0, 3:1.0})

### U_1 DT base

In [27]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="Severity", featuresCol="features")

dt = dt.fit(train_und)

In [57]:
# Make predictions.
predictions = dt.transform(testData)

In [58]:
print_metrics_and_cf(predictions)

Accuracy : 0.8015510789354038
Test Error = 0.198449
----------------------------




Precision for class  1 : 37.42%
Recall for class  1 : 71.13%
avg_F1-Score for class  1 : 49.04%
----------------------------
Precision for class  2 : 98.90%
Recall for class  2 : 80.68%
avg_F1-Score for class  2 : 88.87%
----------------------------
Precision for class  3 : 34.63%
Recall for class  3 : 76.55%
avg_F1-Score for class  3 : 47.69%
----------------------------
Precision for class  4 : 5.93%
Recall for class  4 : 60.58%
avg_F1-Score for class  4 : 10.81%
----------------------------
----------------------
avg_F1-Score: 49.10%
----------------------
[[1.0050e+03 9.5000e+01 3.1000e+02 3.0000e+00]
 [1.0710e+03 8.6572e+04 5.7400e+03 1.3919e+04]
 [6.0900e+02 3.1600e+02 3.2160e+03 6.0000e+01]
 [1.0000e+00 5.5300e+02 2.0000e+01 8.8200e+02]]
----------------------


In [33]:
dt.featureImportances

SparseVector(28, {0: 0.084, 1: 0.1177, 2: 0.608, 3: 0.0089, 19: 0.0214, 20: 0.1163, 21: 0.0052, 22: 0.0304, 23: 0.0081})

In [32]:
feat_imp=dt.featureImportances
from pyspark.ml.linalg import SparseVector
non_zero_elements = [(index, value) for index, value in zip(feat_imp.indices, feat_imp.values) if value != 0]
sorted_non_zero_elements = sorted(non_zero_elements, key=lambda x: x[1], reverse=True)
print('Le features più importanti per la predizione sono :')
for el in sorted_non_zero_elements:
    if el[0]>=24:
        print(num_col[len(num_col)-1], 'con un\'importanza dello', el[1])
    else:
        print(num_col[el[0]], 'con un\'importanza dello', el[1])

Le features più importanti per la predizione sono :
Distance_mi con un'importanza dello 0.6079570635876781
Start_Lng con un'importanza dello 0.11770419100136396
month con un'importanza dello 0.11629488733169649
Start_Lat con un'importanza dello 0.08395371791912863
Working_Weekend con un'importanza dello 0.030425593021708705
Traffic_Signal con un'importanza dello 0.021436394764437153
Temperature_F con un'importanza dello 0.00887952455729292
Astronomical_Twilight con un'importanza dello 0.008099003932141613
hour con un'importanza dello 0.005249623884552483


### U_2 Random forest base 

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="Severity", featuresCol="features", numTrees=10)


# Train model.  This also runs the indexers.
model = rf.fit(train_und)

# Make predictions.
predictions = model.transform(testData)


In [60]:
print_metrics_and_cf(predictions)

Accuracy : 0.850225579687336
Test Error = 0.149774
----------------------------
Precision for class  1 : 51.29%
Recall for class  1 : 31.00%
avg_F1-Score for class  1 : 38.64%
----------------------------
Precision for class  2 : 98.77%
Recall for class  2 : 86.05%
avg_F1-Score for class  2 : 91.97%
----------------------------
Precision for class  3 : 33.10%
Recall for class  3 : 89.67%
avg_F1-Score for class  3 : 48.36%
----------------------------
Precision for class  4 : 8.11%
Recall for class  4 : 48.21%
avg_F1-Score for class  4 : 13.89%
----------------------------
----------------------
avg_F1-Score: 48.21%
----------------------
[[4.3800e+02 9.5000e+01 8.6700e+02 1.3000e+01]
 [3.8300e+02 9.2335e+04 6.7330e+03 7.8510e+03]
 [3.2000e+01 3.1500e+02 3.7670e+03 8.7000e+01]
 [1.0000e+00 7.4100e+02 1.2000e+01 7.0200e+02]]
----------------------


In [35]:
model.featureImportances

SparseVector(28, {0: 0.0601, 1: 0.0398, 2: 0.6021, 3: 0.0328, 4: 0.0037, 5: 0.0003, 6: 0.0121, 7: 0.001, 8: 0.0005, 10: 0.0133, 12: 0.0006, 14: 0.0005, 16: 0.0015, 17: 0.0068, 18: 0.0001, 19: 0.0236, 20: 0.1697, 21: 0.0134, 22: 0.0166, 23: 0.0008, 25: 0.0004, 27: 0.0001})

In [36]:
feat_imp=model.featureImportances
from pyspark.ml.linalg import SparseVector
non_zero_elements = [(index, value) for index, value in zip(feat_imp.indices, feat_imp.values) if value != 0]
sorted_non_zero_elements = sorted(non_zero_elements, key=lambda x: x[1], reverse=True)
print('Le features più importanti per la predizione sono :')
for el in sorted_non_zero_elements:
    if el[0]>=24:
        print(num_col[len(num_col)-1], 'con un\'importanza dello', el[1])
    else:
        print(num_col[el[0]], 'con un\'importanza dello', el[1])

Le features più importanti per la predizione sono :
Distance_mi con un'importanza dello 0.6021298960645595
month con un'importanza dello 0.16965688407223428
Start_Lat con un'importanza dello 0.06006015464731429
Start_Lng con un'importanza dello 0.039790476833249024
Temperature_F con un'importanza dello 0.03279361148756961
Traffic_Signal con un'importanza dello 0.02364886010816531
Working_Weekend con un'importanza dello 0.01660877635558642
hour con un'importanza dello 0.0133927382480035
Crossing con un'importanza dello 0.01334245350087565
Wind_Speed_mph con un'importanza dello 0.0121125874923735
Stop con un'importanza dello 0.006815860127949847
Humidity_perc con un'importanza dello 0.003691973339777941
Station con un'importanza dello 0.0014562540455305187
Precipitation_in con un'importanza dello 0.0009929584067117163
Astronomical_Twilight con un'importanza dello 0.0007903088639355463
Junction con un'importanza dello 0.0006367021440046624
Amenity con un'importanza dello 0.000537617241298

#### U_2)_a) Ranodom Forest Tuning aumento max depth e numero di alberi

In [38]:
rf = RandomForestClassifier(labelCol="Severity", featuresCol="features", numTrees=20,maxDepth=20, seed=10
                           )
# Train model.  This also runs the indexers.
model = rf.fit(train_und)

# Make predictions.
predictions = model.transform(testData)
print_metrics_and_cf(predictions)

Accuracy : 0.8357989717763089
Test Error = 0.164201
----------------------------




Precision for class  1 : 40.78%
Recall for class  1 : 68.58%
avg_F1-Score for class  1 : 51.15%
----------------------------
Precision for class  2 : 99.18%
Recall for class  2 : 83.83%
avg_F1-Score for class  2 : 90.86%
----------------------------
Precision for class  3 : 31.07%
Recall for class  3 : 89.00%
avg_F1-Score for class  3 : 46.06%
----------------------------
Precision for class  4 : 10.09%
Recall for class  4 : 64.22%
avg_F1-Score for class  4 : 17.44%
----------------------------
----------------------
avg_F1-Score: 51.38%
----------------------
[[9.6900e+02 7.2000e+01 3.6200e+02 1.0000e+01]
 [1.1880e+03 8.9949e+04 7.8810e+03 8.2840e+03]
 [2.1400e+02 2.0800e+02 3.7390e+03 4.0000e+01]
 [5.0000e+00 4.6200e+02 5.4000e+01 9.3500e+02]]
----------------------


In [39]:
model.featureImportances

SparseVector(28, {0: 0.1152, 1: 0.108, 2: 0.3236, 3: 0.0591, 4: 0.0492, 5: 0.0125, 6: 0.0404, 7: 0.0033, 8: 0.0017, 9: 0.0002, 10: 0.0107, 11: 0.0003, 12: 0.0084, 13: 0.0002, 14: 0.0015, 16: 0.0032, 17: 0.0064, 18: 0.0003, 19: 0.0174, 20: 0.1213, 21: 0.0673, 22: 0.0214, 23: 0.0093, 24: 0.0077, 25: 0.0072, 26: 0.0022, 27: 0.002})

In [40]:
feat_imp=model.featureImportances
from pyspark.ml.linalg import SparseVector
non_zero_elements = [(index, value) for index, value in zip(feat_imp.indices, feat_imp.values) if value != 0]
sorted_non_zero_elements = sorted(non_zero_elements, key=lambda x: x[1], reverse=True)
print('Le features più importanti per la predizione sono :')
for el in sorted_non_zero_elements:
    if el[0]>=24:
        print(num_col[len(num_col)-1], 'con un\'importanza dello', el[1])
    else:
        print(num_col[el[0]], 'con un\'importanza dello', el[1])

Le features più importanti per la predizione sono :
Distance_mi con un'importanza dello 0.32364114144043005
month con un'importanza dello 0.12128154580107002
Start_Lat con un'importanza dello 0.1151541824871002
Start_Lng con un'importanza dello 0.10795877543782732
hour con un'importanza dello 0.06728564837311637
Temperature_F con un'importanza dello 0.05911022325052982
Humidity_perc con un'importanza dello 0.0491736564692269
Wind_Speed_mph con un'importanza dello 0.040440387788528126
Working_Weekend con un'importanza dello 0.021387199554207138
Traffic_Signal con un'importanza dello 0.017427909742005178
Visibility_mi con un'importanza dello 0.012526496479246925
Crossing con un'importanza dello 0.010704855189879325
Astronomical_Twilight con un'importanza dello 0.009250053123785384
Junction con un'importanza dello 0.008412287521364178
Weather_Condition_1hot con un'importanza dello 0.0076636170031701405
Weather_Condition_1hot con un'importanza dello 0.0072190283651356645
Stop con un'import

### U_3 NN

###### data pre for Undersampling for NN 

In [61]:
g=trainingData_nn.groupBy('Severity').agg(F.count('Severity').alias('Count'))
g=g.sort('Severity')
g_p=g.toPandas()
n_0=g_p.iloc[0,1]
n_1=g_p.iloc[1,1]
n_2=g_p.iloc[2,1]
n_3=g_p.iloc[3,1]

In [62]:
und=(n_0+n_2+n_3)/3
train_und_nn = trainingData_nn.sampleBy('Severity', fractions={0:1.0 ,1: und/n_1 , 2:1.0, 3:1.0})
result_counts=train_und_nn.groupBy("Severity").count()
result_counts.show()

+--------+-----+
|Severity|count|
+--------+-----+
|       1| 5513|
|       3| 3285|
|       2| 9906|
|       0| 3352|
+--------+-----+



In [63]:
# MLP Layers  -> output layer must have the same number of units of the Severity classes
layers = [len(num_col),4]

# Create the Multilayer Perceptron Classifier and set its parameters
trainer = MultilayerPerceptronClassifier(
    layers=layers,
    labelCol="Severity",
    featuresCol="features",
    maxIter=100,  
    tol=1e-6,
    seed=None,
    blockSize=32,
    stepSize=0.03,  
    solver="l-bfgs",
    initialWeights=None,
    probabilityCol="probability",
    rawPredictionCol="rawPrediction"
)

# Train the model
model = trainer.fit(train_und_nn)

# Make predictions on the test set
predictions= model.transform(testData_nn)

In [64]:
print_metrics_and_cf(predictions)

Accuracy : 0.41093016547347444
Test Error = 0.58907
----------------------------




Precision for class  1 : 3.65%
Recall for class  1 : 10.22%
avg_F1-Score for class  1 : 5.38%
----------------------------
Precision for class  2 : 98.16%
Recall for class  2 : 40.04%
avg_F1-Score for class  2 : 56.88%
----------------------------
Precision for class  3 : 6.34%
Recall for class  3 : 84.76%
avg_F1-Score for class  3 : 11.81%
----------------------------
Precision for class  4 : 3.33%
Recall for class  4 : 25.03%
avg_F1-Score for class  4 : 5.88%
----------------------------
----------------------
avg_F1-Score: 19.99%
----------------------
[[  150.   205.   951.   162.]
 [ 3612. 42967. 50522. 10218.]
 [  205.   311.  3531.   119.]
 [  143.   290.   651.   362.]]
----------------------


## C_ Over & Under 


In [26]:
under_1=trainingData.filter(col('Severity') == 1)
under_1 = trainingData.sample(True, 0.3, seed=42)

In [27]:
#Over Sample Classe 0
oversample_0=trainingData.filter(col('Severity') == 0)
num_campioni_da_generare = round(n_1*0.3)
num_campioni_da_generare=(num_campioni_da_generare-n_0)/n_0 #percentuale 
#print(num_campioni_da_generare)
full_0=oversample_0
#print(full.count())
while num_campioni_da_generare>1:
    oversample_0 = oversample_0.unionAll(full_0)
    #trainingData_over = trainingData_over.unionAll(df_minority_oversampled)
    num_campioni_da_generare=num_campioni_da_generare-1.0
    #num_campioni_da_generare=num_campioni_da_generare/n_0
    print(num_campioni_da_generare)
    #print('------')
    #print(trainingData_over.count())
#print(num_campioni_da_generare, 'u')
df_minority_oversampled = full_0.sample(True, num_campioni_da_generare, seed=42)

oversample_0 = oversample_0.unionAll(df_minority_oversampled)

20.058702670971527
19.058702670971527
18.058702670971527
17.058702670971527
16.058702670971527
15.058702670971527
14.058702670971527
13.058702670971527
12.058702670971527
11.058702670971527
10.058702670971527
9.058702670971527
8.058702670971527
7.0587026709715275
6.0587026709715275
5.0587026709715275
4.0587026709715275
3.0587026709715275
2.0587026709715275
1.0587026709715275
0.05870267097152748


In [28]:
# Overs sample Classe 2
oversample_2=trainingData.filter(col('Severity') == 2)
num_campioni_da_generare = round(n_1*0.3)
num_campioni_da_generare=(num_campioni_da_generare-n_2)/n_2 #percentuale 
#print(num_campioni_da_generare)
full_2=oversample_2
#print(full.count())

while num_campioni_da_generare>1:
    oversample_2 = oversample_2.unionAll(full_2)
    #trainingData_over = trainingData_over.unionAll(df_minority_oversampled)
    num_campioni_da_generare=num_campioni_da_generare-1.0
    #num_campioni_da_generare=num_campioni_da_generare/n_0
    print(num_campioni_da_generare)
    #print('------')
    #print(trainingData_over.count())
#print(num_campioni_da_generare, 'u')
df_minority_oversampled = full_2.sample(True, num_campioni_da_generare, seed=42)

oversample_2 = oversample_2.unionAll(df_minority_oversampled)

5.6136156417789485
4.6136156417789485
3.6136156417789485
2.6136156417789485
1.6136156417789485
0.6136156417789485


In [29]:
# Oversample Classe 3 
oversample_3=trainingData.filter(col('Severity') == 3)
num_campioni_da_generare = round(n_1*0.3)
num_campioni_da_generare=(num_campioni_da_generare-n_3)/n_3 #percentuale 
#print(num_campioni_da_generare)
full_3=oversample_3
#print(full.count())

while num_campioni_da_generare>1:
    oversample_3 = oversample_3.unionAll(full_3)
    #trainingData_over = trainingData_over.unionAll(df_minority_oversampled)
    num_campioni_da_generare=num_campioni_da_generare-1.0
    #num_campioni_da_generare=num_campioni_da_generare/n_0
    print(num_campioni_da_generare)
    #print('------')
    #print(trainingData_over.count())
#print(num_campioni_da_generare, 'u')
df_minority_oversampled = full_3.sample(True, num_campioni_da_generare, seed=42)

oversample_3 = oversample_3.unionAll(df_minority_oversampled)

20.947786259541985
19.947786259541985
18.947786259541985
17.947786259541985
16.947786259541985
15.947786259541985
14.947786259541985
13.947786259541985
12.947786259541985
11.947786259541985
10.947786259541985
9.947786259541985
8.947786259541985
7.947786259541985
6.947786259541985
5.947786259541985
4.947786259541985
3.947786259541985
2.947786259541985
1.947786259541985
0.947786259541985


In [30]:
df_mezzo=oversample_0.unionAll(oversample_2).unionAll(oversample_3).unionAll(under_1)

In [31]:
df_mezzo.count()

305461

In [32]:
from pyspark.sql import functions as F
g=df_mezzo.groupBy('Severity').agg(F.count('Severity').alias('Count'))
g=g.sort('Severity')
g.show()

+--------+-----+
|Severity|Count|
+--------+-----+
|       0|76207|
|       1|74954|
|       2|78069|
|       3|76231|
+--------+-----+



In [33]:
# ri assegno il train 
train_mezzo=df_mezzo

### U+O A) Dt base

In [73]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="Severity", featuresCol="features")

dt = dt.fit(train_mezzo)
predictions = dt.transform(testData)
print_metrics_and_cf(predictions)

Accuracy : 0.7703021718602455
Test Error = 0.229698
----------------------------
Precision for class  1 : 36.25%
Recall for class  1 : 71.90%
avg_F1-Score for class  1 : 48.20%
----------------------------
Precision for class  2 : 99.03%
Recall for class  2 : 77.28%
avg_F1-Score for class  2 : 86.82%
----------------------------
Precision for class  3 : 37.70%
Recall for class  3 : 75.48%
avg_F1-Score for class  3 : 50.28%
----------------------------
Precision for class  4 : 5.09%
Recall for class  4 : 67.86%
avg_F1-Score for class  4 : 9.46%
----------------------------
----------------------
avg_F1-Score: 48.69%
----------------------
[[1.0160e+03 9.1000e+01 2.9900e+02 7.0000e+00]
 [1.1570e+03 8.2926e+04 4.9340e+03 1.8285e+04]
 [6.2900e+02 2.6000e+02 3.1710e+03 1.4100e+02]
 [1.0000e+00 4.5900e+02 8.0000e+00 9.8800e+02]]
----------------------


In [74]:
dt.featureImportances

SparseVector(28, {0: 0.1816, 1: 0.0361, 2: 0.5254, 6: 0.0006, 10: 0.0009, 12: 0.001, 19: 0.0167, 20: 0.1706, 21: 0.0032, 22: 0.0537, 23: 0.0101})

### U+O_A)_a) DT Tuning

In [75]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier

dt_param1 = DecisionTreeClassifier(labelCol="Severity", featuresCol="features",
                                  maxDepth=20,
                                  maxBins=32)
                                   

dt_param1 = dt_param1.fit(train_mezzo)
predictions_param1 = dt_param1.transform(testData)
print_metrics_and_cf(predictions_param1)

Accuracy : 0.8538278599657259
Test Error = 0.146172
----------------------------
Precision for class  1 : 27.25%
Recall for class  1 : 64.61%
avg_F1-Score for class  1 : 38.34%
----------------------------
Precision for class  2 : 98.33%
Recall for class  2 : 86.58%
avg_F1-Score for class  2 : 92.08%
----------------------------
Precision for class  3 : 38.43%
Recall for class  3 : 71.58%
avg_F1-Score for class  3 : 50.01%
----------------------------
Precision for class  4 : 9.56%
Recall for class  4 : 57.28%
avg_F1-Score for class  4 : 16.39%
----------------------------
----------------------
avg_F1-Score: 49.20%
----------------------
[[9.130e+02 2.180e+02 2.750e+02 7.000e+00]
 [2.036e+03 9.290e+04 4.519e+03 7.847e+03]
 [3.920e+02 7.660e+02 3.007e+03 3.600e+01]
 [9.000e+00 5.890e+02 2.400e+01 8.340e+02]]
----------------------


In [76]:
dt.featureImportances

SparseVector(28, {0: 0.1816, 1: 0.0361, 2: 0.5254, 6: 0.0006, 10: 0.0009, 12: 0.001, 19: 0.0167, 20: 0.1706, 21: 0.0032, 22: 0.0537, 23: 0.0101})

### U+O_A)_b) DT Tuning Pesi

In [77]:
from pyspark.sql import functions as F
g=train_mezzo.groupBy('Severity').agg(F.count('Severity').alias('Count'))
g.sort('Count').show()

+--------+-----+
|Severity|Count|
+--------+-----+
|       1|74954|
|       3|76001|
|       0|77478|
|       2|77778|
+--------+-----+



In [80]:
# DEFINIRE I PESI !!!!!!!!  <-------------------------------------------------------------------------------
trainingData_w = train_mezzo.withColumn("weights2", when(trainingData["Severity"] == 1, 1.0)
    .when(trainingData["Severity"] == 0, 306211 / 77478)  # Calcolo del peso per la classe 1
    .when(trainingData["Severity"] == 2, 306211 / 77778)  # Calcolo del peso per la classe 3
    .when(trainingData["Severity"] == 3, 306211 / 76001)  # Calcolo del peso per la classe 4
)

In [81]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier

dt_param2 = DecisionTreeClassifier(labelCol="Severity", featuresCol="features",
                                  maxDepth=30,
                                  maxBins=32,
                                weightCol='weights2'  
                                  )
                                   

dt_param2 = dt_param2.fit(trainingData_w)
predictions_param2 = dt_param2.transform(testData)
print_metrics_and_cf(predictions_param2)

Accuracy : 0.8891599342496416
Test Error = 0.11084
----------------------------
Precision for class  1 : 32.18%
Recall for class  1 : 57.89%
avg_F1-Score for class  1 : 41.37%
----------------------------
Precision for class  2 : 97.94%
Recall for class  2 : 90.75%
avg_F1-Score for class  2 : 94.21%
----------------------------
Precision for class  3 : 39.06%
Recall for class  3 : 65.60%
avg_F1-Score for class  3 : 48.97%
----------------------------
Precision for class  4 : 13.98%
Recall for class  4 : 51.37%
avg_F1-Score for class  4 : 21.97%
----------------------------
----------------------
avg_F1-Score: 51.63%
----------------------
[[8.1800e+02 2.7400e+02 3.1200e+02 9.0000e+00]
 [1.3970e+03 9.7373e+04 3.9620e+03 4.5700e+03]
 [3.2300e+02 1.0970e+03 2.7560e+03 2.5000e+01]
 [4.0000e+00 6.7800e+02 2.6000e+01 7.4800e+02]]
----------------------


In [82]:
dt.featureImportances

SparseVector(28, {0: 0.1816, 1: 0.0361, 2: 0.5254, 6: 0.0006, 10: 0.0009, 12: 0.001, 19: 0.0167, 20: 0.1706, 21: 0.0032, 22: 0.0537, 23: 0.0101})

### U+O B_ Random Forest

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="Severity", featuresCol="features", numTrees=10)


# Train model.  This also runs the indexers.
model = rf.fit(train_mezzo)

# Make predictions.

predictions = model.transform(testData)

In [84]:
print_metrics_and_cf(predictions)

Accuracy : 0.6475011366418354
Test Error = 0.352499
----------------------------
Precision for class  1 : 37.13%
Recall for class  1 : 72.68%
avg_F1-Score for class  1 : 49.15%
----------------------------
Precision for class  2 : 99.09%
Recall for class  2 : 64.04%
avg_F1-Score for class  2 : 77.80%
----------------------------
Precision for class  3 : 37.86%
Recall for class  3 : 76.12%
avg_F1-Score for class  3 : 50.57%
----------------------------
Precision for class  4 : 3.31%
Recall for class  4 : 76.85%
avg_F1-Score for class  4 : 6.35%
----------------------------
----------------------
avg_F1-Score: 45.96%
----------------------
[[1.0270e+03 5.5000e+01 2.8800e+02 4.3000e+01]
 [1.1370e+03 6.8712e+04 4.9540e+03 3.2499e+04]
 [6.0100e+02 2.4900e+02 3.1980e+03 1.5300e+02]
 [1.0000e+00 3.2800e+02 8.0000e+00 1.1190e+03]]
----------------------


In [37]:
model.featureImportances

SparseVector(28, {0: 0.1611, 1: 0.0984, 2: 0.4172, 3: 0.0424, 4: 0.0069, 5: 0.0001, 6: 0.0064, 7: 0.0001, 8: 0.0002, 9: 0.0, 10: 0.0066, 12: 0.0, 16: 0.0001, 17: 0.0045, 18: 0.0, 19: 0.0094, 20: 0.1724, 21: 0.0291, 22: 0.0406, 23: 0.0018, 24: 0.0003, 25: 0.0001, 26: 0.002})

In [36]:
feat_imp=model.featureImportances
from pyspark.ml.linalg import SparseVector
non_zero_elements = [(index, value) for index, value in zip(feat_imp.indices, feat_imp.values) if value != 0]
sorted_non_zero_elements = sorted(non_zero_elements, key=lambda x: x[1], reverse=True)
print('Le features più importanti per la predizione sono :')
for el in sorted_non_zero_elements:
    if el[0]>=24:
        print(num_col[len(num_col)-1], 'con un\'importanza dello', el[1])
    else:
        print(num_col[el[0]], 'con un\'importanza dello', el[1])

Le features più importanti per la predizione sono :
Distance_mi con un'importanza dello 0.4172273529766177
month con un'importanza dello 0.17235023803006808
Start_Lat con un'importanza dello 0.16113531962603805
Start_Lng con un'importanza dello 0.09843555704989608
Temperature_F con un'importanza dello 0.04242030342157565
Working_Weekend con un'importanza dello 0.04061534859422437
hour con un'importanza dello 0.029109616161463847
Traffic_Signal con un'importanza dello 0.009440405853263379
Humidity_perc con un'importanza dello 0.00692223141406977
Crossing con un'importanza dello 0.006624621042602222
Wind_Speed_mph con un'importanza dello 0.006417921782229011
Stop con un'importanza dello 0.004469155898827137
Weather_Condition_1hot con un'importanza dello 0.0019704588283467528
Astronomical_Twilight con un'importanza dello 0.0018463797212776491
Weather_Condition_1hot con un'importanza dello 0.0002640871170730634
Amenity con un'importanza dello 0.00020590118375243583
Visibility_mi con un'imp

### U+O C_ NN


In [85]:
under_1_nn=trainingData_nn.filter(col('Severity') == 1)
under_1_nn = trainingData_nn.sample(True, 0.3, seed=42)

In [86]:
oversample_0_nn=trainingData_nn.filter(col('Severity') == 0)
num_campioni_da_generare = round(n_1*0.3)
num_campioni_da_generare=(num_campioni_da_generare-n_0)/n_0 
full_0_nn=oversample_0_nn

while num_campioni_da_generare>1:
    oversample_0_nn = oversample_0_nn.unionAll(full_0_nn)
    num_campioni_da_generare=num_campioni_da_generare-1.0
    print(num_campioni_da_generare)
    
df_minority_oversampled_nn = full_0_nn.sample(True, num_campioni_da_generare, seed=42)

oversample_0_nn = oversample_0_nn.unionAll(df_minority_oversampled_nn)

20.418854415274463
19.418854415274463
18.418854415274463
17.418854415274463
16.418854415274463
15.418854415274463
14.418854415274463
13.418854415274463
12.418854415274463
11.418854415274463
10.418854415274463
9.418854415274463
8.418854415274463
7.418854415274463
6.418854415274463
5.418854415274463
4.418854415274463
3.4188544152744633
2.4188544152744633
1.4188544152744633
0.4188544152744633


In [87]:
oversample_2_nn=trainingData_nn.filter(col('Severity') == 2)
num_campioni_da_generare = round(n_1*0.3)
num_campioni_da_generare=(num_campioni_da_generare-n_2)/n_2
full_2_nn=oversample_2_nn

while num_campioni_da_generare>1:
    oversample_2_nn = oversample_2_nn.unionAll(full_2_nn)
    num_campioni_da_generare=num_campioni_da_generare-1.0
    print(num_campioni_da_generare)
    
df_minority_oversampled_nn = full_2_nn.sample(True, num_campioni_da_generare, seed=42)

oversample_2_nn = oversample_2_nn.unionAll(df_minority_oversampled_nn)

5.586109428629114
4.586109428629114
3.586109428629114
2.586109428629114
1.586109428629114
0.586109428629114


In [88]:
oversample_3_nn=trainingData_nn.filter(col('Severity') == 3)
num_campioni_da_generare = round(n_1*0.3)
num_campioni_da_generare=(num_campioni_da_generare-n_3)/n_3  
full_3_nn=oversample_3_nn

while num_campioni_da_generare>1:
    oversample_3_nn = oversample_3_nn.unionAll(full_3_nn)
    num_campioni_da_generare=num_campioni_da_generare-1.0
    print(num_campioni_da_generare)
    
df_minority_oversampled_nn = oversample_3_nn.sample(True, num_campioni_da_generare, seed=42)

oversample_3_nn = oversample_3_nn.unionAll(df_minority_oversampled_nn)

20.876103500761037
19.876103500761037
18.876103500761037
17.876103500761037
16.876103500761037
15.876103500761037
14.876103500761037
13.876103500761037
12.876103500761037
11.876103500761037
10.876103500761037
9.876103500761037
8.876103500761037
7.876103500761037
6.876103500761037
5.876103500761037
4.876103500761037
3.8761035007610367
2.8761035007610367
1.8761035007610367
0.8761035007610367


In [89]:
class0_counts=oversample_0_nn.count()
class2_counts=oversample_2_nn.count()
class3_counts=oversample_3_nn.count()
class1_counts=under_1_nn.count()
print(f'Class 0: {class0_counts}
Class 1: {class1_counts}
Class 2: {class2_counts}
Class 3: {class3_counts}')

SyntaxError: unterminated string literal (detected at line 5) (3786177059.py, line 5)

In [91]:
train_und_over_nn=oversample_0_nn.unionAll(oversample_2_nn).unionAll(oversample_3_nn).unionAll(under_1_nn)

#### U+O C_ a)

In [None]:
# MLP Layers  -> output layer must have the same number of units of the Severity classes
layers = [len(num_col),16,32,16,8,4]

# Create the Multilayer Perceptron Classifier and set its parameters
trainer = MultilayerPerceptronClassifier(
    layers=layers,
    blockSize=128,
    labelCol="Severity",
    featuresCol="features",
    maxIter=100
)

# Train the model
model = trainer.fit(train_und_over_nn)

# Make predictions on the test set
predictions= model.transform(testData_nn)

print_metrics_and_cf(predictions)

#### U+0 C_ b)

In [None]:
layers = [len(num_col),4]

# Create the Multilayer Perceptron Classifier and set its parameters
trainer = MultilayerPerceptronClassifier(
    layers=layers,
    labelCol="Severity",
    featuresCol="features",
    maxIter=100,  
    tol=1e-6,
    seed=None,
    blockSize=32,
    stepSize=0.03,  
    solver="l-bfgs",
    initialWeights=None,
    probabilityCol="probability",
    rawPredictionCol="rawPrediction"
)

# Train the model
model = trainer.fit(train_und_over)

# Make predictions on the test set
predictions= model.transform(testData)

print_metrics_and_cf(predictions)