### **INCREMENTAL DATA INGESTION**

In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

In [0]:
import sys
# -----------------------------
# Ajouter le repo au Python Path
# -----------------------------
sys.path.append("/Workspace/Users/mandu543@gmail.com/databricks_flights/Pipelines")

from lib.config import *


In [0]:
spark.sql(f'CREATE VOLUME IF NOT EXISTS {RAW_BRONZE_ZONE}');
spark.sql(f'CREATE VOLUME IF NOT EXISTS {RAW_SILVER_ZONE}');
spark.sql(f'CREATE VOLUME IF NOT EXISTS {RAW_GOLD_ZONE}');


In [0]:
spark.sql(f'CREATE VOLUME IF NOT EXISTS {BRONZE_ZONE}.bronzeVolume')
spark.sql(f'CREATE VOLUME IF NOT EXISTS {SILVER_ZONE}.silverVolume')
spark.sql(f'CREATE VOLUME IF NOT EXISTS {GOLD_ZONE}.goldVolume')

#### Ingestion Data with Loop (Jobs)

In [0]:
# =========================================================
# PARAMETRAGE CENTRALISE
# =========================================================

# Bonnes pratiques :
# - Centraliser tous les chemins et noms de table en haut du notebook
# - Facilite la maintenance et la promotion DEV → TEST → PROD
# - Permet d'utiliser des variables dynamiques (widgets, env, etc.)
# - Evite le hardcoding partout dans le code
# =========================================================


# Bookings, Airports, Flights, Customers
dbutils.widgets.text("src_data","")
source_data = dbutils.widgets.get("src_data")

source_data_lower = source_data.lower()
raw_path = f"/Volumes/workspace/raw_flights/raw_data/{source_data}/"
bronze_path = f"/Volumes/workspace/01_bronze/bronzevolume/{source_data_lower}/data"
schema_path = f"/Volumes/workspace/01_bronze/bronzevolume/{source_data_lower}/schema_tracking"
checkpoint_path = f"/Volumes/workspace/01_bronze/bronzevolume/{source_data_lower}/stream_checkpoint"
table_name = BRONZE_ZONE + f".flights_{source_data_lower}"


In [0]:
# =========================================================
# LECTURE AUTO LOADER
# =========================================================

# Objectif :
# - Ingestion incrémentale automatique des fichiers CSV
# - Détection automatique des nouveaux fichiers
# - Gestion de l’évolution du schéma
#
# Bonnes pratiques Bronze :
# - Utiliser schemaEvolutionMode = "rescue" (ne jamais casser le pipeline)
# - Séparer schema tracking et checkpoint streaming
# - Activer inferColumnTypes pour éviter tout en string
# - includeExistingFiles = true uniquement au premier run
# =========================================================

df_raw = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", schema_path)
        .option("cloudFiles.schemaEvolutionMode", "rescue")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .load(raw_path)
)
 

In [0]:
# =====================================
# ECRITURE DELTA (Bronze Layer)
# Bonnes pratiques :
# - Append only en Bronze
# - Checkpoint séparé du schema
# - trigger(once=True) pour batch incrémental orchestré
# - awaitTermination() pour contrôle propre du job
# =====================================

requete = (
    df_raw.writeStream
        .format("delta")
        .outputMode("append")
        .trigger(once=True)
        .option("checkpointLocation",
                checkpoint_path)
        .option("path",
                bronze_path)
        .start()
)
 
requete.awaitTermination()

In [0]:
table = spark.sql(f" SELECT * from delta.`{bronze_path}`")
display(table)
