In [1]:
# Configuration initiale pour JupyterLab Docker (n√©cessaire pour que pyspark soit importable)
# Cette cellule doit √™tre ex√©cut√©e AVANT la cellule suivante
import os
import sys

# D√©tecter l'environnement Docker
if os.path.exists('/usr/local/spark') or os.path.exists('/opt/spark'):
    spark_home = os.environ.get('SPARK_HOME', '/usr/local/spark')
    spark_python = os.path.join(spark_home, 'python')
    
    # Nettoyer TOUS les modules pyspark d√©j√† charg√©s
    modules_to_remove = [k for k in list(sys.modules.keys()) if k.startswith('pyspark')]
    for mod in modules_to_remove:
        del sys.modules[mod]
    
    # Retirer le chemin syst√®me Spark de sys.path pour √©viter les conflits
    if spark_python in sys.path:
        sys.path.remove(spark_python)
    
    # Retirer SPARK_HOME de l'environnement pour que PySpark pip utilise son Spark embarqu√©
    if 'SPARK_HOME' in os.environ:
        del os.environ['SPARK_HOME']
    
    # V√©rifier si pyspark.sql.SparkSession est importable
    try:
        # Tester l'import direct (avec PySpark pip si install√©)
        from pyspark.sql import SparkSession
        print("‚úì PySpark pip d√©j√† disponible - pyspark est importable")
    except (ImportError, AttributeError):
        # Installer PySpark via pip
        print("‚ö† Installation de PySpark via pip...")
        import subprocess
        # Installer PySpark compatible avec Spark 4.0.1
        subprocess.check_call([sys.executable, "-m", "pip", "install", "pyspark==3.5.0", "-q"])
        # Nettoyer √† nouveau les modules
        modules_to_remove = [k for k in list(sys.modules.keys()) if k.startswith('pyspark')]
        for mod in modules_to_remove:
            del sys.modules[mod]
        # Tester l'import apr√®s installation
        from pyspark.sql import SparkSession
        print("‚úì PySpark install√© via pip - pyspark est maintenant importable")
    
    # S'assurer que le chemin pip est prioritaire
    import site
    site_packages = [p for p in sys.path if 'site-packages' in p]
    if site_packages:
        # D√©placer site-packages en d√©but de sys.path
        for sp in site_packages:
            if sp in sys.path:
                sys.path.remove(sp)
                sys.path.insert(0, sp)
    
    # IMPORTANT: S'assurer que PySpark pip utilise son propre Spark embarqu√©
    # Ne pas d√©finir SPARK_HOME pour que PySpark pip utilise son Spark int√©gr√©
    if 'SPARK_HOME' in os.environ:
        del os.environ['SPARK_HOME']
    
    print("‚úì Configuration termin√©e - PySpark pip utilisera son Spark embarqu√©")
    print("‚úì Vous pouvez maintenant importer pyspark dans les cellules suivantes")
else:
    print("Environnement local d√©tect√© - utilisation de PySpark pip si disponible")

‚úì PySpark pip d√©j√† disponible - pyspark est importable
‚úì Configuration termin√©e - PySpark pip utilisera son Spark embarqu√©
‚úì Vous pouvez maintenant importer pyspark dans les cellules suivantes


## 1) Contexte d'ex√©cution
Ce notebook est fait pour √™tre ex√©cut√© **dans le JupyterLab de l'environnement Docker** (service `jupyter-spark`).

- PostgreSQL dans Docker : `jdbc:postgresql://postgres:5432/app`
- MinIO dans Docker : `http://minio:9000`
- Buckets attendus : `bronze`, `silver`, `gold`

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime

# SparkSession avec config MinIO + drivers (au besoin)
# NOTE: Utilisation de hadoop-aws:3.3.4 au lieu de 3.4.1 pour √©viter l'erreur BulkDelete
# hadoop-aws:3.3.4 est compatible avec PySpark 3.5.0 et ne n√©cessite pas BulkDelete
spark = (SparkSession.builder
    .appName("TP Final - Phase 1 Bronze")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .getOrCreate())

# Pour permettre overwrite uniquement de la partition du jour
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

print("Spark OK")
print("S3A endpoint =", spark.sparkContext._jsc.hadoopConfiguration().get("fs.s3a.endpoint"))

:: loading settings :: url = jar:file:/opt/conda/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.postgresql#postgresql added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c9d492c5-48c9-4d10-8051-96a25875399b;1.0
	confs: [default]
	found org.postgresql#postgresql;42.6.0 in central
	found org.checkerframework#checker-qual;3.31.0 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 204ms :: artifacts dl 8ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.checkerframework#checker-qual;3.31.0 from central in [default]
	org.postgresql#postgresql;42.6.0 from c

Spark OK
S3A endpoint = http://minio:9000


In [4]:
# Configuration JDBC PostgreSQL
jdbc_url = "jdbc:postgresql://postgres:5432/app"
jdbc_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver",
}

# Date d‚Äôingestion (partition) au format YYYY-MM-DD
ingestion_date = datetime.now().strftime("%Y-%m-%d")
print("Ingestion date =", ingestion_date)

Ingestion date = 2026-01-16


## 2) Fonction d‚Äôingestion generique (PostgreSQL ‚Üí Bronze)
Cette fonction :
- lit la table via JDBC
- ajoute les colonnes techniques obligatoires
- ecrit en Parquet dans `s3a://bronze/<table_name>/<YYYY-MM-DD>/` (partitionnement par dossier)

In [5]:
def ingest_table_to_bronze(table_name: str, *, ingestion_date: str, base_path: str = "s3a://bronze") -> dict:
    """
    Ingerer une table PostgreSQL vers la zone Bronze sur MinIO.

    Ecrit en Parquet et partitionne par date d‚Äôingestion (YYYY-MM-DD) via la structure de dossiers.
    Ajoute les metadonnees techniques :
      - _ingestion_timestamp
      - _source_system = 'postgresql'
      - _table_name
      - _ingestion_date

    Retourne un petit dictionnaire de stats.
    """
    ingestion_ts_col = F.current_timestamp()

    df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=jdbc_properties)
    row_count = df.count()

    df_out = (df
        .withColumn("_ingestion_timestamp", ingestion_ts_col)
        .withColumn("_source_system", F.lit("postgresql"))
        .withColumn("_table_name", F.lit(table_name))
        .withColumn("_ingestion_date", F.lit(ingestion_date))
    )

    target_path = f"{base_path}/{table_name}/{ingestion_date}"

    df_out.write.mode("overwrite").parquet(target_path)

    return {
        "table": table_name,
        "rows": row_count,
        "path": target_path,
        "partition": ingestion_date,
    }

## 3) Execution Phase 1 : tables obligatoires (+ bonus)
Tables obligatoires : `customers`, `orders`, `order_details`, `products`

Tables bonus (si dispo) : `employees`, `suppliers`, `categories`

In [6]:
required_tables = ["customers", "orders", "order_details", "products"]
bonus_tables = ["employees", "suppliers", "categories"]
tables_to_ingest = required_tables + bonus_tables

results = []
errors = []

print("=== PHASE 1 - INGESTION BRONZE ===")
for table in tables_to_ingest:
    try:
        stats = ingest_table_to_bronze(table, ingestion_date=ingestion_date)
        results.append(stats)
        print(f"[OK] {stats['table']} -> {stats['rows']} lignes -> {stats['path']} (_ingestion_date={stats['partition']})")
    except Exception as e:
        errors.append({"table": table, "error": str(e)})
        print(f"[ERREUR] {table} : {e}")

print("\nResume OK :")
for r in results:
    print(f"  - {r['table']}: {r['rows']} lignes")

if errors:
    print("\nResume erreurs :")
    for err in errors:
        print(f"  - {err['table']}: {err['error']}")

=== PHASE 1 - INGESTION BRONZE ===


26/01/16 15:56:38 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

[OK] customers -> 91 lignes -> s3a://bronze/customers/2026-01-16 (_ingestion_date=2026-01-16)
[OK] orders -> 830 lignes -> s3a://bronze/orders/2026-01-16 (_ingestion_date=2026-01-16)
[OK] order_details -> 2155 lignes -> s3a://bronze/order_details/2026-01-16 (_ingestion_date=2026-01-16)
[OK] products -> 77 lignes -> s3a://bronze/products/2026-01-16 (_ingestion_date=2026-01-16)
[OK] employees -> 9 lignes -> s3a://bronze/employees/2026-01-16 (_ingestion_date=2026-01-16)
[OK] suppliers -> 29 lignes -> s3a://bronze/suppliers/2026-01-16 (_ingestion_date=2026-01-16)
[OK] categories -> 8 lignes -> s3a://bronze/categories/2026-01-16 (_ingestion_date=2026-01-16)

Resume OK :
  - customers: 91 lignes
  - orders: 830 lignes
  - order_details: 2155 lignes
  - products: 77 lignes
  - employees: 9 lignes
  - suppliers: 29 lignes
  - categories: 8 lignes


## 4) Verifications rapides
On relit une table depuis Bronze et on verifie la presence des colonnes techniques + la partition.

In [7]:
# Exemple : verifier customers (lecture de la partition du jour)
df_bronze_customers = spark.read.parquet(f"s3a://bronze/customers/{ingestion_date}")

expected_cols = {"_ingestion_timestamp", "_source_system", "_table_name", "_ingestion_date"}
missing = expected_cols - set(df_bronze_customers.columns)
print("Colonnes manquantes :", missing)

df_bronze_customers.select(
    "_ingestion_timestamp",
    "_source_system",
    "_table_name",
    "_ingestion_date",
).show(5, truncate=False)

df_bronze_customers.groupBy("_ingestion_date").count().orderBy("_ingestion_date").show(truncate=False)

Colonnes manquantes : set()
+--------------------------+--------------+-----------+---------------+
|_ingestion_timestamp      |_source_system|_table_name|_ingestion_date|
+--------------------------+--------------+-----------+---------------+
|2026-01-16 15:56:39.417705|postgresql    |customers  |2026-01-16     |
|2026-01-16 15:56:39.417705|postgresql    |customers  |2026-01-16     |
|2026-01-16 15:56:39.417705|postgresql    |customers  |2026-01-16     |
|2026-01-16 15:56:39.417705|postgresql    |customers  |2026-01-16     |
|2026-01-16 15:56:39.417705|postgresql    |customers  |2026-01-16     |
+--------------------------+--------------+-----------+---------------+
only showing top 5 rows

+---------------+-----+
|_ingestion_date|count|
+---------------+-----+
|2026-01-16     |91   |
+---------------+-----+



# TP Final ‚Äî Phase 2 : Raffinement Silver (Bronze ‚Üí Silver)

Objectif : nettoyer, typer et structurer les donnees Bronze pour les rendre exploitables en Silver.

Sorties attendues (minimum) :
- `dim_customers` : `company_name` en InitCap, `country` en MAJUSCULES
- `dim_products` : jointure produits ‚Üî categories + `stock_status` (CRITIQUE si stock < 10, sinon NORMAL)
- `fact_orders` : jointure `orders` + `order_details` + calcul `montant_net`

Ecriture en Parquet sous `s3a://silver/<dataset>/<YYYY-MM-DD>/` (meme partition jour que la Phase 1).

> Important : cette Phase 2 lit la partition Bronze du jour `s3a://bronze/<table>/<YYYY-MM-DD>/` produite par la Phase 1.

In [8]:
from pyspark.sql import functions as F

def _first_existing_column(df, candidates: list[str]) -> str:
    for name in candidates:
        if name in df.columns:
            return name
    raise ValueError(f"Aucune colonne trouvee parmi {candidates}. Colonnes dispo: {df.columns}")

def _canonicalize_columns(df, mapping: dict[str, list[str]]):
    # mapping: canonical_name -> [candidates...]
    out = df
    for canonical, candidates in mapping.items():
        found = _first_existing_column(out, candidates)
        if found != canonical:
            out = out.withColumnRenamed(found, canonical)
    return out

def read_bronze(table_name: str, ingestion_date: str):
    path = f"s3a://bronze/{table_name}/{ingestion_date}"
    return spark.read.parquet(path)

def write_silver(df, dataset_name: str, ingestion_date: str, base_path: str = "s3a://silver") -> dict:
    target_path = f"{base_path}/{dataset_name}/{ingestion_date}"
    row_count = df.count()
    df.write.mode("overwrite").parquet(target_path)
    return {"dataset": dataset_name, "rows": row_count, "path": target_path}

In [9]:
# --- Lecture Bronze (partition du jour) ---
customers_bronze = read_bronze("customers", ingestion_date)
orders_bronze = read_bronze("orders", ingestion_date)
order_details_bronze = read_bronze("order_details", ingestion_date)
products_bronze = read_bronze("products", ingestion_date)

# Tables bonus (si presentes) : categories (pour dim_products)
categories_bronze = None
try:
    categories_bronze = read_bronze("categories", ingestion_date)
except Exception as e:
    print("[INFO] categories non disponible en Bronze:", e)

# --- 2.1 Dim_Customers ---
# R√®gles : company_name InitCap, country UPPER + dedup cl√© m√©tier
customers = _canonicalize_columns(customers_bronze, {
    "customer_id": ["customer_id", "customerid", "CustomerID"],
    "company_name": ["company_name", "companyname", "CompanyName"],
    "country": ["country", "Country"],
})
dim_customers = (customers
    .dropDuplicates(["customer_id"])
    .withColumn("company_name", F.initcap(F.col("company_name")))
    .withColumn("country", F.upper(F.col("country")))
    .select("customer_id", "company_name", "country")
 )

# --- 2.2 Dim_Products ---
# R√®gles : join produits ‚Üî categories + stock_status
products = _canonicalize_columns(products_bronze, {
    "product_id": ["product_id", "productid", "ProductID"],
    "product_name": ["product_name", "productname", "ProductName"],
    "category_id": ["category_id", "categoryid", "CategoryID"],
    "units_in_stock": ["units_in_stock", "unitsinstock", "UnitsInStock"],
})
products = (products
    .withColumn("units_in_stock", F.col("units_in_stock").cast("int"))
    .dropDuplicates(["product_id"])
 )
if categories_bronze is not None:
    categories = _canonicalize_columns(categories_bronze, {
        "category_id": ["category_id", "categoryid", "CategoryID"],
        "category_name": ["category_name", "categoryname", "CategoryName"],
    }).dropDuplicates(["category_id"])
    products = products.join(categories.select("category_id", "category_name"), on="category_id", how="left")
else:
    products = products.withColumn("category_name", F.lit(None).cast("string"))
dim_products = (products
    .withColumn("stock_status", F.when(F.col("units_in_stock") < 10, F.lit("CRITIQUE")).otherwise(F.lit("NORMAL")))
    .select("product_id", "product_name", "category_id", "category_name", "units_in_stock", "stock_status")
 )

# --- 2.3 Fact_Orders ---
# R√®gles : jointure orders + order_details + montant_net
orders = _canonicalize_columns(orders_bronze, {
    "order_id": ["order_id", "orderid", "OrderID"],
    "customer_id": ["customer_id", "customerid", "CustomerID"],
    "order_date": ["order_date", "orderdate", "OrderDate"],
})
order_details = _canonicalize_columns(order_details_bronze, {
    "order_id": ["order_id", "orderid", "OrderID"],
    "product_id": ["product_id", "productid", "ProductID"],
    "unit_price": ["unit_price", "unitprice", "UnitPrice"],
    "quantity": ["quantity", "Quantity"],
    "discount": ["discount", "Discount"],
})
orders = orders.withColumn("order_date", F.to_date(F.col("order_date")))
order_details = (order_details
    .withColumn("unit_price", F.col("unit_price").cast("double"))
    .withColumn("quantity", F.col("quantity").cast("int"))
    .withColumn("discount", F.col("discount").cast("double"))
    .dropDuplicates(["order_id", "product_id"])
 )
fact_orders = (orders.select("order_id", "customer_id", "order_date")
    .join(order_details, on="order_id", how="inner")
    .withColumn("montant_net", (F.col("unit_price") * F.col("quantity") * (F.lit(1.0) - F.col("discount"))).cast("double"))
    .select("order_id", "product_id", "customer_id", "order_date", "quantity", "unit_price", "discount", "montant_net")
 )

# --- Ecriture Silver ---
results_silver = [
    write_silver(dim_customers, "dim_customers", ingestion_date),
    write_silver(dim_products, "dim_products", ingestion_date),
    write_silver(fact_orders, "fact_orders", ingestion_date),
]
print("=== PHASE 2 - ECRITURE SILVER OK ===")
for r in results_silver:
    print(f"[OK] {r['dataset']} -> {r['rows']} lignes -> {r['path']}")

=== PHASE 2 - ECRITURE SILVER OK ===
[OK] dim_customers -> 91 lignes -> s3a://silver/dim_customers/2026-01-16
[OK] dim_products -> 77 lignes -> s3a://silver/dim_products/2026-01-16
[OK] fact_orders -> 2155 lignes -> s3a://silver/fact_orders/2026-01-16


In [10]:
# Verification rapide : relire une sortie Silver
df_check_dim_customers = spark.read.parquet(f"s3a://silver/dim_customers/{ingestion_date}")
df_check_dim_customers.show(5, truncate=False)
df_check_dim_customers.printSchema()

+-----------+----------------------------------+-------+
|customer_id|company_name                      |country|
+-----------+----------------------------------+-------+
|ALFKI      |Alfreds Futterkiste               |GERMANY|
|ANATR      |Ana Trujillo Emparedados Y Helados|MEXICO |
|ANTON      |Antonio Moreno Taquer√≠a           |MEXICO |
|AROUT      |Around The Horn                   |UK     |
|BERGS      |Berglunds Snabbk√∂p                |SWEDEN |
+-----------+----------------------------------+-------+
only showing top 5 rows

root
 |-- customer_id: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- country: string (nullable = true)



## 3.1 - Script Python Producer Kafka

Simulation de nouvelles commandes envoy√©es en temps r√©el sur le topic `telemetry_orders`.


In [22]:
# Installation de kafka-python si n√©cessaire
import subprocess
import sys

try:
    from kafka import KafkaProducer
    print("‚úì kafka-python d√©j√† install√©")
except ImportError:
    print("‚ö† Installation de kafka-python...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "kafka-python", "-q"])
    from kafka import KafkaProducer
    print("‚úì kafka-python install√© avec succ√®s")

from kafka import KafkaProducer
import json
import random
import uuid
from datetime import datetime, timezone
import time

# Configuration Kafka
KAFKA_BROKER = "broker:29092"
TOPIC = "telemetry_orders"

# Initialisation du producer
producer = KafkaProducer(
    bootstrap_servers=[KAFKA_BROKER],
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

print(f"Producer Kafka configur√© pour le topic: {TOPIC}")
print(f"Broker: {KAFKA_BROKER}")


‚úì kafka-python d√©j√† install√©
Producer Kafka configur√© pour le topic: telemetry_orders
Broker: broker:29092


In [23]:
# Donn√©es de r√©f√©rence pour g√©n√©rer des commandes r√©alistes
CUSTOMER_IDS = ["ALFKI", "ANATR", "ANTON", "AROUT", "BERGS", "BLAUS", "BLONP", "BOLID", "BONAP", "BOTTM"]
PRODUCT_IDS = list(range(1, 78))  # IDs produits de 1 √† 77
STATUSES = ["pending", "confirmed", "processing", "shipped", "delivered"]

def generate_order_message():
    """G√©n√®re un message JSON simulant une nouvelle commande."""
    order_id = random.randint(10000, 99999)
    customer_id = random.choice(CUSTOMER_IDS)
    product_id = random.choice(PRODUCT_IDS)
    quantity = random.randint(1, 50)
    unit_price = round(random.uniform(10.0, 500.0), 2)
    discount = round(random.uniform(0.0, 0.3), 2)
    status = random.choice(STATUSES)
    timestamp = datetime.now(timezone.utc).isoformat()
    
    message = {
        "order_id": order_id,
        "customer_id": customer_id,
        "product_id": product_id,
        "quantity": quantity,
        "unit_price": unit_price,
        "discount": discount,
        "status": status,
        "order_timestamp": timestamp
    }
    return message

# Test : g√©n√©rer et envoyer quelques messages
print("G√©n√©ration de messages de test...")
for i in range(5):
    msg = generate_order_message()
    producer.send(TOPIC, msg)
    print(f"Message {i+1} envoy√©: {json.dumps(msg, indent=2)}")
    time.sleep(0.5)

producer.flush()
print(f"\n{5} messages envoy√©s sur le topic {TOPIC}")


G√©n√©ration de messages de test...
Message 1 envoy√©: {
  "order_id": 83767,
  "customer_id": "BOLID",
  "product_id": 43,
  "quantity": 37,
  "unit_price": 71.67,
  "discount": 0.13,
  "status": "pending",
  "order_timestamp": "2026-01-16T16:00:42.872661+00:00"
}
Message 2 envoy√©: {
  "order_id": 17447,
  "customer_id": "BOTTM",
  "product_id": 6,
  "quantity": 26,
  "unit_price": 322.98,
  "discount": 0.25,
  "status": "delivered",
  "order_timestamp": "2026-01-16T16:00:43.376755+00:00"
}
Message 3 envoy√©: {
  "order_id": 37081,
  "customer_id": "BOLID",
  "product_id": 65,
  "quantity": 1,
  "unit_price": 479.27,
  "discount": 0.27,
  "status": "processing",
  "order_timestamp": "2026-01-16T16:00:43.877241+00:00"
}
Message 4 envoy√©: {
  "order_id": 44216,
  "customer_id": "BONAP",
  "product_id": 41,
  "quantity": 18,
  "unit_price": 266.65,
  "discount": 0.14,
  "status": "shipped",
  "order_timestamp": "2026-01-16T16:00:44.377726+00:00"
}
Message 5 envoy√©: {
  "order_id": 665

In [24]:
# Imports n√©cessaires pour la g√©n√©ration de messages
# Cette cellule doit √™tre ex√©cut√©e avant la cellule suivante si vous ex√©cutez les cellules individuellement
import json
import random
from datetime import datetime, timezone
import time

# V√©rifier que producer et TOPIC sont d√©finis
if 'producer' not in globals():
    print("[ERREUR] Variable 'producer' non trouv√©e.")
    print("‚ö† Veuillez d'abord ex√©cuter la cellule 16 pour configurer le producer Kafka.")
    raise NameError("Variable 'producer' non trouv√©e")

if 'TOPIC' not in globals():
    print("[ERREUR] Variable 'TOPIC' non trouv√©e.")
    print("‚ö† Veuillez d'abord ex√©cuter la cellule 16 pour configurer le topic Kafka.")
    raise NameError("Variable 'TOPIC' non trouv√©e")

print("‚úì Imports et v√©rifications OK")

‚úì Imports et v√©rifications OK


## 3.2 - Job Spark Structured Streaming

Lecture des messages Kafka, parsing JSON et √©criture dans Bronze.


In [25]:
# DIAGNOSTIC ET CORRECTION : V√©rifier si le connecteur Kafka est disponible
# Si ce n'est pas le cas, cette cellule va arr√™ter et recr√©er la session

from pyspark.sql import SparkSession
import time as time_module

def check_kafka_connector():
    """V√©rifie si le connecteur Kafka est disponible."""
    try:
        if 'spark_streaming' not in globals():
            return False, "spark_streaming n'existe pas"
        
        # Tester si le format kafka est disponible
        test_reader = spark_streaming.readStream.format("kafka")
        return True, "Connecteur Kafka disponible"
    except Exception as e:
        return False, str(e)

# V√©rifier le connecteur
is_available, message = check_kafka_connector()

if not is_available:
    print(f"‚ùå Connecteur Kafka non disponible: {message}")
    print("\nüîß CORRECTION AUTOMATIQUE: Arr√™t et recr√©ation de la session...")
    
    # Arr√™ter toutes les sessions
    try:
        if 'spark_streaming' in globals():
            spark_streaming.stop()
            del spark_streaming
        active = SparkSession.getActiveSession()
        if active and "Kafka" in active.sparkContext.appName:
            active.stop()
        time_module.sleep(3)
    except:
        pass
    
    # Recr√©er la session avec un nom unique
    app_name_unique = f"TP-Final-Phase3-Kafka-{int(time_module.time())}"
    print(f"   Cr√©ation d'une nouvelle session: {app_name_unique}")
    print("   ‚è≥ T√©l√©chargement des packages (20-40 secondes)...")
    
    spark_streaming = (SparkSession.builder
        .master("local[*]")
        .appName(app_name_unique)
        .config("spark.jars.packages", 
                "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
                "org.apache.kafka:kafka-clients:3.5.0,"
                "org.apache.hadoop:hadoop-aws:3.3.4,"
                "com.amazonaws:aws-java-sdk-bundle:1.12.262")
        .config("spark.sql.streaming.kafka.useDeprecatedOffsetFetching", "false")
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
        .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
        .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123")
        .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .getOrCreate())
    
    spark_streaming.sparkContext.setLogLevel("WARN")
    
    # Attendre le t√©l√©chargement
    time_module.sleep(10)
    
    # V√©rifier √† nouveau
    is_available, message = check_kafka_connector()
    if is_available:
        print("‚úì Connecteur Kafka maintenant disponible !")
    else:
        print(f"‚ö† Connecteur toujours non disponible: {message}")
        print("   Veuillez attendre 20-30 secondes suppl√©mentaires et r√©ex√©cuter cette cellule.")
else:
    print(f"‚úì {message}")
    print("‚úì Vous pouvez maintenant ex√©cuter la cellule suivante (lecture du stream Kafka)")

‚úì Connecteur Kafka disponible
‚úì Vous pouvez maintenant ex√©cuter la cellule suivante (lecture du stream Kafka)


In [26]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# SparkSession avec support Kafka
# NOTE: Utilisation de hadoop-aws:3.3.4 au lieu de 3.4.1 pour √©viter l'erreur BulkDelete
# IMPORTANT: Arr√™ter toute session existante pour forcer le chargement des packages Kafka
try:
    existing_spark = SparkSession.getActiveSession()
    if existing_spark:
        app_name = existing_spark.sparkContext.appName
        if "Phase 3 Streaming Kafka" in app_name or "spark_streaming" in globals():
            print("[INFO] Arr√™t de la SparkSession existante pour recharger les packages Kafka...")
            existing_spark.stop()
            # Nettoyer la variable globale si elle existe
            if 'spark_streaming' in globals():
                del spark_streaming
            import time
            time.sleep(2)  # Attendre que la session soit compl√®tement arr√™t√©e
except Exception as e:
    print(f"[INFO] Aucune session existante √† arr√™ter: {e}")

# Cr√©er une nouvelle SparkSession avec les packages Kafka
# NOTE: Utilisation de .master("local[*]") pour forcer une nouvelle session
# Les packages seront t√©l√©charg√©s automatiquement au premier d√©marrage (peut prendre 10-30 secondes)
spark_streaming = (SparkSession.builder
    .master("local[*]")
    .appName("TP Final - Phase 3 Streaming Kafka")
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
            "org.apache.kafka:kafka-clients:3.5.0,"
            "org.apache.hadoop:hadoop-aws:3.3.4,"
            "com.amazonaws:aws-java-sdk-bundle:1.12.262")
    .config("spark.sql.streaming.kafka.useDeprecatedOffsetFetching", "false")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .getOrCreate())

spark_streaming.sparkContext.setLogLevel("WARN")

print("‚úì SparkSession cr√©√©e")
print("‚è≥ Les packages Kafka sont en cours de t√©l√©chargement (premi√®re ex√©cution uniquement)...")
print("   Cela peut prendre 10-30 secondes. Veuillez patienter.\n")

# Attendre un peu pour que les packages soient t√©l√©charg√©s
import time
time.sleep(5)  # Attendre 5 secondes pour le t√©l√©chargement initial

# V√©rifier que le connecteur Kafka est disponible
try:
    # Tester si le format kafka est disponible
    test_df = spark_streaming.readStream.format("kafka")
    print("‚úì Connecteur Kafka disponible et pr√™t")
except Exception as e:
    print(f"\n‚ö† ERREUR: Connecteur Kafka non disponible: {e}")
    print("‚ö† Les packages peuvent prendre plus de temps √† t√©l√©charger.")
    print("‚ö† SOLUTION: R√©ex√©cutez cette cellule apr√®s 10-20 secondes.")
    print("   Les packages seront t√©l√©charg√©s au premier d√©marrage et mis en cache.")
    raise

KAFKA_BROKER = "broker:29092"
KAFKA_TOPIC = "telemetry_orders"
BRONZE_PATH = "s3a://bronze/kafka_orders"
CHECKPOINT_LOCATION = "s3a://bronze/checkpoints/kafka_orders"

print("\nSpark Streaming configur√©")
print(f"Kafka broker: {KAFKA_BROKER}")
print(f"Topic: {KAFKA_TOPIC}")
print(f"Checkpoint: {CHECKPOINT_LOCATION}")
print(f"Spark version: {spark_streaming.version}")

[INFO] Arr√™t de la SparkSession existante pour recharger les packages Kafka...
‚úì SparkSession cr√©√©e
‚è≥ Les packages Kafka sont en cours de t√©l√©chargement (premi√®re ex√©cution uniquement)...
   Cela peut prendre 10-30 secondes. Veuillez patienter.

‚úì Connecteur Kafka disponible et pr√™t

Spark Streaming configur√©
Kafka broker: broker:29092
Topic: telemetry_orders
Checkpoint: s3a://bronze/checkpoints/kafka_orders
Spark version: 3.5.0


In [27]:
# Sch√©ma JSON des messages de commandes
order_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("discount", DoubleType(), True),
    StructField("status", StringType(), True),
    StructField("order_timestamp", StringType(), True),
])

# Lecture du stream Kafka
df_kafka_stream = (spark_streaming.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BROKER)
    .option("subscribe", KAFKA_TOPIC)
    .option("startingOffsets", "earliest")
    .load())

print("Stream Kafka configur√©")


AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.

In [18]:
# Parsing JSON et ajout des m√©tadonn√©es techniques
df_parsed = (df_kafka_stream
    .select(
        F.from_json(F.col("value").cast("string"), order_schema).alias("data"),
        F.col("timestamp").alias("kafka_timestamp"),
        F.col("partition"),
        F.col("offset")
    )
    .select(
        "data.*",
        F.current_timestamp().alias("_ingestion_timestamp"),
        F.lit("kafka").alias("_source_system"),
        F.lit("telemetry_orders").alias("_table_name"),
        F.to_date(F.current_timestamp()).alias("_ingestion_date"),
        "kafka_timestamp",
        "partition",
        "offset"
    )
    .withColumn("order_timestamp", F.to_timestamp(F.col("order_timestamp")))
)

print("Parsing JSON et ajout des m√©tadonn√©es techniques effectu√©s")


NameError: name 'df_kafka_stream' is not defined

In [19]:
# √âcriture en streaming vers Bronze avec append mode et checkpoint
query = (df_parsed.writeStream
    .outputMode("append")
    .format("parquet")
    .option("path", BRONZE_PATH)
    .option("checkpointLocation", CHECKPOINT_LOCATION)
    .trigger(processingTime="10 seconds")
    .start())

print("=== PHASE 3 - STREAMING KAFKA D√âMARR√â ===")
print(f"√âcriture dans: {BRONZE_PATH}")
print(f"Checkpoint: {CHECKPOINT_LOCATION}")
print(f"Mode: append")
print("\nLe streaming est actif. Envoyez des messages sur le topic pour les voir appara√Ætre dans Bronze.")
print("Pour arr√™ter le streaming, ex√©cutez: query.stop()")


NameError: name 'df_parsed' is not defined

In [20]:
# V√©rification : lecture des donn√©es √©crites en Bronze (apr√®s quelques secondes de streaming)
# D√©commentez cette cellule apr√®s avoir laiss√© le streaming tourner quelques secondes

# df_bronze_kafka = spark_streaming.read.parquet(BRONZE_PATH)
# print(f"Nombre de messages ing√©r√©s: {df_bronze_kafka.count()}")
# df_bronze_kafka.show(10, truncate=False)
# df_bronze_kafka.printSchema()


In [21]:
# Arr√™ter le streaming Kafka
# Ex√©cutez cette cellule quand vous avez termin√© de tester le streaming

if 'query' in globals():
    print("Arr√™t du streaming en cours...")
    query.stop()
    print("‚úì Streaming arr√™t√© avec succ√®s")
else:
    print("‚ö† Aucun streaming actif trouv√© (variable 'query' non d√©finie)")

‚ö† Aucun streaming actif trouv√© (variable 'query' non d√©finie)


## 4.1 - Transformation Silver des donn√©es streaming

Traitement s√©par√© des donn√©es Kafka vers Silver, similaire √† la Phase 2 mais adapt√© aux donn√©es streaming.


In [None]:
# Utilisation de la m√™me SparkSession que la Phase 2
# Si le streaming est toujours actif, on peut utiliser spark_streaming, sinon spark

# Lecture des donn√©es Kafka depuis Bronze (apr√®s arr√™t du streaming)
# Note: Pour un traitement en continu, on pourrait aussi faire un streaming vers Silver

# S'assurer que ingestion_date est d√©fini (au cas o√π cette cellule est ex√©cut√©e seule)
from datetime import datetime
if 'ingestion_date' not in globals():
    ingestion_date = datetime.now().strftime("%Y-%m-%d")
    print(f"[INFO] ingestion_date d√©fini automatiquement: {ingestion_date}")

# S'assurer que spark et F sont disponibles
from pyspark.sql import functions as F
if 'spark' not in globals():
    print("[ERREUR] SparkSession 'spark' non trouv√©e. Veuillez ex√©cuter la Phase 1 d'abord.")
    raise NameError("SparkSession 'spark' non trouv√©e")

BRONZE_KAFKA_PATH = "s3a://bronze/kafka_orders"

def process_kafka_orders_to_silver(ingestion_date: str):
    """Transforme les donn√©es Kafka de Bronze vers Silver."""
    try:
        # Lecture depuis Bronze
        df_kafka_bronze = spark.read.parquet(f"{BRONZE_KAFKA_PATH}/*")
        
        if df_kafka_bronze.count() == 0:
            print(f"[INFO] Aucune donn√©e Kafka trouv√©e dans {BRONZE_KAFKA_PATH}")
            return None
        
        # Transformation Silver : nettoyage et enrichissement
        df_silver = (df_kafka_bronze
            .withColumn("montant_net", 
                       (F.col("unit_price") * F.col("quantity") * (F.lit(1.0) - F.col("discount"))).cast("double"))
            .withColumn("order_date", F.to_date(F.col("order_timestamp")))
            .select(
                "order_id",
                "customer_id",
                "product_id",
                "quantity",
                "unit_price",
                "discount",
                "montant_net",
                "status",
                "order_date",
                "order_timestamp",
                "_ingestion_timestamp",
                "_source_system",
                "_table_name",
                "_ingestion_date"
            )
        )
        
        # √âcriture en Silver
        target_path = f"s3a://silver/kafka_orders/{ingestion_date}"
        row_count = df_silver.count()
        df_silver.write.mode("overwrite").parquet(target_path)
        
        return {
            "dataset": "kafka_orders",
            "rows": row_count,
            "path": target_path,
            "partition": ingestion_date
        }
    except Exception as e:
        print(f"[ERREUR] Traitement Kafka vers Silver: {e}")
        return None

# Ex√©cution (d√©commentez apr√®s avoir arr√™t√© le streaming et laiss√© quelques donn√©es s'accumuler)
# result_kafka_silver = process_kafka_orders_to_silver(ingestion_date)
# if result_kafka_silver:
#     print(f"[OK] {result_kafka_silver['dataset']} -> {result_kafka_silver['rows']} lignes -> {result_kafka_silver['path']}")


In [None]:
# Comparaison des donn√©es batch 
print("=== PHASE 4 - COMPARAISON BATCH vs STREAMING ===\n")

# S'assurer que ingestion_date est d√©fini (au cas o√π cette cellule est ex√©cut√©e seule)
from datetime import datetime
if 'ingestion_date' not in globals():
    ingestion_date = datetime.now().strftime("%Y-%m-%d")
    print(f"[INFO] ingestion_date d√©fini automatiquement: {ingestion_date}")

# S'assurer que spark et F sont disponibles
from pyspark.sql import functions as F
if 'spark' not in globals():
    print("[ERREUR] SparkSession 'spark' non trouv√©e. Veuillez ex√©cuter la Phase 1 d'abord.")
    raise NameError("SparkSession 'spark' non trouv√©e")

# Red√©finition des chemins
BRONZE_KAFKA_PATH = "s3a://bronze/kafka_orders"

# Donn√©es batch (PostgreSQL)
try:
    df_batch_orders = spark.read.parquet(f"s3a://bronze/orders/{ingestion_date}")
    batch_count = df_batch_orders.count()
    print(f"[BATCH] Commandes PostgreSQL: {batch_count} lignes")
    print(f"       Source: postgresql")
    print(f"       Path: s3a://bronze/orders/{ingestion_date}")
except Exception as e:
    print(f"[BATCH] Erreur: {e}")
    batch_count = 0

print()

# Donn√©es streaming (Kafka)
try:
    df_streaming_orders = spark.read.parquet(f"{BRONZE_KAFKA_PATH}/*")
    streaming_count = df_streaming_orders.count()
    print(f"[STREAMING] Commandes Kafka: {streaming_count} lignes")
    print(f"           Source: kafka")
    print(f"           Path: {BRONZE_KAFKA_PATH}")
    if streaming_count > 0:
        print(f"           Derni√®re commande:")
        df_streaming_orders.orderBy(F.col("_ingestion_timestamp").desc()).show(1, truncate=False)
except Exception as e:
    print(f"[STREAMING] Erreur: {e}")
    streaming_count = 0

print(f"\nTotal commandes (batch + streaming): {batch_count + streaming_count}")
print("\nLes deux sources coexistent et peuvent √™tre utilis√©es s√©par√©ment ou ensemble pour l'analyse.")


## Explication d√©taill√©e du choix : Option A vs Option B

### Analyse des deux options

#### Option A : Traitement s√©par√© (choix retenu)

**Avantages :**
1. **Tra√ßabilit√©** : Chaque source (PostgreSQL vs Kafka) conserve sa m√©tadonn√©e `_source_system`, permettant de savoir d'o√π viennent les donn√©es
2. **Flexibilit√©** : Les traitements peuvent √™tre diff√©rents selon la source 
3. **Maintenance** : En cas de probl√®me, il est plus facile d'identifier la source et de corriger ind√©pendamment
4. **Performance** : Les traitements batch et streaming peuvent √™tre optimis√©s s√©par√©ment
5. **S√©paration des pr√©occupations** : Respect du principe SOLID dans l'architecture Medallion

**Impl√©mentation :**
- Donn√©es batch (Phase 1) ‚Üí Bronze ‚Üí Silver (Phase 2) : `fact_orders`, `dim_customers`, `dim_products`
- Donn√©es streaming (Phase 3) ‚Üí Bronze ‚Üí Silver (Phase 4.1) : `kafka_orders`
- Les deux sources coexistent et peuvent √™tre utilis√©es ensemble pour des analyses (Phase 4.2)

#### Option B : Union batch + streaming (non retenue)

**Avantages :**
- Vue unifi√©e imm√©diate des donn√©es historiques et temps r√©el
- Simplification des requ√™tes d'analyse (une seule table √† interroger)

**Inconv√©nients :**
- Perte de tra√ßabilit√© de la source
- Complexit√© accrue dans la gestion des conflits (m√™me order_id dans batch et streaming)
- Moins de flexibilit√© pour des traitements diff√©renci√©s
- Plus difficile √† d√©boguer en cas de probl√®me

### Justification du choix

Nous avons choisi **l'Option A** car elle offre une meilleure architecture pour un data lake en production :

1. **Conformit√© avec l'architecture Medallion** : Chaque couche (Bronze, Silver) peut traiter diff√©remment les sources selon leurs caract√©ristiques
2. **Scalabilit√©** : Facilite l'ajout de nouvelles sources de donn√©es sans impacter les existantes
3. **Qualit√© des donn√©es** : Permet d'appliquer des r√®gles de qualit√© diff√©rentes selon la source
4. **Gouvernance** : Meilleure tra√ßabilit√© et auditabilit√© des donn√©es

**Note** : L'Option B reste possible en cr√©ant une vue unifi√©e ou en faisant une union lors des analyses finales (couche Gold), mais nous pr√©f√©rons maintenir la s√©paration jusqu'√† cette √©tape pour pr√©server la tra√ßabilit√©.
