# ETL Accidents

In [8]:
from dotenv import load_dotenv
import pandas as pd
import requests
import json
import os

load_dotenv(dotenv_path="../.env")
host = os.getenv("POSTGRES_HOST")
port = os.getenv("POSTGRES_PORT")
db = os.getenv("POSTGRES_DB")
user = os.getenv("POSTGRES_USER")
password = os.getenv("POSTGRES_PASSWORD")


## 1 - Extraction parquet


In [6]:
import requests

# URL Parquet de ton dataset
dataset_id = "accidents-corporels-de-la-circulation-millesime"
parquet_url = f"https://public.opendatasoft.com/api/explore/v2.1/catalog/datasets/{dataset_id}/exports/parquet"

# T√©l√©charger le Parquet directement
response = requests.get(parquet_url, timeout=60)
if response.status_code != 200:
    raise Exception(f"Erreur t√©l√©chargement Parquet: {response.status_code}")

In [16]:
import io
# Lire le contenu Parquet en DataFrame directement depuis le flux m√©moire
parquet_data = response.content
raw_dataframe = pd.read_parquet(io.BytesIO(parquet_data))

## Extraction Pagin√©e: Non utilis√©

In [14]:
import requests
import pandas as pd
import time

def fetch_accidents_paginated(limit=500):
    """
    R√©cup√®re les donn√©es de l'API OpenDataSoft en JSON,
    en utilisant une pagination via 'start' et 'rows'.
    Chaque chunk est stock√© dans une liste de DataFrames.
    G√®re automatiquement le rate limit (429) avec pause.
    """
    url = "https://public.opendatasoft.com/api/records/1.0/search/"
    dataset_slug = "accidents-corporels-de-la-circulation-millesime@public"

    offset = 0
    all_chunks = []

    while True:
        params = {
            "dataset": dataset_slug,
            "rows": limit,       # nombre de lignes par chunk
            "start": offset,     # offset pour pagination
            "timezone": "Europe/Paris"
        }

        try:
            response = requests.get(url, params=params, timeout=30)

            # Gestion du rate limit
            if response.status_code == 429:
                print("‚è≥ Rate limit atteint, pause 10 secondes...")
                time.sleep(10)
                continue  # recommence la m√™me requ√™te

            if response.status_code != 200:
                print(f"‚ùå Erreur API: {response.status_code}")
                break

            data = response.json().get("records", [])
            if not data:
                print("üîπ Plus de donn√©es √† r√©cup√©rer.")
                break

            # Convertir le chunk JSON en DataFrame
            df_chunk = pd.json_normalize(data)
            all_chunks.append(df_chunk)

            print(f"üîπ Chunk r√©cup√©r√©: {len(df_chunk)} lignes (offset {offset})")

            # Pause courte pour √©viter le rebuke du serveur
            time.sleep(1)

            # Avancer l'offset pour le prochain chunk
            offset += len(df_chunk)

        except Exception as e:
            print(f"‚ùå Erreur lors de la r√©cup√©ration du chunk: {e}")
            break

    return all_chunks

# Utilisation
print("üîç Pagination API des accidents corporels...")
chunks_list = fetch_accidents_paginated(limit=10000)

print("Nombre de chunks r√©cup√©r√©s :", len(chunks_list))

# Optionnel : concat√©ner tous les chunks en un seul DataFrame
# df_accidents = pd.concat(chunks_list, ignore_index=True)


üîç Pagination API des accidents corporels...
üîπ Chunk r√©cup√©r√©: 10000 lignes (offset 0)
‚ùå Erreur API: 400
Nombre de chunks r√©cup√©r√©s : 1


# 2 - Chargement donn√©es brutes dans table RAW - couche Bronze

In [None]:
from sqlalchemy import create_engine

# Connexion PostgreSQL
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}")

# Sauvegarder localement en fichier Parquet (optionnel)
raw_dataframe.to_parquet("accidents.parquet", index=False)

# Nettoyage
if "datetime" in raw_dataframe.columns:
    raw_dataframe.drop(columns=["datetime"], inplace=True)
if "int" in raw_dataframe.columns:
    raw_dataframe.rename(columns={"int": "intsect"}, inplace=True)

# Lecture chunk par chunk pour insertion PostgreSQL
chunksize = 50000
for start in range(0, len(raw_dataframe), chunksize):
    chunk = raw_dataframe.iloc[start:start+chunksize]
    chunk.to_sql("raw_accidents", engine, schema="accidents_bronze", if_exists="append", index=False)
    print(f"Chunk {start//chunksize + 1} ins√©r√© ({len(chunk)} lignes)")

print("‚úÖ Conversion CSV ‚Üí Parquet et insertion termin√©e.")

# 3 - Transformation et insertion - couche Gold

In [9]:
# A utiliser si la table raw est d√©j√† remplie et la connexion n'est plus en m√©moire
from sqlalchemy import create_engine
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}")

### 3.1 - Transformation et insertion: Tables dimension

#### 3.1.1 - Table dim_conditions

S√©paration des op√©rations de chargement de fichiers et d√©finition des fonctions outils dans le but d'all√©ger les cellules de transformation / chargement.
Les fichiers JSON servent √† trouver la correspondance entre des cl√©s faisant r√©f√©rence √† une donn√©e, donc ici, les donn√©es m√©t√©orologiques.

In [10]:
########################
# Outils dim_conditions

import json
from pathlib import Path

# Charger les JSON
with Path("./resources/cond_meteo.json").open("r", encoding="utf-8") as f:
    conditions_data = json.load(f)["conditions_meteo"]
with Path("./resources/cond_lum.json").open("r", encoding="utf-8") as f:
    luminosite_data = json.load(f)["conditions_luminosite"]

# Cr√©er un dictionnaire index√© par code pour acc√®s rapide
luminosite_dict = {cond["code"]: cond for cond in luminosite_data}

# Cr√©er un dictionnaire index√© par code pour acc√®s rapide
conditions_dict = {cond["code"]: cond for cond in conditions_data}

def get_condition_value(code: str, key: str):
    condition = conditions_dict.get(code)
    if condition:
        return condition.get(key)
    return None

def normalize_libelle(libelle):
    if libelle is None:
        return ""
    return str(libelle).strip().lower().replace("\u00A0", " ")

def get_luminosite_code_from_libelle(libelle: str):
    lib_norm = normalize_libelle(libelle)
    for code, data in luminosite_dict.items():
        json_lib_norm = normalize_libelle(data.get("libelle"))
        if lib_norm == json_lib_norm:
            return int(code)
    return None

def get_condition_code_from_libelle(libelle: str):
    lib_norm = normalize_libelle(libelle)
    for code, data in conditions_dict.items():
        json_lib_norm = normalize_libelle(data.get("libelle"))
        if lib_norm == json_lib_norm:
            return int(code)
    return None

def calcul_risque(row):
    est_nuit = row["est_nuit"]
    est_intemperie = row["est_intemperie"]

    if not est_nuit and not est_intemperie:
        return 1  # Plein jour, pas d'intemp√©ries
    elif est_nuit and est_intemperie:
        return 5  # Nuit + intemp√©ries
    elif est_nuit or est_intemperie:
        return 4  # Nuit ou intemp√©ries
    else:
        return 2  # Cas interm√©diaire

In [None]:
#################
# Transform/Load

import pandas as pd
from sqlalchemy import text

# DataFrame vide
df_dim_conditions = pd.DataFrame(columns=[
    "luminosite_code",
    "luminosite_libelle",
    "est_nuit",
    "atm_code",
    "atm_libelle",
    "est_intemperie",
    "niveau_risque"
])

# Flush de la table avant insertion (WORKFLOW DE TEST)
with engine.begin() as conn:
    conn.execute(text("TRUNCATE TABLE accidents_gold.dim_conditions RESTART IDENTITY CASCADE"))

##################################
# Lecture et conversion tol√©rante

with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT lum, atm
        FROM accidents_bronze.raw_accidents
        WHERE lum IS NOT NULL AND atm IS NOT NULL
    """))

    for row in result.mappings():
        lum_raw = row["lum"]
        atm_raw = row["atm"]

        # Conversion tol√©rante : texte ou nombre
        try:
            lum_code = int(lum_raw)
        except (ValueError, TypeError):
            # Chercher le code correspondant au libell√©
            lum_code = next((k for k,v in luminosite_dict.items() if v.get("libelle") == lum_raw), None)

        try:
            atm_code = int(atm_raw)
        except (ValueError, TypeError):
            atm_code = next((k for k,v in conditions_dict.items() if v.get("libelle") == atm_raw), None)

        # Sauter les valeurs non reconnues
        if lum_code is None or atm_code is None:
            continue

        # R√©cup√©rer libell√©s et bool√©ens
        est_nuit_val = bool(luminosite_dict.get(lum_code, {}).get("est_nuit", False))
        est_intemp_val = bool(conditions_dict.get(atm_code, {}).get("est_intemperie", False))
        lum_libelle_val = luminosite_dict.get(lum_code, {}).get("libelle", f"Code {lum_code}")
        atm_libelle_val = conditions_dict.get(atm_code, {}).get("libelle", f"Code {atm_code}")

        # On supprime les duplicats de cl√© composite
        df_dim_conditions.drop_duplicates(subset=['luminosite_code', 'atm_code'], keep='first', inplace=True)
        
        df_dim_conditions.loc[len(df_dim_conditions)] = [
            lum_code,
            lum_libelle_val,
            est_nuit_val,
            atm_code,
            atm_libelle_val,
            est_intemp_val,
            calcul_risque({"est_nuit": est_nuit_val, "est_intemperie": est_intemp_val})
        ]

# S'assurer que les bool√©ens sont bien bool
df_dim_conditions["est_nuit"] = df_dim_conditions["est_nuit"].fillna(False).astype(bool)
df_dim_conditions["est_intemperie"] = df_dim_conditions["est_intemperie"].fillna(False).astype(bool)

# Insertion en batch dans la table gold

# Insertion rapide dans PostgreSQL avec pandas.to_sql
# Si la table existe d√©j√† et a la contrainte UNIQUE, on peut g√©rer le conflit via 'if_exists' et 'method'
df_dim_conditions.to_sql(
    name='dim_conditions',
    con=engine,
    schema='accidents_gold',
    if_exists='append',  # ajoute les lignes
    index=False
)


#### 3.1.2 - Table dim_vehicule

In [3]:
######################
# Outils dim_vehicule

import json
from pathlib import Path

# Chemin vers ton fichier JSON
json_file = Path("./resources/cat_veh.json")

# Charger le JSON dans un dictionnaire Python
with json_file.open("r", encoding="utf-8") as f:
    vehicules_data = json.load(f)["categories"]

# Cr√©er un dictionnaire index√© par code pour acc√®s rapide
vehicules_dict = {veh["code"]: veh for veh in vehicules_data}

def get_vehicule_code_by_label(label):
    label_norm = label.strip().lower()
    for code, veh in vehicules_dict.items():
        libelle = veh.get("libelle")
        if libelle and libelle.strip().lower() == label_norm:
            return code  # retourne la cl√© (code) sous forme de cha√Æne
    return None

In [None]:
#################
# Transform/Load

import pandas as pd

# Dictionnaire de conversion des niveaux de protection en entiers
niveau_protection_map = {
    "Faible": 1,
    "Interm√©diaire": 2,
    "√âlev√©e": 3,
    "Elev√©e": 3  # pour g√©rer absence d'accent si besoin
}

# Flush de la table avant insertion
with engine.begin() as conn:
    conn.execute(text("TRUNCATE TABLE accidents_gold.dim_vehicule RESTART IDENTITY CASCADE"))

# R√©cup√©ration des donn√©es depuis la base brute
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT catv
        FROM accidents_bronze.raw_accidents
        WHERE catv IS NOT NULL
    """))
    temp_df = pd.DataFrame(result.fetchall(), columns=['catv'])

# S√©parer les valeurs multiples et exploser en lignes
temp_df['catv'] = temp_df['catv'].astype(str).apply(lambda x: x.split(',') if pd.notna(x) else [])
temp_df = temp_df.explode('catv').reset_index(drop=True)

# Convertir les libell√©s en codes en utilisant la fonction d√©di√©e
temp_df['catv_code'] = temp_df['catv'].apply(get_vehicule_code_by_label)

# Nettoyage des doublons √©ventuels avant cr√©ation du DF final
# et suppression des lignes sans code
temp_df = temp_df[temp_df['catv_code'].notna()]
temp_df.drop_duplicates(subset=['catv_code'], inplace=True)

# Construire ton DataFrame final avec codes et conversion de niveau_protection
df_dim_vehicule = pd.DataFrame({
    "categorie_code": temp_df['catv_code'],
    "categorie_libelle": temp_df['catv'],
    "type_vehicule": temp_df['catv_code'].astype(str).apply(lambda x: get_vehicule_value(x, "type")),
    "est_motorise": temp_df['catv_code'].astype(str).apply(lambda x: get_vehicule_value(x, "libelle") != "Bicyclette"),
    "niveau_protection": temp_df['catv_code'].astype(str).apply(lambda x: get_vehicule_value(x, "niveau_protection"))
})

# Appliquer la conversion des libell√©s 'niveau_protection' en entiers
df_dim_vehicule['niveau_protection'] = df_dim_vehicule['niveau_protection'].map(niveau_protection_map)

# Insertion en base via to_sql
df_dim_vehicule.to_sql(
    name='dim_vehicule',
    con=engine,
    schema='accidents_gold',
    if_exists='append',
    index=False,
    method='multi'
)


#### 3.1.3- Table dim_routes

In [None]:
#################
# Transform/Load

import pandas as pd
from sqlalchemy import text

# Import des fonctions de r√©f√©rence
from refs_routes import (
    get_trace_plan_df,
    get_categorie_route_df,
    get_profil_route_df,
    get_etat_surface_df
)

# Initialisation du DataFrame
df_dim_route = pd.DataFrame(columns=[
    "categorie_route_code",
    "categorie_route_libelle",
    "profil_route_code",
    "profil_route_libelle",
    "trace_plan_code",
    "trace_plan_libelle",
    "etat_surface_code",
    "etat_surface_libelle",
    "niveau_risque_route"
])

# Fonction calcul_risque mise √† jour
def calcul_risque(row):
    if row["surf"] == 7:
        return 5  # Risque maximum : verglac√©
    elif row["prof"] == 3 and row["plan"] == 4:
        return 4  # Risque √©lev√© : sommet + virage en S
    elif row["surf"] in (2, 3, 4, 5, 6):
        return 3  # Risque interm√©diaire : surfaces d√©grad√©es
    elif row["surf"] == 1 and row["prof"] in (2, 4):
        return 2  # Risque faible : conditions correctes mais pas parfaites
    else:
        return 1  # Normal : plat et route normale


# R√©cup√©rer des codes √† partir des libell√©s
def get_code_from_df(df, libelle, libelle_col="libelle", code_col="code"):
    libelle_str = str(libelle).strip()
    df[libelle_col] = df[libelle_col].astype(str).str.strip()
    result = df.loc[df[libelle_col] == libelle_str, code_col]
    return result.iloc[0] if not result.empty else None

# Flush de la table avant insertion
with engine.begin() as conn:
    conn.execute(text("TRUNCATE TABLE accidents_gold.dim_route RESTART IDENTITY CASCADE"))

# Connexion et r√©cup√©ration des donn√©es
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT catr, prof, plan, surf
        FROM accidents_bronze.raw_accidents
        WHERE catr IS NOT NULL
    """)).mappings()

    # DataFrames de r√©f√©rence
    df_trace_plan = get_trace_plan_df()
    df_categorie_route = get_categorie_route_df()
    df_profil_route = get_profil_route_df()
    df_etat_surface = get_etat_surface_df()

    # Construction du DataFrame
    for row in result.fetchall():
        df_dim_route.loc[len(df_dim_route)] = [
            get_code_from_df(df_categorie_route, row["catr"]),
            row["catr"],
            get_code_from_df(df_profil_route, row["prof"]),
            row["prof"],
            get_code_from_df(df_trace_plan, row["plan"]),
            row["plan"],
            get_code_from_df(df_etat_surface, row["surf"]),
            row["surf"],
            calcul_risque(row)
        ]

df_dim_route.to_sql(
    name='dim_route',
    con=engine,
    schema='accidents_gold',
    if_exists='append',  # ajoute les lignes
    index=False,
    method='multi'       # ins√®re par lots, beaucoup plus rapide
)


#### 3.1.4 - Table dim_date

In [None]:
#################
# Transform/Load

import pandas as pd
import holidays
from sqlalchemy import text
import locale
from sqlalchemy.dialects.postgresql import insert

def upsert_postgres(table, conn, keys, data_iter):
    data = [dict(zip(keys, row)) for row in data_iter]
    insert_stmt = insert(table.table).values(data)
    upsert_stmt = insert_stmt.on_conflict_do_nothing(index_elements=['date_id'])
    conn.execute(upsert_stmt)
    
# DataFrame vide
df_dim_date = pd.DataFrame(columns=[
    "date_id",
    "date_complete",
    "annee",
    "mois",
    "jour",
    "trimestre",
    "jour_semaine",
    "nom_jour",
    "nom_mois",
    "semaine_annee",
    "jour_annee",
    "est_weekend",
    "est_jour_ferie",
    "nom_jour_ferie",
    "saison"
])

def get_saison(mois):
    if mois in [12, 1, 2]:
        return "Hiver"
    elif mois in [3, 4, 5]:
        return "Printemps"
    elif mois in [6, 7, 8]:
        return "√ât√©"
    else:
        return "Automne"

# Flush de la table avant insertion
with engine.begin() as conn:
    conn.execute(text("TRUNCATE TABLE accidents_gold.dim_date RESTART IDENTITY CASCADE"))

# Connexion et r√©cup√©ration des dates depuis la table raw
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT an AS annee, mois, jour
        FROM accidents_bronze.raw_accidents
        WHERE an IS NOT NULL AND mois IS NOT NULL AND jour IS NOT NULL
    """)).mappings()

    fr_holidays = holidays.France(years=[2025])  # adapter l'ann√©e si besoin

    for row in result.fetchall():
        annee = row["annee"]
        mois = row["mois"]
        jour = row["jour"]
        date_complete = pd.Timestamp(year=int(annee), month=int(mois), day=int(jour))

        jour_semaine = date_complete.isoweekday()  # 1=Lundi, 7=Dimanche
        est_weekend = jour_semaine in [6,7]
        est_jour_ferie = date_complete in fr_holidays
        nom_jour_ferie = fr_holidays.get(date_complete) if est_jour_ferie else None

        df_dim_date.loc[len(df_dim_date)] = [
            int(date_complete.strftime("%Y%m%d")),  # date_id
            date_complete,
            annee,
            mois,
            jour,
            date_complete.quarter,
            jour_semaine,
            date_complete.day_name(),
            date_complete.month_name(),
            date_complete.isocalendar().week,
            date_complete.dayofyear,
            est_weekend,
            est_jour_ferie,
            nom_jour_ferie,
            get_saison(mois)
        ]

df_dim_date.to_sql(
    name='dim_date',
    con=engine,
    schema='accidents_gold',
    if_exists='append',  # ajoute les lignes
    index=False,
    method=upsert_postgres
)