In [0]:
spark.sql("USE globalretail_silver")
spark.sql("""
CREATE TABLE IF NOT EXISTS silver_products (
    product_id STRING,
    name STRING,
    category STRING,
    brand STRING,
    price DOUBLE,
    stack_quantity INT,
    rating DOUBLE,
    is_active BOOLEAN,
    price_category STRING,
    stock_status STRING,
    last_updated TIMESTAMP
) USING DELTA
""")

In [0]:
spark.sql("use globalretail_silver")
last_processed_df = spark.sql("select max(last_updated) as last_processed from silver_products")

last_processed_timestamp = last_processed_df.collect()[0]['last_processed']

if last_processed_timestamp is None:
  last_processed_timestamp = '1900-01-01 00:00:00'

In [0]:
spark.sql(f"""
create or replace temporary view bronze_incremental_products as
select * from globalretail_bronze.bronze_products  c
where c.ingestion_timestamp > '{last_processed_timestamp}'
""")

In [0]:
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW silver_incremental_products_normalized AS
SELECT
  product_id,
  name,
  category,
  brand,
  -- Price normalization
  CASE WHEN price < 0 THEN 0 ELSE price END AS price,
  -- Stock quantity normalization
  CASE WHEN stock_quantity < 0 THEN 0 ELSE stock_quantity END AS stock_quantity,
  -- Rating normalization
  CASE 
    WHEN rating < 0 THEN 0
    WHEN rating > 5 THEN 5
    ELSE rating
  END AS rating,
  is_active,
  -- Price categorization
  CASE
    WHEN price >= 1000 THEN 'Premium'
    WHEN price >= 300 THEN 'Standard'
    ELSE 'Budget'
  END AS price_category,
  -- Stock status calculation
  CASE
    WHEN stock_quantity = 0 THEN 'Out of Stock'
    WHEN stock_quantity <= 10 THEN 'Low Stock'
    WHEN stock_quantity <= 50 THEN 'Moderate Stock'
    ELSE 'Sufficient Stock'
  END AS stock_status,
  ingestion_timestamp AS last_updated
FROM bronze_incremental_products
""")

In [0]:
spark.sql("""
MERGE INTO silver_products AS target
USING silver_incremental_products_normalized AS source
ON target.product_id = source.product_id
WHEN MATCHED THEN
  UPDATE SET
    target.name = source.name,
    target.category = source.category,
    target.brand = source.brand,
    target.price = source.price,
    target.stack_quantity = source.stock_quantity,
    target.rating = source.rating,
    target.is_active = source.is_active,
    target.price_category = source.price_category,
    target.stock_status = source.stock_status,
    target.last_updated = source.last_updated
WHEN NOT MATCHED THEN
  INSERT (
    product_id,
    name,
    category,
    brand,
    price,
    stack_quantity,
    rating,
    is_active,
    price_category,
    stock_status,
    last_updated
  )
  VALUES (
    source.product_id,
    source.name,
    source.category,
    source.brand,
    source.price,
    source.stock_quantity,
    source.rating,
    source.is_active,
    source.price_category,
    source.stock_status,
    source.last_updated
  )
""")

In [0]:
spark.sql("USE globalretail_silver")
display(spark.sql("SELECT * FROM globalretail_silver.silver_products"))