# Notebook 2 : Chargement des Données vers BigQuery (LOAD)

## Objectif

Ce notebook permet de **charger les données depuis Google Cloud Storage (GCS) vers BigQuery** pour créer la couche "silver" du pipeline ETL. 

Les données brutes stockées dans GCS (couche "bronze") sont chargées dans BigQuery.

## Prérequis

Avant d'exécuter ce notebook, assurez-vous d'avoir :

1. **Exécuté le notebook `1_[EXTRACT]_ingest_to_gcs.ipynb`** pour avoir des données dans GCS
2. **Fichier `.env` configuré** avec les variables d'environnement nécessaires
3. **Service Account** avec les permissions BigQuery (`BigQuery Data Editor`, `BigQuery Job User`)
4. **Packages Python installés** : `google-cloud-bigquery`, `google-cloud-storage`, `pandas`, etc.


## 1 - Configuration et Authentification

Cette section configure l'environnement et établit la connexion avec BigQuery et GCS.

**Étapes :**
- Import des bibliothèques nécessaires
- Chargement des variables d'environnement depuis `.env`
- Authentification avec le Service Account
- Création des clients BigQuery et GCS


In [1]:
import os
import sys
from pathlib import Path

import pandas as pd
from dotenv import load_dotenv
from google.cloud import bigquery, storage
from google.oauth2 import service_account

ROOT = Path.cwd().parent
sys.path.append(str(ROOT))

from src.bq_utils import (
    load_csv_from_gcs,
    load_parquet_from_gcs,
)

# Configuration
load_dotenv()

PROJECT_ID = os.getenv("PROJECT_ID")
SA_PATH = ROOT / os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
BUCKET_NAME = os.getenv("BUCKET_NAME")
DATASET_ID = "silver"

# Authentification
creds = service_account.Credentials.from_service_account_file(SA_PATH)
bq_client = bigquery.Client(project=PROJECT_ID, credentials=creds)
storage_client = storage.Client(project=PROJECT_ID, credentials=creds)

print("[OK] - Configuration et imports terminés")


[OK] - Configuration et imports terminés


### 1.1 - Création du Dataset BigQuery

Création du dataset "silver" s'il n'existe pas déjà. Le dataset est l'équivalent d'un schéma dans une base de données relationnelle.


In [2]:
# Création du dataset s'il n'existe pas
dataset_ref = bq_client.dataset(DATASET_ID)
try:
    bq_client.get_dataset(dataset_ref)
    print(f"[OK] - Dataset {DATASET_ID} existe déjà")
except Exception:
    dataset = bigquery.Dataset(dataset_ref)
    dataset.location = "US"
    dataset = bq_client.create_dataset(dataset, exists_ok=True)
    print(f"[OK] - Dataset {DATASET_ID} créé")


[OK] - Dataset silver existe déjà


## 2 - Chargement des Tables de Dimension

Les tables de dimension contiennent les données de référence qui seront utilisées pour enrichir les tables de fait. Elles sont généralement stables dans le temps.

---

### 2.1 - Table `dim_gare` (Emplacement des Gares)

Cette table contient les informations géographiques et descriptives de toutes les gares d'Île-de-France.

**Caractéristiques :**
- **Format source** : Parquet (depuis GCS)
- **Schéma** : Défini manuellement avec clé primaire `id_gares`
- **Types de données** : Géographie (GEOGRAPHY), entiers, chaînes de caractères
- **Clé primaire** : `id_gares` (mode REQUIRED)

**Note** : Le schéma manuel permet de contrôler précisément les types de données, notamment pour les colonnes géographiques.

In [None]:
# Chargement direct depuis GCS (bronze) vers BigQuery (silver) avec schéma manuel
gcs_path_gares = "bronze/emplacement-des-gares-idf/emplacement-des-gares-idf.parquet"

# Définition du schéma manuel avec clé primaire (id_gares)
schema_gares = [
    bigquery.SchemaField("geo_point_2d", "GEOGRAPHY", mode="NULLABLE"),
    bigquery.SchemaField("geo_shape", "GEOGRAPHY", mode="NULLABLE"),
    bigquery.SchemaField("id_gares", "INTEGER", mode="REQUIRED", description="Clé primaire"),
    bigquery.SchemaField("nom_gares", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("nom_so_gar", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("nom_su_gar", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("id_ref_zdc", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("nom_zdc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("id_ref_zda", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("nom_zda", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("idrefliga", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("idrefligc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("res_com", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("indice_lig", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("mode", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("tertrain", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("terrer", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("termetro", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("tertram", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("terval", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("exploitant", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("idf", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("principal", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("x", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("y", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("picto", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("nom_iv", "STRING", mode="NULLABLE"),
]

table_id = load_parquet_from_gcs(
    gcs_path=gcs_path_gares,
    table_name="gares",
    bq_client=bq_client,
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    bucket_name=BUCKET_NAME,
    schema=schema_gares,
    primary_key="id_gares"
)

### 2.2 - Vérification de la Table `dim_gare`

Après le chargement, on vérifie que les données ont été correctement chargées en :
- Affichant le nombre de lignes
- Listant les colonnes et leurs types
- Afficant un aperçu des données (5 premières lignes)


In [5]:
# Vérification de la table chargée
table = bq_client.get_table(table_id)
print(f"[OK] - Nombre total de lignes: {table.num_rows}")
print(f"[OK] - Colonnes:")
for field in table.schema:
    print(f"  - {field.name}: {field.field_type}")

# Requête simple pour vérifier les données et convertir en DataFrame pandas
query = f"SELECT * FROM `{table_id}` LIMIT 5"
results = bq_client.query(query).result()
df = results.to_dataframe()

print(f"\n[OK] - Aperçu des données (5 premières lignes):")
display(df)


[OK] - Nombre total de lignes: 1234
[OK] - Colonnes:
  - geo_point_2d: GEOGRAPHY
  - geo_shape: GEOGRAPHY
  - id_gares: INTEGER
  - nom_gares: STRING
  - nom_so_gar: STRING
  - nom_su_gar: STRING
  - id_ref_zdc: INTEGER
  - nom_zdc: STRING
  - id_ref_zda: INTEGER
  - nom_zda: STRING
  - idrefliga: STRING
  - idrefligc: STRING
  - res_com: STRING
  - indice_lig: STRING
  - mode: STRING
  - tertrain: STRING
  - terrer: STRING
  - termetro: STRING
  - tertram: STRING
  - terval: STRING
  - exploitant: STRING
  - idf: INTEGER
  - principal: INTEGER
  - x: FLOAT
  - y: FLOAT
  - picto: STRING
  - nom_iv: STRING

[OK] - Aperçu des données (5 premières lignes):




Unnamed: 0,geo_point_2d,geo_shape,id_gares,nom_gares,nom_so_gar,nom_su_gar,id_ref_zdc,nom_zdc,id_ref_zda,nom_zda,...,termetro,tertram,terval,exploitant,idf,principal,x,y,picto,nom_iv
0,POINT(2.57064006889043 49.0100449600405),POINT(2.57064006889043 49.0100449600405),1004,Parc PX,,,73595,NC,52244,Parc Px,...,0,0,0,Transdev,1,0,668589.1929,6878989.0,,Parc PX
1,POINT(2.54483189791712 49.0089784609502),POINT(2.54483189791712 49.0089784609502),1002,Parc PR,,,74162,NC,59080,Parc Pr,...,0,0,0,Transdev,1,0,666700.4738,6878881.0,,Parc PR
2,POINT(2.56055884390188 49.0100018661511),POINT(2.56055884390188 49.0100018661511),1003,Terminal 3 - Roissypole,,,73596,Aéroport CDG 1 (Terminal 3),462398,Aéroport CDG 1 (Terminal 3) - RER,...,0,0,0,Transdev,1,0,667851.6587,6878988.0,,Terminal 3 - Roissypole
3,POINT(2.34266003733635 48.8846809737278),POINT(2.34266003733635 48.8846809737278),1017,Funiculaire Montmartre Station Basse,,,73849,Funiculaire de Montmartre - Gare basse,45559,Funiculaire de Montmartre - Gare basse,...,0,0,FUNICULAIRE MONTMART,RATP,1,0,651795.0575,6865163.0,,Funiculaire Montmartre Station Basse
4,POINT(2.34254965325849 48.8856611127015),POINT(2.34254965325849 48.8856611127015),1018,Funiculaire Montmartre Station Haute,,,73848,Funiculaire de Montmartre - Gare haute,45558,Funiculaire de Montmartre - Gare haute,...,0,0,FUNICULAIRE MONTMART,RATP,1,0,651787.8703,6865272.0,,Funiculaire Montmartre Station Haute


### 2.3 - Table `dim_ligne` (Référentiel des Lignes)

Cette table contient les informations sur toutes les lignes de transport en commun d'Île-de-France.

**Caractéristiques :**
- **Format source** : Parquet (depuis GCS)
- **Schéma** : Auto-détecté par BigQuery
- **Contenu** : Informations sur les lignes (numéros, noms, types de transport, etc.)

In [6]:
gcs_path_lignes = "bronze/referentiel-des-lignes/referentiel-des-lignes.parquet"
table_id_lignes = load_parquet_from_gcs(
    gcs_path=gcs_path_lignes,
    table_name="dim_ligne",
    bq_client=bq_client,
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    bucket_name=BUCKET_NAME,
    schema=None,  # Autodetect
    primary_key=None
)



[...] - Chargement de gs://bronze-sncf-etl/bronze/referentiel-des-lignes/referentiel-des-lignes.parquet vers univ-reims-sncf-forecast.silver.dim_ligne...
[OK] - 2120 lignes chargées dans univ-reims-sncf-forecast.silver.dim_ligne
[OK] - Taille: 0.56 MB


### 2.4 - Table `dim_arret` (Référentiel des Arrêts)

Cette table contient les informations sur tous les arrêts de transport en commun d'Île-de-France.

**Caractéristiques :**
- **Format source** : Parquet (depuis GCS)
- **Schéma** : Auto-détecté par BigQuery
- **Contenu** : Informations sur les arrêts (noms, coordonnées, lignes desservies, etc.)

In [7]:
#
gcs_path_arrets = "bronze/arrets/arrets.parquet"
table_id_arrets = load_parquet_from_gcs(
    gcs_path=gcs_path_arrets,
    table_name="dim_arret",
    bq_client=bq_client,
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    bucket_name=BUCKET_NAME,
    schema=None,  # Autodetect
    primary_key=None
)



[...] - Chargement de gs://bronze-sncf-etl/bronze/arrets/arrets.parquet vers univ-reims-sncf-forecast.silver.dim_arret...
[OK] - 38619 lignes chargées dans univ-reims-sncf-forecast.silver.dim_arret
[OK] - Taille: 6.34 MB


### 2.5 - Table `dim_transporteur` (Liste des Transporteurs)

Cette table contient les informations sur tous les transporteurs (opérateurs de transport) d'Île-de-France.

**Caractéristiques :**
- **Format source** : Parquet (depuis GCS)
- **Schéma** : Auto-détecté par BigQuery
- **Contenu** : Informations sur les transporteurs (noms, codes, types de transport, etc.)

**Note** : Cette table de dimension permet d'identifier les différents opérateurs de transport qui gèrent les lignes et arrêts.


In [None]:
gcs_path_transporteurs = "bronze/liste-transporteurs/liste-transporteurs.parquet"
table_id_transporteurs = load_parquet_from_gcs(
    gcs_path=gcs_path_transporteurs,
    table_name="dim_transporteur",
    bq_client=bq_client,
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    bucket_name=BUCKET_NAME,
    schema=None,  # Autodetect
    primary_key=None
)



[...] - Chargement de gs://bronze-sncf-etl/bronze/liste-transporteurs/liste-transporteurs.parquet vers univ-reims-sncf-forecast.silver.dim_transporteur...


KeyboardInterrupt: 

### 2.5 - Table `dim_vacances_scolaires` (Calendrier des Vacances Scolaires)

Cette table contient les périodes de vacances scolaires pour différentes zones et années.

**Caractéristiques :**
- **Format source** : CSV (depuis GCS)
- **Schéma** : Auto-détecté par BigQuery
- **Encodage** : UTF-8
- **Séparateur** : Point-virgule (`;`)
- **Contenu** : Dates de début/fin de vacances, zones, années, etc.

**Note** : Pour les fichiers CSV, il est important de spécifier l'encodage et le séparateur pour éviter les erreurs de parsing.


In [None]:
gcs_path_vacances = "bronze/vacances-scolaires/vacances_scolaires.csv"
table_id_vacances = load_csv_from_gcs(
    gcs_path=gcs_path_vacances,
    table_name="dim_vacances_scolaires",
    bq_client=bq_client,
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    bucket_name=BUCKET_NAME,
    schema=None,  # Autodetect
    skip_leading_rows=1,
    encoding="utf-8",
    sep=";"
)



[...] - Chargement de gs://bronze-sncf-etl/bronze/vacances-scolaires/vacances_scolaires.csv vers univ-reims-sncf-forecast.silver.dim_vacances_scolaires...
[OK] - 2306 lignes chargées dans univ-reims-sncf-forecast.silver.dim_vacances_scolaires
[OK] - Taille: 0.16 MB


## 3 - Chargement des Tables de Fait

Les tables de fait contiennent les mesures et événements métier. Ici, nous chargeons les données historiques de validations des titres de transport.

---

### 3.1 - Configuration pour les Fichiers de Validation

Les fichiers de validation historiques ont des formats différents selon les années :
- **Encodages variés** : UTF-8, UTF-16LE, Latin-1
- **Séparateurs variés** : Tabulation (`\t`), point-virgule (`;`)
- **Extensions variées** : `.txt`, `.csv`

Ce dictionnaire de configuration permet de spécifier les paramètres corrects pour chaque fichier.

**Note importante** : Le fichier `2023_S2_NB_FER.txt` utilise l'encodage UTF-16LE, qui n'est pas supporté directement par BigQuery. La fonction `load_csv_from_gcs` convertit automatiquement ce fichier en UTF-8 avant le chargement.


### 3.2 - Chargement des Fichiers de Validation

Cette section charge tous les fichiers de validation historiques depuis GCS vers BigQuery.

**Processus :**
1. Parcourt le dictionnaire de configuration
2. Recherche chaque fichier dans GCS
3. Charge le fichier avec les paramètres appropriés (encodage, séparateur, format de date)
4. Crée une table séparée pour chaque fichier (ex: `fact_validations_2015s1_nb_fer_csv`)

**Gestion spéciale :**
- **Fichiers UTF-16LE** : Conversion automatique en UTF-8 (nécessite `storage_client`)
- **Format de date** : `DD/MM/YYYY` (format BigQuery pour les dates françaises)
- **Schéma** : Auto-détecté (toutes les colonnes en STRING pour éviter les erreurs de parsing)

**Durée estimée** : Plusieurs minutes selon le nombre et la taille des fichiers.


In [4]:
load_rf_config = {
    "2015S1_NB_FER.csv": {
        "encoding": "utf-8",
        "sep": ";",
        "skip_rows": 1
    },
    "2015S2_NB_FER.csv": {
        "encoding": "utf-8",
        "sep": ";",
        "skip_rows": 1
    },
    "2016S1_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": "\t",
        "skip_rows": 1
    },
    "2016S2_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": "\t",
        "skip_rows": 1
    },
    "2017S1_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": "\t",
        "skip_rows": 1
    },
    "2017_S2_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": "\t",
        "skip_rows": 1
    },
    "2018_S1_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": "\t",
        "skip_rows": 1
    },
    "2019_S1_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": "\t",
        "skip_rows": 1
    },
    "2019_S2_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": "\t",
        "skip_rows": 1
    },
    "2020_S1_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": "\t",
        "skip_rows": 1
    },
    "2020_S2_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": "\t",
        "skip_rows": 1
    },
    "2021_S1_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": "\t",
        "skip_rows": 1
    },
    "2021_S2_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": "\t",
        "skip_rows": 1
    },
    "2022_S1_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": "\t",
        "skip_rows": 1
    },
    "2022_S2_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": ";",
        "skip_rows": 1
    },
    "2023_S2_NB_FER.txt": {
        "encoding": "utf-16le",
        "sep": "\t",
        "skip_rows": 1
    },
    "2024_S1_NB_FER.txt": {
        "encoding": "utf-8",
        "sep": "\t",
        "skip_rows": 1
    }
}

In [5]:
# Charger tous les fichiers de validation depuis GCS vers BigQuery
bucket = storage_client.bucket(BUCKET_NAME)

# Parcourir tous les fichiers dans la configuration
for filename, config in load_rf_config.items():
    # Chercher le fichier dans GCS
    blobs = list(bucket.list_blobs(prefix="bronze/histo-validations-reseau-ferre/"))
    
    # Trouver le blob correspondant
    blob = None
    for b in blobs:
        if b.name.endswith(filename):
            blob = b
            break
    
    if blob is None:
        print(f"[SKIP] - {filename} (non trouvé dans GCS)")
        continue
    
    gcs_path = blob.name
    table_name = f"fact_validations_{filename.replace('.', '_').replace('-', '_').lower()}"
    
    sep = config["sep"]
    encoding = config["encoding"]
    
    # Convertir "\t" en tabulation réelle si nécessaire
    if sep == "\\t":
        sep = "\t"
    
    # Utiliser la fonction load_csv_from_gcs avec le schéma unifié (toutes les colonnes en STRING)
    table_id = load_csv_from_gcs(
        gcs_path=gcs_path,
        table_name=table_name,
        bq_client=bq_client,
        project_id=PROJECT_ID,
        dataset_id=DATASET_ID,
        bucket_name=BUCKET_NAME,
        schema=None,
        skip_leading_rows=config.get("skip_rows", 1),
        encoding=encoding,
        sep=sep,
        date_format="DD/MM/YYYY",  # Format BigQuery pour les dates
        storage_client=storage_client,  # Requis pour la conversion UTF-16LE
    )



[...] - Chargement de gs://bronze-sncf-etl/bronze/histo-validations-reseau-ferre/2015/data-rf-2015/2015S1_NB_FER.csv vers univ-reims-sncf-forecast.silver.fact_validations_2015s1_nb_fer_csv...
[OK] - 755989 lignes chargées dans univ-reims-sncf-forecast.silver.fact_validations_2015s1_nb_fer_csv
[OK] - Taille: 45.93 MB

[...] - Chargement de gs://bronze-sncf-etl/bronze/histo-validations-reseau-ferre/2015/data-rf-2015/2015S2_NB_FER.csv vers univ-reims-sncf-forecast.silver.fact_validations_2015s2_nb_fer_csv...
[OK] - 778747 lignes chargées dans univ-reims-sncf-forecast.silver.fact_validations_2015s2_nb_fer_csv
[OK] - Taille: 47.28 MB

[...] - Chargement de gs://bronze-sncf-etl/bronze/histo-validations-reseau-ferre/2016/data-rf-2016/2016S1_NB_FER.txt vers univ-reims-sncf-forecast.silver.fact_validations_2016s1_nb_fer_txt...
[OK] - 779712 lignes chargées dans univ-reims-sncf-forecast.silver.fact_validations_2016s1_nb_fer_txt
[OK] - Taille: 52.96 MB

[...] - Chargement de gs://bronze-sncf-etl