# Analyse des données d'accidents de la route

## Récupération des données

Tout d'abord, nous allons commencer par télécharger les données nécéssaire pour faire cette analyse

In [None]:
import os
import requests

#Liste des URLS
urls = [
    "https://static.data.gouv.fr/resources/bases-de-donnees-annuelles-des-accidents-corporels-de-la-circulation-routiere-annees-de-2005-a-2021/20231005-093927/carcteristiques-2022.csv",
    "https://static.data.gouv.fr/resources/bases-de-donnees-annuelles-des-accidents-corporels-de-la-circulation-routiere-annees-de-2005-a-2021/20231005-094147/vehicules-2022.csv",
    "https://static.data.gouv.fr/resources/bases-de-donnees-annuelles-des-accidents-corporels-de-la-circulation-routiere-annees-de-2005-a-2021/20231005-094229/usagers-2022.csv",
    "https://static.data.gouv.fr/resources/bases-de-donnees-annuelles-des-accidents-corporels-de-la-circulation-routiere-annees-de-2005-a-2021/20231005-094112/lieux-2022.csv",
    "https://static.data.gouv.fr/resources/bases-de-donnees-annuelles-des-accidents-corporels-de-la-circulation-routiere-annees-de-2005-a-2020/20221024-113743/carcteristiques-2021.csv",
    "https://static.data.gouv.fr/resources/bases-de-donnees-annuelles-des-accidents-corporels-de-la-circulation-routiere-annees-de-2005-a-2020/20221024-113925/vehicules-2021.csv",
    "https://static.data.gouv.fr/resources/bases-de-donnees-annuelles-des-accidents-corporels-de-la-circulation-routiere-annees-de-2005-a-2022/20231009-140337/usagers-2021.csv",
    "https://static.data.gouv.fr/resources/bases-de-donnees-annuelles-des-accidents-corporels-de-la-circulation-routiere-annees-de-2005-a-2020/20221024-113901/lieux-2021.csv"
]

data_directory = "data"
os.makedirs(data_directory, exist_ok=True)

Maintenant on a besoin d'une fonction qui va nous permettre de télécharger ces fichiers la à partir de leurs URLS

In [None]:
def download_file(url, directory):
    response = requests.get(url)
    if response.status_code == 200:
        filename = url.split("/")[-1]
        filepath = os.path.join(directory, filename)

        with open(filepath, "wb") as file:
            file.write(response.content)
        print(f'Téléchargement de {filename} terminé, le fichier est sauvegardé dans {filepath}')
    else:
        print(f"Impossible de télécharger le fichier {url}")

Plus qu'a parcourir la liste des URLS et télécharger les fichiers

In [None]:
for url in urls:
    download_file(url, data_directory)

## Nettoyage des données

Maintenant que nous avons téléchargé les données, nous allons les nettoyer pour les rendre plus facile à manipuler.

On commence par la création d'une session spark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DM1") \
    .getOrCreate()



Définition d'un schéma pour chaque fichier

In [None]:
#Définition des schémas pour les données
from pyspark.sql.types import *;

#schéma pour les caractéristiques_2022
#on en a créer deux différents car il y a une différence de nommage entre les fichiers de 2021 et 2022 pour la colonne de l'id de l'accident (Accident_Id et Num_Acc)
caracteristiques2022_schema = StructType([
    StructField("Accident_Id", LongType(), True),
    StructField("jour", IntegerType(), True),
    StructField("mois", IntegerType(), True),
    StructField("an", IntegerType(), True),
    StructField("hrmn", StringType(),True),
    StructField("lum", IntegerType(), True),
    StructField("dep", StringType(), True),
    StructField("com", StringType(), True),
    StructField("agg", IntegerType(), True),
    StructField("int", IntegerType(), True),
    StructField("atm", IntegerType(), True),
    StructField("col", IntegerType(), True),
    StructField("adr", StringType(), True),
    StructField("lat", StringType(), True),
    StructField("long", StringType(), True),
])

#schéma pour les caractéristiques
caracteristiques_schema = StructType([
    StructField("Num_Acc", LongType(), True),
    StructField("jour", IntegerType(), True),
    StructField("mois", IntegerType(), True),
    StructField("an", IntegerType(), True),
    StructField("hrmn", StringType(),True), 
    StructField("lum", IntegerType(), True),
    StructField("dep", StringType(), True),
    StructField("com", StringType(), True),
    StructField("agg", IntegerType(), True),
    StructField("int", IntegerType(), True),
    StructField("atm", IntegerType(), True),
    StructField("col", IntegerType(), True),
    StructField("adr", StringType(), True),
    StructField("lat", StringType(), True),
    StructField("long", StringType(), True),
])


#schéma pour les véhicules
vehicules_schema = StructType([
    StructField("Num_Acc", LongType(), True),
    StructField("id_vehicule", StringType(), True),
    StructField("num_veh", StringType(), True),  
    StructField("senc", IntegerType(), True),
    StructField("catv", IntegerType(), True),
    StructField("obs", IntegerType(), True),
    StructField("obsm", IntegerType(), True),
    StructField("choc", IntegerType(), True),
    StructField("manv", IntegerType(), True),
    StructField("motor", IntegerType(), True),
    StructField("occutc", IntegerType(), True),
])

#schéma pour les usagers
usagers_schema = StructType([
    StructField("Num_Acc", LongType(), True),
    StructField("id_usager", StringType(), True),
    StructField("id_vehicule", StringType(), True),
    StructField("num_veh", StringType(), True),
    StructField("place", IntegerType(), True),
    StructField("catu", IntegerType(), True),
    StructField("grav", IntegerType(), True),
    StructField("sexe", IntegerType(), True),
    StructField("An_nais", IntegerType(), True),
    StructField("trajet", IntegerType(), True),
    StructField("secu1", IntegerType(), True),
    StructField("secu2", IntegerType(), True),
    StructField("secu3", IntegerType(), True),
    StructField("locp", IntegerType(), True),
    StructField("actp", IntegerType(), True),
    StructField("etatp", IntegerType(), True),
])


#schéma pour les lieux
lieux_schema = StructType([
    StructField("Num_Acc", LongType(), True),
    StructField("catr", IntegerType(), True),
    StructField("voie", StringType(), True),
    StructField("V1", StringType(), True),
    StructField("V2", StringType(), True),
    StructField("circ", IntegerType(), True),
    StructField("nbv", IntegerType(), True),
    StructField("vosp", IntegerType(), True),
    StructField("prof", IntegerType(), True),
    StructField("pr", StringType(), True),
    StructField("pr1", StringType(), True),
    StructField("plan", IntegerType(), True),
    StructField("lartpc", FloatType(), True),
    StructField("larrout", FloatType(), True),
    StructField("surf", IntegerType(), True),
    StructField("infra", IntegerType(), True),
    StructField("situ", IntegerType(), True),
    StructField("vma", FloatType(), True),
])

Chargement des fichiers et création des DataFrames à partir des fichiers téléchargés...

In [None]:
def load_csv_to_df(spark, file_path, schema, delimiter=";"):
    df = spark.read \
        .option("header", "true") \
        .option("delimiter", delimiter) \
        .schema(schema) \
        .csv(file_path)
    return df


file_path_caracterstiques_2022 = "data/carcteristiques-2022.csv"
file_path_caracterstiques_2021 = "data/carcteristiques-2021.csv"
file_path_vehicules_2022 = "data/vehicules-2022.csv"
file_path_vehicules_2021 = "data/vehicules-2021.csv"
file_path_usagers_2022 = "data/usagers-2022.csv"
file_path_usagers_2021 = "data/usagers-2021.csv"
file_path_lieux_2022 = "data/lieux-2022.csv"
file_path_lieux_2021 = "data/lieux-2021.csv"


df_caracteristiques_2022 = load_csv_to_df(spark, file_path_caracterstiques_2022, caracteristiques2022_schema)
df_caracteristiques_2022 = df_caracteristiques_2022.withColumnRenamed("Accident_Id", "Num_Acc")
df_caracteristiques_2021 = load_csv_to_df(spark, file_path_caracterstiques_2021, caracteristiques_schema)
df_vehicules_2022 = load_csv_to_df(spark, file_path_vehicules_2022, vehicules_schema)
df_vehicules_2021 = load_csv_to_df(spark, file_path_vehicules_2021, vehicules_schema)
df_usagers_2022 = load_csv_to_df(spark, file_path_usagers_2022, usagers_schema)
df_usagers_2021 = load_csv_to_df(spark, file_path_usagers_2021, usagers_schema)
df_lieux_2022 = load_csv_to_df(spark, file_path_lieux_2022, lieux_schema)
df_lieux_2021 = load_csv_to_df(spark, file_path_lieux_2021, lieux_schema)

Ici on retrouve quelques fonctions générales qui vont nous faciliter la tâche lors du nettoyage des données

In [None]:
from pyspark.sql import functions as fn

#fonction qui permet de convertir les types de données
def convert_data_types(df, type_mappings, date_format_mappings=None, timestamp_format_mappings=None):
    for column, data_type in type_mappings.items():
        df = df.withColumn(column, fn.col(column).cast(data_type))
    
    if date_format_mappings:
        for column, fmt in date_format_mappings.items():
            df = df.withColumn(column, fn.to_date(fn.col(column), fmt))
    
    if timestamp_format_mappings:
        for column, fmt in timestamp_format_mappings.items():
            df = df.withColumn(column, fn.to_timestamp(fn.col(column), fmt))
    
    return df

#fonction qui permet de gérer les valeurs manquantes
def handle_missing_values(df, fill_values=None, drop_rows=False, drop_cols=None):
    if fill_values:
        for column, fill_value in fill_values.items():
            df = df.withColumn(column, fn.when(fn.col(column).isNull(), fill_value).otherwise(fn.col(column)))
    
    if drop_rows:
        df = df.na.drop(subset=drop_rows)
    
    if drop_cols:
        df = df.drop(*drop_cols)

    return df


In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import functools as ft
#fonction qui permet d'indexer les colonnes catégorielles
def index_categorical_columns(df, categorical_columns):
    stages = []
    
    for categorical_col in categorical_columns:
        string_indexer = StringIndexer(inputCol=categorical_col, outputCol=categorical_col + "_indexed")
        stages.append(string_indexer)
    
    pipeline = Pipeline(stages=stages)
    
    df_transformed = pipeline.fit(df).transform(df)
    
    return df_transformed

#fonction qui affiche les lignes avec des valeurs nulles
def show_lignes_nulles(df):
    condition = ft.reduce(lambda a, b: a | b, (fn.col(c).isNull() for c in df.columns))
    lignes_avec_nulles = df.filter(condition)
    lignes_avec_nulles.show()

#fonction qui affiche les lignes en doublons
def nb_lignes_doublons(df):
    duplicates = df.groupBy(df.columns) \
        .agg(fn.count("*").alias("count")) \
        .filter("`count` > 1")


    total_duplicates = duplicates.selectExpr("sum(count) - count(*) as total_duplicates").collect()[0]["total_duplicates"]

    print(f"Nombre total de lignes en doublon : {total_duplicates}")

#fonction qui affiche pour chaque colonne, le nombre de lignes nulles
def nb_colonnes_nulles(df):
    df.select([fn.count(fn.when(fn.col(c).isNull(), c)).alias(c) for c in df.columns]).show()


In [None]:
def info_missing_values(df,colonne):
    total_count = df.count()
    missing_count = df.filter(fn.col(colonne).isNull()).count()
    non_missing_count = total_count - missing_count
    missing_percentage = (missing_count/total_count) * 100

    print(f"Total de lignes : {total_count}")
    print(f"Nombre de valeurs manquantes : {missing_count}")
    print(f"Pourcenrage de valeurs manquantes : {missing_percentage:.2f}%")


def frequence_values(df,colonne):
    value_counts = df.filter(fn.col(colonne).isNotNull()) \
    .groupBy(colonne) \
    .count() \
    .withColumnRenamed("count", "frequency") \
    .orderBy(fn.desc("frequency"))

# Ajouter une colonne avec la somme totale des fréquences sur toutes les lignes pour calculer le pourcentage
    total_frequency = value_counts.select(fn.sum("frequency").alias("total")).collect()[0]["total"]
    value_counts = value_counts.withColumn("percentage", (fn.col("frequency") / total_frequency) * 100)

# Afficher le résultat
    value_counts.show()

### La table `caracteristiques`

Commençons par traiter la table `caracteristiques_2022` et en premier lieu le bon typage des colonnes

In [None]:
df_caracteristiques_2022.show(5)

Petit rappel du schéma pour voir le type de chaque colonne

In [None]:
df_caracteristiques_2022.printSchema()

Avec le schéma affiché, on peut facilement idenitifer les colonnes qui sont mal typées. Par exemple les colonnes `lat` et `long` sont de type `StringType` alors qu'elles devraient être de type `DoubleType`.


On peut aussi ajouter une colonne `date_time` qui sera de type `TimeStamp` et qui sera la combinaison des colonnes `an`, `mois`, `jour` et `hrmn`.


Pour le reste des colonnes, on peut les laisser telles quelles car elles sont bien typées.

In [None]:
# On en déduit donc cela (pour les colonnes lat et long et l'ajout de la colonne 'date_time', nous allons faire cela dans la fonction générale pour 
#le nettoyage de 'caracterstiques' pour bien respecter le principe DRY)
timestamp_format_mappings = {"date_temp": "yyyy-MM-dd HH:mm"}

Passons au traitement des valeurs manquantes dans la table

In [None]:
nb_colonnes_nulles(df_caracteristiques_2022)

In [None]:
info_missing_values(df_caracteristiques_2022,"col")

Nous constatons ici la présence de quelques valeurs manquantes (minimes), nous allons les traiter en les remplaçant par des valeurs par défaut.

Pour l'analyse des données nous aurons juste a filter les lignes qui contiennent des valeurs manquantes.

In [None]:
fill_values = {'lum':-1,'int':-1,'atm':-1,'col':-1,'adr' : "adresse non renseigné"}

Il nous reste maintenant à identifier les colonnes catégorielles.

Et à partir des informations fourni pour la table `caracteristiques`, on peut identifier les colonnes catégorielles suivant: 
`lum`, `agg`, `int`, `atm` et `col`

In [None]:
categorical_columns_caracteristiques = ["lum","agg", "int", "atm", "col"]

Nous pouvons maintenant construire notre fonction `clean_caracteristiques` qui va nous permettre de nettoyer la table `caracteristiques` en appliquant les traitements que nous avons défini plus haut.

In [None]:
#fonction qui permet de nettoyer les données des caractéristiques des accidents
def clean_caracteristiques(df,fill_values=None,drop_rows=False,drop_cols=False,type_mapping=None,date_format_mappings=None,timestamp_format_mappings=None,categorical_columns=None):
    df = handle_missing_values(df, fill_values=fill_values, drop_rows=drop_rows, drop_cols=drop_cols)
    df = df.withColumn(
    "date_temp",
    fn.concat_ws(" ", 
              fn.concat_ws("-", fn.col("an"), fn.lpad(fn.col("mois"), 2, "0"), fn.lpad(fn.col("jour"), 2, "0")),
              fn.col("hrmn"))
    )
    df = convert_data_types(df, type_mappings=type_mapping, date_format_mappings=date_format_mappings, timestamp_format_mappings=timestamp_format_mappings)

    #ici nous avons remarqué que lors du cast direct de ces deux colonnes en double, il y a eu des erreurs de conversion(des nulls partout)
    #nous avons ensuite remarqué qu'il y avait des virgules à la place des points pour les valeurs décimales
    
    df = df \
        .withColumn("lat", fn.regexp_replace("lat", ",", ".")) \
        .withColumn("long", fn.regexp_replace("long", ",", "."))

    df = df.withColumn("lat", fn.col("lat").cast("double")) \
        .withColumn("long", fn.col("long").cast("double"))

    df = df.withColumnRenamed("date_temp", "date_time")
    return df

Effectuons maintenant le nettoyage de la table `caracteristiques`

In [None]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")


df_caracteristiques_2022 = clean_caracteristiques(
    df_caracteristiques_2022, 
    fill_values=fill_values, 
    drop_rows={},
    drop_cols={},
    type_mapping={}, 
    timestamp_format_mappings=timestamp_format_mappings, 
    categorical_columns=categorical_columns_caracteristiques)

Quelques vérifications pour s'assurer que le nettoyage a bien été effectué

In [None]:
df_caracteristiques_2022.show(5)
nb_colonnes_nulles(df_caracteristiques_2022)

In [None]:
df_caracteristiques_2022.printSchema()

Nous allons maintenant effectuer exactement le meme traitement pour la table `caracteristiques_2021`

Quelques vérifications initiales

In [None]:
df_caracteristiques_2021.show(5)
nb_colonnes_nulles(df_caracteristiques_2021)

In [None]:
info_missing_values(df_caracteristiques_2021,"col")

Similaire a celle de 2022, nous allons donc effectuer les memes opérations pour cette table

In [None]:
fill_values = {"atm": -1, "col": -1}
df_caracteristiques_2021 = clean_caracteristiques(
    df_caracteristiques_2021, 
    fill_values=fill_values, 
    drop_rows={}, 
    drop_cols={},
    type_mapping={}, 
    timestamp_format_mappings=timestamp_format_mappings, 
    categorical_columns=categorical_columns_caracteristiques)


Nous procédons encore une fois aux vérifications

In [None]:
df_caracteristiques_2021.show(5)
nb_colonnes_nulles(df_caracteristiques_2021)

In [None]:
df_caracteristiques_2021.printSchema()

Nous allons juste comparer les deux schémas pour voir si les deux tables ont le meme pour pouvoir les fusionner par la suite

In [None]:
print(df_caracteristiques_2022.schema == df_caracteristiques_2021.schema)

### La table `vehicules`
Commençons par la table `vehicules_2022` et en premier lieu le bon typage des colonnes

In [None]:
df_vehicules_2022.show(5)

In [None]:
df_vehicules_2022.printSchema()

Ici, nous pouvons constater que la colonne `id_vehicule` est mal typé, qui devrait être de type `integer`, pour le reste des colonnes, elles sont bien typées.

Mais pour cela, nous devons d'abord supprimer les espaces entre les chiffres, car si nous effectuons la conversion directement
nous aurons des nulles dans toutes les valeurs `id_vehicule`

Cette opération sera effectuée directement dans la fonction `clean_vehicules`

In [None]:
type_mapping = {"id_vehicule": IntegerType()}

Passons donc au traitement des valeurs manquantes dans la table

In [None]:
nb_colonnes_nulles(df_vehicules_2022)

Voyons le pourcentage de valeurs manquantes dans la table `vehicules_2022` pour voir comment les traiter

In [None]:
info_missing_values(df_vehicules_2022,"senc")

Pour les colonnes `senc`, `catv`, `obs`, `obsm`, `choc`, `manv` et `motor`, nous allons les remplacer par des valeurs par défaut car le pourcentage de valeurs manquantes est minime.

In [None]:
fill_values = {"senc": -1, "catv": -1, "obs": -1, "obsm": -1, "choc": -1, "manv": -1, "motor": -1}

Et pour `occutc` ?

In [None]:
info_missing_values(df_vehicules_2022,"occutc")

Prés de la totalité des lignes n'ont pas de valeurs pour cette colonne, nous allons donc la supprimer.

In [None]:
drop_cols = ["occutc"]

Passons à l'identification des colonnes catégorielles

À partir des informations fournies pour la table `vehicules`, on peut identifier les colonnes catégorielles suivantes:
`senc`, `catv`, `obs`, `obsm`, `choc`, `manv`, `motor`

In [None]:
categorical_columns_vehicules = ["senc", "catv", "obs", "obsm", "choc", "manv", "motor"]

Nous pouvons maintenant construire notre fonction `clean_vehicules` qui va nous permettre de nettoyer les tables `vehicules` en appliquant les traitements que nous avons défini plus haut.

In [None]:
def clean_vehicules(df, fill_values=None, drop_rows=False, drop_cols=False, type_mapping=None, date_format_mappings=None, timestamp_format_mappings=None, categorical_columns=None):
    df = df.withColumn("id_vehicule", fn.regexp_replace(fn.col("id_vehicule"), "[^\\d]", ""))
    df = handle_missing_values(df, fill_values=fill_values, drop_rows=drop_rows, drop_cols=drop_cols)
    df = convert_data_types(df, type_mappings=type_mapping, date_format_mappings=date_format_mappings, timestamp_format_mappings=timestamp_format_mappings)
    return df

In [None]:
df_vehicules_2022 = clean_vehicules(
    df_vehicules_2022, 
    fill_values=fill_values, 
    drop_rows={}, 
    drop_cols=drop_cols,
    type_mapping=type_mapping, 
    date_format_mappings={}, 
    timestamp_format_mappings={}, 
    categorical_columns=categorical_columns_vehicules)


Vérifications du nettoyage de la table `vehicules_2022`.

In [None]:
df_vehicules_2022.printSchema()

In [None]:
df_vehicules_2022.show(5)
nb_colonnes_nulles(df_vehicules_2022)

Nous allons effectuer encore une fois le meme traitement pour la table `vehicules_2021`

In [None]:
df_vehicules_2021.show(5)

In [None]:
df_vehicules_2021.printSchema()

La colonne mal typé est la meme que celle de 2022.

In [None]:
nb_colonnes_nulles(df_vehicules_2021)

Encore une fois similaire a celle de 2022.

In [None]:
info_missing_values(df_vehicules_2021,"senc")
info_missing_values(df_vehicules_2021,"occutc")

Nous allons donc garder les memes arguments et appeler la fonction `clean_vehicules`.

In [None]:
df_vehicules_2021 = clean_vehicules(
    df_vehicules_2021, 
    fill_values=fill_values, 
    drop_rows={}, 
    drop_cols=drop_cols,
    type_mapping=type_mapping, 
    date_format_mappings={}, 
    timestamp_format_mappings={}, 
    categorical_columns=categorical_columns_vehicules)

Vérifications du nettoyage de la table `vehicules_2021`.

In [None]:
df_vehicules_2021.show(5)
nb_colonnes_nulles(df_vehicules_2021)

In [None]:
df_vehicules_2021.printSchema()

In [None]:
print(df_vehicules_2022.schema == df_vehicules_2021.schema)

### La table `usagers`
On commence par regarder le bon typage des colonnes de la table `usagers_2022`

In [None]:
df_usagers_2022.show(5)

Petit rappel du schéma pour voir le type de chaque colonne

In [None]:
df_usagers_2022.printSchema()

Ici, nous pouvons constater que la colonne `id_usager` et `id_vehicule` sont mal typé, qui devrait être de type `long` pour la première et de type `integer` pour la deuxième. Pour le reste des colonnes, elles sont bien typées.

Mais pour cela, nous devons d'abord supprimer les espaces entre les chiffres, car si nous effectuons la conversion directement
nous aurons des nulles dans toutes les valeurs `id_usager` et `id_vehicule`

Cette opération sera effectuée directement dans la fonction `clean_usagers`

In [None]:
type_mapping = {
    'id_vehicule': IntegerType(),
    'id_usager': LongType(),
    }

Passons donc au traitement des valeurs manquantes dans la table

In [None]:
nb_colonnes_nulles(df_usagers_2022)

In [None]:
show_lignes_nulles(df_usagers_2022)

Voyons le pourcentage de valeurs manquantes pour la colonne `sexe` dans la table `df_usagers_2022` pour voir comment les traiter

In [None]:
info_missing_values(df_usagers_2022,"sexe")

Voyons maintenant la frequence des valeurs pour la colonne `sexe`

In [None]:
frequence_values(df_usagers_2022, "sexe")

On remarque que la valeur `1` est la plus fréquente, nous allons donc la remplacer par cette valeur. Ça sera la meme chose our les colonnes `grav`, `trajet`, `secu1` et `place` car le pourcentage de valeurs manquantes est minime et n'affectera pas l'analyse.

Pour la colonne `An_nais`, nous avons pris la convention de remplacer les valeurs manquantes par `9999` pour signifier que l'année de naissance est inconnue.

Pour le reste des colonnes ayant des valeurs manquantes, nous les avons pas remplacer par la valeur la plus fréquente mais plutot par la valeur `-1` pour signifier que la valeur est non rensignée, car le pourcentage de valeurs manquantes est assez élevé.

In [None]:
fill_values = {
    'sexe': df_usagers_2022.groupBy("sexe").count().orderBy("count", ascending=False).first()[0],  # Remplacer les valeurs nulles dans 'sexe' par 1
    'An_nais': 9999,  # Remplacer les valeurs nulles dans 'An_nais' par 9999
    'grav' : df_usagers_2022.groupBy("grav").count().orderBy("count", ascending=False).first()[0],
    'trajet' : df_usagers_2022.groupBy("trajet").count().orderBy("count", ascending=False).first()[0],
    'secu1' : df_usagers_2022.groupBy("secu1").count().orderBy("count", ascending=False).first()[0],
    'place' : df_usagers_2022.groupBy("place").count().orderBy("count", ascending=False).first()[0],
    'secu2' : -1,
    'locp' : -1,
    'actp' : -1,
}

Et pour `secu3` ainsi que `etatp` ?

In [None]:
info_missing_values(df_usagers_2022, "secu3")
info_missing_values(df_usagers_2022, "etatp")

Prés de la totalité des lignes n'ont pas de valeurs pour ces 2 colonnes, nous allons donc les supprimer.

In [None]:
columns_to_drop = ['secu3', 'etatp']

Passons à l'identification des colonnes catégorielles

À partir des informations fournies pour la table `usager-2022`, on peut identifier les colonnes catégorielles suivantes:
`catu`, `grav`, `sexe`, `trajet`, `secu1`, `secu2`, `locp` et `actp`

In [None]:
categorical_columns_usagers = ['catu', 'grav', 'sexe', 'trajet', 'secu1', 'secu2', 'locp', 'actp']

Nous pouvons maintenant construire notre fonction `clean_usagers` qui va nous permettre de nettoyer les tables `usagers` en appliquant les traitements que nous avons défini plus haut.

In [None]:
def clean_usagers(df,fill_values=None,drop_rows=False,drop_cols=None,categorical_columns=None, type_mapping=None):
    df = df.withColumn("id_vehicule", fn.regexp_replace(fn.col("id_vehicule"), "[^\\d]", ""))
    df = df.withColumn("id_usager", fn.regexp_replace(fn.col("id_usager"), "[^\\d]", ""))
    df = handle_missing_values(df, fill_values=fill_values, drop_rows=drop_rows, drop_cols=drop_cols)
    df = convert_data_types(df, type_mappings=type_mapping, date_format_mappings=None, timestamp_format_mappings=None)
    return df

In [None]:
drop_rows = False

df_usagers_2022 = clean_usagers(df_usagers_2022,
                                fill_values,
                                drop_rows, 
                                columns_to_drop, 
                                categorical_columns_usagers,
                                type_mapping)

Vérification du nettoyage de la table `usagers_2022`.

In [None]:
df_usagers_2022.printSchema()

In [None]:
df_usagers_2022.show(5)
nb_colonnes_nulles(df_usagers_2022)

Nous allons effectuer encore une fois le meme traitement pour la table `usagers_2021`

In [None]:
df_usagers_2021.show(5)

In [None]:
df_usagers_2021.printSchema()

Les colonnes mal typées sont les memes que celles de 2022.

In [None]:
nb_colonnes_nulles(df_usagers_2021)

Encore une fois similaire à celle de 2022.

In [None]:
info_missing_values(df_usagers_2021, "secu3")
info_missing_values(df_usagers_2021, "sexe")

In [None]:
frequence_values(df_usagers_2021, "sexe")

Nous allons donc garder les memes arguments et appeler la fonction `clean_usagers`.

In [None]:
drop_rows = False
df_usagers_2021 = clean_usagers(df_usagers_2021,
                                fill_values, 
                                drop_rows,
                                columns_to_drop, 
                                categorical_columns_usagers, 
                                type_mapping)

Vériifcation du nettoyage de la table `usagers_2021`.

In [None]:
df_usagers_2021.show(5)
nb_colonnes_nulles(df_usagers_2021)

In [None]:
df_usagers_2021.printSchema()

In [None]:
print(df_usagers_2022.schema == df_usagers_2021.schema)

### La table `Lieux`
On commence par regarder le bon typage des colonnes de la table `lieux_2022`

In [None]:
df_lieux_2022.show(5)

In [None]:
frequence_values(df_lieux_2022, "pr")

Ici on remarque que les colonnes `pr` et `pr1` ont des valeurs entières et des valeurs `(1)` qui font surement reference à des valeurs non renseignées. ( La description de la table indique que `-1` signifie `non renseigné`, et on l'absence de cette valeur, on peut supposer que `(1)` est utilisé pour signifier `non renseigné`)

Nous allons donc remplacer ces valeurs par `-1` pour les colonnes `pr` et `pr1`

In [None]:
columns_to_modify = ['pr', 'pr1']

Petit rappel du schéma pour voir le type de chaque colonne

In [None]:
df_lieux_2022.printSchema()

Ici, nous pouvons constater que la colonne `pr` et `pr1` sont mal typé, qui devrait être tout les 2 de type `integer`. Pour le reste des colonnes, elles sont bien typées.

In [None]:
type_mapping = {
    'pr' : IntegerType(),
    'pr1' : IntegerType(),
    }

Passons donc au traitement des valeurs manquantes dans la table

In [None]:
nb_colonnes_nulles(df_lieux_2022)

In [None]:
show_lignes_nulles(df_lieux_2022)

Voyons le pourcentage de valeurs manquantes pour la colonne `vosp` dans la table `df_lieux_2022` pour voir comment les traiter

In [None]:
info_missing_values(df_lieux_2022, "vosp")

Voyons maintenant la frequence des valeurs pour la colonne `vosp`

In [None]:
frequence_values(df_lieux_2022, "vosp")

On remarque que la valeur `0` est la plus fréquente, nous allons donc la remplacer par cette valeur. Ça sera la meme chose our les colonnes `circ`, `nbv`, `prof`, `plan`, `larrout`, `surf`, `infra` `situ` et `vma` car le pourcentage de valeurs manquantes est minime et n'affectera pas l'analyse.

In [None]:
fill_values = {
    'circ': df_lieux_2022.groupBy("circ").count().orderBy("circ", ascending=False).first()[0],
    'nbv': df_lieux_2022.groupBy("nbv").count().orderBy("nbv", ascending=False).first()[0],
    'vosp' : df_lieux_2022.groupBy("vosp").count().orderBy("vosp", ascending=False).first()[0],
    'prof' : df_lieux_2022.groupBy("prof").count().orderBy("prof", ascending=False).first()[0],
    'plan' : df_lieux_2022.groupBy("plan").count().orderBy("plan", ascending=False).first()[0],
    'larrout' : df_lieux_2022.groupBy("larrout").count().orderBy("larrout", ascending=False).first()[0],
    'surf' : df_lieux_2022.groupBy("surf").count().orderBy("surf", ascending=False).first()[0],
    'infra' : df_lieux_2022.groupBy("infra").count().orderBy("infra", ascending=False).first()[0],
    'situ' : df_lieux_2022.groupBy("situ").count().orderBy("situ", ascending=False).first()[0],
    'vma' : df_lieux_2022.groupBy("vma").count().orderBy("vma", ascending=False).first()[0]
}

In [None]:
info_missing_values(df_lieux_2022, "lartpc")
frequence_values(df_lieux_2022, "V2")

Près de la totalité des lignes de la colonne `lartpc` n'ont pas de valeurs, et pour ce qui est de la colonne `V2`, plus de `90%` des valeurs sont non renseignées. Nous allons donc supprimer ces 2 colonnes

In [None]:
columns_to_drop = ['lartpc','V2']

Passons à l'identification des colonnes catégorielles

À partir des informations fournies pour la table `lieux`, on peut identifier les colonnes catégorielles suivantes:
`catr`, `circ`, `vosp`, `prof`, `plan`, `surf`, `infra` et `situ`

In [None]:
categorical_columns_lieux = ['catr', 'circ', 'vosp', 'prof', 'plan', 'surf', 'infra', 'situ']

Nous pouvons maintenant construire notre fonction `clean_lieux` qui va nous permettre de nettoyer les tables `lieux` en appliquant les traitements que nous avons défini plus haut.

In [None]:
def clean_lieux(df,fill_values=None,drop_rows=False,drop_cols=None,categorical_columns=None, type_mapping=None, columns_to_modify=None):
    for column in columns_to_modify:
        df = df.withColumn(column, fn.when(fn.col(column) == '(1)', '-1').otherwise(fn.col(column)))
        
    df = handle_missing_values(df, fill_values=fill_values, drop_rows=drop_rows, drop_cols=drop_cols)
    df = convert_data_types(df, type_mappings=type_mapping, date_format_mappings=None, timestamp_format_mappings=None)
    return df

In [None]:
drop_rows = False

df_lieux_2022 = clean_lieux(df_lieux_2022,
                            fill_values, 
                            drop_rows, 
                            columns_to_drop, 
                            categorical_columns_lieux, 
                            type_mapping, 
                            columns_to_modify)

Vériifcation du nettoyage de la table `lieux_2022`.

In [None]:
df_lieux_2022.printSchema()

In [None]:
df_lieux_2022.show(5)
nb_colonnes_nulles(df_lieux_2022)

Nous allons effectuer encore une fois le meme traitement pour la table `lieux_2021`

In [None]:
df_lieux_2021.show(5)

In [None]:
df_lieux_2021.printSchema()

Les colonnes mal typées sont les memes que celles de 2022.

In [None]:
nb_colonnes_nulles(df_lieux_2021)

Encore une fois similaire à celle de 2022.

In [None]:
info_missing_values(df_lieux_2021, "lartpc")
info_missing_values(df_lieux_2021, "vosp")

In [None]:
frequence_values(df_lieux_2021, "V2")

Nous allons donc garder les memes arguments et appeler la fonction `clean_lieux`.

In [None]:
drop_rows = False

df_lieux_2021 = clean_lieux(df_lieux_2021, 
                            fill_values, 
                            drop_rows, 
                            columns_to_drop, 
                            categorical_columns_lieux, 
                            type_mapping, 
                            columns_to_modify)

Vériifcation du nettoyage de la table `lieux_2021`.

In [None]:
df_lieux_2021.show(5)
nb_colonnes_nulles(df_lieux_2021)

In [None]:
df_lieux_2021.printSchema()

In [None]:
print(df_lieux_2022.schema == df_lieux_2021.schema)

## Réunion des tables 2021 et 2022 et analyse des données

## Réunion des données

Maintenant que le nettoyages des données est terminé, nous allons fusionner les tables 'caracteristiques','vehciules','usagers' et 'lieux'.

In [None]:
df_caracteristiques = df_caracteristiques_2021.unionByName(df_caracteristiques_2022)

df_vehicules_2021 = df_vehicules_2021.withColumn("annee", fn.lit(2021))
df_vehicules_2022 = df_vehicules_2022.withColumn("annee",fn.lit(2022))
df_vehicules = df_vehicules_2021.unionByName(df_vehicules_2022)

df_usagers_2021 = df_usagers_2021.withColumn("annee", fn.lit(2021))
df_usagers_2022 = df_usagers_2022.withColumn("annee", fn.lit(2022))
df_usagers = df_usagers_2021.unionByName(df_usagers_2022)

df_lieux_2021 = df_lieux_2021.withColumn("annee", fn.lit(2021))
df_lieux_2022 = df_lieux_2022.withColumn("annee", fn.lit(2022))
df_lieux = df_lieux_2021.unionByName(df_lieux_2022)

In [None]:
print(f"Nombre total de lignes : {df_caracteristiques.count()}")
print(f"Nombre total de lignes : {df_vehicules.count()}")
print(f"Nombre total de lignes : {df_usagers.count()}")
print(f"Nombre total de lignes : {df_lieux.count()}")

In [None]:
print(df_caracteristiques.count() == df_caracteristiques_2022.count()+df_caracteristiques_2021.count())
print(df_vehicules.count() == df_vehicules_2022.count()+df_vehicules_2021.count())
print(df_usagers.count() == df_usagers_2022.count()+df_usagers_2021.count())
print(df_lieux.count() == df_lieux_2022.count()+df_lieux_2021.count())

Puisque les données sont maintenant fusionnés, nous allons pouvoir traiter les colonnes catégorielles.

In [None]:
df_caracteristiques = index_categorical_columns(df_caracteristiques, categorical_columns_caracteristiques)
df_vehicules = index_categorical_columns(df_vehicules, categorical_columns_vehicules)
df_usagers = index_categorical_columns(df_usagers, categorical_columns_usagers)
df_lieux = index_categorical_columns(df_lieux, categorical_columns_lieux)

In [None]:
df_caracteristiques.show(5)
df_vehicules.show(5)

In [None]:
df_usagers.show(5)
df_lieux.show(5)

## Résumés numériques

Prenons ici la colonne "lum" qui represente les conditions d’éclairage dans lesquelles l'accident s'est produit.

Filtrons notre dataframe pour se débarassé des '-1' qui sont des valeurs non renseigné

In [None]:
filtered_df_caracterstiques = df_caracteristiques.filter(fn.col("lum") != -1)

In [None]:
filtered_df_caracterstiques.select(
    fn.mean('lum').alias('Moyenne'),
    fn.stddev('lum').alias('Ecart-type'),
    fn.skewness('lum').alias('Asymétrie'),
    fn.kurtosis('lum').alias('Aplatissement'),
    fn.expr('percentile(lum, array(0.25))')[0].alias('1er Quartile'),
    fn.expr('percentile(lum, array(0.5))')[0].alias('Médiane'),
    fn.expr('percentile(lum, array(0.75))')[0].alias('3ème Quartile')
).show()

Maintenant, afin d'engendrer les objets graphiques, nous allons transformer notre dataframe spark en une dataframe pandas pour l'utilisation des fonctions de visualisation de données.

In [None]:
pandas_df_caracteristiques = df_caracteristiques.toPandas()
filtered_df_caracterstiques_lum = pandas_df_caracteristiques[pandas_df_caracteristiques["lum"] != -1]

Quelques objets graphiques pour illustrer les données de la colonne "lum"

In [None]:
import plotly.express as px

fig = px.box(filtered_df_caracterstiques_lum, y='lum', title='Distribution des conditions d\'éclairage')
fig.show()

In [None]:
import altair as alt

alt.data_transformers.disable_max_rows()

chart = alt.Chart(filtered_df_caracterstiques_lum).mark_boxplot().encode(
    y='lum:Q',
).properties(
    title='Distribution des conditions d\'éclairage avec Altair'
)

chart

Nous allons effectuer les memes étapes avec differentes colonnes "catv" pour l'étude des catégories de véhicules ici en l'occurence

In [None]:
filtered_df_vehicules = df_vehicules.filter(fn.col("catv") != -1)

In [None]:
filtered_df_vehicules.select(
    fn.mean('catv').alias('Moyenne'),
    fn.stddev('catv').alias('Ecart-type'),
    fn.skewness('catv').alias('Asymétrie'),
    fn.kurtosis('catv').alias('Aplatissement'),
    fn.expr('percentile(catv, array(0.25))')[0].alias('1er Quartile'),
    fn.expr('percentile(catv, array(0.5))')[0].alias('Médiane'),
    fn.expr('percentile(catv, array(0.75))')[0].alias('3ème Quartile')
).show()

In [None]:
pandas_df_vehicules = df_vehicules.toPandas()
filtered_df_vehicules_catv = pandas_df_vehicules[pandas_df_vehicules["catv"] != -1]
fig = px.box(filtered_df_vehicules_catv, y='catv', title='Boxplot des Catégories de Véhicules')
fig.show()

In [None]:
chart = alt.Chart(filtered_df_vehicules_catv).mark_boxplot().encode(
    y='catv:Q',
).properties(
    title='Catégories de véhicules impliqués dans les accidents'
)

chart

Meme chose ici pour la colonne "An_nais" qui represente l'année de naissance des usagers

In [None]:
filtered_df_usagers = df_usagers.filter(fn.col("An_nais") != 9999)
filtered_df_usagers.select(
    fn.mean('An_nais').alias('Moyenne'),
    fn.stddev('An_nais').alias('Ecart-type'),
    fn.skewness('An_nais').alias('Asymétrie'),
    fn.kurtosis('An_nais').alias('Aplatissement'),
    fn.expr('percentile(An_nais, array(0.25))')[0].alias('1er Quartile'),
    fn.expr('percentile(An_nais, array(0.5))')[0].alias('Médiane'),
    fn.expr('percentile(An_nais, array(0.75))')[0].alias('3ème Quartile')
).show()

In [None]:
pandas_df_usagers = df_usagers.toPandas()

Sauf qu'ici, au lieu de traiter directement la colonen 'An_nais', nous allons plutôt créer une colonne 'age' qui representra l'age des usagers

In [None]:
filtered_df_usagers_age = pandas_df_usagers[pandas_df_usagers['An_nais'] != 9999].copy()
filtered_df_usagers_age['age'] = filtered_df_usagers_age['annee'] - filtered_df_usagers_age['An_nais']

#affiche quelques lignes de cette dataframe
 
print(f"La moyenne d'age est de : {filtered_df_usagers_age['age'].mean()}")

In [None]:
chart = alt.Chart(filtered_df_usagers_age).mark_boxplot().encode(
    y='age:Q',
).properties(
    title='Distribution de l\'age des usagers impliqués dans les accidents'
)

chart

### Visualisation pour la répartition des accidents sur la semaine(jours et heurs) ainsi que les mois de l'année

Nous allons construire une dataframe qui contiendra en détails les colonnes nécéssaire à ce traitement (comme jour de la semaine)
(les jours de la semaine sont numérotés de 1 à 7 commençant par le dimanche)

In [None]:
df_caracteristiques_rep = df_caracteristiques.withColumn("jour_semaine", fn.dayofweek("date_time"))
df_caracteristiques_rep = df_caracteristiques_rep.withColumn("heure", fn.hour("date_time"))

df_caracteristiques_rep = df_caracteristiques_rep.withColumn("jour_heure", 
                                                              fn.concat(fn.col("jour_semaine"), fn.lit("-"), fn.col("heure")))
df_caracteristiques_rep.show(5)

Nous allons maintenant visualiser cela en utilisant une heatmap pour avoir plus de détails (jour et heure réuni)

In [None]:
# Conversion de Spark DataFrame en Pandas DataFrame pour la visualisation
pandas_df = df_caracteristiques_rep.toPandas()

heatmap_data = pandas_df.groupby('jour_heure').size().reset_index(name='counts')

# Extraire le jour de la semaine et l'heure dans des colonnes séparées pour le graphique
heatmap_data['jour'] = heatmap_data['jour_heure'].apply(lambda x: x.split("-")[0])
heatmap_data['heure'] = heatmap_data['jour_heure'].apply(lambda x: x.split("-")[1])

# Convertir 'heure' en chaîne pour garantir l'ordre lors de la visualisation
heatmap_data['heure'] = heatmap_data['heure'].astype(str)

# Définir l'ordre des heures de manière explicite
hours_order = [str(i) for i in range(24)]

# Création du heatmap avec Plotly
fig_heatmap = px.density_heatmap(heatmap_data, x='heure', y='jour', z='counts', color_continuous_scale='Viridis', 
                                 title="Heatmap de la répartition des accidents par jour de la semaine et heure",
                                 category_orders={'heure': hours_order}) # Ajout de category_orders
fig_heatmap.show()


Affichons les histogrames pour les jours uniquement, les heures uniquement et les mois

In [None]:

# Répartition des accidents sur la semaine
fig_semaine = px.histogram(pandas_df, x="jour_semaine", nbins=7, title="Répartition des accidents par jour de la semaine")
fig_semaine.show()

In [None]:
# Répartition des accidents sur les heures de la journée
fig_heures = px.histogram(pandas_df, x="heure", nbins=24, title="Répartition des accidents par heure de la journée")
fig_heures.show()

In [None]:
# Répartition des accidents sur les mois de l'année
fig_mois = px.histogram(pandas_df, x="mois", nbins=12, title="Répartition des accidents par mois")
fig_mois.show()

### Profils des usagers

afin de générer le graphique pour le profil des usagers, nous allons effectuer une jointure entre la table 'usagers' et la table 'caracteristiques' pour avoir les informations sur les accidents et les usagers avec lesquels ils sont liés.

In [None]:
df_joined_cara_usagers = df_caracteristiques.join(df_usagers, "Num_Acc")

Ensuite on récupere les colonnes qui nous intéressent pour cette analyse ici en l'occurence "agg" pour le type d'agglomération et "catu" pour la catégorie d'usager

In [None]:
df_profile = df_joined_cara_usagers.groupBy("agg", "catu").count()

In [None]:
pandas_df_profile = df_profile.toPandas()

# Convertir 'agg'  et catu en texte pour améliorer la lisibilité dans le graphique
pandas_df_profile['agg'] = pandas_df_profile['agg'].map({1: 'Hors agglomération', 2: 'En agglomération'})
pandas_df_profile['catu'] = pandas_df_profile['catu'].map({1: 'Conducteur', 2: 'Passager', 3: 'Piéton'})



In [None]:
fig = px.bar(pandas_df_profile, x='catu', y='count', color='agg', barmode='group',
             labels={'catu':'Catégorie d\'usager', 'count':'Nombre d\'usagers'},
             title='Profil des usagers impliqués par lieu d\'accident')
fig.update_xaxes(type='category')
fig.show()

### Accidents impliquant des cyclistes et/ou des piétons

Filtrons d'abord les cyclistes et les piétons

In [None]:
# Accidents impliquant des cyclistes
df_cyclistes = df_vehicules.filter(df_vehicules.catv == 1)

# Accidents impliquant des piétons
df_pietons = df_usagers.filter(df_usagers.catu == 3)

Ensuite nous allons récuperer les accidents impliquant des cyclistes et des piétons sans doublons

In [None]:
accidents_cyclistes = df_cyclistes.select("Num_Acc").distinct()
accidents_pietons = df_pietons.select("Num_Acc").distinct()

On fusionne les accidents puis on fait une jointure avec la dataframe 'caracteristiques_rep' pour obtenur les détails des accidents

In [None]:
accidents_impliquant_pietons_ou_cyclistes = accidents_cyclistes.union(accidents_pietons).distinct()
df_accidents_impliquant_pietons_ou_cyclistes = df_caracteristiques_rep.join(accidents_impliquant_pietons_ou_cyclistes, "Num_Acc")

In [None]:
df_accidents_impliquant_pietons_ou_cyclistes.show(5)

In [None]:
pandas_df = df_accidents_impliquant_pietons_ou_cyclistes.toPandas()

heatmap_data = pandas_df.groupby('jour_heure').size().reset_index(name='counts')

# Extraire le jour de la semaine et l'heure dans des colonnes séparées pour le graphique
heatmap_data['jour'] = heatmap_data['jour_heure'].apply(lambda x: x.split("-")[0])
heatmap_data['heure'] = heatmap_data['jour_heure'].apply(lambda x: x.split("-")[1])

heatmap_data['heure'] = heatmap_data['heure'].astype(str)

# Définir l'ordre des heures de manière explicite
hours_order = [str(i) for i in range(24)]

# Création du heatmap avec Plotly
fig_heatmap = px.density_heatmap(heatmap_data, x='heure', y='jour', z='counts', color_continuous_scale='Viridis', 
                                 title="Heatmap de la répartition des accidents par jour de la semaine et heure",
                                 category_orders={'heure': hours_order}) # Ajout de category_orders
fig_heatmap.show()

Pareil que tout à l'heure, on affiche les histogrammes pour les jours, les heures et les mois

In [None]:
df_jour = df_accidents_impliquant_pietons_ou_cyclistes.groupBy("jour_semaine").count()
pandas_df_jour = df_jour.toPandas()

fig_jour = px.bar(pandas_df_jour, x='jour_semaine', y='count', title='Répartition des accidents impliquant des cyclistes/piétons par jour de la semaine')
fig_jour.show()

In [None]:
df_heure = df_accidents_impliquant_pietons_ou_cyclistes.groupBy("heure").count()
pandas_df_heure = df_heure.toPandas()

fig_heure = px.bar(pandas_df_heure, x='heure', y='count', title='Répartition des accidents impliquant des cyclistes/piétons par heure de la journée')
fig_heure.show()

In [None]:
df_mois = df_accidents_impliquant_pietons_ou_cyclistes.groupBy("mois").count()
pandas_df_mois = df_mois.toPandas()

fig_mois = px.bar(pandas_df_mois, x='mois', y='count', title='Répartition des accidents impliquant des cyclistes/piétons par mois')
fig_mois.show()

## Usage des types composites

### Fabrication d'un dataframe avec une ligne par accident, une colonne contenant les véhicules en cause, et les lieux de l’accident.

 Nous allons créer un DataFrame Spark qui rassemble des informations à la fois sur les véhicules impliqués dans les accidents et les lieux où ces accidents se sont produits.

Aggrégation des Informations sur les Véhicules

In [None]:
df_vehicules_agg = df_vehicules.groupBy("Num_Acc").agg(
    fn.collect_list(
        fn.struct(
            "id_vehicule", "num_veh", "senc", "catv", "obs", "obsm", "choc", "manv", "motor", "annee"
        )
    ).alias("vehicules")
)

Faisons une Jointure avec les Informations sur les Lieux

In [None]:
df_jointure = df_vehicules_agg.join(df_lieux, "Num_Acc")

Et enfin construisons la dataframe composée des informations sur les véhicules et les lieux

In [None]:
df_comp = df_jointure.select(
    "Num_Acc",
    "vehicules",
    fn.struct(
        "catr", "voie", "V1", "circ", "nbv", "vosp", "prof", "pr", "pr1", "plan", "larrout", "surf", "infra", "situ", "vma"
    ).alias("details_lieu")
)

In [None]:
df_comp.show(truncate=False)

In [None]:
df_comp.printSchema()

## Sauvegarde au format parquet

In [None]:
data_directory = "data_cleaned"
os.makedirs(data_directory, exist_ok=True)

#### Pour `caracteristiques`

Partitionner par département (`dep`) optimise l'analyse et la gestion des données :

- Améliore la performance des requêtes en ciblant des sous-ensembles spécifiques.
- Facilite l'organisation et l'accès aux données par région géographique.
- Permet des analyses régionales ciblées, aidant à identifier des tendances locales.



In [None]:
df_caracteristiques.write.partitionBy("dep").parquet("data_cleaned/caracteristiques_par_dep")

#### Pour `vehicules`

Partitionner la table des véhicules par catégorie (`catv`) présente plusieurs avantages significatifs:

- Améliore la performance des requêtes et permet de cibler efficacement des catégories spécifiques de véhicules
- Facilite l'organisation des données qui seront structurées de manière intuitive selon le type de véhicule, rendant l'accès et l'analyse plus aisés.
- Favorise les études spécifiques par catégorie de véhicule, essentielles pour comprendre les risques et les comportements associés à chaque type.

In [None]:
df_vehicules.write.partitionBy("catv").parquet("data_cleaned/vehicules_par_catv")

#### Pour `usagers`

Partitionner cette table par catégorie d'usager (`catu`) améliore significativement l'analyse et la gestion des données :

- Accès rapide aux données concernant des catégories spécifiques d'usagers, comme les conducteurs, les passagers, et les piétons.
- Regroupe les données en fonction du rôle de l'usager dans l'accident, facilitant les analyses spécifiques.
- Permet des études ciblées sur le comportement et les risques associés à chaque catégorie d'usagers, essentiel pour les initiatives de prévention.

In [None]:
df_usagers.write.partitionBy("catu").parquet("data_cleaned/usagers_par_catu")

#### Pour `lieux`

Partitionner cette table par par catégorie de route (`catr`) offre plusieurs avantages pour la gestion et l'analyse des données :

- Supporte des études spécifiques aux différents environnements routiers, permettant de détecter les zones à risque et d'adapter les mesures de sécurité.
- Aide les autorités à concevoir des politiques et des interventions spécifiques en fonction des caractéristiques de chaque type de route.
- La segmentation par type de route aide à mieux organiser les données, rendant les analyses par catégorie de route plus intuitives.

In [None]:
df_lieux.write.partitionBy("catr").parquet("data_cleaned/lieux_par_catr")

In [None]:
df_lieux = spark.read.parquet("data_cleaned/lieux_par_catr")