In [0]:
# CODE DU NOTEBOOK 02_Transformation_Produit
# ----------------------------------------

from pyspark.sql.functions import col, when, to_timestamp

DBFS_PATH_BRUT = "/Volumes/workspace/ventes_produits_schema/data_mesh"
TABLE_PRODUIT = "ventes_produits_schema.commandes_enrichies"

# 1. Lecture des données brutes
df = spark.read.format("delta").load(DBFS_PATH_BRUT)

# --- Transformations (Ajout de Valeur) ---

# Ajout de la colonne 'client_type' (HIGH_VALUE si TTC >= 100)
df_transformed = df.withColumn(
    "client_type",
    when(col("montant_ttc") >= 100.00, "HIGH_VALUE")
    .otherwise("STANDARD")
)

# Conversion de la date
df_transformed = df_transformed.withColumn(
    "date_commande_ts",
    to_timestamp(col("date_commande"), "yyyy-MM-dd HH:mm:ss")
)

# Filtrer uniquement les commandes réussies (Règle de Qualité du Produit)
df_final = df_transformed.filter(col("statut") == "COMPLETED")

# 2. Écriture du Produit de Données (Table Delta)
# Le produit est stocké dans le schéma désigné par la Plateforme.
df_final.write.format("delta").mode("overwrite").saveAsTable(TABLE_PRODUIT)

print(f"Produit de Données créé : {TABLE_PRODUIT}")