In [24]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, desc, count, mean, stddev, log
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, second
from pyspark.ml.feature import StringIndexer

In [25]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("accidents") \
    .config("spark.executor.memory", "6g") \
    .config("spark.driver.memory", "6g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.memory.storageFraction", "0.3") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

data = spark.read.csv(f"file:///home/clement/2026/S8/Projet_commun/data/single_csv", header=True, inferSchema=True)
data.show(5)



+---------+-------+--------+-------------------+-------------------+------------------+------------------+-------+-------+------------------+--------------------+-----------------+----------+-----------+-----+-------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|       ID| Source|Severity|         Start_Time|           End_Time|         Start_Lat|         Start_Lng|End_Lat|End_Lng|      Distance(mi)|         Description|           Street|      City|     County|State|Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Con

                                                                                

In [26]:
def print_null_counts(df, show_count=20):
    null_df = df.select(
        [
            sum(col(c).isNull().cast("int")).alias(c)
            for c in df.columns
        ]
    )

    rows = null_df.collect()
    counts = {
        c: rows[0][i] for i, c in enumerate(df.columns)
    }

    counts_df = spark.createDataFrame([(k, v) for k, v in counts.items()], ["Column name", "Null Count"])
    counts_df.orderBy("Null Count", ascending=False).show(show_count)

In [27]:
# Calcul du nombre de valeurs manquantes pour chaque colonne
missing_counts = data.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in data.columns
])

print_null_counts(data, 23)

# Conversion des résultats en dictionnaire Python
missing_dict = missing_counts.collect()[0].asDict()

# Liste des colonnes ayant plus de 1M valeurs manquantes
columns_with_missing_gt_1M = [col_name for col_name, count in missing_dict.items() if count > 1000000]

# Suppression de ces colonnes trop incomplètes
data = data.drop(*columns_with_missing_gt_1M)

data.show(5)

                                                                                

+--------------------+----------+
|         Column name|Null Count|
+--------------------+----------+
|             End_Lat|   3402762|
|             End_Lng|   3402762|
|       Wind_Chill(F)|   1999019|
|     Wind_Speed(mph)|    571233|
|   Precipitation(in)|    513662|
|      Visibility(mi)|    177098|
|      Wind_Direction|    175206|
|         Humidity(%)|    174144|
|   Weather_Condition|    173459|
|      Temperature(F)|    163853|
|        Pressure(in)|    140679|
|   Weather_Timestamp|    120228|
|      Sunrise_Sunset|     23246|
|      Civil_Twilight|     23246|
|   Nautical_Twilight|     23246|
|Astronomical_Twil...|     23246|
|        Airport_Code|     22635|
|              Street|     10869|
|            Timezone|      7808|
|             Zipcode|      1915|
|                City|       253|
|         Description|         5|
|          Start_Time|         0|
+--------------------+----------+
only showing top 23 rows





+---------+-------+--------+-------------------+-------------------+------------------+------------------+------------------+--------------------+-----------------+----------+-----------+-----+-------+-------+----------+------------+-------------------+--------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|       ID| Source|Severity|         Start_Time|           End_Time|         Start_Lat|         Start_Lng|      Distance(mi)|         Description|           Street|      City|     County|State|Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|Junction|No_Exit|Rail

                                                                                

In [28]:
# Suppression de certaines colonnes non pertinentes ou redondantes
data = data.drop("Description", "ID", "Source")

# Liste des colonnes catégorielles contenant des valeurs manquantes à combler par la modalité la plus fréquente
features_cat = ["Street", "City", "Zipcode", "Timezone", "Airport_Code", "Sunrise_Sunset", "Civil_Twilight", "Nautical_Twilight", "Astronomical_Twilight"]

# Remplissage des valeurs manquantes par la modalité la plus fréquente pour chaque colonne catégorielle
for feature in features_cat:
    mode_value = data.groupBy(feature).count().orderBy(desc("count")).first()[0]
    data = data.fillna({feature: mode_value})

print("Nombre de lignes avant suppression des valeurs manquantes :", data.count())

data = data.na.drop()

print("Nombre de lignes après suppresion des valeurs manquantes :", data.count())

print_null_counts(data, 30)

                                                                                

Nombre de lignes avant suppression des valeurs manquantes : 7728394


                                                                                

Nombre de lignes après suppresion des valeurs manquantes : 7081140


                                                                                

+-----------------+----------+
|      Column name|Null Count|
+-----------------+----------+
|   Sunrise_Sunset|         0|
|       Roundabout|         0|
|Precipitation(in)|         0|
|          Station|         0|
|         Severity|         0|
|             Stop|         0|
|Weather_Condition|         0|
|            State|         0|
|          Amenity|         0|
|       Start_Time|         0|
|         Junction|         0|
|         End_Time|         0|
|          Zipcode|         0|
|         Timezone|         0|
|          Country|         0|
|          No_Exit|         0|
|             Bump|         0|
|          Railway|         0|
|     Airport_Code|         0|
|   Temperature(F)|         0|
|Weather_Timestamp|         0|
|         Crossing|         0|
|  Traffic_Calming|         0|
|         Give_Way|         0|
|      Humidity(%)|         0|
|        Start_Lat|         0|
|     Pressure(in)|         0|
|   Traffic_Signal|         0|
|   Visibility(mi)|         0|
|       

In [29]:
# Conversion des colonnes booléennes en entiers (0 ou 1)
boolean_features = ["Amenity", "Bump", "Crossing", "Give_Way", "Junction", "No_Exit", "Railway", "Roundabout", "Station", "Stop", "Traffic_Calming", "Traffic_Signal", "Turning_Loop"]
for feature in boolean_features:
    data = data.withColumn(feature, col(feature).cast("int"))

# Transformation des colonnes binaires en index numériques avec StringIndexer
binary_features = ["Sunrise_Sunset", "Civil_Twilight", "Nautical_Twilight", "Astronomical_Twilight"]
for feature in binary_features:
    indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index")
    data = indexer.fit(data).transform(data)
    data = data.drop(feature)

# Calcul de la durée d'un accident en minutes
data = data.withColumn("duration", (col("End_Time").cast("long") - col("Start_Time").cast("long")) / 60)

# Extraction des composantes temporelles de Start_Time puis suppression de la colonne initiale
data = data.withColumn("Start_Year", year(col("Start_Time"))) \
        .withColumn("Start_Month", month(col("Start_Time"))) \
        .withColumn("Start_Day", dayofmonth(col("Start_Time"))) \
        .withColumn("Start_Hour", hour(col("Start_Time"))) \
        .withColumn("Start_Minute", minute(col("Start_Time"))) \
        .withColumn("Start_Second", second(col("Start_Time"))) \
        .drop("Start_Time")

# Extraction des composantes temporelles de End_Time puis suppression de la colonne initiale
data = data.withColumn("End_Year", year(col("End_Time"))) \
        .withColumn("End_Month", month(col("End_Time"))) \
        .withColumn("End_Day", dayofmonth(col("End_Time"))) \
        .withColumn("End_Hour", hour(col("End_Time"))) \
        .withColumn("End_Minute", minute(col("End_Time"))) \
        .withColumn("End_Second", second(col("End_Time"))) \
        .drop("End_Time")

# Extraction des composantes temporelles de Weather_Timestamp puis suppression de la colonne initiale
data = data.withColumn("Weather_Year", year(col("Weather_Timestamp"))) \
        .withColumn("Weather_Month", month(col("Weather_Timestamp"))) \
        .withColumn("Weather_Day", dayofmonth(col("Weather_Timestamp"))) \
        .withColumn("Weather_Hour", hour(col("Weather_Timestamp"))) \
        .withColumn("Weather_Minute", minute(col("Weather_Timestamp"))) \
        .withColumn("Weather_Second", second(col("Weather_Timestamp"))) \
        .drop("Weather_Timestamp")

# Suppression de colonnes redondantes ou géographiques trop fines
data = data.drop("Street", "City", "County", "Country", "Zipcode", "Airport_Code")

# Indexation des colonnes catégorielles restantes
features = ["Timezone", "State", "Wind_Direction", "Weather_Condition"]
for feature in features:
    indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index")
    data = indexer.fit(data).transform(data)
    data = data.drop(feature)

# Ajout d'une colonne booléenne pour identifier si l'accident a eu lieu le weekend
# et d'une autre pour savoir si l'accident s’est produit la nuit (avant 6h du matin)
data = data.withColumn("is_weekend", (col("Start_Day") >= 6).cast("int")) \
           .withColumn("is_night", (col("Start_Hour") < 6).cast("int"))

# Affichage des 5 premières lignes du DataFrame final
data.show(5)

                                                                                

+--------+------------------+------------------+------------------+--------------+-----------+------------+--------------+---------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------------+--------------------+-----------------------+---------------------------+------------------+----------+-----------+---------+----------+------------+------------+--------+---------+-------+--------+----------+----------+------------+-------------+-----------+------------+--------------+--------------+--------------+-----------+--------------------+-----------------------+----------+--------+
|Severity|         Start_Lat|         Start_Lng|      Distance(mi)|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Speed(mph)|Precipitation(in)|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset_index|Civ

# Modèles

In [30]:
from pyspark.ml.feature import VectorAssembler

feature_cols = ["Start_Lat", "Start_Lng", "Distance(mi)", "Temperature(F)", "Humidity(%)", "Pressure(in)",
                "Wind_Speed(mph)", "Precipitation(in)", "Amenity", "Bump", "Crossing", "Give_Way",
                "Junction", "No_Exit", "Railway", "Roundabout", "Station", "Stop", "Traffic_Calming", 
                "Traffic_Signal", "Turning_Loop","Sunrise_Sunset_index", "Civil_Twilight_index", 
                "Nautical_Twilight_index", "Astronomical_Twilight_index", "duration", "Start_Year", 
                "Start_Month", "Start_Day", "Start_Hour", "Start_Minute", "Start_Second", "End_Year", 
                "End_Month", "End_Day", "End_Hour", "End_Minute", "End_Second", "Weather_Year", "Weather_Month", 
                "Weather_Day", "Weather_Hour", "Weather_Minute", "Weather_Second", "Timezone_index", 
                "State_index", "Wind_Direction_index", "Weather_Condition_index", "is_weekend", "is_night"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data = assembler.transform(data)

In [31]:
train_df, test_df = data.randomSplit([0.8, 0.2], seed=42)

print("Train set size:", train_df.count())
print("Test set size:", test_df.count())

                                                                                

Train set size: 5664829




Test set size: 1416311


                                                                                

## Prédiction de sévérité

In [9]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Entraînement du modèle Random Forest
rf = RandomForestClassifier(labelCol="Severity", featuresCol="features", numTrees=25, maxDepth=7, maxBins=192, seed=42)
rf_model = rf.fit(train_df)
rf_pred = rf_model.transform(test_df)
rf_pred.select("features", "Severity", "prediction", "probability").show(5)

# Évaluation avec différentes métriques
# Accuracy
rf_evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="accuracy")
rf_accuracy = rf_evaluator_accuracy.evaluate(rf_pred)
print("Test set accuracy with Random Forest:", rf_accuracy)

# Precision (macro-average)
rf_evaluator_precision = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="weightedPrecision")
rf_precision = rf_evaluator_precision.evaluate(rf_pred)
print("Test set precision with Random Forest:", rf_precision)

# Recall (macro-average)
rf_evaluator_recall = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="weightedRecall")
rf_recall = rf_evaluator_recall.evaluate(rf_pred)
print("Test set recall with Random Forest:", rf_recall)

# F1-Score (macro-average)
rf_evaluator_f1 = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="f1")
rf_f1 = rf_evaluator_f1.evaluate(rf_pred)
print("Test set F1-score with Random Forest:", rf_f1)

# Résumé des métriques
print("\n=== Résumé des métriques Random Forest ===")
print(f"Accuracy:  {rf_accuracy:.4f}")
print(f"Precision: {rf_precision:.4f}")
print(f"Recall:    {rf_recall:.4f}")
print(f"F1-Score:  {rf_f1:.4f}")

from pyspark.sql.functions import col

# Affichage de la distribution des prédictions par classe
print("\n=== Distribution des prédictions ===")
rf_pred.groupBy("Severity", "prediction").count().orderBy("Severity", "prediction").show()

# Pour obtenir des métriques par classe individuellement
print("\n=== Métriques par classe ===")
classes = rf_pred.select("Severity").distinct().collect()
for row in classes:
    class_label = row["Severity"]
    print(f"\nClasse {class_label}:")
    
    # Créer des prédictions binaires pour cette classe
    binary_pred = rf_pred.withColumn("binary_label", 
                                   (col("Severity") == class_label).cast("double")) \
                        .withColumn("binary_prediction", 
                                   (col("prediction") == class_label).cast("double"))
    
    # Calculer les métriques pour cette classe
    tp = binary_pred.filter((col("binary_label") == 1.0) & (col("binary_prediction") == 1.0)).count()
    fp = binary_pred.filter((col("binary_label") == 0.0) & (col("binary_prediction") == 1.0)).count()
    tn = binary_pred.filter((col("binary_label") == 0.0) & (col("binary_prediction") == 0.0)).count()
    fn = binary_pred.filter((col("binary_label") == 1.0) & (col("binary_prediction") == 0.0)).count()
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    print(f"  Precision: {precision:.4f}")
    print(f"  Recall:    {recall:.4f}")
    print(f"  F1-Score:  {f1:.4f}")
    print(f"  Support:   {tp + fn}")

                                                                                

+--------------------+--------+----------+--------------------+
|            features|Severity|prediction|         probability|
+--------------------+--------+----------+--------------------+
|(50,[0,1,3,4,5,6,...|       1|       2.0|[0.0,0.0928746386...|
|(50,[0,1,3,4,5,6,...|       1|       2.0|[0.0,0.1693104796...|
|[26.08409,-80.192...|       1|       2.0|[0.0,0.0228321814...|
|(50,[0,1,3,4,5,6,...|       1|       2.0|[0.0,0.1255024251...|
|(50,[0,1,3,4,5,6,...|       1|       2.0|[0.0,0.1472458607...|
+--------------------+--------+----------+--------------------+
only showing top 5 rows



                                                                                

Test set accuracy with Random Forest: 0.8056366151219612


                                                                                

Test set precision with Random Forest: 0.784744830326955


                                                                                

Test set recall with Random Forest: 0.8056366151219612


                                                                                

Test set F1-score with Random Forest: 0.7201696062764191

=== Résumé des métriques Random Forest ===
Accuracy:  0.8056
Precision: 0.7847
Recall:    0.8056
F1-Score:  0.7202

=== Distribution des prédictions ===


                                                                                

+--------+----------+-------+
|Severity|prediction|  count|
+--------+----------+-------+
|       1|       2.0|  12899|
|       2|       2.0|1139473|
|       2|       3.0|    231|
|       3|       2.0| 225863|
|       3|       3.0|   1559|
|       4|       2.0|  36242|
|       4|       3.0|     44|
+--------+----------+-------+


=== Métriques par classe ===


                                                                                


Classe 1:


                                                                                

  Precision: 0.0000
  Recall:    0.0000
  F1-Score:  0.0000
  Support:   12899

Classe 3:


                                                                                

  Precision: 0.8501
  Recall:    0.0069
  F1-Score:  0.0136
  Support:   227422

Classe 4:


                                                                                

  Precision: 0.0000
  Recall:    0.0000
  F1-Score:  0.0000
  Support:   36286

Classe 2:




  Precision: 0.8056
  Recall:    0.9998
  F1-Score:  0.8922
  Support:   1139704


                                                                                

In [10]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Entraînement du modèle Random Forest
rf = RandomForestClassifier(labelCol="Severity", featuresCol="features", numTrees=25, maxDepth=10, maxBins=192, seed=42)
rf_model = rf.fit(train_df)
rf_pred = rf_model.transform(test_df)
rf_pred.select("features", "Severity", "prediction", "probability").show(5)

# Évaluation avec différentes métriques
# Accuracy
rf_evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="accuracy")
rf_accuracy = rf_evaluator_accuracy.evaluate(rf_pred)
print("Test set accuracy with Random Forest:", rf_accuracy)

# Precision (macro-average)
rf_evaluator_precision = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="weightedPrecision")
rf_precision = rf_evaluator_precision.evaluate(rf_pred)
print("Test set precision with Random Forest:", rf_precision)

# Recall (macro-average)
rf_evaluator_recall = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="weightedRecall")
rf_recall = rf_evaluator_recall.evaluate(rf_pred)
print("Test set recall with Random Forest:", rf_recall)

# F1-Score (macro-average)
rf_evaluator_f1 = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="f1")
rf_f1 = rf_evaluator_f1.evaluate(rf_pred)
print("Test set F1-score with Random Forest:", rf_f1)

# Résumé des métriques
print("\n=== Résumé des métriques Random Forest ===")
print(f"Accuracy:  {rf_accuracy:.4f}")
print(f"Precision: {rf_precision:.4f}")
print(f"Recall:    {rf_recall:.4f}")
print(f"F1-Score:  {rf_f1:.4f}")

from pyspark.sql.functions import col

# Affichage de la distribution des prédictions par classe
print("\n=== Distribution des prédictions ===")
rf_pred.groupBy("Severity", "prediction").count().orderBy("Severity", "prediction").show()

# Pour obtenir des métriques par classe individuellement
print("\n=== Métriques par classe ===")
classes = rf_pred.select("Severity").distinct().collect()
for row in classes:
    class_label = row["Severity"]
    print(f"\nClasse {class_label}:")
    
    # Créer des prédictions binaires pour cette classe
    binary_pred = rf_pred.withColumn("binary_label", 
                                   (col("Severity") == class_label).cast("double")) \
                        .withColumn("binary_prediction", 
                                   (col("prediction") == class_label).cast("double"))
    
    # Calculer les métriques pour cette classe
    tp = binary_pred.filter((col("binary_label") == 1.0) & (col("binary_prediction") == 1.0)).count()
    fp = binary_pred.filter((col("binary_label") == 0.0) & (col("binary_prediction") == 1.0)).count()
    tn = binary_pred.filter((col("binary_label") == 0.0) & (col("binary_prediction") == 0.0)).count()
    fn = binary_pred.filter((col("binary_label") == 1.0) & (col("binary_prediction") == 0.0)).count()
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    print(f"  Precision: {precision:.4f}")
    print(f"  Recall:    {recall:.4f}")
    print(f"  F1-Score:  {f1:.4f}")
    print(f"  Support:   {tp + fn}")

                                                                                

+--------------------+--------+----------+--------------------+
|            features|Severity|prediction|         probability|
+--------------------+--------+----------+--------------------+
|(50,[0,1,3,4,5,6,...|       1|       2.0|[0.0,0.2024387827...|
|(50,[0,1,3,4,5,6,...|       1|       2.0|[0.0,0.3757718477...|
|[26.08409,-80.192...|       1|       2.0|[0.0,0.0245557996...|
|(50,[0,1,3,4,5,6,...|       1|       2.0|[0.0,0.1814488025...|
|(50,[0,1,3,4,5,6,...|       1|       2.0|[0.0,0.2682613632...|
+--------------------+--------+----------+--------------------+
only showing top 5 rows



                                                                                

Test set accuracy with Random Forest: 0.8235465233271506


                                                                                

Test set precision with Random Forest: 0.8113018952197439


                                                                                

Test set recall with Random Forest: 0.8235465233271506


                                                                                

Test set F1-score with Random Forest: 0.7685228593513731

=== Résumé des métriques Random Forest ===
Accuracy:  0.8235
Precision: 0.8113
Recall:    0.8235
F1-Score:  0.7685

=== Distribution des prédictions ===


                                                                                

+--------+----------+-------+
|Severity|prediction|  count|
+--------+----------+-------+
|       1|       1.0|   1265|
|       1|       2.0|  11425|
|       1|       3.0|    209|
|       2|       1.0|    117|
|       2|       2.0|1129408|
|       2|       3.0|  10131|
|       2|       4.0|     48|
|       3|       1.0|     58|
|       3|       2.0| 191876|
|       3|       3.0|  35410|
|       3|       4.0|     78|
|       4|       1.0|      6|
|       4|       2.0|  34438|
|       4|       3.0|   1527|
|       4|       4.0|    315|
+--------+----------+-------+


=== Métriques par classe ===


                                                                                


Classe 1:


                                                                                

  Precision: 0.8748
  Recall:    0.0981
  F1-Score:  0.1764
  Support:   12899

Classe 3:


                                                                                

  Precision: 0.7490
  Recall:    0.1557
  F1-Score:  0.2578
  Support:   227422

Classe 4:


                                                                                

  Precision: 0.7143
  Recall:    0.0087
  F1-Score:  0.0172
  Support:   36286

Classe 2:




  Precision: 0.8261
  Recall:    0.9910
  F1-Score:  0.9011
  Support:   1139704


                                                                                

## Prédiction de distance

In [33]:
data = data.drop("features")
data.show(5)

+--------+------------------+------------------+------------------+--------------+-----------+------------+--------------+---------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------------+--------------------+-----------------------+---------------------------+------------------+----------+-----------+---------+----------+------------+------------+--------+---------+-------+--------+----------+----------+------------+-------------+-----------+------------+--------------+--------------+--------------+-----------+--------------------+-----------------------+----------+--------+
|Severity|         Start_Lat|         Start_Lng|      Distance(mi)|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Speed(mph)|Precipitation(in)|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset_index|Civ

In [35]:
# Conversion miles vers mètres et remplacement de la colonne originale
# 1 mile = 1609.344 mètres
MILES_TO_METERS = 1609.344

data = data.withColumn("Distance(m)", col("Distance(mi)") * MILES_TO_METERS) \
           .drop("Distance(mi)")

print("Conversion effectuée ! La colonne Distance(mi) a été remplacée par Distance(m)")
print("\nAperçu des premières lignes:")
data.select("Distance(m)").show(10)

Conversion effectuée ! La colonne Distance(mi) a été remplacée par Distance(m)

Aperçu des premières lignes:
+-----------------+
|      Distance(m)|
+-----------------+
|              0.0|
|2751.978301391602|
|              0.0|
|              0.0|
|              0.0|
|              0.0|
|              0.0|
|3347.435397216797|
|              0.0|
|              0.0|
+-----------------+
only showing top 10 rows



In [36]:
data.show(5)

+--------+------------------+------------------+--------------+-----------+------------+--------------+---------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------------+--------------------+-----------------------+---------------------------+------------------+----------+-----------+---------+----------+------------+------------+--------+---------+-------+--------+----------+----------+------------+-------------+-----------+------------+--------------+--------------+--------------+-----------+--------------------+-----------------------+----------+--------+-----------------+
|Severity|         Start_Lat|         Start_Lng|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Speed(mph)|Precipitation(in)|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset_index|Civil_Twilight_index|Na

In [37]:
sample_size = 0.001
sample_data = data.select("Distance(m)").sample(sample_size, seed=42).toPandas()

print(f"Taille de l'échantillon pour visualisation: {len(sample_data)} lignes")


meters_mean = sample_data['Distance(m)'].mean()
meters_median = sample_data['Distance(m)'].median()

print("\n=== STATISTIQUES DISTANCE (MÈTRES) ===")
print(f"  - Moyenne: {meters_mean:.0f} mètres")
print(f"  - Médiane: {meters_median:.0f} mètres")
print(f"  - Min: {sample_data['Distance(m)'].min():.0f} mètres")
print(f"  - Max: {sample_data['Distance(m)'].max():.0f} mètres")
print(f"  - Écart-type: {sample_data['Distance(m)'].std():.0f} mètres")

                                                                                

Taille de l'échantillon pour visualisation: 7097 lignes

=== STATISTIQUES DISTANCE (MÈTRES) ===
  - Moyenne: 963 mètres
  - Médiane: 64 mètres
  - Min: 0 mètres
  - Max: 111383 mètres
  - Écart-type: 3256 mètres


In [38]:
MAX_DISTANCE_METERS = 16093.44

print("Nombre de lignes avant filtrage:", data.count())
print(f"Nombre de lignes avec distance > {MAX_DISTANCE_METERS} m:", data.filter(col("Distance(m)") > MAX_DISTANCE_METERS).count())
print(f"Nombre de lignes avec distance = 0 m:", data.filter(col("Distance(m)") == 0).count())

data = data.filter((col("Distance(m)") <= MAX_DISTANCE_METERS) & (col("Distance(m)") > 0))

print("Nombre de lignes après filtrage:", data.count())

                                                                                

Nombre de lignes avant filtrage: 7081140


                                                                                

Nombre de lignes avec distance > 16093.44 m: 32010


                                                                                

Nombre de lignes avec distance = 0 m: 2961619




Nombre de lignes après filtrage: 4087511


                                                                                

In [39]:
data_log = data.withColumn(
    "Distance_Log", 
    when(col("Distance(m)") > 0, log(col("Distance(m)") + 1)).otherwise(0)
)

print("\nComparaison - Distance en mètres vs log transformée:")
data_log.select(
    mean(col("Distance(m)")).alias("mean_meters"),
    stddev(col("Distance(m)")).alias("std_meters"),
    mean(col("Distance_Log")).alias("mean_log"),
    stddev(col("Distance_Log")).alias("std_log")
).show() 


Comparaison - Distance en mètres vs log transformée:




+------------------+------------------+-----------------+------------------+
|       mean_meters|        std_meters|         mean_log|           std_log|
+------------------+------------------+-----------------+------------------+
|1369.5320511864238|2148.9483060506204|6.112663842835017|1.7028481042324313|
+------------------+------------------+-----------------+------------------+



                                                                                

In [41]:
data.show(5)
data_log.show(5)

+--------+------------------+----------+--------------+-----------+------------+--------------+---------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------------+--------------------+-----------------------+---------------------------+------------------+----------+-----------+---------+----------+------------+------------+--------+---------+-------+--------+----------+----------+------------+-------------+-----------+------------+--------------+--------------+--------------+-----------+--------------------+-----------------------+----------+--------+-----------------+
|Severity|         Start_Lat| Start_Lng|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Speed(mph)|Precipitation(in)|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset_index|Civil_Twilight_index|Nautical_Twilight_

In [42]:
data = data.withColumn(
    "Distance_Normalized", 
    when(col("Distance(m)") > 0, log(col("Distance(m)") + 1)).otherwise(0)
)

feature_cols_final = ["Start_Lat", "Start_Lng", "Distance(m)", "Temperature(F)", "Humidity(%)", "Pressure(in)",
                "Wind_Speed(mph)", "Precipitation(in)", "Amenity", "Bump", "Crossing", "Give_Way",
                "Junction", "No_Exit", "Railway", "Roundabout", "Station", "Stop", "Traffic_Calming", 
                "Traffic_Signal", "Turning_Loop","Sunrise_Sunset_index", "Civil_Twilight_index", 
                "Nautical_Twilight_index", "Astronomical_Twilight_index", "duration", "Start_Year", 
                "Start_Month", "Start_Day", "Start_Hour", "Start_Minute", "Start_Second", "End_Year", 
                "End_Month", "End_Day", "End_Hour", "End_Minute", "End_Second", "Weather_Year", "Weather_Month", 
                "Weather_Day", "Weather_Hour", "Weather_Minute", "Weather_Second", "Timezone_index", 
                "State_index", "Wind_Direction_index", "Weather_Condition_index", "is_weekend", "is_night"]

print("Features finales utilisant Distance(m) en mètres:")
print(f"Nombre de features: {len(feature_cols_final)}")
data.select("Distance(m)").show(10)

Features finales utilisant Distance(m) en mètres:
Nombre de features: 50
+------------------+
|       Distance(m)|
+------------------+
| 2751.978301391602|
| 3347.435397216797|
| 4988.966246520996|
| 579.3638630218506|
| 1448.409561630249|
| 6083.320273956299|
|3508.3700274353023|
|14242.695013916016|
| 3669.304273956299|
|12456.322191650392|
+------------------+
only showing top 10 rows



In [43]:
data.show(5)

+--------+------------------+----------+--------------+-----------+------------+--------------+---------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------------+--------------------+-----------------------+---------------------------+------------------+----------+-----------+---------+----------+------------+------------+--------+---------+-------+--------+----------+----------+------------+-------------+-----------+------------+--------------+--------------+--------------+-----------+--------------------+-----------------------+----------+--------+-----------------+-------------------+
|Severity|         Start_Lat| Start_Lng|Temperature(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Speed(mph)|Precipitation(in)|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset_index|Civil_Twilight_inde

In [44]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

results = {}

feature_cols = ["Start_Lat", "Start_Lng", "Temperature(F)", "Humidity(%)", "Pressure(in)",
                "Wind_Speed(mph)", "Precipitation(in)", "Amenity", "Bump", "Crossing", "Give_Way",
                "Junction", "No_Exit", "Railway", "Roundabout", "Station", "Stop", "Traffic_Calming", 
                "Traffic_Signal", "Turning_Loop","Sunrise_Sunset_index", "Civil_Twilight_index", 
                "Nautical_Twilight_index", "Astronomical_Twilight_index", "duration", "Start_Year", 
                "Start_Month", "Start_Day", "Start_Hour", "Start_Minute", "Start_Second", "End_Year", 
                "End_Month", "End_Day", "End_Hour", "End_Minute", "End_Second", "Weather_Year", "Weather_Month", 
                "Weather_Day", "Weather_Hour", "Weather_Minute", "Weather_Second", "Timezone_index", 
                "State_index", "Wind_Direction_index", "Weather_Condition_index", "is_weekend", "is_night"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_distance")
data_ml = assembler.transform(data)

data_final = data_ml.select("features_distance", col("Distance(m)").alias("label"))

train_df, test_df = data_final.randomSplit([0.8, 0.2], seed=42)

print(f"Dataset préparé !")
print(f"Training set: {train_df.count()} lignes")
print(f"Test set: {test_df.count()} lignes")
print(f"Features: {len(feature_cols)}")

print("\nAperçu des données:")
train_df.show(5)

Dataset préparé !


                                                                                

Training set: 3270344 lignes


                                                                                

Test set: 817167 lignes
Features: 49

Aperçu des données:


[Stage 2228:>                                                       (0 + 1) / 1]

+--------------------+-----------------+
|   features_distance|            label|
+--------------------+-----------------+
|(49,[0,1,2,3,4,5,...|        209.21472|
|(49,[0,1,2,3,4,5,...|         16.09344|
|(49,[0,1,2,3,4,5,...|        17.702784|
|(49,[0,1,2,3,4,5,...|539.1302400000001|
|(49,[0,1,2,3,4,5,...|         16.09344|
+--------------------+-----------------+
only showing top 5 rows



                                                                                

In [45]:
assembler_log = VectorAssembler(inputCols=feature_cols, outputCol="features_distance")
data_ml_log = assembler_log.transform(data_log)

data_final_log = data_ml_log.select("features_distance", col("Distance_Log").alias("label"))

train_df_log, test_df_log = data_final_log.randomSplit([0.8, 0.2], seed=42)

print(f"  Training set: {train_df_log.count()} lignes")
print(f"  Test set: {test_df_log.count()} lignes")

                                                                                

  Training set: 3270344 lignes




  Test set: 817167 lignes


                                                                                

In [49]:
from pyspark.ml.regression import GBTRegressor
import time

gbt = GBTRegressor(
    featuresCol="features_distance",
    labelCol="label",
    maxIter=120,        
    maxDepth=12,
    stepSize=0.06,        
    maxBins=192,         
    seed=42
)
start_time = time.time()
gbt_model = gbt.fit(train_df_log)
gbt_predictions = gbt_model.transform(test_df_log)
training_time = time.time() - start_time

                                                                                

In [50]:
gbt_rmse = evaluator_rmse.evaluate(gbt_predictions)
gbt_mae = evaluator_mae.evaluate(gbt_predictions)
gbt_r2 = evaluator_r2.evaluate(gbt_predictions)

print(f"RÉSULTATS GBT:")
print(f"- RMSE: {gbt_rmse:.2f} mètres")
print(f"- MAE: {gbt_mae:.2f} mètres")
print(f"- R²: {gbt_r2:.4f}")
print(f"- Temps d'entraînement: {training_time:.2f} secondes")



RÉSULTATS GBT:
- RMSE: 1.11 mètres
- MAE: 0.82 mètres
- R²: 0.5778
- Temps d'entraînement: 8257.36 secondes


                                                                                