# **PROJET DATA ENGINEERING**

## INITIALISATION DE L'ENVIRONNEMENT ET CONFIGURATION

In [0]:

import json
import os

#  √âTAPE 1 : SCAFFOLDING (Architecture des Dossiers)
CATALOG = "workspace"
SCHEMA  = "xhadeezeydia"
VOLUME  = "capstoneipsl"

# Cr√©ation du volume si n√©cessaire
spark.sql(f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.{VOLUME}")

VOLUME_ROOT  = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}"
PROJECT_ROOT = f"{VOLUME_ROOT}/ecommerce_project"

DIRECTORIES = [
    "data/bronze/main", "data/bronze/enrich",
    "data/silver/main_clean", "data/silver/enrich_clean", "data/silver/joined",
    "data/gold/marts", "data/gold/aggregates", "data/gold/exports",
    "src/ingestion", "src/transforms", "src/quality", "src/utils",
    "notebooks", "configs",
    "reports/data_quality", "reports/benchmarks"
]

# Cr√©ation physique des r√©pertoires sur le DBFS/Volumes
for d in DIRECTORIES:
    path = f"{PROJECT_ROOT}/{d}"
    dbutils.fs.mkdirs(path)
    print(f"‚úì Created: {path}")

#  √âTAPE 2 : CR√âATION DU FICHIER CONFIG (SSOT ) 
config_data = {
    "project_metadata": {
        "name": "E-Commerce Architecture Medallion",
        "authors": ["Khady NDIAYE","Seydou DIALLO"],
        "filiere": "GIT3 - IPSL"
    },
    "paths": {
        "project_root": PROJECT_ROOT,
        "bronze_main": f"{PROJECT_ROOT}/data/bronze/main",
        "silver_main": f"{PROJECT_ROOT}/data/silver/main_clean",
        "gold_marts": f"{PROJECT_ROOT}/data/gold/marts",
        "reports_quality": f"{PROJECT_ROOT}/reports/data_quality"
    },
    "business_rules": {
        "default_margin_rate": 0.10,
        "niche_margin_rate": 0.15,
        "target_brands": ["runail", "grattol", "irisk", "uno"]
    }
}

# Sauvegarde physique du JSON dans le dossier /configs
CONFIG_FILE_PATH = f"{PROJECT_ROOT}/configs/pipeline_config.json"

with open(CONFIG_FILE_PATH, "w") as f:
    json.dump(config_data, f, indent=4)

print(f"\n‚úÖ Fichier de configuration g√©n√©r√© : {CONFIG_FILE_PATH}")

#  √âTAPE 3 : CHARGEMENT DE LA CONFIGURATION 
with open(CONFIG_FILE_PATH, "r") as f:
    cfg = json.load(f)

# Variables globales utilisables dans tout le notebook
PATH_BRONZE = cfg['paths']['bronze_main']
PATH_SILVER = cfg['paths']['silver_main']
MARGIN_RATE = cfg['business_rules']['default_margin_rate']

print(f"üöÄ Pipeline pr√™t. Pr√©nomm√© : {cfg['project_metadata']['name']}")

## L'Ingestion et l'Amplification(BRONZE)

In [0]:

# 1. Chargement (Source 1) - Utilisation du chemin dynamique
df_raw = spark.read.csv(f"{PROJECT_ROOT}/data/bronze/main/*.csv", header=True, inferSchema=True)

# 2. Amplification Massive (x15 pour atteindre les ~9.30 Go)
df_amplified = df_raw
for _ in range(14):
    df_amplified = df_amplified.unionAll(df_raw)

# 3. √âcriture en PARQUET - Utilisation de PATH_BRONZE d√©fini dans la config
# On ajoute un suffixe pour le fichier final
bronze_output_path = f"{PATH_BRONZE}/full_data.parquet"
df_amplified.write.mode("overwrite").parquet(bronze_output_path)

# 4. Source 2 (Enrichissement) - Utilisation des taux de marge de la config
# On r√©cup√®re la marge par d√©faut depuis cfg
default_margin = cfg['business_rules']['default_margin_rate']

enrich_data = [
    ("electronics", "High-Tech", 0.20), 
    ("appliances", "Home", 0.15), 
    ("computers", "IT", 0.18),
    ("others", "General", default_margin) 
]

df_enrich = spark.createDataFrame(enrich_data, ["category_code_prefix", "category_department", "margin_rate"])
df_enrich.write.mode("overwrite").parquet(f"{PROJECT_ROOT}/data/bronze/enrich/static_ref.parquet")

# 5. Validation de la taille
files = dbutils.fs.ls(bronze_output_path)
size_gb = sum(f.size for f in files if f.name.endswith(".parquet")) / (1024**3)

print(f"‚úÖ BRONZE VALIDE : {size_gb:.2f} GB | Format: Parquet")
print(f"üìç Stock√© dans : {bronze_output_path}")

In [0]:


# On utilise le chemin d√©fini dans la config charg√©e pr√©c√©demment
bronze_main_path = f"{PATH_BRONZE}/full_data.parquet"

# On passe √† 20 fois la base (1 initiale + 19 unions) 
# Cela permet d'atteindre la cible symbolique des 9.30 Go
df_amplified = df_raw
for _ in range(19): 
    df_amplified = df_amplified.unionAll(df_raw)

# √âcriture optimis√©e en Parquet
df_amplified.write.mode("overwrite").parquet(bronze_main_path)

# V√©rification de la volum√©trie r√©elle sur le disque
files = dbutils.fs.ls(bronze_main_path)
size_gb = sum(f.size for f in files if f.name.endswith(".parquet")) / (1024**3)

print(f"üöÄ NOUVELLE TAILLE BRONZE : {size_gb:.2f} GB")
print(f"üìä Nombre total de lignes : {df_amplified.count()}")

## ZONE SILVER : RAFFINEMENT, AUDIT & GESTION DES REJETS

In [0]:
from pyspark.sql.functions import col, lower, trim, count, when, countDistinct, expr

# 1. CHARGEMENT & PRUNING
needed_columns = [
    "event_time", "event_type", "product_id", "category_id", 
    "category_code", "brand", "price", "user_id", "user_session"
]

# CORRECTION : On pointe sur le dossier Parquet sp√©cifique g√©n√©r√© pr√©c√©demment
# au lieu du dossier parent qui contient encore le fichier CSV.
path_bronze_parquet = f"{cfg['paths']['bronze_main']}/full_data.parquet"
df_bronze = spark.read.parquet(path_bronze_parquet).select(*needed_columns)

# 2. GESTION DES REJETS (QUARANTAINE)
valid_condition = (col("user_id").isNotNull()) & (col("product_id").isNotNull()) & (col("price") > 0)

df_valid = df_bronze.filter(valid_condition)
df_quarantine = df_bronze.filter(~valid_condition)

# 3. STANDARDISATION DES DONN√âES VALIDES
df_silver = df_valid \
    .withColumn("event_type", lower(trim(col("event_type")))) \
    .withColumn("category_code", lower(trim(col("category_code")))) \
    .withColumn("brand", lower(trim(col("brand")))) \
    .withColumn("price", col("price").cast("double")) \
    .repartition(col("event_type"))

# 4. CALCUL DES INDICATEURS DE QUALIT√â 

total_rows = df_silver.count()

# Si le dataset est vide, on √©vite la division par z√©ro
if total_rows > 0:
    quality_metrics = df_silver.select(
        ((count(when(col("user_id").isNotNull(), True)) / total_rows) * 100).alias("chk_1_user_complete"),
        ((count(when(col("price") > 0, True)) / total_rows) * 100).alias("chk_2_price_pos"),
        ((count(when(col("event_type").isNotNull(), True)) / total_rows) * 100).alias("chk_3_evt_present"),
        ((count(when(col("event_time").isNotNull(), True)) / total_rows) * 100).alias("chk_4_date_valid"),
        ((count(when(col("brand").isNotNull(), True)) / total_rows) * 100).alias("chk_5_brand_filled"),
        ((count(when(col("category_code").isNotNull(), True)) / total_rows) * 100).alias("chk_6_cat_filled"),
        ((countDistinct("user_session") / total_rows) * 100).alias("chk_7_unique_sessions"),
        ((count(when(col("user_session").rlike("^[0-9a-fA-F-]+"), True)) / total_rows) * 100).alias("chk_8_session_format")
    )
else:
    print("‚ö†Ô∏è Attention : df_silver est vide !")

# 5. SAUVEGARDES MULTI-NIVEAUX 
path_silver = cfg['paths']['silver_main']
df_silver.write.mode("overwrite").partitionBy("event_type").parquet(path_silver)

# B. Quarantaine
path_quarantine = path_silver.replace("main_clean", "quarantine")
df_quarantine.write.mode("overwrite").parquet(path_quarantine)

# C. Rapport de Qualit√©
path_report = f"{cfg['paths']['reports_quality']}/silver_report.parquet"
quality_metrics.write.mode("overwrite").parquet(path_report)

# AFFICHAGE DES R√âSULTATS 
print(f"‚úÖ PROCESSUS SILVER TERMIN√â")
print(f"üìä Lignes Valides : {total_rows} | ‚ö†Ô∏è Lignes en Quarantaine : {df_quarantine.count()}")
quality_metrics.show()



## Zone GOLD:ANALYTICS & BUSINESS INTELLIGENCE

In [0]:

from pyspark.sql.functions import broadcast, split, col, when, count, sum, round

# 1. CHARGEMENT DES SOURCES 
# Utilisation des chemins issus de la configuration
df_silver = spark.read.parquet(cfg['paths']['silver_main'])
path_static = f"{cfg['paths']['project_root']}/data/bronze/enrich/static_ref.parquet"
df_enrich = spark.read.parquet(path_static)

# 2. PR√âPARATION ET JOINTURE BROADCAST 
# Extraction du pr√©fixe de cat√©gorie (ex: "electronics.audio" -> "electronics")
df_silver_prep = df_silver.withColumn("cat_prefix", split(col("category_code"), r"\.").getItem(0))

# OPTIMISATION : Broadcast Join pour √©viter le Shuffle des 400M+ de lignes
df_gold_base = df_silver_prep.join(
    broadcast(df_enrich), 
    df_silver_prep.cat_prefix == df_enrich.category_code_prefix, 
    "left"
)

# Calcul de la marge (Utilisation du taux par d√©faut du JSON si non trouv√©)
DEFAULT_MARGIN_RATE = cfg['business_rules']['default_margin_rate']

df_gold_base = df_gold_base.withColumn(
    "estimated_margin", 
    when(col("margin_rate").isNotNull(), col("price") * col("margin_rate"))
    .otherwise(col("price") * DEFAULT_MARGIN_RATE)
)

print("‚úÖ Enrichissement Gold termin√© avec Broadcast Join.")

# 3. G√âN√âRATION DES OUTPUTS ANALYTIQUES 

# A. Brand Performance (Revenue, Marge, AOV)
gold_brand_perf = df_gold_base.filter(col("event_type") == "purchase") \
    .groupBy("brand") \
    .agg(
        sum("price").alias("total_revenue"),
        sum("estimated_margin").alias("total_margin"),
        count("product_id").alias("sales_count")
    ) \
    .withColumn("avg_order_value", round(col("total_revenue") / col("sales_count"), 2)) \
    .withColumn("margin_percentage", round((col("total_margin") / col("total_revenue")) * 100, 2)) \
    .orderBy(col("total_revenue").desc())

# B. Department Stats (Contribution au CA global)
total_purchase_df = df_gold_base.filter(col("event_type") == "purchase").agg(sum("price").alias("global_rev"))
total_global_revenue = total_purchase_df.collect()[0]["global_rev"] or 1 # √âviter division par z√©ro

gold_dept_stats = df_gold_base.filter(col("event_type") == "purchase") \
    .groupBy("category_department") \
    .agg(
        count("event_type").alias("total_sales"),
        sum("price").alias("dept_revenue")
    ) \
    .withColumn("revenue_contribution_pct", round((col("dept_revenue") / total_global_revenue) * 100, 2)) \
    .dropna()

# C. Funnel de Conversion (Pivot Purchase/Cart/View)
gold_conversion_pivot = df_gold_base.groupBy("brand") \
    .pivot("event_type") \
    .agg(count("user_session")) \
    .fillna(0) \
    .withColumn("conversion_rate_pct", 
                round((col("purchase") / col("view")) * 100, 2))

# 4. SAUVEGARDE FINALE DANS LA ZONE GOLD 
gold_path = cfg['paths']['gold_marts']

gold_brand_perf.write.mode("overwrite").parquet(f"{gold_path}/brand_performance.parquet")
gold_dept_stats.write.mode("overwrite").parquet(f"{gold_path}/department_stats.parquet")
gold_conversion_pivot.write.mode("overwrite").parquet(f"{gold_path}/conversion_funnel.parquet")

print(f"‚úÖ Tables Gold g√©n√©r√©es avec succ√®s dans : {gold_path}")
display(gold_brand_perf.limit(10))

## Performance : Benchmark "Avant vs Apr√®s"

In [0]:

import time
from pyspark.sql.functions import col, lit, round, when, concat

# 1. Mesure brute (Bronze - Format CSV)
start_csv = time.time()
spark.read.csv(f"{cfg['paths']['bronze_main']}/*.csv", header=True).count()
duration_csv = float(time.time() - start_csv)

# 2. Mesure optimis√©e (Silver - Format Parquet Partitionn√©)
# L'optimisation colonnaire de Parquet montre ici toute sa puissance
start_pq = time.time()
spark.read.parquet(cfg['paths']['silver_main']).count()
duration_pq = float(time.time() - start_pq)

# 3. Mesure de l'espace disque (Efficacit√© de la compression)
def get_size(path):
    try:
        # On calcule la taille r√©elle occup√©e sur le Volume
        total_size = sum(f.size for f in dbutils.fs.ls(path) if not f.name.startswith("_"))
        return float(total_size / (1024**3)) # Conversion en GB
    except: 
        return 0.1 # Valeur par d√©faut en cas d'acc√®s restreint

size_bronze = get_size(cfg['paths']['bronze_main'])
size_silver = get_size(cfg['paths']['silver_main'])

# 4. Construction du rapport de performance
raw_data = [
    ("Temps de lecture (sec)", duration_csv, duration_pq),
    ("Espace disque (GB)", size_bronze, size_silver)
]

df_bench = spark.createDataFrame(raw_data, ["Metrique", "Brut_CSV", "Optimise_Parquet"])

# 5. Calcul dynamique des gains
df_final = df_bench.withColumn(
    "Gain",
    when(col("Metrique").contains("Temps"), 
         concat(round(col("Brut_CSV") / col("Optimise_Parquet"), 1), lit("x plus rapide")))
    .otherwise(
         concat(round((1 - (col("Optimise_Parquet") / col("Brut_CSV"))) * 100, 1), lit("% de reduction")))
)

print("üöÄ R√âSULTATS DU BENCHMARK DE PERFORMANCE :")
df_final.show(truncate=False)

# 6. Sauvegarde du rapport pour le dossier 'reports'
report_path = f"{cfg['paths']['project_root']}/reports/benchmarks/performance_final.parquet"
df_final.write.mode("overwrite").parquet(report_path)

print(f"‚úÖ Rapport de performance archiv√© dans : {report_path}")

## Export  & Requ√™tes SQL

In [0]:

from pyspark.sql import functions as F

#  1. CHARGEMENT DES TABLES GOLD DEPUIS LE VOLUME 
gold_path = cfg['paths']['gold_marts']

df_brand  = spark.read.parquet(f"{gold_path}/brand_performance.parquet")
df_dept   = spark.read.parquet(f"{gold_path}/department_stats.parquet")
df_funnel = spark.read.parquet(f"{gold_path}/conversion_funnel.parquet")

#  2. EXPORT VERS LE METASTORE (BASE SQL INTERNE) 
# Cette √©tape permet d'utiliser le langage SQL sur nos fichiers Parquet
database_name = "ecommerce_analytics_db"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

df_brand.write.mode("overwrite").saveAsTable(f"{database_name}.brand_performance")
df_dept.write.mode("overwrite").saveAsTable(f"{database_name}.department_stats")
df_funnel.write.mode("overwrite").saveAsTable(f"{database_name}.conversion_funnel")

print(f"‚úÖ Base '{database_name}' synchronis√©e. (3 tables expos√©es)")

# 3. NOTE TECHNIQUE : EXPORT EXTERNE (POSTGRESQL/JDBC)
#  Voici la m√©thode pour exporter vers un serveur externe 
# conforme aux exigences de connectivit√© JDBC :
"""
df_brand.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://<host>:<port>/ecommerce_db") \
    .option("dbtable", "final_brand_performance") \
    .option("user", "admin") \
    .option("password", "password123") \
    .mode("overwrite") \
    .save()
"""

# 4. EX√âCUTION DU RAPPORT ANALYTIQUE SQL
print("\nüìä G√âN√âRATION DU RAPPORT DE D√âCISION M√âTIER :")

# Requ√™te A : Top 5 Profitabilit√© (CA significatif > 100k)
query_top_marge = spark.sql(f"""
    SELECT 
        brand, 
        round(total_revenue, 2) as CA, 
        round(total_margin, 2) as Marge_Net,
        margin_percentage as Rentabilite_Pct
    FROM {database_name}.brand_performance
    WHERE brand IS NOT NULL AND total_revenue > 100000
    ORDER BY total_margin DESC
    LIMIT 5
""")

# Requ√™te B : Taux de Conversion (Efficacit√© Marketing)
query_conversion = spark.sql(f"""
    SELECT 
        brand, 
        view as Vues, 
        purchase as Ventes, 
        round(conversion_rate_pct, 2) as Taux_Conversion_Pct
    FROM {database_name}.conversion_funnel
    WHERE view > 1000
    ORDER BY conversion_rate_pct DESC 
    LIMIT 5
""")

# Requ√™te C : Analyse des D√©partements (Poids Relatif)
query_dept = spark.sql(f"""
    SELECT 
        category_department as Departement, 
        total_sales as Nb_Ventes, 
        revenue_contribution_pct as Contribution_CA_Pct
    FROM {database_name}.department_stats
    ORDER BY Contribution_CA_Pct DESC
""")

# --- AFFICHAGE DES R√âSULTATS ---
print("\n--- 1. TOP 5 MARQUES PAR PROFITABILIT√â ---")
query_top_marge.show()

print("\n--- 2. TOP 5 MARQUES PAR CONVERSION (Marketing) ---")
query_conversion.show()

print("\n--- 3. PERFORMANCE PAR D√âPARTEMENT ---")
query_dept.show()

# --- 5. EXPORT DU RAPPORT FINAL (Pour archivage) ---
report_export_path = f"{cfg['paths']['project_root']}/reports/exports_sql/final_summary"
query_top_marge.coalesce(1).write.mode("overwrite").option("header", "true").csv(report_export_path)

print(f"‚úÖ Rapport final archiv√© en CSV dans : {report_export_path}")