"""
This notebook performs the following tasks to load the Silver layer transactions:

1. **Create Silver Layer Table**: Creates the Silver layer orders table in the `globalretail_silver` database.
2. **Load Incremental Data**: Loads incremental data from the Bronze layer transactions table based on the last updated timestamp.
3. **Data Transformation and Cleanup**: Normalizes quantity and total amount, ensures consistent date formats, derives order status based on quantity and total amount, and filters out records with null transaction dates, customer IDs, or product IDs.
4. **Merge Data**: Merges the transformed data into the Silver layer orders table using the MERGE command.

"""

In [0]:
"""
Creates the 'silver_orders' table in the 'globalretail_silver' database if it does not already exist.
The table schema includes transaction and order details, and stores data using the Delta Lake format (USING DELTA).
Delta Lake (referred to as 'DELTA' here) is a storage layer that brings ACID transactions, scalable metadata handling, and unifies streaming and batch data processing on Databricks.
"""

spark.sql("USE globalretail_silver")
spark.sql("""
CREATE TABLE IF NOT EXISTS silver_orders (
    transaction_id STRING,
    customer_id STRING,
    product_id STRING,
    quantity INT,
    total_amount DOUBLE,
    transaction_date DATE,
    payment_method STRING,
    store_type STRING,
    order_status STRING,
    last_updated TIMESTAMP
)
USING DELTA
""")

In [0]:
"""
Retrieves the most recent 'last_updated' timestamp from the 'silver_orders' table to determine the last processed record.
If no records exist, defaults the timestamp to '1900-01-01T00:00:00.000+00:00' for initial processing.
"""
# Get the last processed timestamp from silver layer
last_processed_df = spark.sql("SELECT MAX(last_updated) as last_processed FROM silver_orders")
last_processed_timestamp = last_processed_df.collect()[0]['last_processed']

if last_processed_timestamp is None:
    last_processed_timestamp = "1900-01-01T00:00:00.000+00:00"

In [0]:
"""
Creates or replaces a temporary view 'bronze_incremental_orders' containing new records from the 'globalretail_bronze.bronze_transactions' table.
Filters records where 'ingestion_timestamp' is greater than the last processed timestamp to enable incremental data processing.
"""

# Create a temporary view of incremental bronze data
spark.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW bronze_incremental_orders AS
SELECT *
FROM globalretail_bronze.bronze_transactions WHERE ingestion_timestamp > '{last_processed_timestamp}'

""")

In [0]:
"""
Displays all records from the 'bronze_incremental_orders' temporary view for inspection or validation.
"""
display(spark.sql("select * from bronze_incremental_orders"))

Data Transformations:
   - Quantity and total_amount normalization (setting negative values to 0)
   - Date casting to ensure consistent date format
   - Order status derivation based on quantity and total_amount

Data Quality Checks: We filter out records with null transaction dates, customer IDs, or product IDs.


In [0]:
"""
Creates or replaces the 'silver_incremental_orders' temporary view by transforming and cleansing data from 'bronze_incremental_orders'.
- Sets negative 'quantity' and 'total_amount' values to 0.
- Casts 'transaction_date' to DATE.
- Sets 'order_status' to 'Cancelled' if 'quantity' or 'total_amount' is 0, otherwise 'Completed'.
- Adds the current timestamp as 'last_updated'.
- Filters out records with NULL 'transaction_date', 'customer_id', or 'product_id'.
"""

spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW silver_incremental_orders AS
SELECT
    transaction_id,
    customer_id,
    product_id,
    CASE 
        WHEN quantity < 0 THEN 0 
        ELSE quantity 
    END AS quantity,
    CASE 
        WHEN total_amount < 0 THEN 0 
        ELSE total_amount 
    END AS total_amount,
    CAST(transaction_date AS DATE) AS transaction_date,
    payment_method,
    store_type,
    CASE
        WHEN quantity = 0 OR total_amount = 0 THEN 'Cancelled'
        ELSE 'Completed'
    END AS order_status,
    CURRENT_TIMESTAMP() AS last_updated
FROM bronze_incremental_orders
WHERE transaction_date IS NOT NULL
  AND customer_id IS NOT NULL
  AND product_id IS NOT NULL
""")

In [0]:
"""
Displays all records from the 'silver_incremental_orders' temporary view for inspection or validation.
"""
display(spark.sql("select * from silver_incremental_orders"))

In [0]:
"""
Performs an upsert (merge) operation from the 'silver_incremental_orders' temporary view into the 'silver_orders' Delta table.
- Updates existing records in 'silver_orders' that match on 'transaction_id' with values from 'silver_incremental_orders'.
- Inserts new records from 'silver_incremental_orders' that do not exist in 'silver_orders'.
"""

spark.sql("""
MERGE INTO silver_orders target
USING silver_incremental_orders source
ON target.transaction_id = source.transaction_id
WHEN MATCHED THEN
    UPDATE SET *
WHEN NOT MATCHED THEN
    INSERT *
""")

In [0]:
%sql
-- Returns all columns and rows from the silver_orders table.
-- Ensures that no negative values are present in the 'amount' column.
-- Only rows where 'amount' is greater than or equal to 0 will be returned.
select * from silver_orders