In [0]:
%sql
show databases

In [0]:
spark.sql("USE globalretail_silver")

spark.sql("""
CREATE TABLE IF NOT EXISTS silver_orders (
    transaction_id STRING,
    customer_id STRING,
    product_id STRING,
    quantity INT,
    total_amount DOUBLE,
    transaction_date DATE,
    payment_method STRING,
    store_type STRING,
    order_status STRING,
    last_updated TIMESTAMP
)
USING DELTA
""")

In [0]:
# Get the last processed timestamp from silver layer
last_processed_df = spark.sql("select max(last_updated) as last_processed from silver_orders")
last_processed_timestamp = last_processed_df.collect()[0]['last_processed']

if last_processed_timestamp is None:
    last_processed_timestamp = '1900-01-01'

In [0]:
# Create a temporary view of incremental bronze data
spark.sql(f"""
          CREATE OR REPLACE TEMPORARY VIEW bronze_incremental_orders AS
          SELECT * FROM globalretail_bronze.bronze_transactions WHERE ingestion_timestamp > '{last_processed_timestamp}'
          """)


In [0]:
display(spark.sql("SELECT * FROM bronze_incremental_orders"))

In [0]:
# Quantity and total_amount normalization (setting negative values to 0)
# Date casting to ensure consistent date format
# Order status derivation based on quantity and total_amount
# Data Quality Checks: We filter out records with null transaction dates, customer IDs, or product IDs.

spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW silver_incremental_orders AS
SELECT
    transaction_id,
    customer_id,
    product_id,
    CASE
        WHEN quantity < 0 THEN 0
        ELSE quantity
    END AS quantity,
    CASE
        WHEN total_amount < 0 THEN 0
        ELSE total_amount
    END AS total_amount,
    CAST(transaction_date AS DATE) AS transaction_date,
    payment_method,
    store_type,
    CASE
        WHEN quantity = 0 OR total_amount = 0 THEN 'Cancelled'
        ELSE 'Completed'
    END AS order_status,
    CURRENT_TIMESTAMP() AS last_updated
FROM bronze_incremental_orders
WHERE transaction_date IS NOT NULL AND customer_id IS NOT NULL AND product_id IS NOT NULL
""")

In [0]:
display(spark.sql("SELECT * FROM silver_incremental_orders"))

In [0]:
spark.sql("""
MERGE INTO silver_orders target
USING silver_incremental_orders source
ON target.transaction_id = source.transaction_id
WHEN MATCHED THEN
    UPDATE SET *
WHEN NOT MATCHED THEN
    INSERT *
""")

In [0]:
display(spark.sql("SELECT * FROM silver_orders"))