In [0]:
import re
from pyspark.sql import functions as F, types as T, Window

In [0]:
def normalize_fecha_col_batch(df, fecha_col="FECHA"):
    return (df
            .withColumn("FECHA", F.to_date("FECHA", "yyyy-MM-dd"))
    )
from pyspark.sql import functions as F, types as T

def unpivot_via_explode_safe(df, fecha_col="FECHA"):

    exclude = {fecha_col.lower(), "_rescued_data", "rescued_data", "ingestion_date"}
    value_cols = [c for c in df.columns if c.lower() not in exclude]


    pairs = F.array(*[
        F.struct(
            F.lit(c).alias("POZO"),
            F.col(c).cast(T.DoubleType()).alias("PRODUCCION_MBD")
        )
        for c in value_cols
    ])


    return (df
        .select(F.col(fecha_col).alias("FECHA"), F.explode(pairs).alias("s"))
        .select("FECHA", "s.POZO", "s.PRODUCCION_MBD"))


def add_basic_flags(df):
    return (df
      .withColumn("PRODUCCION_MBD", F.col("PRODUCCION_MBD").cast(T.DoubleType()))
      .withColumn("FLAG_NULL", F.when(F.col("PRODUCCION_MBD").isNull(), 1).otherwise(0))
      .withColumn("FLAG_ZERO", F.when(F.col("PRODUCCION_MBD") == 0, 1).otherwise(0))
    )



In [0]:
silver_table = "workspace.pmx_etl.silver_pemex_prod"
checkpoint_silver = "/Volumes/workspace/pmx_etl/checkpoints_silver"
def process_silver(microDF, batchId: int):

    df = normalize_fecha_col_batch(microDF, "FECHA")
    df = unpivot_via_explode_safe(df, "FECHA")
    df = add_basic_flags(df).withColumn("DATA_SOURCE", F.lit("pemex_prod"))
    

    minmax = df.agg(F.min("FECHA").alias("minf"), F.max("FECHA").alias("maxf")).collect()[0]
    minf, maxf = minmax["minf"], minmax["maxf"]
    
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {silver_table} (
      FECHA DATE, ANIO INT, MES INT,
      POZO STRING, PRODUCCION_MBD DOUBLE,
      FLAG_NULL INT, FLAG_ZERO INT,
      FLAG_OUTLIER_IQR INT, FLAG_OUTLIER_MOM INT, FLAG_INACTIVO INT,
      DATA_SOURCE STRING, INGESTION_TS TIMESTAMP
    ) USING DELTA PARTITIONED BY (ANIO, MES)
    """)

    hist = spark.table(silver_table).where(
        (F.col("FECHA") >= F.add_months(F.lit(minf), -12)) & (F.col("FECHA") <= F.add_months(F.lit(maxf), 0))
    ).select("FECHA","POZO","PRODUCCION_MBD")

    # 3) Combina histórico + nuevos para calcular IQR por POZO
    union_for_stats = df.select("POZO","PRODUCCION_MBD").unionByName(
        hist.select("POZO","PRODUCCION_MBD")
    )
    stats = (union_for_stats
        .filter(F.col("PRODUCCION_MBD").isNotNull())
        .groupBy("POZO")
        .agg(
            F.expr("percentile_approx(PRODUCCION_MBD, 0.25, 1000)").alias("Q1"),
            F.expr("percentile_approx(PRODUCCION_MBD, 0.75, 1000)").alias("Q3")
        )
        .withColumn("IQR", F.col("Q3") - F.col("Q1"))
        .withColumn("LOW_B", F.col("Q1") - 1.5*F.col("IQR"))
        .withColumn("HIGH_B", F.col("Q3") + 1.5*F.col("IQR"))
    )

    df2 = (df.join(stats, on="POZO", how="left")
      .withColumn("FLAG_OUTLIER_IQR",
        F.when(
          (F.col("PRODUCCION_MBD").isNotNull()) &
          ((F.col("PRODUCCION_MBD") < F.col("LOW_B")) | (F.col("PRODUCCION_MBD") > F.col("HIGH_B"))),
          1
        ).otherwise(0)
      )
      .drop("Q1","Q3","IQR","LOW_B","HIGH_B")
    )


    prev = (spark.table(silver_table)
        .select("POZO","FECHA","PRODUCCION_MBD")
        .withColumnRenamed("FECHA","FECHA_PREV")
        .withColumnRenamed("PRODUCCION_MBD","PROD_PREV")
    )
    # Tomamos el mes anterior exacto (FECHA - 1 mes)
    df3 = (df2
      .withColumn("FECHA_PREV", F.add_months("FECHA", -1))
      .join(prev, on=["POZO","FECHA_PREV"], how="left")
      .withColumn("PCT_CHANGE",
          F.when(F.col("PROD_PREV").isNull(), None)
           .otherwise(F.abs(F.col("PRODUCCION_MBD") - F.col("PROD_PREV")) /
                      F.when(F.col("PROD_PREV") == 0, None).otherwise(F.col("PROD_PREV")))
      )
      .withColumn("FLAG_OUTLIER_MOM", F.when(F.col("PCT_CHANGE") > F.lit(2.0), 1).otherwise(0))
    )

    # 5) Inactividad: rachas de ceros
    hist_for_streak = spark.table(silver_table).select("POZO","FECHA","PRODUCCION_MBD")
    union_for_streak = (hist_for_streak.unionByName(df3.select("POZO","FECHA","PRODUCCION_MBD"))
        .dropDuplicates(["POZO","FECHA"])
    )

    w = Window.partitionBy("POZO").orderBy("FECHA")
    # Creamos grupos de "rachas" separando por cambios cero->no cero
    tmp = (union_for_streak
        .withColumn("IS_ZERO", F.when(F.col("PRODUCCION_MBD")==0, 1).otherwise(0))
        .withColumn("grp", F.sum(F.when(F.col("IS_ZERO")==0, 1).otherwise(0)).over(w))
    )
    wgrp = Window.partitionBy("POZO","grp")
    streaked = (tmp
        .withColumn("STREAK_ZERO", F.sum("IS_ZERO").over(wgrp))
        .select("POZO","FECHA","STREAK_ZERO")
    )

    df4 = (df3.join(streaked, on=["POZO","FECHA"], how="left")
        .withColumn("FLAG_INACTIVO", F.when(F.col("STREAK_ZERO") >= F.lit(6), 1).otherwise(0)) # 6 meses
        .drop("STREAK_ZERO","FECHA_PREV","PROD_PREV","PCT_CHANGE")
    )

    # 6) Campos de control + particiones
    out = (df4
        .withColumn("ANIO", F.year("FECHA"))
        .withColumn("MES", F.month("FECHA"))
        .withColumn("INGESTION_TS", F.current_timestamp())
        .select("FECHA","ANIO","MES","POZO","PRODUCCION_MBD",
                "FLAG_NULL","FLAG_ZERO","FLAG_OUTLIER_IQR","FLAG_OUTLIER_MOM","FLAG_INACTIVO",
                "DATA_SOURCE","INGESTION_TS")
    )

    # 7) MERGE (upsert) a Silver 
    out.createOrReplaceTempView("silver_upserts")

    spark.sql(f"""
    MERGE INTO {silver_table} AS t
    USING silver_upserts AS s
    ON t.FECHA = s.FECHA AND t.POZO = s.POZO
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
    """)
    return

# Stream desde BRONZE con foreachBatch
bronze_stream2silver = (
  spark.readStream.table('pmx_etl.bronze_pmx_prod')
   .writeStream
   .foreachBatch(process_silver)
   .option("checkpointLocation", checkpoint_silver)
   .trigger(availableNow=True)  
   .start()
)



In [0]:
#dbutils.fs.rm("/Volumes/workspace/pmx_etl/checkpoints_silver", recurse=True)
#spark.sql("""
#          TRUNCATE TABLE workspace.pmx_etl.bronze_pmx_prod
#          """)