# API SNCF: Les retards de train

__Auteur :__ 

Steve Caron

__Présentation :__ 

Ce script permet de requêter les informations sur les arrivées des trains en gare pour la journée d'hier. 

Il retourne les réponses des requêtes dans deux répertoires:

* data/arrivees/ : répertoire contenant les informations sur les arrivées en gare

* data/perturbations/ : répertoire contenant les informations sur les perturabations observées sur les arrivées en gare


__Inputs :__ 

Un fichier data/top{n}gare.json contenant les informations permettant de faire les requêtes pour chaques gares. Les enregistrements sont classés en fonction de la fréquentation des gares.

__Params :__

* CLEF_API : nom sous lequel est enregistrer la cle API

In [1]:
from dotenv import load_dotenv
import os
import requests
import json
import csv
import datetime
from dataclasses import dataclass,asdict
import pandas as pd
from sqlalchemy import create_engine,text
from sqlalchemy.types import *
from sqlalchemy.exc import IntegrityError
import logging

In [2]:
CLEF_API = "API_KEY"
DB_PORT = 3306

In [3]:
db_log_file_name = 'db.log'
db_handler_log_level = logging.DEBUG
db_logger_log_level = logging.DEBUG

db_handler = logging.FileHandler(db_log_file_name)
db_handler.setLevel(db_handler_log_level)

db_logger = logging.getLogger('sqlalchemy')
db_logger.addHandler(db_handler)
db_logger.setLevel(db_logger_log_level)

In [4]:
def convertir_en_string(dt):
    '''Cette fonction convertit un datetime en chaîne de caractères'''
    if str is None:
        return None
    else:
        return datetime.datetime.strftime(dt,'%Y%m%dT%H%M%S')

In [5]:
def convertir_en_datetime(str):
    '''Cette fonction convertit une chaîne de caractères en datetime'''
    if str is None:
        return None
    else:
        return datetime.datetime.strptime(str,"%Y%m%dT%H%M%S")

In [6]:
def to_json(data,nom_fichier):
    '''Cette fonction permet d'enregistrer un fichier JSON'''
    with open(nom_fichier, "w") as fc:
        json.dump(data, fc)

In [7]:
def to_csv(data,nom_fichier):
    '''Cette fonction permet d'enregistrer un fichier CSV'''
    with open(nom_fichier,"w", newline='', encoding='utf-8')as fc:
        writer = csv.DictWriter(fc,fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)

In [8]:
def requete_api(code_gare,code_reseau,date,nb_gare):
    '''Cette fonction effectue une requête API pour collecter la liste des arrivées pour une gare spécifique sur un réseau spécifique '''

    base_url = "https://api.sncf.com/v1/coverage/sncf"
    #Requete sans le filtre sur les trains
    requete = f"{base_url}/stop_areas/{code_gare}/networks/{code_reseau}/arrivals?from_datetime={date}&count={nb_gare}"
    reponse = requests.get(requete, auth=(api_key,""))
    reponse_json = reponse.json()
    
    return reponse_json

In [9]:
def verification_dates(reponse_API,code_gare,code_reseau,date_requete,date_max:str):
    '''Cette fonction permet de vérifier si toutes les arrivées comprises dans une réponse API sont avant la date max
    Si c'est le cas la fonction retourne la réponse API initiale et la date de la dernière arrivée de la réponse initiale
    Si ce n'est pas le cas, elle refait un appel API en faisant une requete comprennant uniquement les dates antérieurs à la date max
    La fonction retourne alors la nouvelle réponse API et la date de la dernière arrivée comprise dans la réponse initiale'''
    datetime_max = convertir_en_datetime(date_max)
    arrivees_reponse=reponse_API["arrivals"]
    #Gestion du cas ou la requete ne revoie pas d'arrivée
    if len(arrivees_reponse) == 0:
        #On retourne des liste vide et un datetime qui sera forcement supérieur à la date max
        return [],[],datetime.datetime(9999,1,1,12,12,00)
    datetime_derniere_requete = convertir_en_datetime(arrivees_reponse[-1]["stop_date_time"]["arrival_date_time"])
    if datetime_derniere_requete > datetime_max:
        for compteur,arrivee in enumerate(arrivees_reponse):
            arrivee_datetime = convertir_en_datetime(arrivee["stop_date_time"]["arrival_date_time"])
            if arrivee_datetime > datetime_max:
                reponse = requete_api(code_gare,code_reseau,date_requete,compteur)
                return reponse["arrivals"],reponse["disruptions"], datetime_derniere_requete
    else:
        return arrivees_reponse,reponse_API["disruptions"], datetime_derniere_requete

In [10]:
def requete_entre_dates(code_gare,code_reseau,date_min,date_max,liste_arrivee,liste_perturbation,compteur_requete):
    '''Cette fonction permet de faire des requêtes pour récupérer des données sur tous les enregistrements de la journée.
    Elle sépare en deux listes les informations concernant les départs et les informations concernant les arrivées'''

    date_requete = date_min

    while date_requete < date_max:
        # Requete api
        reponse_api = requete_api(code_gare,code_reseau,date_requete,10)
        compteur_requete += 1
        arrivees,perturbations,date_derniere_requete = verification_dates(reponse_api,code_gare,code_reseau,date_requete,date_max)
        # Ajoute chaque arrivées de la requete à la liste
        [liste_arrivee.append(arrivee) for arrivee in arrivees]
        # Ajoute chaque perturbations de la requete à la liste
        [liste_perturbation.append(perturbation)  for perturbation in perturbations]
        
        date_requete = convertir_en_string(date_derniere_requete + datetime.timedelta(seconds=1))
        
    return liste_arrivee,liste_perturbation,compteur_requete

In [11]:
def liste_id(nom_fichier):
    '''Cette fonction ouvre un fichier json et récupère une liste de toutes les clés d'un dictionnaire
    Il retourne le fichier json dans une variable et la liste de toutes les clés'''

    #Ouverture du fichier csv
    with open(nom_fichier,"r") as jsonfil:
        data_gare = json.load(jsonfil)
    toutes_id = data_gare["id"]
    liste_cles = []
    # J'ajoute toutes les clés du dictionnaire id dans une liste
    [liste_cles.append(cle) for cle in toutes_id.keys()]
    return data_gare,liste_cles

In [12]:
@dataclass
class Arrivee:
    arrivee_id:                         str
    gare_id:                            str | None
    date_arrive:                        datetime.date | None
    heure_arrivee_prevue:               datetime.time | None
    heure_arrivee:                      datetime.time | None
    retard:                             datetime.timedelta | None
    network:                            str | None
    ligne:                              str | None
    trip:                               str | None
    direction:                          str | None
    disruption_id:                      str | None

In [13]:
@dataclass
class Perturbation:
    perturbation_id:                        str | None
    debut:                                  datetime.datetime | None
    fin:                                    datetime.datetime | None
    effet:                                  str | None
    message_display:                        str | None

In [14]:
def extraire_donnees(data,liste_cles:list):
    '''Cette fonction permet d'extraire une donnée dans un dictionnaire'''
    try:
        for cle in liste_cles:
            #Cas ou la donnée est stockée dans un dictionnaire
            if type(data) is dict:
                data = data[cle]
            #Cas ou la données est stockée dans une liste, alors on prend le premier element de la liste
            else:
                data=data[0][cle]
        return data
    except (KeyError,IndexError):
        return None

In [15]:
def calcul_retard(json_arrivee):
    ''' Cette fonction calcul le retard entre l'horaire prévue et l'horaire d'arrivée, si le train est à l'heure ou en avance la fonction renvoie None'''
    heure_prevue = convertir_en_datetime(extraire_donnees(json_arrivee,["stop_date_time","base_arrival_date_time"]))
    heure_arrivee = convertir_en_datetime(extraire_donnees(json_arrivee,["stop_date_time","arrival_date_time"]))
    try:
        if heure_prevue < heure_arrivee:
            #Le train est en retard
            retard = heure_arrivee - heure_prevue
            return retard
        else:
            #Le train est à l'heure ou en avance
            return None
    #Cas ou un des horaire est manquant
    except TypeError:
        return None

In [16]:
def transformation_heure(datetime:datetime):
    '''Transforme un objet datetime en objet time'''
    try:
        return datetime.time()
    except AttributeError:
        return None

In [17]:
def collecte_donnees_arrivee(json_arrivee):
    '''Cette fonction instancie une arrivée et remplie les champs en extrayant les données d'un dictionnaire'''
    arrivee = Arrivee(
        arrivee_id = extraire_donnees(json_arrivee,["stop_point","stop_area","id"]).split(":")[-1]+"-" \
            + convertir_en_datetime(extraire_donnees(json_arrivee,["stop_date_time","arrival_date_time"])).strftime("%Y%m%d") + "-"\
            + extraire_donnees(json_arrivee,["display_informations","trip_short_name"]),
        gare_id = extraire_donnees(json_arrivee,["stop_point","stop_area","id"]),
        date_arrive = convertir_en_datetime(extraire_donnees(json_arrivee,["stop_date_time","arrival_date_time"])).date(),
        heure_arrivee_prevue = transformation_heure(convertir_en_datetime(extraire_donnees(json_arrivee,["stop_date_time","base_arrival_date_time"]))),
        heure_arrivee = transformation_heure(convertir_en_datetime(extraire_donnees(json_arrivee,["stop_date_time","arrival_date_time"]))),
        retard = calcul_retard(json_arrivee),
        network = extraire_donnees(json_arrivee,["display_informations","network"]),
        ligne = extraire_donnees(json_arrivee,["display_informations","label"]),
        trip = extraire_donnees(json_arrivee,["display_informations","trip_short_name"]),
        direction = extraire_donnees(json_arrivee,["display_informations","direction"]),
        disruption_id = extraire_donnees(json_arrivee,["display_informations","links","id"])
    )
    return asdict(arrivee)

In [18]:
def collecte_donnees_perturbation(json_perturbation):
    '''Cette fonction instancie une perturbation et remplie les champs en extrayant les données d'un dictionnaire'''
    perturbation = Perturbation(
        perturbation_id = extraire_donnees(json_perturbation,["id"]),
        debut = convertir_en_datetime(extraire_donnees(json_perturbation,["application_periods","begin"])),
        fin = convertir_en_datetime(extraire_donnees(json_perturbation,["application_periods","end"])),
        effet = extraire_donnees(json_perturbation,["severity","effect"]),
        message_display = extraire_donnees(json_perturbation,["messages","text"])
    )
    return asdict(perturbation)

In [19]:
def stockage_en_bdd(nom_fichier:str):
    '''Cette fonction place des données dans un dataframe et stocke le dataframe dans une base de donnée'''

    try:
        #Chargement des données
        df = pd.read_csv(nom_fichier)
    except FileNotFoundError:
        print(f"Pas de fichier {nom_fichier}")
        return None


    #Dictionnaire contenant les schéma de données
    schema_donnees = {"schema_arrivee" : {
        "arrivee_id": String(255),
        "gare_id": String(255),
        "date_arrive": Date,
        "heure_arrivee_prevue": Time,
        "heure_arrivee": Time,
        "retard": Time,
        "network": String(255),
        "ligne": String(255),
        "trip": String(255),
        "direction": String(255),
        "disruption_id": String(255)},
        "schema_perturbation":{
        "perturbation_id" : String(255),
	    "debut" : DATETIME,
	    "fin" : DATETIME,
	    "effet" : String(255),
	    "message" : String(255) }
        }

    # En fonction du nom de fichier je détermine la clé du dictionnaire schema_donnees
    if nom_fichier.startswith("data/arrivees"):
        cle_dictionnaire = "schema_arrivee"
        nom_table = "arrivees"
    else:
        cle_dictionnaire = "schema_perturbation"
        nom_table = "perturbations"

    #Connexion à la base de données
    con_string = f"mysql+pymysql://root:{db_password}@localhost:{DB_PORT}/APP_SNCF"
    engine = create_engine(con_string,echo=False)
    
    try:
        #Ajout des données à la base de donnée
        with engine.connect() as con:
            df.to_sql(nom_table,con,if_exists="append",index=False, dtype=schema_donnees[cle_dictionnaire])
    except:
        con.rollback()
        print("Fait un petit rollback")
        raise

    print(f"Enregistrement dans la BDD de {len(df.axes[0])} {nom_table}")

In [20]:
def stockage_en_bdd_perturbations(nom_fichier:str):
    '''Cette fonction permet de charger un fichier csv dans une BDD Mysql en utilisant pandad
    Elle insert les données ligne par ligne pour gerer les cas ou la donnée est déjà stocker dans la BDD'''

    try:
        #Chargement des données
        df = pd.read_csv(nom_fichier)
    except FileNotFoundError:
        print(f"Enregistrement dans la BDD de 0 perturbations")
        return None
    
    schema_perturbation = {
        "perturbation_id" : String(255),
	    "debut" : DATETIME,
	    "fin" : DATETIME,
	    "effet" : String(255),
	    "message" : String(255) }
    
    #Connexion à la base de données
    con_string = f"mysql+pymysql://root:{db_password}@localhost:{DB_PORT}/APP_SNCF"
    engine = create_engine(con_string,echo=False)

    #Ajout des données à la base de donnée

    compteur_enregistrement = 0

    with engine.connect() as con:
        for i in range(len(df)):
            try:
                df.iloc[i:i+1].to_sql("perturbations",con,if_exists="append",index=False, dtype=schema_perturbation)
                compteur_enregistrement += 1
            except IntegrityError:
                #La cle primaire est deja présente dans la BDD
                pass
            except:
                #Les autres erreurs
                con.rollback()
                print("Fait un petit rollback")
                raise
    
    print(f"Enregistrement dans la BDD de {compteur_enregistrement} perturbations")
    

In [21]:
def run(data_gare,cle,date_min,date_max):
    '''Cette fontion lance les fonctions pour traiter les données d'une gare'''
    code_gare = data_gare["id"].get(cle)
    nom_gare = data_gare["nom"].get(cle)
    print(f"Debut des requetes pour la gare: {nom_gare}")

    compteur_requete = 0
    liste_code_reseau = []
    liste_arrivees = []
    liste_perturbations = []

    #Récupération des identifiant réseaux pour la gare en cours
    [liste_code_reseau.append(reseau["id"]) for reseau in data_gare["networks"].get(cle)]

    # Je traite réseau par réseau
    for reseau in liste_code_reseau:
        liste_arrivees, liste_perturbations,compteur_requete = requete_entre_dates(code_gare,reseau,date_min,date_max,liste_arrivees,liste_perturbations,compteur_requete)

    #Sauvegarde données brutes
    nom_fichier_arrivees = f"data/arrivees/{code_gare}-{date_max}.json".replace(":","_")
    nom_fichier_perturbations = f"data/perturbations/{code_gare}-{date_max}.json".replace(":","_")
    to_json(liste_arrivees,nom_fichier_arrivees)
    to_json(liste_perturbations,nom_fichier_perturbations)
    
    #Nettoyage des données
    liste_arrivees_clean=[]
    for arrivee in liste_arrivees:
        data_clean_arrivees = collecte_donnees_arrivee(arrivee)
        liste_arrivees_clean.append(data_clean_arrivees)    
    liste_perturbations_clean=[]
    for perturbation in liste_perturbations:
        data_clean_perturbation =collecte_donnees_perturbation(perturbation)
        liste_perturbations_clean.append(data_clean_perturbation)
    
    #Sauvegarde des données propres
    nom_ficher_arrivees_clean = f"data/arrivees_propres/{code_gare}-{date_max}.csv".replace(":","_")
    nom_ficher_perturbations_clean = f"data/perturbations_propres/{code_gare}-{date_max}.csv".replace(":","_")
    #Vérifie si les listes contiennent des données
    if liste_arrivees_clean:
        to_csv(liste_arrivees_clean,nom_ficher_arrivees_clean)
    if liste_perturbations_clean:
        to_csv(liste_perturbations_clean,nom_ficher_perturbations_clean)
    #Enregistrement en BDD
    stockage_en_bdd(nom_ficher_arrivees_clean)
    stockage_en_bdd_perturbations(nom_ficher_perturbations_clean)
    return compteur_requete

In [22]:
# Récupération de la clé API
load_dotenv()
api_key = os.getenv(CLEF_API)
db_password = os.getenv("DB_PASSWORD")

In [23]:
# Creation des string date permettant de faire les requete API 
aujourdhui = datetime.date.today()
hier_debut_journee = datetime.datetime(year=aujourdhui.year, month=aujourdhui.month, day=aujourdhui.day-1, hour=0, minute=0 ,second=0)
hier_fin_journee = datetime.datetime(year=aujourdhui.year, month=aujourdhui.month, day=aujourdhui.day-1, hour=23, minute=59 ,second=59)
date_min = convertir_en_string(hier_debut_journee)
date_max = convertir_en_string(hier_fin_journee)

In [24]:
# Récupération des clés dictionnaire contenant les informations des gares
data_gare,liste_cle = liste_id("data/top200gare.json")
# Exécution du run pour chaque gare
compteur_total = 0
for cle in liste_cle[:200]:
    compteur = run(data_gare,cle,date_min,date_max)
    compteur_total += compteur
    print(f"{compteur} requêtes effectuées pour un total de {compteur_total} requêtes")

Debut des requetes pour la gare: Paris Nord
Enregistrement dans la BDD de 658 arrivees
70 requêtes effectuées pour un total de 70 requêtes
Debut des requetes pour la gare: Paris Saint-Lazare
Enregistrement dans la BDD de 411 arrivees
43 requêtes effectuées pour un total de 113 requêtes
Debut des requetes pour la gare: Paris - Gare de Lyon - Hall 1 & 2
Enregistrement dans la BDD de 700 arrivees
76 requêtes effectuées pour un total de 189 requêtes
Debut des requetes pour la gare: Paris - Montparnasse - Hall 1 & 2
Enregistrement dans la BDD de 214 arrivees
24 requêtes effectuées pour un total de 213 requêtes
Debut des requetes pour la gare: Paris Est
Enregistrement dans la BDD de 178 arrivees
23 requêtes effectuées pour un total de 236 requêtes
Debut des requetes pour la gare: Lyon Part Dieu
Enregistrement dans la BDD de 348 arrivees
la clef primaire existe deja
la clef primaire existe deja
43 requêtes effectuées pour un total de 279 requêtes
Debut des requetes pour la gare: La Défense
En