# Notebook 99 — Schema Definition (ETL)

## Objectif

Ce notebook formalise le **schéma logique et conceptuel** des données du projet **Formula 1 Predictive Assistant (F1PA)** selon une approche **ETL (Extract – Transform – Load)**.

Le but est de définir :
- les **artefacts attendus** à chaque étape (Extract / Transform / Load),
- les **niveaux de granularité** (grain),
- les **clés primaires (PK)** et **clés étrangères (FK)**,
- les **relations** entre entités,
- les **conventions de nommage** et règles minimales de traçabilité.

Ce notebook **n’implémente pas** :
- de transformations opérationnelles (jointures, agrégations),
- de feature engineering final,
- d’entraînement de modèle.

Il fournit un **contrat de données** qui guidera la suite du projet (scripts Python + orchestration Airflow + chargement PostgreSQL).

## Contexte (acquis figés)

### Sources et granularités

- **OpenF1 (API REST)** : granularité **lap-level** (1 ligne = 1 tour)
  - cible : `lap_duration`
  - clé composite validée : `meeting_key`, `session_key`, `driver_number`, `lap_number`
  - indicateur métier : `is_pit_out_lap` (tours de sortie des stands)

- **Meteostat (CSV)** : granularité **hour-level** (1 ligne = 1 heure, 1 station)
  - variables météo : `temp, rhum, pres, wspd, wdir, prcp, cldc, coco` (+ `*_source`)

- **Wikipedia (scraping HTML)** : référentiel **circuits**
  - attributs : nom, pays, localité, latitude, longitude, URL source

### Périmètre temporel

Saisons **2022 à 2025**, cohérent sur l’ensemble des sources, et ère technique de "l'effet de sol" pour les voitures.

## Principes ETL (à respecter dans tout le projet)

### 1) Extract

- Collecter les données depuis les sources **sans transformation métier**.
- Conserver la structure proche de la source.
- Assurer la **traçabilité** (source, date de collecte, fichier d’origine).

### 2) Transform

- Nettoyer, conformer et rendre **joinables** les jeux de données.
- Matérialiser explicitement :
  - le référentiel circuits,
  - le pivot sessions,
  - l’enrichissement lap-level avec la météo.
- Appliquer les règles métier documentées (ex. exclusion des tours pit-out pour le dataset ML).

### 3) Load

- Charger les données transformées dans une **base PostgreSQL locale**.
- Publier un modèle relationnel simple et stable (dimensions + faits) utilisable par :
  - le ML,
  - l’API,
  - l’application Streamlit.

## Conventions de nommage

### Artefacts / datasets

- **Extract** : `extract_<source>_<objet>`
  - ex. `extract_openf1_laps`, `extract_meteostat_hourly`, `extract_wikipedia_circuits`

- **Transform** : `transform_<objet>`
  - ex. `transform_circuits`, `transform_sessions`, `transform_laps_enriched`

- **Load (PostgreSQL)** : schéma `f1pa` + tables `dim_*` / `fact_*`
  - ex. `f1pa.dim_circuit`, `f1pa.dim_session`, `f1pa.fact_lap`

In [1]:
import pandas as pd

In [2]:
schema_spec_columns = [
    "object_name",     # nom logique de l'objet (dataset/table)
    "etl_stage",       # extract / transform / load
    "grain",           # niveau de granularité
    "primary_key",     # clé primaire (conceptuelle ou matérialisée)
    "foreign_keys",    # clés étrangères
    "source_objects",  # objets sources
    "description",     # rôle de l'objet
    "key_decisions"    # décisions structurantes
]

schema_spec = pd.DataFrame(columns=schema_spec_columns)
schema_spec

Unnamed: 0,object_name,etl_stage,grain,primary_key,foreign_keys,source_objects,description,key_decisions


## Méthode de travail dans ce notebook

Nous construisons progressivement le schéma cible en 3 étapes :

1. **Extract** : formaliser les objets issus des sources (sans jointures ni filtrage métier).
2. **Transform** : définir les objets conformés et enrichis (joinables, règles métier explicites).
3. **Load** : définir les tables PostgreSQL finales (dimensions + faits), prêtes pour ML / API.

À chaque étape, on valide :
- la granularité (grain),
- les clés (PK/FK),
- les relations entre objets,
- la traçabilité.

----------

## Extract — Objets existants (données brutes)

L’étape **Extract** contient les données telles qu’obtenues depuis les sources (API, fichiers, scraping).

Principes :
- pas de jointure entre sources,
- pas d’agrégation métier,
- pas de filtrage métier (ex. pit-out conservés),
- structure proche de la donnée d’origine.

In [3]:
extract_rows = [
    {
        "object_name": "extract_openf1_laps",
        "etl_stage": "extract",
        "grain": "1 ligne = 1 tour (lap) pour un pilote dans une session",
        "primary_key": "meeting_key + session_key + driver_number + lap_number (clé logique)",
        "foreign_keys": "",
        "source_objects": "OpenF1 API (laps endpoint)",
        "description": "Données lap-level OpenF1, incluant lap_duration, secteurs, indicateurs pit/out, sans jointure externe.",
        "key_decisions": "Aucun filtrage métier en Extract (ex. pit-out conservés). Variable cible potentielle = lap_duration."
    },
    {
        "object_name": "extract_meteostat_hourly",
        "etl_stage": "extract",
        "grain": "1 ligne = 1 heure pour une station Meteostat",
        "primary_key": "station_id + (year,month,day,hour) (clé logique)",
        "foreign_keys": "",
        "source_objects": "Fichiers CSV Meteostat (hourly)",
        "description": "Données météo horaires (temp, rhum, pres, wspd, wdir, prcp, cldc, coco + *_source).",
        "key_decisions": "Aucune jointure ni agrégation en Extract. Provenances *_source conservées pour audit/qualité."
    },
    {
        "object_name": "extract_wikipedia_circuits",
        "etl_stage": "extract",
        "grain": "1 ligne = 1 circuit (référentiel géographique)",
        "primary_key": "circuit_url (clé logique)",
        "foreign_keys": "",
        "source_objects": "Scraping Wikipedia (liste + pages circuits)",
        "description": "Référentiel géographique des circuits (nom, pays, localité, lat, lon, url).",
        "key_decisions": "Pas de correction manuelle en Extract. Valeurs manquantes acceptées et documentées."
    }
]

schema_spec = pd.concat([schema_spec, pd.DataFrame(extract_rows)], ignore_index=True)
schema_spec[schema_spec["etl_stage"] == "extract"]

Unnamed: 0,object_name,etl_stage,grain,primary_key,foreign_keys,source_objects,description,key_decisions
0,extract_openf1_laps,extract,1 ligne = 1 tour (lap) pour un pilote dans une...,meeting_key + session_key + driver_number + la...,,OpenF1 API (laps endpoint),"Données lap-level OpenF1, incluant lap_duratio...",Aucun filtrage métier en Extract (ex. pit-out ...
1,extract_meteostat_hourly,extract,1 ligne = 1 heure pour une station Meteostat,"station_id + (year,month,day,hour) (clé logique)",,Fichiers CSV Meteostat (hourly),"Données météo horaires (temp, rhum, pres, wspd...",Aucune jointure ni agrégation en Extract. Prov...
2,extract_wikipedia_circuits,extract,1 ligne = 1 circuit (référentiel géographique),circuit_url (clé logique),,Scraping Wikipedia (liste + pages circuits),"Référentiel géographique des circuits (nom, pa...",Pas de correction manuelle en Extract. Valeurs...


### Remarques de cohérence (Extract)

- **OpenF1 laps** est la source “cœur ML” : future table de faits lap-level.
- **Meteostat** est horaire : l’alignement météo ↔ tours dépendra d’un pivot session (circuit + timestamps).
- **Wikipedia circuits** est un référentiel : indispensable pour relier localisation ↔ stations météo, puis relier aux sessions.

### Mapping objets ETL → artefacts physiques

- `extract_wikipedia_circuits` → `data/extract/wikipedia/circuits_wikipedia_extract.csv`
- `extract_meteostat_hourly` → `data/extract/meteostat/hourly/meteostat_Paris_2022.csv` (etc.)
- `extract_openf1_laps` → (à définir : export fichier lors d’un futur script Extract)

----------

## Transform — Objets conformés et enrichis

Objectifs de l’étape **Transform** :
- rendre les données cohérentes et joinables,
- introduire les clés de référentiel (ex. circuit_id),
- matérialiser l’alignement météo ↔ tours de façon déterministe,
- produire un dataset lap-level enrichi, base du futur chargement en base.

Principe : les règles métier (ex. exclusion pit-out pour l’entraînement) sont appliquées ici, car elles modifient l’usage de la donnée et ne relèvent pas d’un simple Extract.

In [4]:
transform_rows = [
    {
        "object_name": "transform_circuits",
        "etl_stage": "transform",
        "grain": "1 ligne = 1 circuit (référentiel)",
        "primary_key": "circuit_id",
        "foreign_keys": "",
        "source_objects": "extract_wikipedia_circuits",
        "description": "Référentiel circuits conformé et stable pour jointures (nom, pays, localité, lat, lon, url).",
        "key_decisions": "Introduction de circuit_id. Conservation de circuit_url comme attribut de traçabilité."
    },
    {
        "object_name": "transform_sessions",
        "etl_stage": "transform",
        "grain": "1 ligne = 1 session OpenF1",
        "primary_key": "meeting_key + session_key",
        "foreign_keys": "circuit_id -> transform_circuits(circuit_id)",
        "source_objects": "OpenF1 API (sessions endpoint) + mapping vers transform_circuits",
        "description": "Pivot session : relie OpenF1 à un circuit + timestamps UTC (ancrage temporel et géographique).",
        "key_decisions": "Contient a minima session_start_ts_utc (UTC) et circuit_id. Indispensable à l’alignement météo."
    },
    {
        "object_name": "transform_laps_enriched",
        "etl_stage": "transform",
        "grain": "1 ligne = 1 tour (lap) enrichi",
        "primary_key": "meeting_key + session_key + driver_number + lap_number",
        "foreign_keys": "meeting_key+session_key -> transform_sessions(meeting_key, session_key)",
        "source_objects": "extract_openf1_laps + extract_meteostat_hourly + transform_sessions + transform_circuits",
        "description": "Lap-level conformé + météo alignée + contexte minimal circuit/session.",
        "key_decisions": (
            "Alignement temporel : datetime_hour_dt_utc = floor_to_hour(lap_timestamp_utc). "
            "Règle station : une station Meteostat retenue par circuit (déterministe, documentée). "
            "Filtrage ML (ex. exclusion is_pit_out_lap=True) appliqué ici, pas en Extract."
        )
    }
]

schema_spec = pd.concat([schema_spec, pd.DataFrame(transform_rows)], ignore_index=True)
schema_spec[schema_spec["etl_stage"] == "transform"]


Unnamed: 0,object_name,etl_stage,grain,primary_key,foreign_keys,source_objects,description,key_decisions
3,transform_circuits,transform,1 ligne = 1 circuit (référentiel),circuit_id,,extract_wikipedia_circuits,Référentiel circuits conformé et stable pour j...,Introduction de circuit_id. Conservation de ci...
4,transform_sessions,transform,1 ligne = 1 session OpenF1,meeting_key + session_key,circuit_id -> transform_circuits(circuit_id),OpenF1 API (sessions endpoint) + mapping vers ...,Pivot session : relie OpenF1 à un circuit + ti...,Contient a minima session_start_ts_utc (UTC) e...
5,transform_laps_enriched,transform,1 ligne = 1 tour (lap) enrichi,meeting_key + session_key + driver_number + la...,meeting_key+session_key -> transform_sessions(...,extract_openf1_laps + extract_meteostat_hourly...,Lap-level conformé + météo alignée + contexte ...,Alignement temporel : datetime_hour_dt_utc = f...


### Règle d’alignement météo (définie ici, implémentée plus tard)

Pour chaque tour (lap), associer une ligne météo Meteostat selon :

1) **Station** : une station unique par circuit (ex. station la plus proche).
2) **Temps** : l’heure météo associée au tour est l’heure UTC arrondie à l’inférieur :
- `datetime_hour_dt_utc = floor_to_hour(lap_timestamp_utc)`

Cette approche est :
- déterministe,
- explicable,
- auditable (car `station_id` et `datetime_hour_dt_utc` sont matérialisés).

---------

## Load — Tables PostgreSQL (schéma cible)

L’étape **Load** correspond au chargement des données transformées dans PostgreSQL
(schéma `f1pa`) sous une forme relationnelle simple, stable et réutilisable.

Objectifs :
- exposer des tables exploitables par le ML et les services (API/Streamlit),
- garantir PK/FK et l’intégrité minimale,
- conserver des colonnes d’audit utiles (station_id, datetime_hour_dt_utc, etc.).

In [5]:
load_rows = [
    {
        "object_name": "f1pa.dim_circuit",
        "etl_stage": "load",
        "grain": "1 ligne = 1 circuit",
        "primary_key": "circuit_id",
        "foreign_keys": "",
        "source_objects": "transform_circuits",
        "description": "Dimension circuit (référentiel géographique).",
        "key_decisions": "circuit_url conservé (traçabilité). lat/lon conservés (ancrage spatial)."
    },
    {
        "object_name": "f1pa.dim_session",
        "etl_stage": "load",
        "grain": "1 ligne = 1 session",
        "primary_key": "meeting_key + session_key",
        "foreign_keys": "circuit_id -> f1pa.dim_circuit(circuit_id)",
        "source_objects": "transform_sessions",
        "description": "Dimension session (pivot temporel + lien circuit).",
        "key_decisions": "Timestamps UTC requis pour alignement et audit."
    },
    {
        "object_name": "f1pa.fact_lap",
        "etl_stage": "load",
        "grain": "1 ligne = 1 tour (lap) enrichi",
        "primary_key": "meeting_key + session_key + driver_number + lap_number",
        "foreign_keys": "meeting_key+session_key -> f1pa.dim_session(meeting_key, session_key)",
        "source_objects": "transform_laps_enriched",
        "description": "Table de faits lap-level : cible lap_duration + features OpenF1 + météo alignée.",
        "key_decisions": "Conserve station_id et datetime_hour_dt_utc pour audit. Règles qualité appliquées en Transform."
    }
]

schema_spec = pd.concat([schema_spec, pd.DataFrame(load_rows)], ignore_index=True)
schema_spec[schema_spec["etl_stage"] == "load"]


Unnamed: 0,object_name,etl_stage,grain,primary_key,foreign_keys,source_objects,description,key_decisions
6,f1pa.dim_circuit,load,1 ligne = 1 circuit,circuit_id,,transform_circuits,Dimension circuit (référentiel géographique).,circuit_url conservé (traçabilité). lat/lon co...
7,f1pa.dim_session,load,1 ligne = 1 session,meeting_key + session_key,circuit_id -> f1pa.dim_circuit(circuit_id),transform_sessions,Dimension session (pivot temporel + lien circu...,Timestamps UTC requis pour alignement et audit.
8,f1pa.fact_lap,load,1 ligne = 1 tour (lap) enrichi,meeting_key + session_key + driver_number + la...,meeting_key+session_key -> f1pa.dim_session(me...,transform_laps_enriched,Table de faits lap-level : cible lap_duration ...,Conserve station_id et datetime_hour_dt_utc po...


### Contenu attendu de `f1pa.fact_lap`

Colonnes minimales attendues :

- **Clé du tour (PK)** :
  - `meeting_key`, `session_key`, `driver_number`, `lap_number`

- **Cible** :
  - `lap_duration`

- **Météo (features Meteostat alignées)** :
  - `temp`, `rhum`, `pres`, `wspd`, `wdir`, `prcp`, `cldc`, `coco`

- **Contexte (audit et explicabilité)** :
  - `circuit_id`
  - `station_id`
  - `datetime_hour_dt_utc`

- **Features OpenF1** :
  - liste finalisée lors de la modélisation (doit rester compatible “model-ready”)

### Règles minimales de qualité (fact_lap)

La table `f1pa.fact_lap` doit respecter :

- **Unicité** de la clé lap (PK composite).
- **Absence de fuite de cible** : aucune feature construite à partir d’informations postérieures au tour.
- **Gestion explicite des valeurs manquantes** :
  - filtrage des laps non exploitables, ou
  - imputation documentée (au moment du ML).
- **Typage** : target numérique, features numériques/catégorielles clairement définies.

---------

## Récapitulatif global — Objets par étape ETL

Cette section récapitule l’ensemble des objets du projet selon l’approche ETL.  
Elle constitue la vue “contrat” qui guidera la suite (Airflow + scripts + PostgreSQL).

In [6]:
stage_order = {"extract": 0, "transform": 1, "load": 2}

schema_spec_view = schema_spec.copy()
schema_spec_view["stage_order"] = schema_spec_view["etl_stage"].map(stage_order)

schema_spec_view = schema_spec_view.sort_values(
    by=["stage_order", "object_name"],
    ascending=[True, True]
).drop(columns=["stage_order"])

schema_spec_view[["object_name", "etl_stage", "grain", "primary_key", "foreign_keys", "source_objects"]]

Unnamed: 0,object_name,etl_stage,grain,primary_key,foreign_keys,source_objects
1,extract_meteostat_hourly,extract,1 ligne = 1 heure pour une station Meteostat,"station_id + (year,month,day,hour) (clé logique)",,Fichiers CSV Meteostat (hourly)
0,extract_openf1_laps,extract,1 ligne = 1 tour (lap) pour un pilote dans une...,meeting_key + session_key + driver_number + la...,,OpenF1 API (laps endpoint)
2,extract_wikipedia_circuits,extract,1 ligne = 1 circuit (référentiel géographique),circuit_url (clé logique),,Scraping Wikipedia (liste + pages circuits)
3,transform_circuits,transform,1 ligne = 1 circuit (référentiel),circuit_id,,extract_wikipedia_circuits
5,transform_laps_enriched,transform,1 ligne = 1 tour (lap) enrichi,meeting_key + session_key + driver_number + la...,meeting_key+session_key -> transform_sessions(...,extract_openf1_laps + extract_meteostat_hourly...
4,transform_sessions,transform,1 ligne = 1 session OpenF1,meeting_key + session_key,circuit_id -> transform_circuits(circuit_id),OpenF1 API (sessions endpoint) + mapping vers ...
6,f1pa.dim_circuit,load,1 ligne = 1 circuit,circuit_id,,transform_circuits
7,f1pa.dim_session,load,1 ligne = 1 session,meeting_key + session_key,circuit_id -> f1pa.dim_circuit(circuit_id),transform_sessions
8,f1pa.fact_lap,load,1 ligne = 1 tour (lap) enrichi,meeting_key + session_key + driver_number + la...,meeting_key+session_key -> f1pa.dim_session(me...,transform_laps_enriched


## Dépendances (flux ETL) — Vue simple

Le pipeline est de type **ETL** :

### Extract
- `extract_openf1_laps` (OpenF1 API — laps)
- `extract_meteostat_hourly` (CSV Meteostat — hourly)
- `extract_wikipedia_circuits` (scraping Wikipedia)

### Transform
- `transform_circuits` ← `extract_wikipedia_circuits`
- `transform_sessions` ← OpenF1 (sessions endpoint) + mapping vers `transform_circuits`
- `transform_laps_enriched` ← `extract_openf1_laps` + `transform_sessions` + `transform_circuits` + `extract_meteostat_hourly`

### Load (PostgreSQL)
- `f1pa.dim_circuit` ← `transform_circuits`
- `f1pa.dim_session` ← `transform_sessions`
- `f1pa.fact_lap` ← `transform_laps_enriched`

## Relations clés (PK/FK)

- `f1pa.dim_circuit`
  - PK : `circuit_id`

- `f1pa.dim_session`
  - PK : `meeting_key + session_key`
  - FK : `circuit_id` → `f1pa.dim_circuit(circuit_id)`

- `f1pa.fact_lap`
  - PK : `meeting_key + session_key + driver_number + lap_number`
  - FK : `meeting_key + session_key` → `f1pa.dim_session(meeting_key, session_key)`

Cette structure garantit :
- une granularité lap-level stable,
- un pivot session indispensable à l’intégration,
- un modèle relationnel final simple (dim/fact) compatible ML + API.