In [0]:
# =========================================================
# SILVER LAYER CONFIG
# =========================================================
SOURCE_TABLE = "workspace.default.bronze_events"
SILVER_TABLE = "workspace.default.silver_events"


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable


In [0]:
df_bronze = spark.table(SOURCE_TABLE)

print("Bronze rows:", df_bronze.count())
print("Bronze columns:", df_bronze.columns)

display(df_bronze.limit(10))


In [0]:
# Create an empty silver table with expected columns if it doesn't exist
if not spark.catalog.tableExists(SILVER_TABLE):
    df_empty = (
        df_bronze.limit(0)
        .withColumn("event_time", F.to_timestamp("event_time"))
        .withColumn("event_date", F.to_date("event_time"))
        .withColumn("event_key", F.lit(None).cast("string"))
    )

    (df_empty.write.format("delta")
        .mode("overwrite")
        .saveAsTable(SILVER_TABLE))

    print(f"✅ Created Silver table: {SILVER_TABLE}")
else:
    print(f"✅ Silver table already exists: {SILVER_TABLE}")


In [0]:
df_silver_existing = spark.table(SILVER_TABLE)

max_silver_time = df_silver_existing.select(F.max("event_time").alias("max_t")).collect()[0]["max_t"]
print("Max event_time in Silver:", max_silver_time)

df_bronze_typed = (
    df_bronze
    .withColumn("event_time", F.to_timestamp("event_time"))
    .withColumn("event_date", F.to_date("event_time"))
)

if max_silver_time is None:
    df_new = df_bronze_typed
else:
    df_new = df_bronze_typed.filter(F.col("event_time") > F.lit(max_silver_time))

print("New Bronze rows to process:", df_new.count())
display(df_new.orderBy(F.col("event_time")).limit(20))


In [0]:
cols = df_new.columns

df_clean = df_new

# --- Standard cleaning rules (only applied if column exists) ---
if "event_type" in cols:
    df_clean = df_clean.withColumn("event_type", F.lower(F.trim(F.col("event_type"))))

if "user_id" in cols:
    df_clean = df_clean.withColumn("user_id", F.col("user_id").cast("long"))

if "product_id" in cols:
    df_clean = df_clean.withColumn("product_id", F.col("product_id").cast("long"))

if "price" in cols:
    df_clean = df_clean.withColumn("price", F.col("price").cast("double"))

# Drop rows with no event_time (invalid)
df_clean = df_clean.filter(F.col("event_time").isNotNull())

display(df_clean.limit(10))

if "brand" in cols:
    df_clean = df_clean.withColumn(
        "brand",
        F.when(F.col("brand").isNull() | (F.trim(F.col("brand")) == ""), F.lit("unknown"))
         .otherwise(F.lower(F.trim(F.col("brand"))))
    )



In [0]:
# choose columns to build a stable key (use only existing ones)
key_candidates = ["event_time", "event_type", "user_id", "product_id", "session_id", "order_id"]
key_cols = [c for c in key_candidates if c in df_clean.columns]

if len(key_cols) == 0:
    raise ValueError("No suitable columns found to build event_key. Check bronze schema.")

df_keyed = df_clean.withColumn(
    "event_key",
    F.sha2(F.concat_ws("||", *[F.col(c).cast("string") for c in key_cols]), 256)
)

# Deduplicate within the incoming batch
w = Window.partitionBy("event_key").orderBy(F.col("event_time").desc())
df_dedup = (
    df_keyed
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)
    .drop("rn")
)

print("After dedup (incoming batch):", df_dedup.count())
display(df_dedup.limit(20))


In [0]:
SILVER_TABLE = "workspace.default.silver_events"

# 1) Check current silver schema
df_t = spark.table(SILVER_TABLE)
print("Silver columns:", df_t.columns)

# 2) Add missing columns safely (only if not exists)
missing = []
if "event_date" not in df_t.columns:
    missing.append("event_date DATE")
if "event_key" not in df_t.columns:
    missing.append("event_key STRING")

if missing:
    spark.sql(f"ALTER TABLE {SILVER_TABLE} ADD COLUMNS ({', '.join(missing)})")
    print("✅ Added missing columns:", missing)
else:
    print("✅ Silver already has event_date + event_key")


In [0]:
from delta.tables import DeltaTable

silver_dt = DeltaTable.forName(spark, SILVER_TABLE)

# ✅ If there is no new data, skip merge (prevents unnecessary job failures)
if df_dedup.count() == 0:
    print("✅ No new rows to MERGE into Silver (incoming batch is empty). Skipping.")
else:
    all_cols = df_dedup.columns
    update_map = {c: f"s.{c}" for c in all_cols}
    insert_map = {c: f"s.{c}" for c in all_cols}

    (
        silver_dt.alias("t")
        .merge(
            df_dedup.alias("s"),
            "t.event_key = s.event_key"
        )
        .whenMatchedUpdate(set=update_map)
        .whenNotMatchedInsert(values=insert_map)
        .execute()
    )

    print("✅ Silver MERGE completed.")


In [0]:
print(spark.table(SILVER_TABLE).columns)


In [0]:
bronze = spark.table("workspace.default.bronze_events")
display(bronze.selectExpr("max(to_timestamp(event_time)) as max_bronze_time"))


In [0]:
silver = spark.table("workspace.default.silver_events")
display(silver.selectExpr("max(event_time) as max_silver_time"))
