##DEVOIR 1

* Hélène HU 22009784
* Nicolas PENELOUX 22001302


### PREAMBULE

Voici notre rendu concernant l'analyse de la base de données annuelles des accidents corporels de la circulation routière, fait par Hélène HU et Nicolas PENELOUX. Il y aura deux rendus, un avec compilation, et un sans, dans le cas ou vous auriez des problèmes à compiler le code du fichier : nous avions nous même eu quelques problèmes à ce sujet, notamment pour pyspark que nous utilisons ; nous voulons vous montrez que nous avons bien travailler le devoir.

Pour le gestionnaire de données, nous avons utilisé pyspark. Notre choix était motivé parce qu'il nous semblait plus pratique de l'utiliser que pandas, même si dans certains cas il semblerait que pandas soit plus efficace, notamment pour l'utilisation de plotly : parfois, plotly ne fonctionnait pas avec des dataframes spark, et dans ce cas nous utilisons les dataframes pandas, en transformant les dfs spark avec "**toPandas()**".

Egalement, nous avons privilégié plotly à altair, car les objets graphiques produit par plotly nous semblait plus agréable à regarder que ceux d'altair.

Enfin, nous avons laissé les lignes de commandes pour installer pyspark, ainsi qu'une autre pour installer une extension particulière de plotly (que nous utilisons à la question 5), dans le cas où vous ne l'aviez pas. En espérant que cela ne gâche pas votre compilation.


In [None]:
!pip install pyspark
!pip install plotly-calplot



### Chargement des fichiers pour les années 2021, 2022

Pour le chargement des fichiers, on commence d'abord par créé le sous répertoire data pour stocker nos fichiers csv. Ensuite, on stocke dans une liste, les urls de téléchargements des fichiers souhaités : On stocke à la même occasion les noms de chaque fichier, pour pouvoir les reconnaître plus tard.

En utilisant le module "*request*", on récupère chaque fichier à l'aide de la fonction "*get(url)*". Enfin on écris dans le sous répertoire data le contenu de la requête.

In [None]:
import os
import requests

# Créer le répertoire "data" s'il n'existe pas
if not os.path.exists("data"):
    os.makedirs("data")




# Liste des URLs des fichiers à télécharger pour les années 2021 et 2022
files = [
    ("https://www.data.gouv.fr/fr/datasets/r/62c20524-d442-46f5-bfd8-982c59763ec8", "usagers-2022"),
    ("https://www.data.gouv.fr/fr/datasets/r/c9742921-4427-41e5-81bc-f13af8bc31a0", "vehicules-2022"),
    ("https://www.data.gouv.fr/fr/datasets/r/a6ef711a-1f03-44cb-921a-0ce8ec975995","lieux-2022"),
    ("https://www.data.gouv.fr/fr/datasets/r/5fc299c0-4598-4c29-b74c-6a67b0cc27e7","caracteristiques-2022"),
    ("https://www.data.gouv.fr/fr/datasets/r/ba5a1956-7e82-41b7-a602-89d7dd484d7a","usagers-2021"),
    ("https://www.data.gouv.fr/fr/datasets/r/0bb5953a-25d8-46f8-8c25-b5c2f5ba905e","vehicules-2021"),
    ("https://www.data.gouv.fr/fr/datasets/r/8a4935aa-38cd-43af-bf10-0209d6d17434","lieux-2021"),
    ("https://www.data.gouv.fr/fr/datasets/r/85cfdc0c-23e4-4674-9bcd-79a970d7269b","caracteristiques-2021"),
]

# Début des téléchargements
for url, name in files:
    response = requests.get(url)
    # On vérifie que le téléchargement s'est bien passé
    if response.status_code == 200:
        # On écris le contenu du fichier dans le sous répertoire "data"
        with open("data/" + name + ".csv", "wb") as f:
            f.write(response.content)
            print("Téléchargement du fichier " + name + " terminé.")
    else:
      # Problème de téléchargement d'un fichier
        print(f"Erreur lors du téléchargement du fichier " + name)

Téléchargement du fichier usagers-2022 terminé.
Téléchargement du fichier vehicules-2022 terminé.
Téléchargement du fichier lieux-2022 terminé.
Téléchargement du fichier caracteristiques-2022 terminé.
Téléchargement du fichier usagers-2021 terminé.
Téléchargement du fichier vehicules-2021 terminé.
Téléchargement du fichier lieux-2021 terminé.
Téléchargement du fichier caracteristiques-2021 terminé.


### Création de dataframes correspondants au fichiers des années 2021, 2022 et nettoyage des données

Partie du code un peu plus intéressante que la précédente. Tout d'abord, on commence bien sûr par créer notre session Spark. On rajoute à cette session la config "*config("spark.sql.legacy.timeParserPolicy", "LEGACY")*" : nous avons eu des problèmes de versions de spark quand il fallait traiter les données de type *TimeStamp* (problème de Parser pour être plus précis), donc nous avons trouver le changement de config comme solution.

Donc, pour traiter les données, nous avons fait une fonction **chargement_et_nettoyage_DF** qui prend en paramètre le nom du fichier qui servira de dataframe. Pour chaque fichier du sous répertoire, on appelle cette fonction.

Dans cette fonction, on commence d'abord par créer le dataframe spark. Ensuite, pour chaque colonne du dataframe, on convertis les valeurs "non renseignées" (équivalente à -1 la plupart du temps) en valeur NULL, ce qui correspond pour nous, d'avantage à ce que devrais être une valeur non renseignée. Il faut faire attention, car le type *Timestamp* pose des problèmes : on ne peut pas simplement vérifié si la valeur de la colonne de type *TimeStamp* soit égale à -1, spark ne l'autorise pas, donc on traite toute les colonnes sauf celle qui sont des *TimeStamp*.
  
Ensuite, on regarde pour chaque fichier, si son nom contient "*usagers*", "*caracteristiques*", "*lieux*" ou "*vehicules*". Cela permet de traiter au cas par cas le dataframe en fonction de ce qu'il est réellement, et cela permet de respecter le principe de DRY (faire une fonction qui permet de traiter plusieurs fichiers différents en même temps, plutôt que recopier le code), et également la possibilité de gérer les données sur les fichiers de la BDD des accidents d'une autre année.

Si nous avons un fichier "*usagers*", nous procédons le nettoyage ainsi :

- On transforme les identifiants "usager" et "vehicule" en Integer. Ils sont à la base stocké sous format String, ce qui est illogique pour un identifiant d'une table. On utilise une fonction UDF préalablement créer "**convert_id_to_int**" qui prend en argument une chaîne de caractère et renvoie cette chaîne sous format Int.
- On indexe la colonne *num_veh*, qui est une colonne catégorielle mais en format String.
- On retire les colonnes "*secu3*" et "*etatp*", qui contiennent trop de valeur NULL.

Pour un fichier "*lieux*" :

- Les colonnes "pr" et "pr1" sont de type String alors qu'ils devraient être des Integer. De plus, les valeurs "non renseignée" sont censé être des "-1", alors qu'ici ils sont des "(1)". On remplace ça par NULL, puis on convertis le type en Integer.
- Certaines colonne String contiennent des valeurs "N/A" quand les valeurs ne sont pas renseignée, on remplace par NULL.
- On supprime les colonnes "*lartpc*" et "*larrout*", trop de NULL.

Pour un fichier "*caracteristiques*" :

- Dans un des fichiers, la colonne "*Num_Acc*" est remplacé par "*Accident_Id*", on remplace le nom.
- Les latitudes et longitudes sont en String à cause de la virgule qui sépare la partie entière de la partie décimale, on remplace la virgule par un point puis on met en type Float.
- On supprime les colonnes "an" "jour" "mois" et "hmrn", pour faire une grosse colonne de type *TimeStamp* de format "yyyy-MM-dd HH:mm"

Enfin pour un fichier "*vehicules*" :

- Comme pour "*usagers*", on s'occupe de l'identifiant "vehicule" de la même façon. On indexe également la colonne *num_veh*.
- On retire la colonne occutc, qui a trop de valeur NULL.

Finalement, après le traitement, la fonction renvoie le dataframe créé. Ensuite, on rajoute à la liste des dataframes celui qu'on vient de créer. On en profitera pour stocker dans une liste les noms de chaque dataframe, cela nous servira pour le merging des dataframes.

In [None]:
import pyspark as ps
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import *
import plotly.express as px
from pyspark.sql import functions as F

#Fonction UDF pour convertir les id String en entier, on vire tout les caractères non numériques
def convert_id_to_int(id):
    id_without_spaces = re.sub(r'\D', '', id)
    return int(id_without_spaces)

# Fonction pour respecter le DRY
def chargement_et_nettoyage_DF(name) :
  convert_id_to_int_udf = F.udf(convert_id_to_int, IntegerType())
  df = spark.read.csv(name,header=True, sep=";", inferSchema=True)

  for column in df.columns:
    #Timestamp est incompatible avec le type Int pour -1, donc on vérifie si la colonne est pas du type Timestamp
    #Par ailleurs, si on met deux fois la même ligne pour transformer les -1 en NULL, c'est parce que étrangement
    #  avec "-1", il y a des colonnes qui ne sont pas affectés par la transformation de -1 à NULL.
      if not isinstance(df.schema[column].dataType, TimestampType):
        df = df.withColumn(column, when(col(column) == "-1", lit(None)).otherwise(col(column)))
        df = df.withColumn(column, when(col(column) == -1, lit(None)).otherwise(col(column)))
        pass

  # Les usagers
  if re.search("usagers",name):
    #On transforme id_usager et id_vehicule en colonne Int, en retirant au préalable les " " que contiennent les
    #valeurs.
    df = df.withColumn("id_usager_int", convert_id_to_int_udf("id_usager"))
    df = df.withColumn("id_vehicule_int", convert_id_to_int_udf("id_vehicule"))
    df = df.drop("id_usager")
    df = df.drop("id_vehicule")
    df = df.withColumnRenamed("id_usager_int","id_usager")
    df = df.withColumnRenamed("id_vehicule_int","id_vehicule")


    #Indexation de num_veh
    indexer = StringIndexer(inputCol='num_veh',outputCol='num_veh_idx')
    indexer = indexer.fit(df)
    df = indexer.transform(df)

    colonne = ["Num_Acc","id_usager","id_vehicule","num_veh","place","catu","grav","sexe","an_nais","trajet","secu1","secu2","secu3","locp","actp","etatp"]
    df = df.select(colonne)

    #On retire les colonnes secu3 et etatp qui sont quasi inutile
    df = df.drop("secu3")
    df = df.drop("etatp")
    df.show()

  # Les lieux
  elif re.search("lieux",name):
    #Nettoyage des données : on transforme les (1) en None et les -1 en None
    df = df.withColumn("pr", when(col("pr") == "(1)", lit(None)).otherwise(col("pr")))
    df = df.withColumn("pr1", when(col("pr1") == "(1)", lit(None)).otherwise(col("pr1")))


    for column in df.columns:
      #Il y avait des colonnes String avec des valeurs "N/A", on les remplace par NULL
        df = df.withColumn(column, when(col(column) == "N/A", lit(None)).otherwise(col(column)))

    #Changement de typage de String à Int pour pr, pr1
    df = df.withColumn("pr", df.pr.cast(IntegerType()))
    df = df.withColumn("pr1", df.pr1.cast(IntegerType()))

    #On drop les colonnesa vec trop de NULL
    df = df.drop("lartpc")
    df = df.drop("larrout")

    df.show()
  # Les caractéristiques
  elif re.search("caracteristiques",name):
    # On renome Accident_Id en Num_Acc
    if "Accident_Id" in df.columns:
      df = df.withColumnRenamed("Accident_Id","Num_Acc")

    # On met la latitude et la longitude en valeur numérique (dans notre cas, en Float)
    df = df.withColumn("lat", regexp_replace(col("lat"), ",", "."))
    df = df.withColumn("long", regexp_replace(col("long"), ",", "."))
    df = df.withColumn("lat", df.lat.cast(FloatType()))
    df = df.withColumn("long", df.long.cast(FloatType()))

    # Ici, on forme une unique colonne "datetime" de type TimeStamp, on supprime "hrmn", "an" "jour" et "mois"
    df = df.withColumn("heure", hour("hrmn"))
    df = df.withColumn("minute", minute("hrmn"))

    df = df.withColumn("heure_minute", concat(col("heure"), lit(":"), col("minute")))
    df = df.withColumn("datetime", to_timestamp(concat(col("an"), lit("-"), col("mois"), lit("-"), col("jour"), lit(" "), col("heure_minute")), "yyyy-MM-dd HH:mm"))
    df = df.drop("heure")
    df = df.drop("minute")
    df = df.drop("heure_minute")
    df = df.drop("an")
    df = df.drop("mois")
    df = df.drop("jour")
    df = df.drop("hrmn")

    #On range dans l'ordre
    colonnes = ["Num_Acc","datetime","lum","dep","com","agg","int","atm","col","adr","lat","long"]
    df = df.select(colonnes)
    df.show()

  # Les véhicules
  elif re.search("vehicules",name):
    #On transforme les id_vehicule de String en Int
    df = df.withColumn("id_vehicule_int", convert_id_to_int_udf("id_vehicule"))
    df = df.drop("id_vehicule")
    df = df.withColumnRenamed("id_vehicule_int","id_vehicule")

    #Indexation de num_veh
    indexer = StringIndexer(inputCol='num_veh',outputCol='num_veh_idx')
    indexer = indexer.fit(df)
    df = indexer.transform(df)

    #Suppression de occutc, qui est quasi vide
    df = df.drop("occutc")

    #On remet les colonnes dans l'ordre
    colonnes = ["Num_Acc","id_vehicule","num_veh","senc","catv","obs","obsm","choc","manv","motor"]
    df = df.select(colonnes)
    df.show()

  else :
    print("Fichier invalide pour le nettoyage.")
    exit(1)


  return df

spark = SparkSession.builder.appName("BDDAccident").config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()
# Problème de version de spark, on utilise ceci pour utiliser le comportement du parser de date et heure d'avant spark 3.0



dataframes = []
names = []
for url,name in files:
  df = chargement_et_nettoyage_DF("data/" + name + ".csv")
  dataframes.append(df)
  names.append(name)


+------------+---------+-----------+-------+-----+----+----+----+-------+------+-----+-----+----+----+
|     Num_Acc|id_usager|id_vehicule|num_veh|place|catu|grav|sexe|an_nais|trajet|secu1|secu2|locp|actp|
+------------+---------+-----------+-------+-----+----+----+----+-------+------+-----+-----+----+----+
|202200000001|  1099700|     813952|    A01|  1.0|   1| 3.0| 1.0|   2008|   5.0|  2.0|  8.0|NULL|NULL|
|202200000001|  1099701|     813953|    B01|  1.0|   1| 1.0| 1.0|   1948|   5.0|  1.0|  8.0|NULL|NULL|
|202200000002|  1099698|     813950|    B01|  1.0|   1| 4.0| 1.0|   1988|   9.0|  1.0|  0.0| 0.0|   0|
|202200000002|  1099699|     813951|    A01|  1.0|   1| 1.0| 1.0|   1970|   4.0|  1.0|  0.0| 0.0|   0|
|202200000003|  1099696|     813948|    A01|  1.0|   1| 1.0| 1.0|   2002|   0.0|  1.0|  0.0|NULL|NULL|
|202200000003|  1099697|     813949|    B01|  1.0|   1| 4.0| 2.0|   1987|   9.0|  1.0|  0.0|NULL|NULL|
|202200000004|  1099694|     813947|    A01|  1.0|   1| 1.0| 2.0|   2000|

### Réunion des années 2021, 2022

Pour respecter les principes de DRY, on créer une fonction **merge_dfs**, qui s'occupera de fusionner les dataframes similaire d'une année à l'autre.

La fonction prend en paramètre la liste des dataframes, et la liste de noms récupéré juste avant.

L'algorithme fonctionne comme ceci :

1. On initialise un dictionnaire vide grouped_dfs.
2. On parcourt simultanément la liste des dataframes et des noms. On récupère le nom, on sépare le nom de l'année ("usagers-2022" -> "usagers" ; "2022"). Ensuite, on regarde si le nom n'est pas déjà dans le dictionnaire grouped_dfs, si c'est cas alors on ajoute une entrée pour ce nom de groupe avec une liste vide. Sinon, il ajoute le Dataframe actuel à la liste correspondant au nom du groupe dans le dictionnaire.
3. On initialise une liste vide "merged_dfs" pour stocker les futurs réunions.
4. On parcourt "grouped_dfs". Pour chaque nom de groupe, on fusionne tous les dataframes de la liste en un seul dataframe, puis on l'ajoute à "merged_dfs"
5. On retourne la liste "merged_dfs", désormais remplis.

On utilise l'algorithme sur chacun de nos dataframes.

In [None]:
# Fonction de merge
def merge_dfs(dfs,names):
  grouped_dfs = {}
  # On parcours simultanément les dataframes et les noms
  for df, nom_df in zip(dfs,names):
    # On sépare le nom en deux "nom" "annee"
        name = nom_df.split("-")[0]
        # Si le nom du df n'est pas dans notre dictionnaire, on ajoute une entrée avec le nom actuel
        if name not in grouped_dfs:
            grouped_dfs[name] = []
            # sinon, on ajoute le df actuel à celui qui existe déjà
        grouped_dfs[name].append(df)

  merged_dfs = []
  # On parcours chaque élément de notre dictionnaire, et on fusionne tout les dataframes équivalent
  for name, dfs in grouped_dfs.items():
        merged_df = dfs[0]
        for df in dfs[1:]:
            merged_df = merged_df.union(df)
        merged_dfs.append(merged_df)

  return merged_dfs


merged_dfs = merge_dfs(dataframes,names)

for df in merged_dfs:
  df.show()


+------------+---------+-----------+-------+-----+----+----+----+-------+------+-----+-----+----+----+
|     Num_Acc|id_usager|id_vehicule|num_veh|place|catu|grav|sexe|an_nais|trajet|secu1|secu2|locp|actp|
+------------+---------+-----------+-------+-----+----+----+----+-------+------+-----+-----+----+----+
|202200000001|  1099700|     813952|    A01|  1.0|   1| 3.0| 1.0|   2008|   5.0|  2.0|  8.0|NULL|NULL|
|202200000001|  1099701|     813953|    B01|  1.0|   1| 1.0| 1.0|   1948|   5.0|  1.0|  8.0|NULL|NULL|
|202200000002|  1099698|     813950|    B01|  1.0|   1| 4.0| 1.0|   1988|   9.0|  1.0|  0.0| 0.0|   0|
|202200000002|  1099699|     813951|    A01|  1.0|   1| 1.0| 1.0|   1970|   4.0|  1.0|  0.0| 0.0|   0|
|202200000003|  1099696|     813948|    A01|  1.0|   1| 1.0| 1.0|   2002|   0.0|  1.0|  0.0|NULL|NULL|
|202200000003|  1099697|     813949|    B01|  1.0|   1| 4.0| 2.0|   1987|   9.0|  1.0|  0.0|NULL|NULL|
|202200000004|  1099694|     813947|    A01|  1.0|   1| 1.0| 2.0|   2000|

###Résumés numériques

Pour cette question, il fallait donc faire le résumé des valeurs numériques de nos dataframes. Seulement, dans la base de donnée, il y a beaucoup de valeurs numériques (*Int, double, float*) mais la plupart sont des colonne catégorielles.
De ce fait, il est impossible de calculer la moyenne, la médiane ... de valeurs catégorielles, cela n'a pas de sens.

Dans ce cas, nous avons décidé de faire les résumés des années de naissances, de la vitesse maximale autorisé, et l'heure des accidents. Comme autres résumés numériques possible, nous aurions pu prendre le nombre d'occupants dans le transport en commun ("*occutc*"), la largeur du terre plein central ("*lartpc*") et la largeur de la chaussée affectée à la circulation des véhicules("*larrout*"), seulement déjà cela nous semblait pas forcément intéressant, et surtout ces trois colonnes ont été nettoyés avant, car il y avait trop de valeur NULL.    
    
Nous calculons "manuellement" le résumé numérique. Il existe une fonction "summary" sur pyspark, mais elle ne permet pas de calculer dirrectement l'asymétrie et l'applatissement, donc on a décidé de tout calculer par nous même, toujours en utilisant le module spark. Cependant, nous avons utilisé **summary()** pour le résumé des heures, car comme à la question du nettoyage des données, il semblerait y avoir un problème au niveau de la version spark, ce qui empêche le traitement correct des valeurs de type TimeStamp. On ne pouvait pas calculer la médiane ni les quartiles, par contre **summary()** lui le fait très bien.

Enfin, pour afficher les boxplots des valeurs numériques, on utilise le module "pandas", car il semblerait que plotly ne puisse pas utilisé "pyspark", ou du moins cela semble compliqué. Dans ce cas, on utilise la fonction "**toPandas()**" qui transforme notre dataframe en dataframe pandas. On peut désormais l'utiliser dans notre boxplot plotly.

In [None]:
from pyspark.sql.functions import *
import pandas as pd
import plotly.express as px

#Nos différents dataframes
df_users = merged_dfs[0]
df_vehicules = merged_dfs[1]
df_locations = merged_dfs[2]
df_caracteristiques = merged_dfs[3]

df_users_an_naiss = df_users.select("an_nais") # les années de naissances
df_caracteristiques = df_caracteristiques.withColumn("heure", hour("datetime"))
temps_total = df_caracteristiques.select("heure") #Les heures des accidents
vma_max = df_locations.select("vma") #La vitesse max



def resume_et_boxplot(df,colonne,title):
   # Extraction de la colonne spécifiée
    df = df.withColumn(colonne, col(colonne))

    # Résumé numérique
    # Calcul des statistiques
    if colonne != "heure":
      statistics = {
          "mean": df.agg({colonne: "mean"}).collect()[0][0],
          "median": df.approxQuantile(colonne, [0.5], 0.001)[0],
          "quartiles": df.approxQuantile(colonne, [0.25, 0.75], 0.001),
          "stddev": df.agg({colonne: "stddev"}).collect()[0][0],
          "skewness": df.agg({colonne: "skewness"}).collect()[0][0],
          "kurtosis": df.agg({colonne: "kurtosis"}).collect()[0][0]
      }

      # Affichage des statistiques
      print(f"Statistiques pour la colonne '{colonne}':")
      print(f"Moyenne: {statistics['mean']}")
      print(f"Médiane: {statistics['median']}")
      print(f"1er Quartile: {statistics['quartiles'][0]}")
      print(f"3ème Quartile: {statistics['quartiles'][1]}")
      print(f"Écart-type: {statistics['stddev']}")
      print(f"Asymétrie: {statistics['skewness']}")
      print(f"Applatissement: {statistics['kurtosis']}")
      print("")
    else :

      #Cas particulier pour la lecture des heures : on ne pouvait pas calculer la médiane et les quartiles
      # pour les heures, malgré que ce soit en type Integer.
      asymetrie = df.agg({colonne: "skewness"}).collect()[0][0]
      stddev = df.agg({colonne: "stddev"}).collect()[0][0]
      kurtosis = df.agg({colonne : "kurtosis"}).collect()[0][0]
      summary = df.select(colonne).summary()
      summary.show()
      print(f"Écart-type: {stddev}")
      print(f"Asymétrie: {asymetrie}")
      print(f"Applatissement: {kurtosis}")


    # Génération du boxplot
    fig = px.box(df.toPandas(), y=colonne, title=title)
    fig.show()


resume_et_boxplot(df_users_an_naiss,"an_nais","Boxplot des années de naissance")
resume_et_boxplot(temps_total,"heure","Boxplot des heures des accidents")
resume_et_boxplot(vma_max,"vma","Boxplot de la vitesse maximale autorisée sur le lieu de l'accident")





Statistiques pour la colonne 'an_nais':
Moyenne: 1983.2704655377267
Médiane: 1987.0
1er Quartile: 1970.0
3ème Quartile: 1998.0
Écart-type: 18.868352428028466
Asymétrie: -0.5701049618826507
Applatissement: -0.3109489775773575



+-------+------------------+
|summary|             heure|
+-------+------------------+
|  count|            111820|
|   mean|13.512725809336434|
| stddev| 5.455475502811854|
|    min|                 0|
|    25%|                10|
|    50%|                14|
|    75%|                18|
|    max|                23|
+-------+------------------+

Écart-type: 5.455475502811854
Asymétrie: -0.4855098287451616
Applatissement: -0.41794912877126444


Statistiques pour la colonne 'vma':
Moyenne: 59.6172907939335
Médiane: 50.0
1er Quartile: 50.0
3ème Quartile: 80.0
Écart-type: 24.34627089932576
Asymétrie: 4.31136037429198
Applatissement: 102.12488199701582



### Répartition

  Pour les deux répartitions qui suivent, on va créer une fonction pour chaque, qui prendra en argument un dataframe, car on les réutiliseras pour les accidents impliquant des cyclistes et des piétons.

*  ### Répartition des accidents sur la semaine (jours et heures)

  Pour les accidents sur la semaine, on récupère sur *datetime* (date et heure de l'accident) les jours de la semaine via la fonction "**dayofweek**" de spark, et les heures avec "**hour**", et on compte le nombre d'accident par jour et par heure de la semaine.  
  On n'oublie pas de remplacer les valeurs numériques des jours par ceux qu'ils représentent, en faisant un "**map**" sur nos jours de la semaines.
  On affiche ça dans un graphique en barre, avec en ordonnée le nombre d'accidents et en abscisse les heures. En couleur, on met les jours de la semaine qui sont associés aux accidents par tranche horaire.


* ### Répartition des accidents sur les mois de l’année

  Pour les accidents sur les mois de l'année, on récupère sur *datetime* les mois avec la fonction **month**, puis de même, on compte le nombre d'accidents lié à chaque mois. On remplace également les numéros des mois par leurs valeurs respective.
  On affiche également dans un graphe en barre, en abscisse les mois, et en ordonnée le nombre d'accident.

* ### Bonus : Accident par jour pendant les années 2021 et 2022

  En petit bonus, en utilisant l'extension "*calplot*" de plotly, on peut afficher dans un calendrier heatmap le nombres d'accidents par jour pendant les années 2021 et 2022. Pour cela, on compte le nombre d'accident pour chaque jour, on créer notre intervalle de date pour calplot (on met de force les dates "2021-01-01" et "2022-12-31", mais il serait plus judicieux de prendre d'abord la date la plus ancienne et la plus récente en guise d'intervalle), avec en abscisse les jours de la semaine, et en ordonnée les mois.


In [None]:
from plotly_calplot import calplot

caracteristiques = merged_dfs[3]
# Map pour les jour de la semaines et les mois
jour_semaine_map = {1: 'Lundi', 2: 'Mardi', 3: 'Mercredi', 4: 'Jeudi', 5: 'Vendredi', 6: 'Samedi', 7: 'Dimanche'}
mois_map = {1:'Janvier', 2:'Fevrier', 3:'Mars', 4:'Avril', 5:'Mai', 6:'Juin', 7:"Juillet", 8:'Aout', 9:'Septembre', 10:'Octobre', 11:'Novembre', 12:'Decembre'}

# Fonction qui affiche les accidents par jour de la semaine et heures
def show_accidents_par_jour_et_heure (df_cara):
  # Requête PySpark pour compter le nombre d'accidents par jour de la semaine et par heure
  accidents_par_jour_et_heure = df_cara.groupBy(hour("datetime").alias("heure"), dayofweek("datetime").alias("jour_semaine")) \
                                              .count() \
                                              .orderBy("jour_semaine", "heure")

  # Conversion du DataFrame Spark en DataFrame Pandas
  df_pandas = accidents_par_jour_et_heure.toPandas()
  df_pandas["jour_semaine"] = df_pandas["jour_semaine"].map(jour_semaine_map)


  # Graphique en barre
  fig = px.bar(df_pandas, y="count", x="heure", color="jour_semaine",
             labels={"heure": "Heure", "count": "Nombre d'accidents", "jour_semaine": "Jour de la semaine"},
             title="Répartition des accidents sur la semaine (jours et heures)")
  fig.update_layout(barmode='stack', xaxis={'categoryorder':'total ascending'})
  fig.show()


show_accidents_par_jour_et_heure(caracteristiques)


# Fonction qui affiche les accidents par mois
def show_accidents_par_mois (df_cara):
  # Groupement des données par mois
  accidents_par_mois = df_cara.groupBy(month("datetime").alias("mois")) \
                                     .count() \
                                     .orderBy("mois")

  df_pandas = accidents_par_mois.toPandas()
  df_pandas["mois"] = df_pandas["mois"].map(mois_map)


  # Graphe en barre
  fig_histogramme = px.bar(df_pandas,
                         x="mois",
                         y="count",
                         labels={"mois": "Mois", "count": "Nombre d'accidents"},
                         title="Répartition des accidents sur les mois de l'année")
  fig_histogramme.show()

show_accidents_par_mois(caracteristiques)



accidents_par_jour = caracteristiques.groupBy(date_format("datetime", "yyyy-MM-dd").alias("jour")) \
                                      .count() \
                                      .orderBy("jour")
# Création de la heatmap
df_pandas = accidents_par_jour.toPandas()
num_rows = len(df_pandas)

# Intervalle de date de notre BDD
start_date = '2021-01-01'
end_date = '2022-12-31'
dates = pd.date_range(start=start_date, end=end_date, periods=num_rows)

# Assignation de la plage de dates à une nouvelle colonne 'ds'
df_pandas['ds'] = dates
fig = calplot(df_pandas,x="ds",y="count",title="Répartition des accidents pour chaque jour, des années 2021 et 2022")
fig.show()


### Profil des usagers

Ici, on défini d'abord quatre fonctions :
 * "**df_to_dict**" qui prend un dataframe et renvoie une liste de dictionnaires correspondant aux lignes du dataframe.
 * "**catr_name**" qui prend u dataframe et remplace les valeurs de la colonne "*catr*" par les le type de circulation, si c'est rurale ou urbaine.
 * "**catr_name_bis**" qui prend un dataframe et remplace les valeurs de la colonne "*catr*" par les noms correspondant.
 * "**sexe_name**" qui prend un dataframe et remplace les valeurs de la colonne "*sexe*" par le sexe de la personne.
 * "**catu_name**" qui prend un dataframe et remplace les valeurs de la colonne "*catu*" par la catégorie des usagers présent sur l'accident.

On commence d'abord par lié le dataframe *usagers* à celui de *lieux*.
On retire toutes les lignes où le sexe de la personne n'est pas renseignée, puis on regroupe notre dataframe sur les colonnes *catr* *catu* et *sexe* et on compte le nombres de lignes pour chaque groupe.

On utilise la fonction **sexe_name**, **catu_name** et **catr_name_bis** sur le dataframe. On effectue une copie du dataframe pour pouvoir l'utiliser après. Enfin, on affiche un graphe en barre, sur deux lignes, une ligne avec les personnes de sexe féminin et l'autre masculin, avec en ordonnée le nombre d'usagers, et en abscisse la catégorie des routes. On met en couleur les différents catégorie d'usager : conducteur, piéton ou passager d'un véhicule.

Ensuite, on fait un deuxième graphe à partir de notre copie : au lieu d'appeler **catr_name_bis**, on va appeler **catr_name** pour nous donner les types de circulation (urbaine ou rurale). On affiche des graphes circulaire, toujours sur deux lignes en fonction du sexe, en différenciant le type de circulation, et en fonction également de la catégorie des usagers.

Finalement, on fini avec un troisième graphe qui représente le nombre d'usagers, par sexe, en fonction des années de naissances. On regroupe les usagers en fonction de leur sexe et de leur année de naissance, on compte le nombre de lignes, et on l'affiche dans ce graphe en barre.



In [None]:
 # Fonction qui créer une liste de dictionnaire correspondat aux lignes d'un dataframe
def df_to_dict(df):
  return [row.asDict() for row in df.collect()]

# Fonction qui transforme les index catr en valeur
def catr_name_bis(df):
  return df.withColumn('catr',
          F.when(df.catr == 1, 'Autoroute')
          .when(df.catr == 2, 'Route nationale')
          .when(df.catr == 3, 'Route Départementale')
          .when(df.catr == 4, 'Voie Communales')
          .when(df.catr == 5, 'Hors réseau public')
          .when(df.catr == 6, 'Parc de stationnement ouvert à la circulation publique')
          .when(df.catr == 7, 'Routes de métropole urbaine')
          .when(df.catr == 9, 'Autre')
)

# Fonction qui transforme l'index des sexes en valeur
def sexe_name(df):
  return df.withColumn('sexe', F.when(df.sexe == 1, 'Male').when(df.sexe == 2, 'Female').otherwise('Unknown'))

# Fonction qui transforme l'index des catu en valeur
def catu_name(df):
  return df.withColumn('catu', F.when(df.catu == 1, 'Conducteur')
                              .when(df.catu == 2, 'Passager')
                              .when(df.catu == 3, 'Piéton'))


#On lie le dataframe users et locations
df_circumstances = df_users.join(df_locations,'Num_Acc')


# On filtre les lignes où le sexe égale à null
df_circumstances = df_circumstances.filter(df_circumstances.sexe.isNotNull())

# on veut le profil des usagers en fonction des circonstances.
# On définit le profil des usagers par leur catégorie et de leur sexe.
# On définit les circonstances en fonction de la catégorie de la route
df_circumstances = df_circumstances.groupby(df_circumstances.catr, df_circumstances.catu, df_circumstances.sexe).count()


# On remplace les valeurs indexées par ce qu'ils veulent dire pour que ça soit plus parlant
df_circumstances = catu_name(df_circumstances)
df_circumstances = sexe_name(df_circumstances)

#On effectue une copie pour la suite, pour éviter de devoir refaire la même approche
df_copie = df_circumstances

#On remplace les valeurs indexées ici aussi
df_circumstances = catr_name_bis(df_circumstances)

fig = px.bar(df_to_dict(df_circumstances), title="Profils des usagers en fonction des catégories de routes",
             x='catr',
             y='count',
             color = 'catu',
             facet_row = 'sexe',
             barmode='group',
             height=800
             )

fig.update_layout(xaxis={'categoryorder':'total descending'})

fig.show()


In [None]:
# Fonction qui transforme l'index catr en valeur "rurale" ou "urbaine"
def catr_name(df):
  return df.withColumn('catr',
          F.when(df.catr == 1, 'Circulation rurale')
          .when(df.catr == 2, 'Circulation rurale')
          .when(df.catr == 3, 'Circulation urbaine')
          .when(df.catr == 4, 'Circulation urbaine')
          .when(df.catr == 5, 'Circulation rurale')
          .when(df.catr == 6, 'Circulation urbaine')
          .when(df.catr == 7, 'Circulation urbaine')
          .when(df.catr == 9, 'Circulation rurale')
)

# Pour distinguer les circonstances en fonction de la circulation urbaine ou en campagne
df_copie = catr_name(df_copie)

# Diagramme circulaire
fig = px.pie(df_to_dict(df_copie), title="Profils des usagers en fonction de la circulation urbaine ou en campagne",
             names='catr',
             values='count',
             color = 'catr',
             facet_row='sexe',
             facet_col='catu',
             height = 600
             )
fig.show()


In [None]:
# On récupère le nombre de personne par date de naissance et par sexe
birth_counts = df_users.groupby(df_users.an_nais, df_users.sexe).count()
birth_counts = sexe_name(birth_counts)
fig = px.bar(df_to_dict(birth_counts), title="Profils des usagers en fonction de l'année de naissance et de leur sexe",
             x='an_nais',
             y='count',
             color = 'sexe')
fig.show()


### Accidents impliquant des cyclistes et/ou des piétons

On récupère d'abord le nombre de piétons par accident à l'aide du numéro de leurs catégorie, en n'oubliant pas de garder les colonnes utiles (celle qui ont au moins un piéton dans l'accident).
Pour les cyclistes, on regarde la catégorie du véhicule, et de même, on compte le nombre de cycliste par accident.

Pour les accidents impliquant des cyclistes ou des piétons, on fait l'union des deux dataframes récupéré juste avant, on compte le nombre de lignes (nombre d'accident) et on affiche le dataframe.

Pour les accidents impliquant des cyclistes et des piétons, on fait la même chose simplement on procède en faisant l'intersection plutôt que l'union.

In [None]:
df_vehicle = merged_dfs[1]

# On compte le nombre de piétons par accident
df_pieton = df_users.where(df_users.catu == 3).distinct().groupby(df_users.Num_Acc).count()
df_pieton = df_pieton.where(col('count') >= 1).select("Num_Acc")


df_bicycle = df_vehicle.where(df_vehicle.catv == 1).select("Num_Acc").distinct()


# Union des df
df_union = df_pieton.union(df_bicycle)
print("Accidents impliquant des cyclistes ou des piétons : " , df_union.count())
df_union.show()

# Intersection
df_intersect = df_pieton.intersect(df_bicycle)
print("Accidents impliquant des cyclistes et des piétons : " , df_intersect.count())
df_intersect.show()


Accidents impliquant des cyclistes ou des piétons :  28244
+------------+
|     Num_Acc|
+------------+
|202200020904|
|202200008870|
|202200027869|
|202200031944|
|202200026347|
|202200031361|
|202200012651|
|202200000613|
|202200025825|
|202200002063|
|202200030307|
|202200006389|
|202200003134|
|202200019643|
|202200020063|
|202200031009|
|202200011390|
|202200024342|
|202200034194|
|202200002998|
+------------+
only showing top 20 rows

Accidents impliquant des cyclistes et des piétons :  577
+------------+
|     Num_Acc|
+------------+
|202100041818|
|202200053011|
|202200043726|
|202100006723|
|202100011042|
|202100050483|
|202100043205|
|202100044050|
|202200039221|
|202100011849|
|202100056283|
|202200020072|
|202100000978|
|202100044303|
|202100055536|
|202100012242|
|202100055529|
|202200018861|
|202100027972|
|202100025013|
+------------+
only showing top 20 rows



### Répartition des accidents sur la semaine (jours et heures)

Pour la répartition, on procède la même manière que la première fois. On appelle la fonction **show_accidents_par_jour_et_heure**, créer plus tôt dans le fichier, mais cette fois sur les dataframes de l'union entre notre précédente union et le dataframe des caractéristiques, et l'union entre notre précédente intersection et le dataframe des caractéristiques.



In [None]:
# Union de l'union piéton / cycliste avec le dataframe des caractéristiques (sur Num_Acc)
df_union_cara = df_union.join(df_caracteristiques, 'Num_acc')
# Union de l'intersection piéton / cycliste avec le dataframe des caractéristiques (sur Num_Acc)
df_intersect_cara = df_intersect.join(df_caracteristiques, 'Num_acc')

# On affiche de la même façon que tout à l'heure
show_accidents_par_jour_et_heure(df_union_cara)
show_accidents_par_jour_et_heure(df_intersect_cara)

### Répartition des accidents sur les mois de l’année

Même principe que précédemment, toujours sur notre union d'union et caractéristique, ainsi que notre union d'intersection et caractéristiques, cette fois sur la fonction **show_accidents_par_mois**

In [None]:
show_accidents_par_mois(df_union_cara)
show_accidents_par_mois(df_intersect_cara)

### Les caractéristiques des lieux où se sont produits ces accidents.

Pour les caractéristiques des lieux des accidents, on commence d'abord par afficher une carte de densité, centré autour de la France métropolitaine. On affiche en couleur style "heatmap" lieux des accidents, avec la latitude et la longitude présente dans le dataframe *caracteristiques*". Le problème de cette approche est qu'il est très rare qu'un accident arrive à exactement la même latitude et longitude d'un autre, souvent il y a un minuscule écart entre deux accidents, cependant en dézoomant correctement la carte, on peut quand même afficher de belles zones colorés où se sont passé des accidents.

On défini trois fonctions **lum_name**, **age_name** et **int_name** qui remplacent les valeurs indexées de '*lum*', '*agg*' et '*int*' ne sont pas NULL.

On regroupe le dataframe caracteristiques en fonction des colonnes '*lum*', '*agg*' et '*int*' et on compte le nombre d'occurences de chaque combinaison. On stocke ça dans un nouveau dataframe.
On applique nos trois fonctions sur ce dataframe, et on créer un graphique en barres avec ce dataframe. Les barres sont regroupées par type d'intersection, avec une couleur différente pour chaque luminosité, et une ligne séparée si c'est en agglomération ou non.

In [None]:
df_pandas = df_caracteristiques.groupby(['lat', 'long']).count().toPandas()

# Création de la carte de densité à l'aide de la latitude et longitude
fig = px.density_mapbox(df_pandas, lat='lat', lon='long', z = 'count',
                        radius=10,
                        center=dict(lat=46.83, lon=2.5),
                        zoom=4,
                        mapbox_style='open-street-map',
                        title='Les lieux où se sont produits les accidents',
                        height = 600)
fig.show()

In [None]:

df_cara_lieu = df_caracteristiques.groupby('lum', 'agg', 'int').count()

# Fonction qui prend un dataframe et remplace les valeurs indexées de "lum"
def lum_name(df):
  return df.withColumn('lum',
          F.when(df.lum == 1, 'Plein jour')
          .when(df.lum == 2, 'Crépuscule ou aube')
          .when(df.lum == 3, 'Nuit sans éclairage public')
          .when(df.lum == 4, 'Nuit avec éclairage public non allumé')
          .when(df.lum == 5, 'Nuit avec éclairage public allumé')
  )

# Fonction qui prend un dataframe et remplace les valeurs indexées de "int"
def int_name(df):
    return df.withColumn('int',
          F.when(df.int == 1, 'Hors intersection')
          .when(df.int == 2, 'Intersection en X')
          .when(df.int == 3, 'Intersection en T')
          .when(df.int == 4, 'Intersection en Y')
          .when(df.int == 5, 'Intersection à plus de 4 branches')
          .when(df.int == 6, 'Giratoire')
          .when(df.int == 7, 'Place')
          .when(df.int == 8, 'Passage à niveau')
          .when(df.int == 9, 'Autre intersection')
    )

# Fonction qui prend un dataframe et remplace les valeurs indexées de "agg"
def agg_name(df):
  return df.withColumn('agg',
          F.when(col('agg') == 1, 'Hors agglomération')
          .when(col('agg') == 2, 'En agglomération')
  )


# On filtre toutes les valeurs non null de notre dataframe
df_cara_lieu = df_cara_lieu.filter(col('int').isNotNull())
df_cara_lieu = df_cara_lieu.filter(col('agg').isNotNull())
df_cara_lieu = df_cara_lieu.filter(col('lum').isNotNull())

# On change les index en valeurs
df_cara_lieu = lum_name(df_cara_lieu)
df_cara_lieu = agg_name(df_cara_lieu)
df_cara_lieu = int_name(df_cara_lieu)

# Graphe en barre
fig = px.bar(df_to_dict(df_cara_lieu), title="Les caractéristiques (agglomération, luminosité, intersection) des lieux où se sont produits ces accidents.",
             x='int',
             y='count',
             color = 'lum',
             facet_row = 'agg',
             barmode='group',
             height = 800)
fig.show()



### Usage des types composites

On crée une vue temporaire pour les dataframes *vehicule* et *lieux*, puis on éxécute une requête SQL pour joindre ces deux dataframes sur la colonne "*Num_Acc*". On récupère aussi l'id_vehicule et les voies des accidents.
On crée un nouveau dataframe, basé sur un schéma comprenant le numéro de l'accident, le numéro du véhicule en cause et le lieu de l'accident. Enfin, on affiche ce dataframe.

In [None]:
# Vue temporaire pour la query
df_vehicle.createOrReplaceTempView("vehicle")
df_locations.createOrReplaceTempView("locations")

#Query
query = """
    SELECT v.Num_acc, v.id_vehicule, l.voie
    FROM vehicle as v
    JOIN locations l ON l.Num_acc = v.Num_acc
"""
result = spark.sql(query)

# Schema de notre structure de type pour les types composites
schema = StructType([
    StructField("Num_Acc", LongType(), False),
    StructField("Vehicule en cause", IntegerType(), True),
    StructField("Lieu", StringType(), True)
])
# On créé notre dataframe a partir du schema précédent
df_composite = spark.createDataFrame(result.rdd, schema=schema)

df_composite.show()
df_composite.printSchema()



+------------+-----------------+--------------------+
|     Num_Acc|Vehicule en cause|                Lieu|
+------------+-----------------+--------------------+
|202200000001|           813952|TEIL(vieille rout...|
|202200000001|           813953|TEIL(vieille rout...|
|202200000002|           813950|                NULL|
|202200000002|           813951|                NULL|
|202200000003|           813948|ROND POINT DE BRE...|
|202200000003|           813949|ROND POINT DE BRE...|
|202200000004|           813947|QUATORZE JUILLET ...|
|202200000005|           813945|ROUTE DE JEAN MOU...|
|202200000005|           813946|ROUTE DE JEAN MOU...|
|202200000006|           813944|      TURCAN FRANCIS|
|202200000007|           813940|         CARNOT  RUE|
|202200000007|           813941|         CARNOT  RUE|
|202200000008|           813938|JAURES JEAN (AVENUE)|
|202200000008|           813939|JAURES JEAN (AVENUE)|
|202200000009|           813937|   LOMBARDS (AVENUE)|
|202200000010|           813

## Sauvegarde au format parquet

* On partitionne le dataframe "usagers" en fonction de la catégorie de l'usager (si c'est un piéton, passager ou conducteur). Cela nous semblait évident de partitionner de cette façon, cela permet de faire un petit partionnement, avec seulement trois possibilités, en fonction de type d'usagers auquel nous avons affaire.

* Pour le dataframe "lieux", on partitionne en fonction de la catégorie de route. Les autres caractéristiques de lieux ne nous semblait pas intéressante pour une partition de parquet, alors que la catégorie de routes nous donne une vague idée sur quoi s'est dérouler un accident

* Nous n'avons pas fait de partition pour le reste, car cela nous semblait sans interêt pour "caractéristiques" et "vehicule", il n'y avait pas de vrai valeur qui apportait un plus en terme de partition.

In [None]:

for name, df in zip(names, dataframes):
  if "usagers" in name: #Partition en fonction de la catégorie des usagers
    df.write.mode('overwrite').partitionBy("catu").parquet("data/"+name+"-cleaned.parquet")
  elif "lieux" in name: #Partition en fonction de la catégorie des lieux
    df.write.mode('overwrite').partitionBy("catr").parquet("data/"+name+"-cleaned.parquet")
  else : #Pas de partition particulière
    df.write.mode('overwrite').parquet("data/"+name+"-cleaned.parquet")


In [None]:
ls -al data/*cleaned.parquet

data/caracteristiques-2021-cleaned.parquet:
total 1908
drwxr-xr-x  2 root root    4096 Mar 24 20:45 [0m[01;34m.[0m/
drwxr-xr-x 10 root root    4096 Mar 24 20:45 [01;34m..[0m/
-rw-r--r--  1 root root 1506386 Mar 24 20:45 part-00000-439db611-9a3e-46b9-acab-a71959240b1a-c000.snappy.parquet
-rw-r--r--  1 root root   11780 Mar 24 20:45 .part-00000-439db611-9a3e-46b9-acab-a71959240b1a-c000.snappy.parquet.crc
-rw-r--r--  1 root root  413749 Mar 24 20:45 part-00001-439db611-9a3e-46b9-acab-a71959240b1a-c000.snappy.parquet
-rw-r--r--  1 root root    3244 Mar 24 20:45 .part-00001-439db611-9a3e-46b9-acab-a71959240b1a-c000.snappy.parquet.crc
-rw-r--r--  1 root root       0 Mar 24 20:45 _SUCCESS
-rw-r--r--  1 root root       8 Mar 24 20:45 ._SUCCESS.crc

data/caracteristiques-2022-cleaned.parquet:
total 1900
drwxr-xr-x  2 root root    4096 Mar 24 20:45 [01;34m.[0m/
drwxr-xr-x 10 root root    4096 Mar 24 20:45 [01;34m..[0m/
-rw-r--r--  1 root root 1514935 Mar 24 20:45 part-00000-7194b166-226

In [None]:
spark.stop()