In [1]:
spark.sql("""
CREATE TABLE IF NOT EXISTS silver_products (
    product_id STRING,
    name STRING,
    category STRING,
    brand STRING,
    price DOUBLE,
    stock_quantity INT,
    rating DOUBLE,
    is_active BOOLEAN,
    price_category STRING,
    stock_status STRING,
    last_updated TIMESTAMP
)
USING DELTA
""")


StatementMeta(, 7cc3cc24-08ec-46e6-a175-851e0798782a, 3, Finished, Available, Finished)

DataFrame[]

In [2]:
last_processed_df = spark.sql("SELECT MAX(last_updated) as last_processed FROM silver_products")
last_processed_timestamp = last_processed_df.collect()[0]['last_processed']


if last_processed_timestamp is None:
    last_processed_timestamp = "1900-01-01T00:00:00.000+00:00"


StatementMeta(, 7cc3cc24-08ec-46e6-a175-851e0798782a, 4, Finished, Available, Finished)

In [7]:
spark.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW bronze_incremental_products AS
SELECT *
FROM BronzeLayer.product WHERE ingestion_timestamp > '{last_processed_timestamp}'
""")


StatementMeta(, 7cc3cc24-08ec-46e6-a175-851e0798782a, 9, Finished, Available, Finished)

DataFrame[]

In [8]:
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW silver_incremental_products AS
SELECT
    product_id,
    name,
    category,
    brand,
    CASE
        WHEN price < 0 THEN 0
        ELSE price
    END AS price,
    CASE
        WHEN stock_quantity < 0 THEN 0
        ELSE stock_quantity
    END AS stock_quantity,
    CASE
        WHEN rating < 0 THEN 0
        WHEN rating > 5 THEN 5
        ELSE rating
    END AS rating,
    is_active,
    CASE
        WHEN price > 1000 THEN 'Premium'
        WHEN price > 100 THEN 'Standard'
        ELSE 'Budget'
    END AS price_category,
    CASE
        WHEN stock_quantity = 0 THEN 'Out of Stock'
        WHEN stock_quantity < 10 THEN 'Low Stock'
        WHEN stock_quantity < 50 THEN 'Moderate Stock'
        ELSE 'Sufficient Stock'
    END AS stock_status,
    CURRENT_TIMESTAMP() AS last_updated
FROM bronze_incremental_products
WHERE name IS NOT NULL AND category IS NOT NULL
""")


StatementMeta(, 7cc3cc24-08ec-46e6-a175-851e0798782a, 10, Finished, Available, Finished)

DataFrame[]

In [9]:
spark.sql("""
MERGE INTO silver_products target
USING silver_incremental_products source
ON target.product_id = source.product_id
WHEN MATCHED THEN
    UPDATE SET *
WHEN NOT MATCHED THEN
    INSERT *
""")


StatementMeta(, 7cc3cc24-08ec-46e6-a175-851e0798782a, 11, Finished, Available, Finished)

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [10]:
spark.sql("SELECT * FROM silver_products LIMIT 10").show()

StatementMeta(, 7cc3cc24-08ec-46e6-a175-851e0798782a, 12, Finished, Available, Finished)

+----------+-----------+-----------+----------+------+--------------+------+---------+--------------+------------+--------------------+
|product_id|       name|   category|     brand| price|stock_quantity|rating|is_active|price_category|stock_status|        last_updated|
+----------+-----------+-----------+----------+------+--------------+------+---------+--------------+------------+--------------------+
|        35| Product 35|     Sports|BeautyGlow|949.84|             1|   2.0|    false|      Standard|   Low Stock|2024-09-28 12:45:...|
|       828|Product 828|       Food|TastyBites|973.33|             4|   3.3|    false|      Standard|   Low Stock|2024-09-28 12:45:...|
|        13| Product 13|   Clothing|  SportMax|220.49|             7|   2.4|    false|      Standard|   Low Stock|2024-09-28 12:45:...|
|       260|Product 260|       Toys|  SportMax| 362.9|             6|   1.1|    false|      Standard|   Low Stock|2024-09-28 12:45:...|
|       384|Product 384|     Sports| HomeSmart|6