In [0]:
# Read Bronze Delta tables
bronze_sales_df = spark.read.table("bronze_sales_transactions")
bronze_product_df = spark.read.table("bronze_product_master")
bronze_store_df = spark.read.table("bronze_store_region")


In [0]:
# Read Bronze Delta tables
bronze_sales_df = spark.read.table("bronze_sales_transactions")
bronze_product_df = spark.read.table("bronze_product_master")
bronze_store_df = spark.read.table("bronze_store_region")


In [0]:
# Import Window specification for deduplication
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# Define window to keep latest record per transaction_id
dedup_window = Window.partitionBy("transaction_id") \
                     .orderBy(col("last_updated_ts").desc())

# Remove duplicates by keeping the most recent record
silver_sales_dedup_df = (
    bronze_sales_df
    .withColumn("rn", row_number().over(dedup_window))
    .filter(col("rn") == 1)
    .drop("rn")
)


In [0]:
from pyspark.sql.functions import coalesce, lit
# Replace null numeric values to avoid calculation errors
silver_sales_clean_df = (
    silver_sales_dedup_df
    .withColumn("quantity", coalesce(col("quantity"), lit(0)))
    .withColumn("discount", coalesce(col("discount"), lit(0.0)))
)


In [0]:
from pyspark.sql.functions import round
# Apply correct business formula:
# total_amount = quantity * unit_price - discount
silver_sales_calibrated_df = (
    silver_sales_clean_df
    .withColumn(
        "correct_total_amount",
        round((col("quantity") * col("unit_price")) - col("discount"), 2)
    )
)


In [0]:
# Get valid store and product keys
valid_store_df = bronze_store_df.select("store_id").distinct()
valid_product_df = bronze_product_df.select("product_id").distinct()


In [0]:
# Join-based validation (enterprise-safe, no collect)
sales_with_store_flag_df = (
    silver_sales_calibrated_df
    .join(valid_store_df.withColumn("valid_store", lit(True)),
          on="store_id", how="left")
)

sales_with_product_flag_df = (
    sales_with_store_flag_df
    .join(valid_product_df.withColumn("valid_product", lit(True)),
          on="product_id", how="left")
)


In [0]:
from pyspark.sql.functions import when
# Add validation flags
silver_sales_validated_df = (
    sales_with_product_flag_df
    .withColumn("valid_quantity", col("quantity") > 0)
    .withColumn("valid_unit_price", col("unit_price") > 0)
    .withColumn("valid_store", coalesce(col("valid_store"), lit(False)))
    .withColumn("valid_product", coalesce(col("valid_product"), lit(False)))
)


In [0]:
# Valid business records
silver_sales_valid_df = silver_sales_validated_df.filter(
    col("valid_quantity") &
    col("valid_unit_price") &
    col("valid_store") &
    col("valid_product")
)
# Invalid records moved to quarantine
silver_sales_quarantine_df = silver_sales_validated_df.filter(
    ~(
        col("valid_quantity") &
        col("valid_unit_price") &
        col("valid_store") &
        col("valid_product")
    )
)


In [0]:
# Persist rejected records for audit and debugging
silver_sales_quarantine_df.write.format("delta") \
    .mode("append") \
    .saveAsTable("silver_sales_quarantine")


In [0]:
from delta.tables import DeltaTable
# Create or upsert into Silver sales table
if spark.catalog.tableExists("silver_sales_transactions"):
    silver_table = DeltaTable.forName(spark, "silver_sales_transactions")

    silver_table.alias("t").merge(
        silver_sales_valid_df.alias("s"),
        "t.transaction_id = s.transaction_id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()

else:
    silver_sales_valid_df.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable("silver_sales_transactions")


In [0]:
# Keep only active products
silver_product_df = bronze_product_df.filter(col("is_active") == True)

silver_product_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver_product_master")


In [0]:
# Keep only active stores
silver_store_df = bronze_store_df.filter(col("is_active") == True)

silver_store_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver_store_region")
