In [0]:
%sql
CREATE DATABASE IF NOT EXISTS silver;
CREATE TABLE IF NOT EXISTS silver_logs (
    run_id STRING,
    table_name STRING,
    status STRING,
    processing_time TIMESTAMP,
    row_count INT,
    error_message STRING
)
USING delta;


In [0]:
from pyspark.sql.functions import *
import uuid
run_id = str(uuid.uuid4())
bronze_sales = spark.table("bronze.sales_raw")


In [0]:
#UTC Conversion and Currency Exchange 
silver_sales_clean = bronze_sales \
    .withColumnRenamed("ingestion_timestamp", "bronze_ingestion_timestamp") \
    .withColumn("silver_processing_timestamp", current_timestamp()) \
    .withColumn(
        "transaction_ts_utc",
        to_utc_timestamp(to_timestamp("transaction_timestamp"), "UTC")
    ) \
    .withColumn(
        "exchange_rate",
        when(col("currency") == "USD", 1.0)
        .when(col("currency") == "INR", 0.012)
        .when(col("currency") == "EUR", 1.1)
    ) \
    .withColumn(
        "total_amount_corrected",
        round(col("quantity") * col("unit_price") - col("discount"), 2)
    ) \
    .withColumn(
        "total_amount_usd",
        col("total_amount_corrected") * col("exchange_rate")
    )


In [0]:
silver_sales_dedup = silver_sales_clean.dropDuplicates(["transaction_id"])

In [0]:
valid_sales = silver_sales_dedup.filter(
    (col("quantity") > 0) &
    col("product_id").isNotNull() &
    col("store_id").isNotNull() &
    col("exchange_rate").isNotNull()
)
invalid_sales = silver_sales_dedup.subtract(valid_sales)

In [0]:
valid_sales.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver.sales")

invalid_sales.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver.sales_quarantine")


In [0]:
row_count = valid_sales.count()

spark.sql(f"""
INSERT INTO silver_logs
VALUES (
    "{run_id}",
    "silver.sales",
    "SUCCESS",
    current_timestamp(),
    {row_count},
    NULL
)
""")


DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
run_id = str(uuid.uuid4())

products = spark.table("bronze.products_raw")

products.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver.products")

spark.sql(f"""
INSERT INTO silver_logs
VALUES (
    "{run_id}",
    "silver.products",
    "SUCCESS",
    current_timestamp(),
    {products.count()},
    NULL
)
""")


DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
from pyspark.sql.functions import current_timestamp
import uuid
run_id = str(uuid.uuid4())
stores_bronze = spark.table("bronze.stores_raw")


In [0]:
%sql
SELECT store_id, COUNT(*) AS cnt
FROM bronze.stores_raw
GROUP BY store_id
HAVING COUNT(*) > 1;


store_id,cnt


In [0]:
stores_clean = stores_bronze \
    .filter(col("store_id").isNotNull()) \
    .dropDuplicates(["store_id"]) \
    .withColumn("silver_processing_timestamp", current_timestamp())

In [0]:
stores_clean.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver.stores")

In [0]:
row_count = stores_clean.count()
spark.sql(f"""
INSERT INTO silver_logs
VALUES (
    "{run_id}",
    "silver.stores",
    "SUCCESS",
    current_timestamp(),
    {row_count},
    NULL
)
""")


DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
SELECT COUNT(*) FROM silver.sales;
SELECT COUNT(*) FROM silver.sales_quarantine;
SELECT COUNT(*) FROM silver.products;
SELECT COUNT(*) FROM silver.stores;
SELECT * FROM silver_logs;


run_id,table_name,status,processing_time,row_count,error_message
ee21b515-caf0-40f8-8391-6fc2d84f40d7,silver.products,SUCCESS,2025-12-23T17:48:00.681Z,200,
94d937d6-aac2-49a9-bce9-c967ce8ecb89,silver.stores,SUCCESS,2025-12-23T17:51:23.382Z,50,
1745cd07-5922-4249-b5d5-7690fe30d77d,silver.sales,SUCCESS,2025-12-23T17:47:40.859Z,419,


In [0]:
%sql
SELECT product_id, COUNT(*) AS cnt
FROM bronze.products_raw
GROUP BY product_id
HAVING COUNT(*) > 1;


product_id,cnt


In [0]:
%sql
SELECT COUNT(*) AS null_product_id_count
FROM bronze.products_raw
WHERE product_id IS NULL;


null_product_id_count
0


In [0]:
%sql
SELECT COUNT(*) AS silver_sales_count FROM silver.sales;
SELECT COUNT(*) AS quarantine_count FROM silver.sales_quarantine;
SELECT * FROM silver_logs WHERE table_name = 'silver.sales';


run_id,table_name,status,processing_time,row_count,error_message
1745cd07-5922-4249-b5d5-7690fe30d77d,silver.sales,SUCCESS,2025-12-23T17:47:40.859Z,419,
94148c16-02d4-4c3f-b001-4f7834f81565,silver.sales,SUCCESS,2025-12-23T18:00:34.677Z,324,


#Incremental logic

In [0]:
%sql
CREATE TABLE IF NOT EXISTS silver_watermark (
    table_name STRING,
    last_processed_ts TIMESTAMP
)
USING delta;


In [0]:
%sql
INSERT INTO silver_watermark
VALUES ('silver.sales', TIMESTAMP('1900-01-01 00:00:00'));

num_affected_rows,num_inserted_rows
1,1


In [0]:
from pyspark.sql.functions import col

last_ts = (
    spark.table("silver_watermark")
    .filter(col("table_name") == "silver.sales")
    .select("last_processed_ts")
    .collect()[0][0]
)
print("Last processed timestamp:", last_ts)

Last processed timestamp: 1900-01-01 00:00:00


In [0]:
bronze_sales = spark.table("bronze.sales_raw")

In [0]:
incremental_bronze_sales = bronze_sales.filter(
    col("ingestion_timestamp") > last_ts
)

In [0]:
from pyspark.sql.functions import *
silver_sales_clean = (
    incremental_bronze_sales
    .withColumnRenamed("ingestion_timestamp", "bronze_ingestion_timestamp")
    .withColumn("silver_processing_timestamp", current_timestamp())
    .withColumn(
        "transaction_ts_utc",
        to_utc_timestamp(to_timestamp("transaction_timestamp"), "UTC")
    )
    .withColumn(
        "exchange_rate",
        when(col("currency") == "USD", 1.0)
        .when(col("currency") == "INR", 0.012)
        .when(col("currency") == "EUR", 1.1)
    )
    .withColumn(
        "total_amount_corrected",
        round(col("quantity") * col("unit_price") - col("discount"), 2)
    )
    .withColumn(
        "total_amount_usd",
        col("total_amount_corrected") * col("exchange_rate")
    )
)


In [0]:
silver_sales_dedup = silver_sales_clean.dropDuplicates(["transaction_id"])

valid_sales = silver_sales_dedup.filter(
    (col("quantity") > 0) &
    col("product_id").isNotNull() &
    col("store_id").isNotNull() &
    col("exchange_rate").isNotNull()
)

invalid_sales = silver_sales_dedup.subtract(valid_sales)


In [0]:
from delta.tables import DeltaTable

silver_sales_tbl = DeltaTable.forName(spark, "silver.sales")

silver_sales_tbl.alias("t").merge(
    valid_sales.alias("s"),
    "t.transaction_id = s.transaction_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
invalid_sales.write.format("delta") \
    .mode("append") \
    .saveAsTable("silver.sales_quarantine")


In [0]:
#Updates Watermark tables
new_max_ts = incremental_bronze_sales.agg(
    {"ingestion_timestamp": "max"}
).collect()[0][0]

spark.sql(f"""
UPDATE silver_watermark
SET last_processed_ts = TIMESTAMP('{new_max_ts}')
WHERE table_name = 'silver.sales'
""")


DataFrame[num_affected_rows: bigint]

In [0]:
import uuid
run_id = str(uuid.uuid4())
spark.sql(f"""
INSERT INTO silver_logs
VALUES (
    "{run_id}",
    "silver.sales_incremental",
    "SUCCESS",
    current_timestamp(),
    {valid_sales.count()},
    NULL
)
""")


DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]