In [1]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import *
from functions import *
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.stat import Correlation

In [2]:
spark = SparkSession.builder \
   .master("local") \
   .appName("DataPreparation") \
   .config("spark.executor.memory", "8gb") \
   .getOrCreate()

sc = spark.sparkContext

In [None]:
# preparo tutti i dataset e li metto in un dizionario per un accesso più comodo
dataset = {name:dataset_preparation(name, spark, 'data') for name in [f'PVS{i+1}' for i in range(9)]}

In [None]:
def get_structural_features(data, window_size:int=200):
    """
    Funzione che, a partire dai dati di un driving process, restituisce la rappresentazione strutturale delle time series estratte dal dataset applicando una sliding window.
    Per ogni variabile originale, le statistiche  strutturali calcolate sono la media e la deviazione standard
    """
    # piuttosto che materializzare le time series e poi calcolare le feature strutturali, calcolo direttamente le feature strutturali con la window, così risparmio moltissima memoria
    padding = data.filter(data['timestep'] == data.count()-1) # prendo l'ultimo elemento
    data = data.union(padding) # lo duplico alla fine del dataset, per poter calcolare la deviazione standard anche per l'ultima finestra (altrimenti darebbe errore)
    cols = data.columns

    # la sliding window
    window = Window.orderBy("timestep").rowsBetween(0, window_size)

    # le funzioni aggregative che rappresentato le features strutturali
    features = [F.avg(col).over(window).alias(f'avg_{col}') for col in cols[1:-1]]\
        + [F.std(col).over(window).alias(f'std_{col}') for col in cols[3:-1]]


    data_proc = data.select(
        'timestep',
        *features,
        F.mode('road_condition').over(window).alias(f'road_condition')
    )

    data_proc = data_proc.dropDuplicates(subset=['timestep']) # rimuovo il record che si era formato alla fine per il padding

    return data_proc

In [5]:
pvs1_proc = get_structural_features(dataset['PVS1'])
pvs7_proc = get_structural_features(dataset['PVS7'])

In [None]:
# unisco PVS1 e PVS7
pvs_proc = pvs1_proc.union(pvs7_proc)

In [8]:
pvs_proc.show(5)

+--------+-------------------+-------------------+--------------------+---------------------+---------------------+---------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------+----------------------+----------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+---------------------+---------------------+---------------------+----------------------------+----------------------------+----------------------------+--------------------+---------------------------+---------------------------+---------------------+---------------------+---------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------

In [9]:
pvs_proc.count()

272584

In [10]:
# vettorizzo i dati
assembler = VectorAssembler(inputCols=pvs_proc.columns[3:-1], outputCol='features')
pvs_proc_vect = assembler.transform(pvs_proc)

In [11]:
# creao una colonna label con la label numerica
indexer = StringIndexer(inputCol="road_condition", outputCol="label", handleInvalid="skip")
pvs_proc_vect = pvs_proc_vect.select('timestep', 'features', 'road_condition')
pvs_proc_vect = indexer.fit(pvs_proc_vect).transform(pvs_proc_vect)

# Feature Selection

Ora faccio il processo di feature selection, analizzando la correlazione tra le feature, per un campione dei dati che comprende PVS1 e PVS7, ho scelto questi due driving process perchè entrambi appartenenti allo scenario 1 (ho preferito evitare di mischiare insieme informazioni da più scenari); ho escluso PVS4 perchè i dati del guidatore 2 successivamente li userò per il test set della classificazione, quindi non voglio toccarli in nessuna analisi.

In [12]:
# calcolo la correlation matrix tra le features
matrix = Correlation.corr(pvs_proc_vect.select('features'), 'features')

In [None]:
# la matrice è solo 110x110, quindi posso fare collect
corr_matrix = matrix.collect()[0][0].toArray().tolist() 
corr_matrix_df = pd.DataFrame(data=corr_matrix, columns=pvs1_proc.columns[3:-1], index=pvs1_proc.columns[3:-1]) 

In [14]:

corr_matrix_df.shape

(110, 110)

## correlation matrix pre features selection (Sconsiglio di visualizzarla perchè troppo grande)

In [110]:
#corr_matrix_df.style.background_gradient(cmap='coolwarm')

## Correlation Feature Selection

In [None]:
# stampo tutte le coppie di feature con correlazione assolut >= 0.75, poi sceglierò arbitrariamente quali tenere
# scorro solo gli elementi sopra la diagonale principale tanto la matrice è simmetrica
z = 0
for i in range(corr_matrix_df.shape[0]):
    for j in range(0+z, corr_matrix_df.shape[1]):
        if abs(corr_matrix_df.iloc[i,j]) >= 0.75 and i!=j:
            print(corr_matrix_df.columns[i], corr_matrix_df.columns[j], corr_matrix_df.iloc[i,j])
    z += 1

avg_acc_x_dashboard_L avg_acc_x_above_suspension_L 0.9817201870053519
avg_acc_x_dashboard_L avg_acc_x_below_suspension_L 0.9182939800492064
avg_acc_x_dashboard_L avg_acc_x_dashboard_R 0.9590750242048502
avg_acc_x_dashboard_L avg_acc_x_above_suspension_R 0.9289181800277347
avg_acc_x_dashboard_L avg_acc_x_below_suspension_R 0.9203889498122404
avg_acc_y_dashboard_L avg_acc_y_above_suspension_L 0.9815718749149694
avg_acc_y_dashboard_L avg_acc_y_below_suspension_L 0.9326025890901657
avg_acc_y_dashboard_L avg_acc_y_dashboard_R 0.980995450486379
avg_acc_y_dashboard_L avg_acc_y_above_suspension_R 0.9753539894584256
avg_acc_y_dashboard_L avg_acc_y_below_suspension_R 0.9151238880701226
avg_acc_x_above_suspension_L avg_acc_x_below_suspension_L 0.8977312740038976
avg_acc_x_above_suspension_L avg_acc_x_dashboard_R 0.9486922075337724
avg_acc_x_above_suspension_L avg_acc_x_above_suspension_R 0.926451993927171
avg_acc_x_above_suspension_L avg_acc_x_below_suspension_R 0.8985559422832139
avg_acc_y_above

Per togliere le feature inutili, ho dovuto analizzare tutte le coppie di features molto correlate, e scegliere arbitrariamente quali tenere.


In [None]:
# risultati della selezione:
all_avg_acc_z = [col for col in corr_matrix_df.columns if col[:9] == 'avg_acc_z']
all_std_mag = [col for col in corr_matrix_df.columns if col[:7] == 'std_mag' and not col[-11:] =='dashboard_R']


to_keep = ['avg_speed', 'avg_acc_x_dashboard_L', 'avg_acc_y_dashboard_L', *all_avg_acc_z, 'avg_gyro_x_dashboard_L', 'avg_gyro_y_dashboard_L', 'avg_gyro_z_dashboard_L',
            'avg_mag_z_dashboard_L', 'avg_temp_dashboard_L', 'avg_temp_above_suspension_L',
           'std_acc_z_dashboard_L', 'std_gyro_z_dashboard_L', *all_std_mag, 'std_temp_dashboard_L',
           'std_temp_above_suspension_L', 'std_temp_below_suspension_L']

In [17]:
corr_matrix_df.index = range(110)
corr_matrix_df = corr_matrix_df[to_keep].iloc[[list(corr_matrix_df.columns).index(el) for el in to_keep]]
corr_matrix_df.index = list(corr_matrix_df.columns)

In [None]:
# solo 29 feature rimaste
corr_matrix_df.shape

(29, 29)

## correlation matrix finale

In [19]:
corr_matrix_df.style.background_gradient(cmap='coolwarm')

Unnamed: 0,avg_speed,avg_acc_x_dashboard_L,avg_acc_y_dashboard_L,avg_acc_z_dashboard_L,avg_acc_z_above_suspension_L,avg_acc_z_below_suspension_L,avg_acc_z_dashboard_R,avg_acc_z_above_suspension_R,avg_acc_z_below_suspension_R,avg_gyro_x_dashboard_L,avg_gyro_y_dashboard_L,avg_gyro_z_dashboard_L,avg_mag_z_dashboard_L,avg_temp_dashboard_L,avg_temp_above_suspension_L,std_acc_z_dashboard_L,std_gyro_z_dashboard_L,std_mag_x_dashboard_L,std_mag_y_dashboard_L,std_mag_z_dashboard_L,std_mag_x_above_suspension_L,std_mag_y_above_suspension_L,std_mag_z_above_suspension_L,std_mag_x_above_suspension_R,std_mag_y_above_suspension_R,std_mag_z_above_suspension_R,std_temp_dashboard_L,std_temp_above_suspension_L,std_temp_below_suspension_L
avg_speed,1.0,0.092652,-0.021543,0.173405,-0.01604,0.064763,0.126589,0.078763,0.043729,0.022742,0.020625,-0.010285,-0.116605,-0.183885,-0.321129,-0.206328,-0.309773,-0.174545,-0.066964,-0.07642,-0.021582,-0.161902,0.101127,0.000137,0.124541,0.202487,0.108233,-0.032584,-0.015165
avg_acc_x_dashboard_L,0.092652,1.0,-0.077535,0.439381,-0.314416,-0.055823,0.410567,0.388999,0.226781,0.384075,-0.192095,-0.637445,-0.087472,-0.006903,-0.139377,-0.078979,-0.14936,-0.090088,-0.1509,-0.03495,0.013476,0.020177,-0.026416,-0.011837,-0.025229,-0.080114,0.074193,-0.00808,-0.05573
avg_acc_y_dashboard_L,-0.021543,-0.077535,1.0,-0.272315,0.031606,-0.074735,-0.271629,0.121781,-0.10481,-0.033995,-0.026607,-0.009501,0.186419,-0.080115,0.239167,-0.046844,0.05737,0.040709,0.053638,-0.054845,-0.043308,-0.065946,-0.076847,0.005974,-0.052042,-0.159015,-0.000669,0.012098,-0.046399
avg_acc_z_dashboard_L,0.173405,0.439381,-0.272315,1.0,0.019675,0.051683,0.504029,0.338767,0.253375,0.455612,-0.063896,-0.271325,-0.098794,0.026644,-0.181479,-0.05341,-0.157146,-0.123892,-0.098871,-0.038771,0.001747,-0.003648,-0.008387,-0.010662,0.027006,0.039016,0.01899,0.006265,-0.024344
avg_acc_z_above_suspension_L,-0.01604,-0.314416,0.031606,0.019675,1.0,0.515636,0.046985,0.036895,0.029787,-0.002143,0.07146,0.217697,0.0342,-0.003067,0.09179,0.04862,0.061502,-0.029961,0.061719,-0.00856,-0.043872,-0.055728,0.011427,-0.016954,0.03456,0.028474,0.016552,-0.100302,0.00392
avg_acc_z_below_suspension_L,0.064763,-0.055823,-0.074735,0.051683,0.515636,1.0,0.148984,0.111175,0.083142,0.085218,-0.006627,0.013155,-0.003496,0.016537,0.002379,-0.048727,-0.012562,-0.015275,0.026402,-0.004766,-0.017035,-0.017859,-0.030863,0.033223,0.001576,-0.012799,0.121916,-0.071079,0.032474
avg_acc_z_dashboard_R,0.126589,0.410567,-0.271629,0.504029,0.046985,0.148984,1.0,0.382592,0.002103,0.430591,-0.082644,-0.261652,-0.088259,0.084263,-0.17209,-0.071178,-0.135132,-0.109714,-0.097517,-0.055961,-0.006486,0.027319,-0.081141,0.067877,0.021406,-0.014768,0.058557,0.003407,0.001584
avg_acc_z_above_suspension_R,0.078763,0.388999,0.121781,0.338767,0.036895,0.111175,0.382592,1.0,0.444033,0.356184,-0.086862,-0.282168,-0.038111,-0.033708,0.056299,-0.008485,-0.069135,-0.077887,-0.061874,-0.060775,0.004312,0.002065,-0.039436,0.045019,-0.003601,-0.038653,0.049435,0.008675,-0.018217
avg_acc_z_below_suspension_R,0.043729,0.226781,-0.10481,0.253375,0.029787,0.083142,0.002103,0.444033,1.0,0.209222,-0.054933,-0.151776,-0.065813,0.033789,-0.059494,-0.025643,-0.076992,-0.048377,-0.058104,-0.003614,-0.040968,-0.014026,-0.004554,0.011429,0.014818,0.022155,0.043018,-0.026188,0.021592
avg_gyro_x_dashboard_L,0.022742,0.384075,-0.033995,0.455612,-0.002143,0.085218,0.430591,0.356184,0.209222,1.0,-0.18203,-0.581557,-0.017171,0.111659,-0.049859,0.008144,-0.096768,-0.127481,-0.155647,-0.029878,0.007495,0.056461,-0.077106,0.04795,-0.027537,-0.032429,0.0786,-0.011367,-0.029392


### Commenti aggiuntivi

Ho selezionato aribitrariamente le features da rimuovere, non ho fatto uno script automatico perchè non mi è venuta in mente nessuna euristica per decidere quale tenere tra un gruppo di feature correlate. Questo motiva il fatto che questa parte del notebook abbia un codice abbastanza "sporco" e poco parametrico.

Vista la naturale ridondanza tra queste feature, mi sembra improbabile che sia stata rimossa qualche feature importante.

Sono rimaste solo 29 feature nel dataset, adesso nessuna coppia di features ha una correlazione lineare maggiore di 0.75.


## Final Data Processing

Ora che ho una lista delle feature da tenere posso selezionarle da ogni dataset PVS

In [21]:
to_keep = ['road_condition','avg_longitude','avg_latitude'] + to_keep

In [23]:
dataset = {name:get_structural_features(data).select(to_keep) for name,data in dataset.items()}

In [25]:
dataset['PVS3'].show(5)

+--------------+-------------------+-------------------+--------------------+---------------------+---------------------+---------------------+----------------------------+----------------------------+---------------------+----------------------------+----------------------------+----------------------+----------------------+----------------------+---------------------+--------------------+---------------------------+---------------------+----------------------+---------------------+---------------------+---------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+--------------------+---------------------------+---------------------------+
|road_condition|      avg_longitude|       avg_latitude|           avg_speed|avg_acc_x_dashboard_L|avg_acc_y_dashboard_L|avg_acc_z_dashboard_L|avg_acc_z_above_suspension_L|avg_acc_z_below_suspension_L|avg_acc_z_dashb

In [26]:
# salvo il dataset in csv
for name,data in dataset.items():
    data.write.option("header", True).mode("overwrite").csv(f'processed_data/{name}')