# NB_Transform_Sales_Data — Silver Layer

Ce notebook implémente la couche **Silver** de l’architecture Medallion dans Microsoft Fabric.

Le pipeline de données du projet est organisé en trois niveaux :

Bronze → Silver → Gold

## Bronze
Les données brutes sont ingérées depuis des fichiers CSV stockés dans le Lakehouse
(`Files/raw/orders/`). Ces données représentent la source du système e-commerce.

## Silver (ce notebook)
Ce notebook transforme les données brutes afin de produire un dataset analytique propre.
Les étapes principales sont :

- contrôles de qualité des données
- gestion des rejets (`sales_orders_rejects`)
- suppression des doublons
- standardisation du canal de vente
- normalisation des types
- calcul du revenu (`revenue`)
- enrichissement temporel (année, mois, jour, etc.)

Le résultat est stocké dans la table Delta :

sales_orders_clean

Cette table constitue la couche Silver du Lakehouse.

## Gold
La couche Gold correspond au Data Warehouse `wh_sales`, qui contient
un modèle dimensionnel (FactSales, DimProduct, DimDate, DimChannel).
Cette couche est utilisée par le Semantic Model et Power BI.

## Objectif
Séparer clairement :
- l’ingestion des données
- la transformation
- la consommation analytique

Ce pattern correspond à l’architecture Medallion utilisée en data engineering moderne.


In [1]:
# ============================================================
# BRONZE LAYER — INGESTION DES DONNÉES BRUTES
# ============================================================
# Lecture des fichiers CSV depuis le dossier raw du Lakehouse.
# Les données sont chargées telles quelles, sans transformation.
# Cette couche correspond à la source brute du pipeline.
# ============================================================

from pyspark.sql.functions import col


df_raw = (
    spark.read
    .option("header", "true")          # le fichier CSV contient des noms de colonnes
    .option("inferSchema", "true")     # Spark déduit automatiquement les types de données
    .csv("Files/raw/orders/")          # lecture des fichiers bruts depuis le Lakehouse
)

if df_raw.rdd.isEmpty():               # vérifie s’il n’y a aucune donnée à traiter
    print("Aucune donnée détectée.")
    dbutils.notebook.exit("NO_DATA")   # arrêt propre du notebook si dataset vide


StatementMeta(, 07b7031e-a3c1-4bdf-a13c-09c997e36eb2, 3, Finished, Available, Finished, False)

In [2]:
# ============================================================
# SILVER LAYER — DATA QUALITY CHECKS
# ============================================================
# Identification des lignes invalides selon des règles métier.
# Les lignes incorrectes sont conservées dans une table de rejet
# pour garantir la traçabilité.
# ============================================================

df_invalid = df_raw.filter(
    col("order_id").isNull() |         # commande sans identifiant
    col("order_datetime").isNull() |   # commande sans date
    (col("quantity") <= 0) |           # quantité invalide
    (col("unit_price") <= 0)           # prix invalide
)

df_valid = df_raw.subtract(df_invalid) # conserve uniquement les lignes valides

df_invalid.write.mode("overwrite").format("delta").saveAsTable("sales_orders_rejects")
# stockage des lignes rejetées pour audit et traçabilité


print("Rows raw     :", df_raw.count())
print("Rows valid   :", df_valid.count())
print("Rows invalid :", df_invalid.count())


StatementMeta(, 07b7031e-a3c1-4bdf-a13c-09c997e36eb2, 4, Finished, Available, Finished, False)

Rows raw     : 846
Rows valid   : 846
Rows invalid : 0


In [3]:
# ============================================================
# SILVER LAYER — NETTOYAGE DES DONNÉES
# ============================================================
# Suppression des doublons
# Standardisation du canal de vente
# Ajout d’un timestamp d’ingestion pour la traçabilité
# ============================================================

from pyspark.sql.functions import trim, upper, current_timestamp

df_clean = (
    df_valid
    .dropDuplicates(["order_id"])                     # suppression des commandes dupliquées
    .withColumn("channel", upper(trim(col("channel"))))  # normalisation du canal de vente
    .filter(col("channel").isin("WEB", "MOBILE"))     # conservation des canaux métier autorisés
    .withColumn("ingestion_ts", current_timestamp())  # ajout d’un timestamp technique d’ingestion
)


StatementMeta(, 07b7031e-a3c1-4bdf-a13c-09c997e36eb2, 5, Finished, Available, Finished, False)

In [4]:
# ============================================================
# SILVER LAYER — NORMALISATION DES TYPES
# ============================================================
# Conversion des types pour garantir la cohérence analytique.
# ============================================================

from pyspark.sql.functions import to_timestamp

df_normalized = (
    df_clean
    .withColumn("order_datetime", to_timestamp("order_datetime"))  # conversion en timestamp
    .withColumn("quantity", col("quantity").cast("int"))           # typage entier
    .withColumn("unit_price", col("unit_price").cast("double"))    # typage décimal
)



StatementMeta(, 07b7031e-a3c1-4bdf-a13c-09c997e36eb2, 6, Finished, Available, Finished, False)

In [5]:
#mini-check
df_normalized.select("order_id", "order_datetime", "channel", "quantity", "unit_price", "ingestion_ts").show(5, truncate=False)
df_normalized.printSchema()


StatementMeta(, 07b7031e-a3c1-4bdf-a13c-09c997e36eb2, 7, Finished, Available, Finished, False)

+--------+-------------------+-------+--------+----------+--------------------------+
|order_id|order_datetime     |channel|quantity|unit_price|ingestion_ts              |
+--------+-------------------+-------+--------+----------+--------------------------+
|O000001 |2026-01-01 10:01:00|WEB    |1       |130.0     |2026-02-14 14:31:27.464232|
|O000002 |2026-01-01 11:58:00|WEB    |2       |95.0      |2026-02-14 14:31:27.464232|
|O000003 |2026-01-01 19:52:00|WEB    |1       |30.0      |2026-02-14 14:31:27.464232|
|O000004 |2026-01-01 20:44:00|WEB    |3       |28.0      |2026-02-14 14:31:27.464232|
|O000005 |2026-01-01 18:32:00|MOBILE |1       |75.0      |2026-02-14 14:31:27.464232|
+--------+-------------------+-------+--------+----------+--------------------------+
only showing top 5 rows

root
 |-- order_id: string (nullable = true)
 |-- order_datetime: timestamp (nullable = true)
 |-- order_date: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_name: stri

In [6]:
# ============================================================
# SILVER LAYER — CALCUL DU CHIFFRE D’AFFAIRES
# ============================================================
# Le revenu est calculé au niveau ligne.
# Cette logique métier est centralisée dans la couche data.
# ============================================================

df_enriched = (
    df_normalized
    .withColumn("revenue", col("quantity") * col("unit_price"))   # calcul du chiffre d’affaires par ligne
)


StatementMeta(, 07b7031e-a3c1-4bdf-a13c-09c997e36eb2, 8, Finished, Available, Finished, False)

In [7]:
# ============================================================
# SILVER LAYER — ENRICHISSEMENT TEMPOREL
# ============================================================
# Extraction d’attributs temporels pour faciliter les analyses.
# ============================================================

from pyspark.sql.functions import year, month, dayofmonth, date_format

df_time = (
    df_enriched
    .withColumn("order_date", col("order_datetime").cast("date"))   # extraction de la date
    .withColumn("order_year", year("order_datetime"))               # extraction de l’année
    .withColumn("order_month", month("order_datetime"))             # extraction du mois
    .withColumn("order_day", dayofmonth("order_datetime"))          # extraction du jour
    .withColumn("order_weekday", date_format("order_datetime", "EEEE"))  # nom du jour de la semaine
)


StatementMeta(, 07b7031e-a3c1-4bdf-a13c-09c997e36eb2, 9, Finished, Available, Finished, False)

In [8]:
# ============================================================
# SILVER OUTPUT — TABLE ANALYTIQUE DU LAKEHOUSE
# ============================================================
# La table sales_orders_clean représente la couche Silver
# prête à être consommée par le Warehouse (Gold).
# ============================================================

(
    df_time
    .write
    .mode("overwrite")                    # remplace la table si elle existe déjà
    .option("overwriteSchema", "true")    # met à jour le schéma si nécessaire
    .format("delta")                      # stockage au format Delta Lake
    .saveAsTable("sales_orders_clean")    # table Silver du Lakehouse
)



StatementMeta(, 07b7031e-a3c1-4bdf-a13c-09c997e36eb2, 10, Finished, Available, Finished, False)