In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC # SinIntermediarios — Build Gold (Optimal Pricing)
# MAGIC
# MAGIC **Purpose**
# MAGIC - Read daily snapshot: `workspace.sinintermediarios.silver_dashboard_latest`
# MAGIC - Compute unit price metric:
# MAGIC   - `unit_price_cop_per_g = price_full_cop / gramos_empaque_int`
# MAGIC - For each `producto_homologado`, compute competitor stats excluding SinIntermediarios:
# MAGIC   - competitor_count, min_unit_price, avg_unit_price
# MAGIC - Apply pricing rules for SinIntermediarios rows only:
# MAGIC   1) **Match lowest competitor unit price** (min), else fallback to own unit price if <1 competitor
# MAGIC   2) **Match average competitor unit price** (avg), else fallback to own unit price if <1 competitor
# MAGIC - Suggested FULL prices (COP) derived from unit prices are rounded to **always end in 900**
# MAGIC - Write:
# MAGIC   - `GOLD_DASHBOARD_TABLE` (overwrite snapshot for dashboard)
# MAGIC   - `GOLD_HISTORY_TABLE` (MERGE/upsert by (scrape_date, url))

# COMMAND ----------

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# COMMAND ----------

# 1) CONFIG

SILVER_DASHBOARD_TABLE = "workspace.sinintermediarios.silver_dashboard_latest"

GOLD_DASHBOARD_TABLE = "workspace.sinintermediarios.gold_optimal_pricing_latest"
GOLD_HISTORY_TABLE   = "workspace.sinintermediarios.gold_optimal_pricing_history"

SIN_SITE_CANON = "sinintermediarios"
GROUP_COL = "producto_homologado"

# Unit price definition:
PRICE_COL = "price_cop"
GRAMS_COL = "gramos_empaque"
GRAMS_INT_COL = "gramos_empaque_int"

# Quality gates (your rule: competitor price_full_cop must be "in thousands")
MIN_VALID_PRICE = 1000
MIN_VALID_GRAMS = 1

# COMMAND ----------

# 2) HELPERS

def table_exists(full_name: str) -> bool:
    try:
        spark.sql(f"DESCRIBE TABLE {full_name}")
        return True
    except Exception:
        return False

def round_to_ending_900(x_col):
    """
    Round numeric column to nearest value that always ends in 900.
    Examples:
      12345 -> 12900
      12910 -> 12900
      12490 -> 12900
      237600 -> 237900
    Formula: round((x - 900)/1000)*1000 + 900
    """
    return (F.round((x_col.cast("double") - F.lit(900.0)) / F.lit(1000.0)) * F.lit(1000.0) + F.lit(900.0)).cast("long")

# COMMAND ----------

# 3) LOAD SILVER SNAPSHOT

df0 = spark.table(SILVER_DASHBOARD_TABLE)

required = ["site", "url", "scrape_date", PRICE_COL, GRAMS_COL, GROUP_COL, "marca"]
missing = [c for c in required if c not in df0.columns]
if missing:
    raise ValueError(f"Missing required columns in {SILVER_DASHBOARD_TABLE}: {missing}")

df = (
    df0
    .withColumn(GRAMS_INT_COL, F.col(GRAMS_COL).cast("int"))
    .withColumn(PRICE_COL, F.col(PRICE_COL).cast("long"))
)

# Unit price per gram (only if valid) — based on price_cop
df = df.withColumn(
    "unit_price_cop_per_g",
    F.when(
        (F.col(PRICE_COL) >= F.lit(MIN_VALID_PRICE)) & (F.col(GRAMS_INT_COL) >= F.lit(MIN_VALID_GRAMS)),
        (F.col(PRICE_COL).cast("double") / F.col(GRAMS_INT_COL).cast("double"))
    ).otherwise(F.lit(None).cast("double"))
)

# COMMAND ----------

# 4) COMPETITOR STATS PER producto_homologado (exclude sinintermediarios)
competitors = (
    df
    .filter(F.col("site") != F.lit(SIN_SITE_CANON))
    .filter(F.col(GROUP_COL).isNotNull())
    .filter(F.col(PRICE_COL) >= F.lit(MIN_VALID_PRICE))
    .filter(F.col(GRAMS_INT_COL) >= F.lit(MIN_VALID_GRAMS))
    .filter(F.col("unit_price_cop_per_g").isNotNull())
    .filter(F.col("unit_price_cop_per_g") > 0)
)

comp_stats = (
    competitors
    .groupBy(GROUP_COL)
    .agg(
        F.count(F.lit(1)).alias("competitor_count"),
        F.min("unit_price_cop_per_g").alias("comp_min_unit_price_per_g"),
        F.avg("unit_price_cop_per_g").alias("comp_avg_unit_price_per_g"),
        F.max("unit_price_cop_per_g").alias("comp_max_unit_price_per_g"),
        F.stddev("unit_price_cop_per_g").alias("comp_stddev_unit_price_per_g"),
    )
)

wmin = Window.partitionBy(GROUP_COL).orderBy(
    F.col("unit_price_cop_per_g").asc(),
    F.col(PRICE_COL).asc()
)

comp_min_row = (
    competitors
    .withColumn("_rn", F.row_number().over(wmin))
    .filter(F.col("_rn") == 1)
    .select(
        F.col(GROUP_COL),
        F.col("marca").alias("comp_min_brand"),
        F.col("unit_price_cop_per_g").alias("comp_min_unit_price_per_g_row")  # optional debug
    )
)

# COMMAND ----------

# 5) BUILD GOLD ROWS (SinIntermediarios only) + APPLY RULES
sin = (
    df
    .filter(F.col("site") == F.lit(SIN_SITE_CANON))
    .filter(F.col(GROUP_COL).isNotNull())
    .filter(F.col(PRICE_COL) >= F.lit(MIN_VALID_PRICE))
    .filter(F.col(GRAMS_INT_COL) >= F.lit(MIN_VALID_GRAMS))
    .join(comp_stats, on=[GROUP_COL], how="left")
    .join(comp_min_row, on=[GROUP_COL], how="left")
)

sin = sin.withColumn("competitor_count", F.coalesce(F.col("competitor_count"), F.lit(0)))
has_comp = F.col("competitor_count") >= F.lit(1)

# Rule 1: match competitor min unit price, else fallback to own unit price
sin = sin.withColumn(
    "optimal_unit_price_rule1_min_per_g",
    F.when(has_comp, F.col("comp_min_unit_price_per_g")).otherwise(F.col("unit_price_cop_per_g"))
)

# Rule 2: match competitor avg unit price, else fallback to own unit price
sin = sin.withColumn(
    "optimal_unit_price_rule2_avg_per_g",
    F.when(has_comp, F.col("comp_avg_unit_price_per_g")).otherwise(F.col("unit_price_cop_per_g"))
)

# Normalize brand compare (case-insensitive)
min_brand_is_vitanas = F.lower(F.trim(F.col("comp_min_brand"))) == F.lit("vitanas")

# Rule 3: match competitor min unit price; if min brand is Vitanas => 10% below Vitanas min
sin = sin.withColumn(
    "optimal_unit_price_rule3_min_vitanas_10pct_per_g",
    F.when(has_comp & min_brand_is_vitanas, F.col("comp_min_unit_price_per_g") * F.lit(0.90))
     .when(has_comp, F.col("comp_min_unit_price_per_g"))
     .otherwise(F.col("unit_price_cop_per_g"))
)

sin = sin.withColumn(
    "suggested_price_rule3_vitanas10_cop_raw",
    (F.col("optimal_unit_price_rule3_min_vitanas_10pct_per_g") * F.col(GRAMS_INT_COL).cast("double"))
)

sin = sin.withColumn(
    "suggested_price_rule3_vitanas10_cop",
    round_to_ending_900(F.col("suggested_price_rule3_vitanas10_cop_raw"))
)

sin = sin.withColumn(
    "delta_price_rule3_cop",
    F.col("suggested_price_rule3_vitanas10_cop") - F.col(PRICE_COL)
)

sin = sin.withColumn(
    "pct_delta_price_rule3",
    F.when(F.col(PRICE_COL) > 0, (F.col("delta_price_rule3_cop") / F.col(PRICE_COL).cast("double"))).otherwise(F.lit(None))
)

# Suggested FULL price (COP) derived from unit price (unit * grams)
sin = sin.withColumn(
    "suggested_price_rule1_min_cop_raw",
    (F.col("optimal_unit_price_rule1_min_per_g") * F.col(GRAMS_INT_COL).cast("double"))
)

sin = sin.withColumn(
    "suggested_price_rule2_avg_cop_raw",
    (F.col("optimal_unit_price_rule2_avg_per_g") * F.col(GRAMS_INT_COL).cast("double"))
)

# Round FULL price to always end in 900 (per your rule)
sin = sin.withColumn("suggested_price_rule1_min_cop", round_to_ending_900(F.col("suggested_price_rule1_min_cop_raw")))
sin = sin.withColumn("suggested_price_rule2_avg_cop", round_to_ending_900(F.col("suggested_price_rule2_avg_cop_raw")))

# Deltas vs current full price
sin = sin.withColumn("delta_price_rule1_cop", F.col("suggested_price_rule1_min_cop") - F.col(PRICE_COL))
sin = sin.withColumn("delta_price_rule2_cop", F.col("suggested_price_rule2_avg_cop") - F.col(PRICE_COL))

# Unit deltas (per gram)
sin = sin.withColumn("delta_unit_rule1_per_g", F.col("optimal_unit_price_rule1_min_per_g") - F.col("unit_price_cop_per_g"))
sin = sin.withColumn("delta_unit_rule2_per_g", F.col("optimal_unit_price_rule2_avg_per_g") - F.col("unit_price_cop_per_g"))

sin = sin.withColumn(
    "pct_delta_price_rule1",
    F.when(F.col(PRICE_COL) > 0, (F.col("delta_price_rule1_cop") / F.col(PRICE_COL).cast("double"))).otherwise(F.lit(None))
)
sin = sin.withColumn(
    "pct_delta_price_rule2",
    F.when(F.col(PRICE_COL) > 0, (F.col("delta_price_rule2_cop") / F.col(PRICE_COL).cast("double"))).otherwise(F.lit(None))
)

# COMMAND ----------

# 6) SELECT + DEDUPE FOR HISTORY
gold_df = sin.withColumn("gold_run_ts", F.current_timestamp())

order_col = F.col("scraped_at").desc() if "scraped_at" in gold_df.columns else F.current_timestamp().desc()
w = Window.partitionBy("scrape_date", "url").orderBy(order_col)

gold_hist = (
    gold_df
    .withColumn("_rn", F.row_number().over(w))
    .filter(F.col("_rn") == 1)
    .drop("_rn")
)

display(
    gold_df
    .select(
        "scrape_date", "site", "url", GROUP_COL,
        PRICE_COL, GRAMS_INT_COL, "unit_price_cop_per_g",
        "competitor_count", "comp_min_unit_price_per_g", "comp_avg_unit_price_per_g",
        "optimal_unit_price_rule1_min_per_g", "optimal_unit_price_rule2_avg_per_g",
        "suggested_price_rule1_min_cop", "suggested_price_rule2_avg_cop",
        "delta_price_rule1_cop", "delta_price_rule2_cop",
        "pct_delta_price_rule1", "pct_delta_price_rule2",
    )
    .orderBy(F.col("scrape_date").desc(), F.col(GROUP_COL).asc_nulls_last())
    .limit(200)
)

# COMMAND ----------

# 7) WRITE GOLD TABLES

print("GOLD_DASHBOARD_TABLE =", GOLD_DASHBOARD_TABLE)
print("GOLD_HISTORY_TABLE   =", GOLD_HISTORY_TABLE)

# 7a) Snapshot overwrite for dashboard
(
    gold_df
    .write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(GOLD_DASHBOARD_TABLE)
)
print("Wrote gold dashboard snapshot:", GOLD_DASHBOARD_TABLE)

# 7b) History: create if missing, else schema-evolving merge
if not table_exists(GOLD_HISTORY_TABLE):
    (
        gold_hist
        .write.format("delta")
        .mode("overwrite")
        .partitionBy("scrape_date")
        .saveAsTable(GOLD_HISTORY_TABLE)
    )
    print("Created gold history table:", GOLD_HISTORY_TABLE)
else:
    tgt = spark.table(GOLD_HISTORY_TABLE)
    tgt_cols = tgt.columns

    src_schema = {f.name: f.dataType.simpleString() for f in gold_hist.schema.fields}
    new_cols = [c for c in src_schema.keys() if c not in tgt_cols]

    if new_cols:
        add_cols_ddl = ", ".join([f"`{c}` {src_schema[c]}" for c in new_cols])
        spark.sql(f"ALTER TABLE {GOLD_HISTORY_TABLE} ADD COLUMNS ({add_cols_ddl})")
        tgt = spark.table(GOLD_HISTORY_TABLE)
        tgt_cols = tgt.columns

    src_aligned = gold_hist
    tgt_schema = tgt.schema

    for c in tgt_cols:
        if c not in src_aligned.columns:
            src_aligned = src_aligned.withColumn(c, F.lit(None).cast(tgt_schema[c].dataType))

    src_aligned = src_aligned.select(*tgt_cols)
    src_aligned.createOrReplaceTempView("src_aligned")

    update_set = ",\n      ".join([f"t.`{c}` = s.`{c}`" for c in tgt_cols if c not in ["scrape_date", "url"]])
    insert_cols = ", ".join([f"`{c}`" for c in tgt_cols])
    insert_vals = ", ".join([f"s.`{c}`" for c in tgt_cols])

    spark.sql(f"""
      MERGE INTO {GOLD_HISTORY_TABLE} AS t
      USING src_aligned AS s
      ON t.scrape_date = s.scrape_date AND t.url = s.url
      WHEN MATCHED THEN UPDATE SET
        {update_set}
      WHEN NOT MATCHED THEN INSERT ({insert_cols})
      VALUES ({insert_vals})
    """)
    print("Upserted gold history table:", GOLD_HISTORY_TABLE)

display(
    spark.table(GOLD_HISTORY_TABLE)
    .orderBy(F.col("scrape_date").desc(), F.col(GROUP_COL).asc_nulls_last())
    .limit(200)
)

# COMMAND ----------

# 8) KPI QUICK CHECKS
kpi = (
    spark.table(GOLD_DASHBOARD_TABLE)
    .groupBy("scrape_date")
    .agg(
        F.count("*").alias("sin_rows"),
        F.sum(F.when(F.col("competitor_count") >= 1, 1).otherwise(0)).alias("rows_with_competitors"),
        F.sum(F.when(F.col("delta_price_rule1_cop") > 0, 1).otherwise(0)).alias("rule1_increase_cnt"),
        F.sum(F.when(F.col("delta_price_rule1_cop") < 0, 1).otherwise(0)).alias("rule1_decrease_cnt"),
        F.avg("pct_delta_price_rule1").alias("rule1_avg_pct_delta"),
        F.avg("pct_delta_price_rule2").alias("rule2_avg_pct_delta"),
    )
)

display(kpi.orderBy(F.col("scrape_date").desc()).limit(30))
