In [0]:
df=spark.read.table("dev.bronze.sales")

df_final=df.dropDuplicates().dropna().drop("ingestion_date")

df_final.write.mode("overwrite").saveAsTable("dev.silver.sales_cleaned")

In [0]:
%sql
select * from dev.bronze.products

In [0]:
%sql
CREATE OR REPLACE TABLE dev.silver.products_scd (product_id int, product_name string, product_category string, product_price double)

In [0]:
%sql
WITH deduplicated_source AS (
    SELECT 
        s.*,
        ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY seqNum DESC) as row_num
    FROM dev.bronze.products s
)
MERGE INTO dev.silver.products_scd t
USING (SELECT * FROM deduplicated_source WHERE row_num = 1
) s
ON t.product_id = s.product_id
WHEN MATCHED and s.operation='UPDATE' 
    THEN 
        UPDATE SET 
    t.product_name = s.product_name,
    t.product_category = s.product_category,
    t.product_price = s.product_price
WHEN MATCHED AND s.operation='DELETE'
THEN 
DELETE
WHEN NOT MATCHED THEN 
INSERT 
    (product_id, product_name, product_category, product_price) 
VALUES 
    (s.product_id, s.product_name, s.product_category, s.product_price)

In [0]:
%sql
select * from dev.silver.products_scd

In [0]:
%sql
select * from dev.bronze.customers

In [0]:
%sql
CREATE OR REPLACE TABLE dev.silver.customers_scd (customer_id int, customer_name string, customer_email string, customer_city string, customer_state string, is_current BOOLEAN,effective_date date, end_date date, version_number int )

In [0]:
%sql
-- SCD Type 2 Implementation - Single MERGE Approach
-- First, ensure your target table has the necessary SCD Type 2 columns:
-- ALTER TABLE dev.silver.products_scd ADD COLUMN IF NOT EXISTS effective_date DATE DEFAULT CURRENT_DATE;
-- ALTER TABLE dev.silver.products_scd ADD COLUMN IF NOT EXISTS end_date DATE;
-- ALTER TABLE dev.silver.products_scd ADD COLUMN IF NOT EXISTS is_current BOOLEAN DEFAULT TRUE;
-- ALTER TABLE dev.silver.products_scd ADD COLUMN IF NOT EXISTS version_number INTEGER DEFAULT 1;

WITH deduplicated_source AS (
    SELECT 
        s.*,
        ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY sequenceNum DESC) as row_num
    FROM dev.bronze.customers s
),
latest_source AS (
    SELECT * FROM deduplicated_source WHERE row_num = 1
)

MERGE INTO dev.silver.customers_scd t
USING latest_source s
ON t.customer_id = s.customer_id AND t.is_current = TRUE

-- Handle DELETE operations - soft delete by marking as inactive
WHEN MATCHED AND s.operation = 'DELETE' THEN 
    UPDATE SET 
        end_date = CURRENT_DATE,
        is_current = FALSE

-- Handle UPDATE operations - only if data actually changed
WHEN MATCHED AND s.operation = 'UPDATE' 
    AND (
        s.customer_name != t.customer_name OR
        s.customer_email != t.customer_email OR
        s.customer_city != t.customer_city OR
        s.customer_state != t.customer_state
    ) THEN 
    UPDATE SET 
        end_date = CURRENT_DATE,
        is_current = FALSE

-- Handle INSERT operations - new records
WHEN NOT MATCHED AND s.operation != 'DELETE' THEN 
    INSERT (
        customer_id, 
        customer_name, 
        customer_email, 
        customer_city,
        customer_state,
        effective_date, 
        end_date, 
        is_current, 
        version_number
    ) 
    VALUES (
        s.customer_id, 
        s.customer_name, 
        s.customer_email, 
        s.customer_city,
        s.customer_state,
        CURRENT_DATE, 
        NULL, 
        TRUE, 
        1
    );

-- Insert new versions for updated records (run after the above MERGE)
INSERT INTO dev.silver.customers_scd (
    customer_id, 
    customer_name, 
    customer_email, 
    customer_city,
    customer_state,
    effective_date, 
    end_date, 
    is_current, 
    version_number
)
SELECT 
    s.customer_id,
    s.customer_name,
    s.customer_email,
    s.customer_city,
    s.customer_state
    CURRENT_DATE + INTERVAL 1 DAY as effective_date,
    NULL as end_date,
    TRUE as is_current,
    COALESCE(MAX(t.version_number), 0) + 1 as version_number
FROM latest_source s
INNER JOIN dev.silver.customers_scd t 
    ON s.customer_id = t.customer_id 
    AND t.end_date = CURRENT_DATE  -- Records we just closed
WHERE s.operation = 'UPDATE'
    AND (
        s.customer_name != t.customer_name OR
        s.customer_email != t.customer_email OR
        s.customer_city != t.customer_city OR
        s.customer_state != t.customer_state
    )
GROUP BY s.customer_id, s.customer_name, s.customer_email, s.customer_city, s.customer_state;

In [0]:
%sql
-- SCD Type 2 Implementation - Single MERGE Approach
-- First, ensure your target table has the necessary SCD Type 2 columns:
-- ALTER TABLE dev.silver.products_scd ADD COLUMN IF NOT EXISTS effective_date DATE DEFAULT CURRENT_DATE;
-- ALTER TABLE dev.silver.products_scd ADD COLUMN IF NOT EXISTS end_date DATE;
-- ALTER TABLE dev.silver.products_scd ADD COLUMN IF NOT EXISTS is_current BOOLEAN DEFAULT TRUE;
-- ALTER TABLE dev.silver.products_scd ADD COLUMN IF NOT EXISTS version_number INTEGER DEFAULT 1;

WITH deduplicated_source AS (
    SELECT 
        s.*,
        ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY sequenceNum DESC) as row_num
    FROM dev.bronze.customers s
),
latest_source AS (
    SELECT * FROM deduplicated_source WHERE row_num = 1
)

MERGE INTO dev.silver.customers_scd t
USING latest_source s
ON t.customer_id = s.customer_id AND t.is_current = TRUE

-- Handle DELETE operations - soft delete by marking as inactive
WHEN MATCHED AND s.operation = 'DELETE' THEN 
    UPDATE SET 
        end_date = CURRENT_DATE,
        is_current = FALSE

-- Handle UPDATE operations - only if data actually changed
WHEN MATCHED AND s.operation = 'UPDATE' 
    AND (
        s.customer_name != t.customer_name OR
        s.customer_email != t.customer_email OR
        s.customer_city != t.customer_city OR
        s.customer_state != t.customer_state
    ) THEN 
    UPDATE SET 
        end_date = CURRENT_DATE,
        is_current = FALSE

-- Handle INSERT operations - new records
WHEN NOT MATCHED AND s.operation != 'DELETE' THEN 
    INSERT (
        customer_id, 
        customer_name, 
        customer_email, 
        customer_city,
        customer_state,
        effective_date, 
        end_date, 
        is_current, 
        version_number
    ) 
    VALUES (
        s.customer_id, 
        s.customer_name, 
        s.customer_email, 
        s.customer_city,
        s.customer_state,
        CURRENT_DATE, 
        NULL, 
        TRUE, 
        1
    );

-- Insert new versions for updated records (run after the above MERGE)
INSERT INTO dev.silver.customers_scd (
    customer_id, 
    customer_name, 
    customer_email, 
    customer_city,
    customer_state,
    effective_date, 
    end_date, 
    is_current, 
    version_number
)
SELECT 
    s.customer_id,
    s.customer_name,
    s.customer_email,
    s.customer_city,
    s.customer_state,
    DATE_ADD(CURRENT_DATE, 1) as effective_date,
    NULL as end_date,
    TRUE as is_current,
    COALESCE(MAX(t.version_number), 0) + 1 as version_number
FROM latest_source s
INNER JOIN dev.silver.customers_scd t 
    ON s.customer_id = t.customer_id 
    AND t.end_date = CURRENT_DATE  -- Records we just closed
WHERE s.operation = 'UPDATE'
    AND (
        s.customer_name != t.customer_name OR
        s.customer_email != t.customer_email OR
        s.customer_city != t.customer_city OR
        s.customer_state != t.customer_state
    )
GROUP BY s.customer_id, s.customer_name, s.customer_email, s.customer_city, s.customer_state;