In [None]:
from pyspark.sql import functions as F, types as T, Window
import os

# ---------- Contexte & chemins ----------
spark.sql("USE CATALOG hive_metastore")
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")
spark.sql("USE SCHEMA gold")

storage_account_name = os.environ["STORAGE_ACCOUNT_NAME"]
storage_account_key  = os.environ["STORAGE_ACCOUNT_KEY"]
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", storage_account_key)

gold_base     = f"abfss://gold@{storage_account_name}.dfs.core.windows.net/"
gold_dim_comm = gold_base + "dim_commune/"
gold_dim_res  = gold_base + "dim_reseau/"
gold_dim_par  = gold_base + "dim_parametre/"
gold_dim_time = gold_base + "dim_temps/"
gold_f_prelev = gold_base + "fact_prelevement/"
gold_f_res    = gold_base + "fact_resultat/"

# ---------- Sources Silver ----------
plv = spark.table("hive_metastore.silver.plv_silver")
res = spark.table("hive_metastore.silver.result_silver")

# ========= DIMENSIONS =========

# -- DIM_TEMPS (à partir des dates de prélèvement) --
# On part de plv.date_prelevement (format date recommandé dans Silver)
dim_temps = (plv
    .select(F.col("date_prelevement").alias("date"))
    .where(F.col("date").isNotNull())
    .dropDuplicates()
    .withColumn("date_key", F.date_format("date", "yyyyMMdd").cast("int"))
    .withColumn("year", F.year("date"))
    .withColumn("quarter", F.quarter("date"))
    .withColumn("month", F.month("date"))
    .withColumn("week", F.weekofyear("date"))
    .withColumn("day", F.dayofmonth("date"))
)

(dim_temps
 .write.format("delta").mode("overwrite").option("overwriteSchema","true")
 .option("path", gold_dim_time).saveAsTable("hive_metastore.gold.dim_temps"))

# -- DIM_COMMUNE --
dim_commune = (plv
    .select(
        F.col("code_insee_commune").alias("code_insee_commune"),
        F.col("nom_commune").alias("nom_commune"),
        F.col("code_dept").alias("code_dept"),
        F.col("annee").alias("annee")  # utile pour l'historique si besoin
    )
    .where(F.col("code_insee_commune").isNotNull())
    .dropDuplicates(["code_insee_commune", "nom_commune", "code_dept"])
)

# surrogate key stable (hash)
dim_commune = dim_commune.withColumn(
    "commune_sk",
    F.sha2(F.concat_ws("||", F.col("code_insee_commune"), F.col("nom_commune"), F.col("code_dept")), 256)
)

(dim_commune
 .select("commune_sk","code_insee_commune","nom_commune","code_dept","annee")
 .write.format("delta").mode("overwrite").option("overwriteSchema","true")
 .option("path", gold_dim_comm).saveAsTable("hive_metastore.gold.dim_commune"))

# -- DIM_RESEAU --
dim_reseau = (plv
    .select(
        F.col("code_reseau").alias("code_reseau"),
        F.col("nom_reseau_amont").alias("nom_reseau_amont"),
        F.col("code_reseau_amont").alias("code_reseau_amont")
    )
    .where(F.col("code_reseau").isNotNull())
    .dropDuplicates(["code_reseau","nom_reseau_amont","code_reseau_amont"])
)

dim_reseau = dim_reseau.withColumn(
    "reseau_sk",
    F.sha2(F.concat_ws("||", F.col("code_reseau"), F.col("nom_reseau_amont"), F.col("code_reseau_amont")), 256)
)

(dim_reseau
 .select("reseau_sk","code_reseau","nom_reseau_amont","code_reseau_amont")
 .write.format("delta").mode("overwrite").option("overwriteSchema","true")
 .option("path", gold_dim_res).saveAsTable("hive_metastore.gold.dim_reseau"))

# -- DIM_PARAMETRE (depuis result_silver) --
dim_parametre = (res
    .select(
        F.col("code_parametre").alias("code_parametre"),
        F.col("code_parametre_sise_eau").alias("code_parametre_sise_eau"),
        F.col("libelle_maj_parametre").alias("libelle_maj_parametre"),
        F.col("libelle_min_parametre").alias("libelle_min_parametre"),
        F.col("libelle_web_parametre").alias("libelle_web_parametre"),
        F.col("code_unite_reference").alias("code_unite_reference"),
        F.col("code_unite_reference_sise_eau").alias("code_unite_reference_sise_eau"),
        F.col("limite_qualite").alias("limite_qualite"),
        F.col("reference_qualite").alias("reference_qualite")
    )
    .where(F.col("code_parametre").isNotNull())
    .dropDuplicates()
)

dim_parametre = dim_parametre.withColumn(
    "parametre_sk",
    F.sha2(F.concat_ws("||",
        F.coalesce(F.col("code_parametre"), F.lit("")),
        F.coalesce(F.col("code_unite_reference"), F.lit("")),
        F.coalesce(F.col("libelle_maj_parametre"), F.lit(""))
    ), 256)
)

(dim_parametre
 .select("parametre_sk","code_parametre","code_parametre_sise_eau",
         "libelle_maj_parametre","libelle_min_parametre","libelle_web_parametre",
         "code_unite_reference","code_unite_reference_sise_eau",
         "limite_qualite","reference_qualite")
 .write.format("delta").mode("overwrite").option("overwriteSchema","true")
 .option("path", gold_dim_par).saveAsTable("hive_metastore.gold.dim_parametre"))

# ========= FAITS =========

# -- FACT_PRELEVEMENT (grain: 1 ligne / référence prélèvement) --
# On associe commune & réseau via SK, et temps via date_key
dim_temps_keyed = spark.table("hive_metastore.gold.dim_temps").select(
    F.col("date").alias("d_date"),
    F.col("date_key")
)
dim_commune_keyed = spark.table("hive_metastore.gold.dim_commune").select(
    "commune_sk","code_insee_commune","code_dept"
)
dim_reseau_keyed = spark.table("hive_metastore.gold.dim_reseau").select(
    "reseau_sk","code_reseau"
)

f_prelev = (plv
    .join(dim_commune_keyed, ["code_insee_commune","code_dept"], "left")
    .join(dim_reseau_keyed, ["code_reseau"], "left")
    .join(dim_temps_keyed, plv["date_prelevement"] == dim_temps_keyed["d_date"], "left")
    .select(
        F.col("reference_prelevement").alias("prelevement_ref"),
        F.col("annee").alias("annee"),
        F.col("commune_sk"),
        F.col("reseau_sk"),
        F.col("date_key"),
        F.col("date_prelevement"),
        F.col("heure_prelevement"),
        F.col("pourcentage_debit"),
        F.col("conclusion_prelevement"),
        F.col("conformite_bacteriologiq"),
        F.col("conformite_chimique"),
        F.col("conformite_reference_bacteriologiq"),
        F.col("conformite_reference_chimique"),
        F.col("uge_libelle"),
        F.col("distri_libelle"),
        F.col("moa_libelle"),
        F.col("source")
    )
    .dropDuplicates(["prelevement_ref"])  # au cas où
)

(f_prelev.write
 .format("delta").mode("overwrite").option("overwriteSchema","true")
 .option("path", gold_f_prelev).saveAsTable("hive_metastore.gold.fact_prelevement"))

# -- FACT_RESULTAT (grain: 1 ligne / résultat de paramètre / prélèvement) --
# On relie res -> plv pour récupérer commune/réseau/date, puis dims paramètre & temps
dim_param_keyed = spark.table("hive_metastore.gold.dim_parametre").select(
    "parametre_sk","code_parametre","code_unite_reference"
)

# Détection robuste d'une colonne "valeur" numérique parmi res_* (si noms variables)
num_candidates = [c for c in res.columns if any(k in c.lower() for k in ["valeur", "result", "mesure"])]
val_expr = None
for c in num_candidates:
    val_expr = F.coalesce(val_expr, F.col(c)) if val_expr is not None else F.col(c)
# Valeur numérique (double) si trouvée, sinon null
val_num = F.regexp_replace(val_expr, ",", ".").cast("double") if val_expr is not None else F.lit(None).cast("double")

dp = dim_parametre.select(
    F.col("parametre_sk"),
    F.col("code_parametre").alias("dp_code_parametre"),
    F.col("code_unite_reference").alias("dp_code_unite_reference")
)

f_res = (res.alias("r")
    .join(plv.select(
            "reference_prelevement","code_insee_commune","code_dept","code_reseau","date_prelevement","annee"
        ).alias("p"),
        F.col("r.reference_prelevement")==F.col("p.reference_prelevement"), "left")
    .join(dim_commune_keyed.alias("c"), ["code_insee_commune","code_dept"], "left")
    .join(dim_reseau_keyed.alias("re"), ["code_reseau"], "left")
    .join(dim_temps_keyed.alias("t"), F.col("p.date_prelevement")==F.col("t.d_date"), "left")
    .join(dp.alias("dp"),
          (F.col("r.code_parametre") == F.col("dp.dp_code_parametre")) &
          ((F.col("r.code_unite_reference") == F.col("dp.dp_code_unite_reference")) |
           F.col("dp.dp_code_unite_reference").isNull()),
          "left")
    .select(
        F.col("r.reference_prelevement").alias("prelevement_ref"),
        F.col("r.annee").alias("annee"),
        F.col("c.commune_sk"),
        F.col("re.reseau_sk"),
        F.col("t.date_key"),
        F.col("dp.parametre_sk"),
        val_num.alias("valeur_numerique"),
        F.col("r.qualite_parametre"),
        F.col("r.insitu_analyse"),
        F.col("r.rg_analyse"),
        F.col("r.code_unite_reference"),
        F.col("r.limite_qualite"),
        F.col("r.reference_qualite"),
        F.col("r.cas_parametre"),
        F.col("r.reference_analyse").alias("reference_analyse"),
        F.col("r.source")
    )
)

(f_res.write
 .format("delta").mode("overwrite").option("overwriteSchema","true")
 .option("path", gold_f_res).saveAsTable("hive_metastore.gold.fact_resultat"))