_Note préliminaire_ : Il est possible d'utiliser cudf.pandas pour accélérer les opérations tabulaires de Pandas sur GPU. La cellule ci-dessous est facultative, mais (une fois le code décommenté) elle installe et charge cuDF pour **CUDA 12** (version de CUDA utilisée par les GPU du Datalab SSP Cloud). Pour les versions adaptées à CUDA 11, se référer à la [documentation RAPIDS](https://docs.rapids.ai/install/).

In [1]:
!pip install --extra-index-url=https://pypi.nvidia.com cudf-cu12==24.12.* dask-cudf-cu12==24.12.* cuml-cu12==24.12.* cugraph-cu12==24.12.*

%load_ext cudf.pandas

Looking in indexes: https://pypi.org/simple, https://pypi.nvidia.com


La cellule ci-dessous installe les dépendances du projet.

In [2]:
!pip install -q -r ../requirements.txt

[31mERROR: Ignored the following versions that require a different python version: 1.21.2 Requires-Python >=3.7,<3.11; 1.21.3 Requires-Python >=3.7,<3.11; 1.21.4 Requires-Python >=3.7,<3.11; 1.21.5 Requires-Python >=3.7,<3.11; 1.21.6 Requires-Python >=3.7,<3.11[0m[31m
[0m[31mERROR: Could not find a version that satisfies the requirement pywin32==308 (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for pywin32==308[0m[31m
[0m

# Pré-traitement des données

Les données issues du scraping de l'API IDF Mobilités ne constituent pas un jeu de données en tant que tel. Le rôle de ce notebook est de transformer ces données brutes, stockées sur le SSP Cloud, en un jeu de données exploitable. Les fichiers JSON contiennent des informations sur les perturbations du réseau francilien, comme leurs causes, gravités et périodes d'application (il faut noter la quantité d'information donnée par l'API — les champs disponibles dans la réponse — varie d'une requête à une autre). Ces perturbations impactent certains objets, entités spécifiques du réseau : lignes de transport, ou arrêts sur ces lignes, permettant de localiser précisément les perturbations.

On commence par importer les modules dont on aura besoin (notamment la configuration, json, pandas, tqdm pour avoir un suivi de la progression des opérations) et lister les chemins des fichiers JSON sur le S3 (résultats du scraping période).

In [3]:
import config
import json
import pandas as pd
from tqdm import tqdm

# Système de fichiers S3 configuré via config
fs = config.fs

# Récupération des chemins des fichiers .json dans le stockage S3
file_paths = [fp for fp in fs.ls(config.MINIO_ROOT) if fp.endswith(".json")]

# Initialisation des structures de données
all_results = []  # Stocke toutes les dates de la requête
all_disruptions = []  # Stocke tous les ID de perturbations
all_objects = []  # Stocke tous les ID des objets impactés

# Données finales à transformer en DataFrame
results_data = []
disruptions_data = []
objects_data = []
objects_disruptions_data = []

# Ensembles pour éviter les doublons
seen_results = set()              # last_updated
seen_disruptions = set()          # (disruption_id, begin, end)
seen_objects = set()              # object_id
seen_objects_disruptions = set()  # (object_id, disruption_id)

On va ensuite lire ces fichiers, filtrer les doublons, et structurer les données en trois tables principales : une pour les perturbations, une pour les objets impactés (lignes, stations), et une faisant la jointure entre les deux : un lien objet-perturbation, correspondant à une perturbation unique sur une période unique et pour un objet impacté unique. Ainsi, une perturbation sur deux périodes et impactant 3 objets (par exemple des travaux sur deux week-ends et affectant 3 lignes) correspondra à 6 liens objet-perturbation.

In [4]:
with tqdm(total=len(file_paths), desc="Traitement des fichiers JSON", unit="fichier") as pbar:
    for file_path in file_paths:
        try:
            # Lecture du fichier JSON
            with fs.open(file_path, "r", encoding="ascii") as f:
                data = json.loads(f.read())

                # Extraction de la date de la requête
                last_updated = data.get("lastUpdatedDate", None)

                all_results.append(last_updated)
                if last_updated not in seen_results:
                    seen_results.add(last_updated)
                    results_data.append(last_updated)

                # Traitement des perturbations
                for d in data.get("disruptions", []):
                    disruption_id = d.get("id")
                    all_disruptions.append(disruption_id)
                    for p in d.get("applicationPeriods", []):
                        key = (disruption_id, p.get("begin"), p.get("end"))
                        if key not in seen_disruptions:
                            seen_disruptions.add(key)
                            disruptions_data.append({
                                "disruption_id": disruption_id,
                                "begin": p.get("begin"),
                                "end": p.get("end"),
                                "lastUpdate": d.get("lastUpdate"),
                                "cause": d.get("cause"),
                                "severity": d.get("severity"),
                                "title": d.get("title"),
                                "message": d.get("message"),
                                "file_lastUpdatedDate": last_updated,
                            })

                # Traitement des lignes et objets impactés
                for l in data.get("lines", []):
                    line_info = {
                        "line_id": l.get("id"),
                        "line_name": l.get("name"),
                        "line_shortName": l.get("shortName"),
                        "line_mode": l.get("mode"),
                        "line_networkId": l.get("networkId"),
                        "file_lastUpdatedDate": last_updated,
                    }
                    for o in l.get("impactedObjects", []):
                        object_id = o.get("id")
                        all_objects.append(object_id)
                        if object_id not in seen_objects:
                            seen_objects.add(object_id)
                            objects_data.append({
                                **line_info,
                                "object_id": object_id,
                                "object_name": o.get("name"),
                                "object_type": o.get("type"),
                            })

                        for disruption_id in o.get("disruptionIds", []):
                            key = (object_id, disruption_id)
                            if key not in seen_objects_disruptions:
                                seen_objects_disruptions.add(key)
                                objects_disruptions_data.append({
                                    **line_info,
                                    "object_id": object_id,
                                    "object_name": o.get("name"),
                                    "object_type": o.get("type"),
                                    "disruption_id": disruption_id,
                                })
        except Exception as e:
            print(f"Une erreur est survenue avec le fichier : {file_path}")
            raise e
        finally:
            pbar.update(1)

Traitement des fichiers JSON: 100%|██████████| 2357/2357 [02:47<00:00, 14.10fichier/s]


Les données ainsi nettoyées sont converties en DataFrames pandas. Créer des DataFrames plus rudimentaires puis les raffiner s'avère être un processus très complexe du fait d'objets imbriqués et de listes de longueurs variables dans nos données brutes. On affiche les comptes d'entités (doublons compris et éliminés) traités.

In [5]:
df_disruptions = pd.DataFrame(disruptions_data)
df_objects = pd.DataFrame(objects_data)
df_objects_disruptions = pd.DataFrame(objects_disruptions_data)

# Résumé final
print("Résultats totaux (tous) :", len(all_results))
print("Total de perturbations traitées (toutes) :", len(all_disruptions))
print("Total d'objets impactés traités (tous) :", len(all_objects))

print("#####################")

print("Résultats totaux (sans doublons) :", len(results_data))
print("Total de perturbations traitées (sans doublons):", len(disruptions_data))
print("Total d'objets impactés traités (sans doublons) :", len(objects_data))
print("Total de liens objet-perturbation traités :", len(objects_disruptions_data))

Résultats totaux (tous) : 2357
Total de perturbations traitées (toutes) : 1724401
Total d'objets impactés traités (tous) : 5476570
#####################
Résultats totaux (sans doublons) : 2357
Total de perturbations traitées (sans doublons): 30177
Total d'objets impactés traités (sans doublons) : 7570
Total de liens objet-perturbation traités : 102807


On comprend ici que l'API IDF Mobilités est mise à jour plus régulièrement que notre fréquence de scraping (aucun doublon dans les résultats des appels API). Cela veut dire qu'il n'est pas impossible que nous ayons manqué des perturbations de très courte durée sur la période considérée. Nous garderons cela en tête pour l'analyse des données.

La déduplication a toutefois été très importante pour les données sur les perturbations et sur les lignes, ce qui était attendu. Avec 19757 perturbations différentes dans notre jeu de données établi sur 3 semaines. Nous avons assez de données pour faire une analyse intéressante, bien que l'idéal serait de produire un outil permettant une analyse continue et automatisée des perturbations fournies par l'API. Avant tout, enregistrons ce jeu de données fraîchement généré.

In [6]:
df_objects_disruptions = df_objects_disruptions.merge(df_disruptions, on=["disruption_id"])

In [7]:
df_disruptions.to_feather("data/disruptions.feather")
df_objects.to_feather("data/objects.feather")
df_objects_disruptions.to_feather("data/objects_disruptions.feather")

