In [0]:
import dlt
from pyspark.sql import functions as F

# ----------- Silver commune/network info (from bronze_com) --------------
@dlt.table(
    name="silver_com",
    comment="Cleaned commune and network info (silver)",
    table_properties={"quality": "silver"}
)
def silver_com():
    df = dlt.read("BRONZE.bronze_com")
    # Standardize codes and text columns: trim, uppercase, handle nulls
    df = (
        df
        .withColumn("inseecommune", F.lpad(F.trim(F.col("inseecommune")), 5, "0"))            # Ensure INSEE code is 5 digits
        .withColumn("nomcommune", F.upper(F.trim(F.col("nomcommune"))))                        # Commune name uppercased and trimmed
        .withColumn("quartier", F.when(F.trim(F.col("quartier")).isin(["-", "", None, "null"]), None)
                                  .otherwise(F.trim(F.col("quartier"))))                      # Replace dashes, nulls, empties with None
        .withColumn("cdreseau", F.lpad(F.trim(F.col("cdreseau")), 9, "0"))                     # Ensure network code is 9 digits
        .withColumn("nomreseau", F.upper(F.trim(F.col("nomreseau"))))                          # Network name cleaned
        .withColumn("debutalim", F.to_date(F.col("debutalim"), "yyyy-MM-dd"))                  # Convert date string to date type
        .dropDuplicates()                                                                      # Remove duplicates
    )
    return df

# --------- Silver sampling events (bronze_plv) enriched with commune/network ----------
@dlt.table(
    name="silver_plv",
    comment="Cleaned sampling events, enriched with commune info (silver)",
    table_properties={"quality": "silver"}
)
def silver_plv():
    plv = dlt.read("BRONZE.bronze_plv")
    commune = dlt.read("silver_com")
    # Clean and standardize columns in sampling events
    plv = (
        plv
        .withColumn("cddept", F.lpad(F.trim(F.col("cddept")), 3, "0"))                        # Department code 3 digits
        .withColumn("cdreseau", F.lpad(F.trim(F.col("cdreseau")), 9, "0"))                    # Network code 9 digits
        .withColumn("inseecommuneprinc", F.lpad(F.trim(F.col("inseecommuneprinc")), 5, "0"))  # Main commune code 5 digits
        .withColumn("nomcommuneprinc", F.upper(F.trim(F.col("nomcommuneprinc"))))             # Main commune name
        .withColumn("cdreseauamont", F.lpad(F.trim(F.col("cdreseauamont")), 9, "0"))          # Upstream network code
        .withColumn("nomreseauamont", F.upper(F.trim(F.col("nomreseauamont"))))               # Upstream network name
        .withColumn("pourcentdebit", F.regexp_replace(F.col("pourcentdebit"), "[^0-9]", "").cast("int"))  # Extract numeric from percentage
        .withColumn("referenceprel", F.trim(F.col("referenceprel")))                          # Sampling reference code
        .withColumn("dateprel", F.to_date(F.col("dateprel"), "yyyy-MM-dd"))                   # Convert sampling date to date
        .withColumn("heureprel", F.regexp_replace(F.col("heureprel"), r"^(\d{2})h(\d{2})$", r"\1:\2"))  # Convert '08h34' to '08:34'
        .withColumn("conclusionprel", F.trim(F.col("conclusionprel")))                        # Result conclusion cleaned
        .withColumn("ugelib", F.trim(F.col("ugelib")))                                        # Operator label
        .withColumn("distrlib", F.trim(F.col("distrlib")))                                    # Distributor label
        .withColumn("moalib", F.trim(F.col("moalib")))                                        # Managing body label
        .replace(["-", "", "null"], None)                                                     # Replace '-'/empty/nulls with None
        .dropDuplicates()                                                                     # Remove duplicates
    )
    # Join with enriched commune/network information using network code
    plv = plv.join(
        commune,
        [plv.cdreseau == commune.cdreseau],
        how="left"
    )
    plv = plv.drop(commune.cdreseau)
    return plv

# ---------- Silver analytical results (bronze_result) joined with sampling ----------
@dlt.table(
    name="silver_result",
    comment="Cleaned analytical results, joined with sampled events (silver)",
    table_properties={"quality": "silver"}
)
def silver_result():
    result = dlt.read("BRONZE.bronze_result")
    prel = dlt.read("silver_plv")
    # Clean and standardize analytical result columns
    result = (
        result
        .withColumn("cddept", F.lpad(F.trim(F.col("cddept")), 3, "0"))                        # Department code
        .withColumn("referenceprel", F.trim(F.col("referenceprel")))                          # Sampling event reference
        .withColumn("cdparametresiseeaux", F.trim(F.col("cdparametresiseeaux")))              # Analytical parameter code (water)
        .withColumn("cdparametre", F.trim(F.col("cdparametre")))                              # Parameter code
        .withColumn("libmajparametre", F.trim(F.col("libmajparametre")))                      # Parameter major label
        .withColumn("libminparametre", F.trim(F.col("libminparametre")))                      # Parameter minor label
        .withColumn("libwebparametre", F.trim(F.col("libwebparametre")))                      # Web label
        .withColumn("qualitparam", F.upper(F.trim(F.col("qualitparam"))))                     # Quality param cleaned
        .withColumn("insituana", F.trim(F.col("insituana")))                                  # In situ analysis label
        .withColumn("rqana", F.trim(F.col("rqana")))                                          # Analysis remarks
        .withColumn("cdunitereferencesiseeaux", F.trim(F.col("cdunitereferencesiseeaux")))    # Reference code
        .withColumn("cdunitereference", F.trim(F.col("cdunitereference")))                    # Reference code
        .withColumn("limitequal", F.regexp_replace(F.col("limitequal"), ",", ".").cast("double")) # Limits to float
        .withColumn("refqual", F.regexp_replace(F.col("refqual"), ",", ".").cast("double"))   # Ref to float
        .withColumn("valtraduite", F.regexp_replace(F.col("valtraduite"), ",", ".").cast("double")) # Value to float
        .withColumn("casparam", F.trim(F.col("casparam")))                                    # CAS parameter
        .withColumn("referenceanl", F.trim(F.col("referenceanl")))                            # Analysis reference
        .replace(["-", "", "null"], None)                                                     # Replace '-'/empty/nulls
        .dropDuplicates()                                                                     # Remove duplicates
    )
    result = result.join(
        prel,
        [result.cddept == prel.cddept, result.referenceprel == prel.referenceprel],
        how="left"
    )
    result = result.drop(prel.cddept, prel.referenceprel)
    return result