# 1. Cities

In [2]:
# Read parquet file in bronze
df = spark.read.load(path='abfss://final@bibik224161840.dfs.core.windows.net/bronze/Application.Cities.parquet', format='parquet')

# Add surrogate key column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("CityID")  
df = df.withColumn("CityKey", row_number().over(window_spec))

# Create a temporary view from the bronze Parquet data (df) as the source
df.createOrReplaceTempView("v_bronze_Cities")

StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
%%sql
--Create table in Lake Database
USE silver;

CREATE TABLE IF NOT EXISTS silver.Cities (
    CityKey int,
	CityID int,
	CityName string,
    StateProvinceID int,
    Location string,
    LatestRecordedPopulation int,
	last_updated timestamp,
    start_date timestamp,
    end_date timestamp,
    is_active boolean
);

StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
%%sql
-- SCD type 2: Insert the updated rows (setting is_active = 0 and end_date for historical rows)
INSERT INTO silver.Cities
SELECT
    target.CityKey,
    target.CityID,
    target.CityName,
    target.StateProvinceID,
    target.Location,
    target.LatestRecordedPopulation,
    target.last_updated,
    target.start_date,
    FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS end_date, 
    false AS is_active
FROM
    silver.Cities AS target
JOIN
    v_bronze_Cities AS source
    ON target.CityID = source.CityID
WHERE
    target.is_active = true  -- Only update active rows
    AND (
        target.CityName <> source.CityName
        OR target.StateProvinceID <> source.StateProvinceID
        OR target.Location <> source.Location
        OR target.LatestRecordedPopulation <> source.LatestRecordedPopulation
        OR target.last_updated <> source.last_updated
    );

StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
%%sql
-- SCD Type 2: Drop old rows after update (which have is_active = 1)
INSERT OVERWRITE silver.Cities
SELECT *
FROM silver.Cities AS target
WHERE NOT EXISTS (
    SELECT 1
    FROM v_bronze_Cities AS source
    WHERE target.CityID = source.CityID
    AND target.is_active = true
    AND (
        target.CityName <> source.CityName
        OR target.StateProvinceID <> source.StateProvinceID
        OR target.Location <> source.Location
        OR target.LatestRecordedPopulation <> source.LatestRecordedPopulation
        OR target.last_updated <> source.last_updated
    )
);

StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
%%sql
-- SCD Type 2: Insert new rows (new records)
WITH max_key AS (  
    SELECT COALESCE(MAX(CityKey), 0) AS max_id FROM silver.Cities  
),  
new_data AS (  
    SELECT  
        source.CityID,  
        source.CityName,  
        source.StateProvinceID,  
        source.Location,  
        source.LatestRecordedPopulation,  
        source.last_updated,  
        FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS start_date,  
        NULL AS end_date,  
        true AS is_active,  
        ROW_NUMBER() OVER (ORDER BY source.CityID) AS rn  
    FROM v_bronze_Cities AS source  
    LEFT JOIN silver.Cities AS target  
        ON source.CityID = target.CityID  
        AND target.is_active = true  
    WHERE target.CityID IS NULL  
       OR target.CityName <> source.CityName  
       OR target.StateProvinceID <> source.StateProvinceID  
       OR target.Location <> source.Location  
       OR target.LatestRecordedPopulation <> source.LatestRecordedPopulation  
       OR target.last_updated <> source.last_updated  
)  
INSERT INTO silver.Cities  
SELECT  
    max_key.max_id + new_data.rn AS CityKey,  
    new_data.CityID,  
    new_data.CityName,  
    new_data.StateProvinceID,  
    new_data.Location,  
    new_data.LatestRecordedPopulation,  
    new_data.last_updated,  
    new_data.start_date,  
    new_data.end_date,  
    new_data.is_active  
FROM new_data  
CROSS JOIN max_key;


StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
# Write to a Parquet file in the Silver layer of the storage account
df_Cities = spark.sql("SELECT * FROM silver.Cities")
df_Cities.repartition(1).write.mode("overwrite").parquet("abfss://final@bibik224161840.dfs.core.windows.net/silver/silver.Cities.parquet")

StatementMeta(, , -1, Cancelled, , Cancelled)

# 2. StateProvince

In [None]:
# Read parquet file in bronze
df = spark.read.load(path='abfss://final@bibik224161840.dfs.core.windows.net/bronze/Application.StateProvinces.parquet', format='parquet')

# Add surrogate key column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("StateProvinceID")
df = df.withColumn("StateProvinceKey", row_number().over(window_spec))

df = df.select("StateProvinceKey", *[col for col in df.columns if col != "StateProvinceKey"])

# Create a temporary view from the bronze Parquet data (df) as the source
df.createOrReplaceTempView("v_bronze_StateProvinces")

StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
%%sql
-- Create table in Lake Database
USE silver;
CREATE TABLE IF NOT EXISTS silver.StateProvinces (
    StateProvinceKey INT, -- Surrogate Key
    StateProvinceID INT,
    StateProvinceCode STRING,
    StateProvinceName STRING,
    CountryID INT,
    SalesTerritory STRING,
    Border STRING,
    LatestRecordedPopulation INT,
    last_updated TIMESTAMP,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    is_active BOOLEAN
);


StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
%%sql
-- SCD Type 2: Insert the updated rows (setting is_active = 0 and end_date for historical rows)
INSERT INTO silver.StateProvinces
SELECT
    target.StateProvinceKey,
    target.StateProvinceID,
    target.StateProvinceCode,
    target.StateProvinceName,
    target.CountryID,
    target.SalesTerritory,
    target.Border,
    target.LatestRecordedPopulation,
    target.last_updated,
    target.start_date,
    FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS end_date, -- Expiration date
    false AS is_active
FROM silver.StateProvinces AS target
JOIN v_bronze_StateProvinces AS source
    ON target.StateProvinceID = source.StateProvinceID
WHERE target.is_active = true
    AND (
        target.StateProvinceCode <> source.StateProvinceCode
        OR target.StateProvinceName <> source.StateProvinceName
        OR target.CountryID <> source.CountryID
        OR target.SalesTerritory <> source.SalesTerritory
        OR target.Border <> source.Border
        OR target.LatestRecordedPopulation <> source.LatestRecordedPopulation
        OR target.last_updated <> source.last_updated
    );


StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
%%sql
-- SCD Type 2: Drop old rows after update (which have is_active = 1)
INSERT OVERWRITE silver.StateProvinces
SELECT *
FROM silver.StateProvinces AS target
WHERE NOT EXISTS (
    SELECT 1
    FROM v_bronze_StateProvinces AS source
    WHERE target.StateProvinceID = source.StateProvinceID
    AND target.is_active = true
    AND (
        target.StateProvinceCode <> source.StateProvinceCode
        OR target.StateProvinceName <> source.StateProvinceName
        OR target.CountryID <> source.CountryID
        OR target.SalesTerritory <> source.SalesTerritory
        OR target.Border <> source.Border
        OR target.LatestRecordedPopulation <> source.LatestRecordedPopulation
        OR target.last_updated <> source.last_updated
    )
);

In [None]:
%%sql
-- SCD Type 2: Insert new rows (new records)
WITH max_key AS (  
    SELECT COALESCE(MAX(StateProvinceKey), 0) AS max_id FROM silver.StateProvinces  
),  
new_data AS (  
    SELECT  
        source.StateProvinceID,  
        source.StateProvinceCode,  
        source.StateProvinceName,  
        source.CountryID,  
        source.SalesTerritory,  
        source.Border,  
        source.LatestRecordedPopulation,  
        source.last_updated,  
        FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS start_date,  
        NULL AS end_date,  
        true AS is_active,  
        ROW_NUMBER() OVER (ORDER BY source.StateProvinceID) AS rn  
    FROM v_bronze_StateProvinces AS source  
    LEFT JOIN silver.StateProvinces AS target  
        ON source.StateProvinceID = target.StateProvinceID  
        AND target.is_active = true  
    WHERE target.StateProvinceID IS NULL  
       OR target.StateProvinceCode <> source.StateProvinceCode  
       OR target.StateProvinceName <> source.StateProvinceName  
       OR target.CountryID <> source.CountryID  
       OR target.SalesTerritory <> source.SalesTerritory  
       OR target.Border <> source.Border  
       OR target.LatestRecordedPopulation <> source.LatestRecordedPopulation  
       OR target.last_updated <> source.last_updated  
)  
INSERT INTO silver.StateProvinces  
SELECT  
    max_key.max_id + new_data.rn AS StateProvinceKey,  
    new_data.StateProvinceID,  
    new_data.StateProvinceCode,  
    new_data.StateProvinceName,  
    new_data.CountryID,  
    new_data.SalesTerritory,  
    new_data.Border,  
    new_data.LatestRecordedPopulation,  
    new_data.last_updated,  
    new_data.start_date,  
    new_data.end_date,  
    new_data.is_active  
FROM new_data  
CROSS JOIN max_key;


In [None]:
# Write to a Parquet file in the Silver layer of the storage account
df_StateProvinces = spark.sql("SELECT * FROM silver.StateProvinces")
df_StateProvinces.repartition(1).write.mode("overwrite").parquet("abfss://final@bibik224161840.dfs.core.windows.net/silver/silver.StateProvinces.parquet")

# 3. Countries

In [8]:
# Read parquet file in bronze
df = spark.read.load(path='abfss://final@bibik224161840.dfs.core.windows.net/bronze/Application.Countries.parquet', format='parquet')

# Add surrogate key column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("CountryID")
df = df.withColumn("CountryKey", row_number().over(window_spec))

df = df.select("CountryKey", *[col for col in df.columns if col != "CountryKey"])

# Create a temporary view from the bronze Parquet data (df) as the source
df.createOrReplaceTempView("v_bronze_Countries")

StatementMeta(sparkpool, 60, 4, Finished, Available, Finished)

In [None]:
%%sql
-- Create table in Lake Database
USE silver;
CREATE TABLE IF NOT EXISTS silver.Countries (
    CountryKey INT, -- Surrogate Key
    CountryID INT,
    CountryName STRING,
    FormalName STRING,
    IsoAlpha3Code STRING,
    IsoNumericCode INT,
    CountryType STRING,
    LatestRecordedPopulation INT,
    Continent STRING,
    Region STRING,
    Subregion STRING,
    Border STRING,
    last_updated TIMESTAMP,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    is_active BOOLEAN
);

In [None]:
%%sql
-- SCD Type 2: Insert the updated rows (setting is_active = 0 and end_date for historical rows)
INSERT INTO silver.Countries
SELECT
    target.CountryKey,
    target.CountryID,
    target.CountryName,
    target.FormalName,
    target.IsoAlpha3Code,
    target.IsoNumericCode,
    target.CountryType,
    target.LatestRecordedPopulation,
    target.Continent,
    target.Region,
    target.Subregion,
    target.Border,
    target.last_updated,
    target.start_date,
    FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS end_date, -- Expiration date
    false AS is_active
FROM silver.Countries AS target
JOIN v_bronze_Countries AS source
    ON target.CountryID = source.CountryID
WHERE target.is_active = true
    AND (
        target.CountryName <> source.CountryName
        OR target.FormalName <> source.FormalName
        OR target.IsoAlpha3Code <> source.IsoAlpha3Code
        OR target.IsoNumericCode <> source.IsoNumericCode
        OR target.CountryType <> source.CountryType
        OR target.LatestRecordedPopulation <> source.LatestRecordedPopulation
        OR target.Continent <> source.Continent
        OR target.Region <> source.Region
        OR target.Subregion <> source.Subregion
        OR target.Border <> source.Border
        OR target.last_updated <> source.last_updated
    );


In [None]:
%%sql
-- SCD Type 2: Drop old rows after update (which have is_active = 1)
INSERT OVERWRITE silver.Countries
SELECT *
FROM silver.Countries AS target
WHERE NOT EXISTS (
    SELECT 1
    FROM v_bronze_Countries AS source
    WHERE target.CountryID = source.CountryID
    AND target.is_active = true
    AND (
        target.CountryName <> source.CountryName
        OR target.FormalName <> source.FormalName
        OR target.IsoAlpha3Code <> source.IsoAlpha3Code
        OR target.IsoNumericCode <> source.IsoNumericCode
        OR target.CountryType <> source.CountryType
        OR target.LatestRecordedPopulation <> source.LatestRecordedPopulation
        OR target.Continent <> source.Continent
        OR target.Region <> source.Region
        OR target.Subregion <> source.Subregion
        OR target.Border <> source.Border
        OR target.last_updated <> source.last_updated
    )
);

In [None]:
%%sql
-- SCD Type 2: Insert new rows (new records)
WITH max_key AS (  
    SELECT COALESCE(MAX(CountryKey), 0) AS max_id FROM silver.Countries  
),  
new_data AS (  
    SELECT  
        source.CountryID,  
        source.CountryName,  
        source.FormalName,  
        source.IsoAlpha3Code,  
        source.IsoNumericCode,  
        source.CountryType,  
        source.LatestRecordedPopulation,  
        source.Continent,  
        source.Region,  
        source.Subregion,  
        source.Border,  
        source.last_updated,  
        FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS start_date,  
        NULL AS end_date,  
        true AS is_active,  
        ROW_NUMBER() OVER (ORDER BY source.CountryID) AS rn  
    FROM v_bronze_Countries AS source  
    LEFT JOIN silver.Countries AS target  
        ON source.CountryID = target.CountryID  
        AND target.is_active = true  
    WHERE target.CountryID IS NULL  
       OR target.CountryName <> source.CountryName  
       OR target.FormalName <> source.FormalName  
       OR target.IsoAlpha3Code <> source.IsoAlpha3Code  
       OR target.IsoNumericCode <> source.IsoNumericCode  
       OR target.CountryType <> source.CountryType  
       OR target.LatestRecordedPopulation <> source.LatestRecordedPopulation  
       OR target.Continent <> source.Continent  
       OR target.Region <> source.Region  
       OR target.Subregion <> source.Subregion  
       OR target.Border <> source.Border  
       OR target.last_updated <> source.last_updated  
)  
INSERT INTO silver.Countries  
SELECT  
    max_key.max_id + new_data.rn AS CountryKey,  
    new_data.CountryID,  
    new_data.CountryName,  
    new_data.FormalName,  
    new_data.IsoAlpha3Code,  
    new_data.IsoNumericCode,  
    new_data.CountryType,  
    new_data.LatestRecordedPopulation,  
    new_data.Continent,  
    new_data.Region,  
    new_data.Subregion,  
    new_data.Border,  
    new_data.last_updated,  
    new_data.start_date,  
    new_data.end_date,  
    new_data.is_active  
FROM new_data  
CROSS JOIN max_key;

In [None]:
# Write to a Parquet file in the Silver layer of the storage account
df_Countries = spark.sql("SELECT * FROM silver.Countries")
df_Countries.repartition(1).write.mode("overwrite").parquet("abfss://final@bibik224161840.dfs.core.windows.net/silver/silver.Countries.parquet")

# 4. DeliveryMethods

In [None]:
# Read parquet file in bronze
df = spark.read.load(path='abfss://final@bibik224161840.dfs.core.windows.net/bronze/Application.DeliveryMethods.parquet',format='parquet')

# Add surrogate key column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("DeliveryMethodID")  
df = df.withColumn("DeliveryMethodKey", row_number().over(window_spec))

df = df.select("DeliveryMethodKey", *[col for col in df.columns if col != "DeliveryMethodKey"])

# Create a temporary view from the bronze Parquet data (df) as the source
df.createOrReplaceTempView("v_bronze_DeliveryMethods")

In [None]:
%%sql
--Create table in Lake Database
USE silver;
CREATE TABLE IF NOT EXISTS silver.DeliveryMethods (
    DeliveryMethodKey INT, -- Surrogate Key (Auto Increment)
    DeliveryMethodID INT,
    DeliveryMethodName STRING,
    last_updated TIMESTAMP,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    is_active BOOLEAN
);

In [None]:
%%sql
-- SCD type 2: Insert the updated rows (setting is_active = 0 and end_date for historical rows)
INSERT INTO silver.DeliveryMethods
SELECT 
    target.DeliveryMethodKey,
    target.DeliveryMethodID,
    target.DeliveryMethodName,
    target.last_updated,
    target.start_date, 
    FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS end_date, -- Expiration date
    false AS is_active
FROM silver.DeliveryMethods AS target
JOIN v_bronze_DeliveryMethods AS source 
    ON target.DeliveryMethodID = source.DeliveryMethodID
WHERE target.is_active = true  -- Only update active rows
    AND (
        target.DeliveryMethodName <> source.DeliveryMethodName -- Name has changed
        OR target.last_updated <> source.last_updated -- Update timestamp has changed
    );

In [None]:
%%sql
-- SCD Type 2: Drop old rows after update (which have is_active = 1)
INSERT OVERWRITE silver.DeliveryMethods
SELECT * 
FROM silver.DeliveryMethods AS target
WHERE NOT EXISTS (
    SELECT 1 
    FROM v_bronze_DeliveryMethods AS source 
    WHERE target.DeliveryMethodID = source.DeliveryMethodID
    AND target.is_active = true
    AND (
        target.DeliveryMethodName <> source.DeliveryMethodName
        OR target.last_updated <> source.last_updated
    )
);

In [None]:
%%sql
-- SCD Type 2: Insert new rows (new records)  
WITH max_key AS (  
    --Retrieve the maximum existing DeliveryMethodKey in the Silver table  
    SELECT COALESCE(MAX(DeliveryMethodKey), 0) AS max_id FROM silver.DeliveryMethods  
),  
new_data AS (  
    --Create a list of new records to be inserted into the Silver table  
    SELECT   
        source.DeliveryMethodID,  
        source.DeliveryMethodName,  
        source.last_updated,  
        FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS start_date, -- Start date of validity  
        NULL AS end_date,  -- NULL value for end_date  
        true AS is_active,  -- Mark the new record as active  
        ROW_NUMBER() OVER (ORDER BY source.DeliveryMethodID) AS rn -- Assign row numbers to new records rn
    FROM v_bronze_DeliveryMethods AS source  
    LEFT JOIN silver.DeliveryMethods AS target   
        ON source.DeliveryMethodID = target.DeliveryMethodID  
        AND target.is_active = true  -- Compare only with currently active records  
    WHERE target.DeliveryMethodID IS NULL  -- New records that do not exist in the Silver table  
       OR target.DeliveryMethodName <> source.DeliveryMethodName  -- Name has changed  
       OR target.last_updated <> source.last_updated  -- Last updated timestamp has changed  
)  
-- Insert new data into the Silver table  
INSERT INTO silver.DeliveryMethods  
SELECT   
    max_key.max_id + new_data.rn AS DeliveryMethodKey, -- New surrogate key
    new_data.DeliveryMethodID,  
    new_data.DeliveryMethodName,  
    new_data.last_updated,  
    new_data.start_date,  
    new_data.end_date,  
    new_data.is_active  
FROM new_data  
CROSS JOIN max_key;  


In [None]:
# Write to a Parquet file in the Silver layer of the storage account
df_DeliveryMethods = spark.sql("SELECT * FROM silver.DeliveryMethods")
df_DeliveryMethods.repartition(1).write.mode("overwrite").parquet("abfss://final@bibik224161840.dfs.core.windows.net/silver/silver.DeliveryMethods.parquet")

# 5. PaymentMethods

In [None]:
# Read parquet file in bronze
df = spark.read.load(path='abfss://final@bibik224161840.dfs.core.windows.net/bronze/Application.PaymentMethods.parquet',format='parquet')

# Add surrogate key column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("PaymentMethodID")  
df = df.withColumn("PaymentMethodKey", row_number().over(window_spec))

df = df.select("PaymentMethodKey", *[col for col in df.columns if col != "PaymentMethodKey"])

# Create a temporary view from the bronze Parquet data (df) as the source
df.createOrReplaceTempView("v_bronze_PaymentMethods")

In [None]:
%%sql
--Create table in Lake Database
USE silver;
CREATE TABLE IF NOT EXISTS silver.PaymentMethods (
    PaymentMethodKey INT, -- Surrogate Key (Auto Increment)
    PaymentMethodID INT,
    PaymentMethodName STRING,
    last_updated TIMESTAMP,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    is_active BOOLEAN
);

In [None]:
%%sql
-- SCD type 2: Insert the updated rows (setting is_active = 0 and end_date for historical rows)
INSERT INTO silver.PaymentMethods
SELECT 
    target.PaymentMethodKey,
    target.PaymentMethodID,
    target.PaymentMethodName,
    target.last_updated,
    target.start_date, 
    FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS end_date, -- Expiration date
    false AS is_active
FROM silver.PaymentMethods AS target
JOIN v_bronze_PaymentMethods AS source 
    ON target.PaymentMethodID = source.PaymentMethodID
WHERE target.is_active = true  -- Only update active rows
    AND (
        target.PaymentMethodName <> source.PaymentMethodName -- Name has changed
        OR target.last_updated <> source.last_updated -- Update timestamp has changed
    );

In [None]:
%%sql
-- SCD Type 2: Drop old rows after update (which have is_active = 1)
INSERT OVERWRITE silver.PaymentMethods
SELECT * 
FROM silver.PaymentMethods AS target
WHERE NOT EXISTS (
    SELECT 1 
    FROM v_bronze_PaymentMethods AS source 
    WHERE target.PaymentMethodID = source.PaymentMethodID
    AND target.is_active = true
    AND (
        target.PaymentMethodName <> source.PaymentMethodName
        OR target.last_updated <> source.last_updated
    )
);

In [None]:
%%sql
-- SCD Type 2: Insert new rows (new records)  
WITH max_key AS (  
    --Retrieve the maximum existing PaymentMethodKey in the Silver table  
    SELECT COALESCE(MAX(PaymentMethodKey), 0) AS max_id FROM silver.PaymentMethods  
),  
new_data AS (  
    --Create a list of new records to be inserted into the Silver table  
    SELECT   
        source.PaymentMethodID,  
        source.PaymentMethodName,  
        source.last_updated,  
        FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS start_date, -- Start date of validity  
        NULL AS end_date,  -- NULL value for end_date  
        true AS is_active,  -- Mark the new record as active  
        ROW_NUMBER() OVER (ORDER BY source.PaymentMethodID) AS rn -- Assign row numbers to new records rn
    FROM v_bronze_PaymentMethods AS source  
    LEFT JOIN silver.PaymentMethods AS target   
        ON source.PaymentMethodID = target.PaymentMethodID  
        AND target.is_active = true  -- Compare only with currently active records  
    WHERE target.PaymentMethodID IS NULL  -- New records that do not exist in the Silver table  
       OR target.PaymentMethodName <> source.PaymentMethodName  -- Name has changed  
       OR target.last_updated <> source.last_updated  -- Last updated timestamp has changed  
)  
-- Insert new data into the Silver table  
INSERT INTO silver.PaymentMethods  
SELECT   
    max_key.max_id + new_data.rn AS PaymentMethodKey, -- New surrogate key
    new_data.PaymentMethodID,  
    new_data.PaymentMethodName,  
    new_data.last_updated,  
    new_data.start_date,  
    new_data.end_date,  
    new_data.is_active  
FROM new_data  
CROSS JOIN max_key;  


In [None]:
# Write to a Parquet file in the Silver layer of the storage account
df_PaymentMethods = spark.sql("SELECT * FROM silver.PaymentMethods")
df_PaymentMethods.repartition(1).write.mode("overwrite").parquet("abfss://final@bibik224161840.dfs.core.windows.net/silver/silver.PaymentMethods.parquet")

# 6. StockItems

In [None]:
# Read parquet file in bronze
df = spark.read.load(path='abfss://final@bibik224161840.dfs.core.windows.net/bronze/Warehouse.StockItems.parquet', format='parquet')

# Add surrogate key column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("StockItemID")
df = df.withColumn("StockItemKey", row_number().over(window_spec))

df = df.select("StockItemKey", *[col for col in df.columns if col != "StockItemKey"])

# Create a temporary view from the bronze Parquet data (df) as the source
df.createOrReplaceTempView("v_bronze_StockItems")


In [None]:
%%sql
-- Create table in Lake Database
USE silver;
CREATE TABLE IF NOT EXISTS silver.StockItems (
    StockItemKey INT, -- Surrogate Key
    StockItemID INT,
    StockItemName STRING,
    SupplierID INT,
    ColorID INT,
    UnitPackageID INT,
    OuterPackageID INT,
    Brand STRING,
    Size STRING,
    LeadTimeDays INT,
    QuantityPerOuter INT,
    IsChillerStock BOOLEAN,
    Barcode STRING,
    TaxRate DECIMAL(18,3),
    UnitPrice DECIMAL(18,2),
    RecommendedRetailPrice DECIMAL(18,2),
    TypicalWeightPerUnit DECIMAL(18,3),
    MarketingComments STRING,
    InternalComments STRING,
    Photo BINARY,
    CustomFields STRING,
    Tags STRING,
    SearchDetails STRING,
    last_updated TIMESTAMP,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    is_active BOOLEAN
);



In [None]:
%%sql
-- SCD Type 2: Insert the updated rows (setting is_active = 0 and end_date for historical rows)
INSERT INTO silver.StockItems
SELECT
    target.StockItemKey,
    target.StockItemID,
    target.StockItemName,
    target.SupplierID,
    target.ColorID,
    target.UnitPackageID,
    target.OuterPackageID,
    target.Brand,
    target.Size,
    target.LeadTimeDays,
    target.QuantityPerOuter,
    target.IsChillerStock,
    target.Barcode,
    target.TaxRate,
    target.UnitPrice,
    target.RecommendedRetailPrice,
    target.TypicalWeightPerUnit,
    target.MarketingComments,
    target.InternalComments,
    target.Photo,
    target.CustomFields,
    target.Tags,
    target.SearchDetails,
    target.last_updated,
    target.start_date,
    FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS end_date, -- Expiration date
    false AS is_active
FROM silver.StockItems AS target
JOIN v_bronze_StockItems AS source
    ON target.StockItemID = source.StockItemID
WHERE target.is_active = true
    AND (
        target.StockItemName <> source.StockItemName
        OR target.SupplierID <> source.SupplierID
        OR target.ColorID <> source.ColorID
        OR target.UnitPackageID <> source.UnitPackageID
        OR target.OuterPackageID <> source.OuterPackageID
        OR target.Brand <> source.Brand
        OR target.Size <> source.Size
        OR target.LeadTimeDays <> source.LeadTimeDays
        OR target.QuantityPerOuter <> source.QuantityPerOuter
        OR target.IsChillerStock <> source.IsChillerStock
        OR target.Barcode <> source.Barcode
        OR target.TaxRate <> source.TaxRate
        OR target.UnitPrice <> source.UnitPrice
        OR target.RecommendedRetailPrice <> source.RecommendedRetailPrice
        OR target.TypicalWeightPerUnit <> source.TypicalWeightPerUnit
        OR target.MarketingComments <> source.MarketingComments
        OR target.InternalComments <> source.InternalComments
        OR target.Photo <> source.Photo
        OR target.CustomFields <> source.CustomFields
        OR target.Tags <> source.Tags
        OR target.SearchDetails <> source.SearchDetails
        OR target.last_updated <> source.last_updated
    );


In [None]:
%%sql
-- SCD Type 2: Drop old rows after update (which have is_active = 1)
INSERT OVERWRITE silver.StockItems
SELECT *
FROM silver.StockItems AS target
WHERE NOT EXISTS (
    SELECT 1
    FROM v_bronze_StockItems AS source
    WHERE target.StockItemID = source.StockItemID
    AND target.is_active = true
    AND (
        target.StockItemName <> source.StockItemName
        OR target.SupplierID <> source.SupplierID
        OR target.ColorID <> source.ColorID
        OR target.UnitPackageID <> source.UnitPackageID
        OR target.OuterPackageID <> source.OuterPackageID
        OR target.Brand <> source.Brand
        OR target.Size <> source.Size
        OR target.LeadTimeDays <> source.LeadTimeDays
        OR target.QuantityPerOuter <> source.QuantityPerOuter
        OR target.IsChillerStock <> source.IsChillerStock
        OR target.Barcode <> source.Barcode
        OR target.TaxRate <> source.TaxRate
        OR target.UnitPrice <> source.UnitPrice
        OR target.RecommendedRetailPrice <> source.RecommendedRetailPrice
        OR target.TypicalWeightPerUnit <> source.TypicalWeightPerUnit
        OR target.MarketingComments <> source.MarketingComments
        OR target.InternalComments <> source.InternalComments
        OR target.Photo <> source.Photo
        OR target.CustomFields <> source.CustomFields
        OR target.Tags <> source.Tags
        OR target.SearchDetails <> source.SearchDetails
        OR target.last_updated <> source.last_updated
    )
);

In [None]:
%%sql
-- SCD Type 2: Insert new rows (new records)
WITH max_key AS (  
    -- Retrieve the maximum existing StockItemKey in the Silver table  
    SELECT COALESCE(MAX(StockItemKey), 0) AS max_id FROM silver.StockItems  
),  
new_data AS (  
    -- Create a list of new records to be inserted into the Silver table  
    SELECT  
        source.StockItemID,  
        source.StockItemName,  
        source.SupplierID,  
        source.ColorID,  
        source.UnitPackageID,  
        source.OuterPackageID,  
        source.Brand,  
        source.Size,  
        source.LeadTimeDays,  
        source.QuantityPerOuter,  
        source.IsChillerStock,  
        source.Barcode,  
        source.TaxRate,  
        source.UnitPrice,  
        source.RecommendedRetailPrice,  
        source.TypicalWeightPerUnit,  
        source.MarketingComments,  
        source.InternalComments,  
        source.Photo,  
        source.CustomFields,  
        source.Tags,  
        source.SearchDetails,  
        source.last_updated,  
        FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS start_date,  
        NULL AS end_date,  
        true AS is_active,  
        ROW_NUMBER() OVER (ORDER BY source.StockItemID) AS rn  -- Assign row numbers to new records
    FROM v_bronze_StockItems AS source  
    LEFT JOIN silver.StockItems AS target  
        ON source.StockItemID = target.StockItemID  
        AND target.is_active = true  -- Compare only with currently active records  
    WHERE target.StockItemID IS NULL  -- New records that do not exist in the Silver table  
       OR target.StockItemName <> source.StockItemName  
       OR target.SupplierID <> source.SupplierID  
       OR target.ColorID <> source.ColorID  
       OR target.UnitPackageID <> source.UnitPackageID  
       OR target.OuterPackageID <> source.OuterPackageID  
       OR target.Brand <> source.Brand  
       OR target.Size <> source.Size  
       OR target.LeadTimeDays <> source.LeadTimeDays  
       OR target.QuantityPerOuter <> source.QuantityPerOuter  
       OR target.IsChillerStock <> source.IsChillerStock  
       OR target.Barcode <> source.Barcode  
       OR target.TaxRate <> source.TaxRate  
       OR target.UnitPrice <> source.UnitPrice  
       OR target.RecommendedRetailPrice <> source.RecommendedRetailPrice  
       OR target.TypicalWeightPerUnit <> source.TypicalWeightPerUnit  
       OR target.MarketingComments <> source.MarketingComments  
       OR target.InternalComments <> source.InternalComments  
       OR target.Photo <> source.Photo  
       OR target.CustomFields <> source.CustomFields  
       OR target.Tags <> source.Tags  
       OR target.SearchDetails <> source.SearchDetails  
       OR target.last_updated <> source.last_updated  
)  
-- Insert new data into the Silver table  
INSERT INTO silver.StockItems  
SELECT  
    max_key.max_id + new_data.rn AS StockItemKey, -- New surrogate key  
    new_data.StockItemID,  
    new_data.StockItemName,  
    new_data.SupplierID,  
    new_data.ColorID,  
    new_data.UnitPackageID,  
    new_data.OuterPackageID,  
    new_data.Brand,  
    new_data.Size,  
    new_data.LeadTimeDays,  
    new_data.QuantityPerOuter,  
    new_data.IsChillerStock,  
    new_data.Barcode,  
    new_data.TaxRate,  
    new_data.UnitPrice,  
    new_data.RecommendedRetailPrice,  
    new_data.TypicalWeightPerUnit,  
    new_data.MarketingComments,  
    new_data.InternalComments,  
    new_data.Photo,  
    new_data.CustomFields,  
    new_data.Tags,  
    new_data.SearchDetails,  
    new_data.last_updated,  
    new_data.start_date,  
    new_data.end_date,  
    new_data.is_active  
FROM new_data  
CROSS JOIN max_key;


In [None]:
# Write to a Parquet file in the Silver layer of the storage account
df_StockItems = spark.sql("SELECT * FROM silver.StockItems")
df_StockItems.repartition(1).write.mode("overwrite").parquet("abfss://final@bibik224161840.dfs.core.windows.net/silver/silver.StockItems.parquet")

# 7. StockGroups

In [None]:
# Read parquet file in bronze
df = spark.read.load(path='abfss://final@bibik224161840.dfs.core.windows.net/bronze/Warehouse.StockGroups.parquet', format='parquet')

# Add surrogate key column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("StockGroupID")
df = df.withColumn("StockGroupKey", row_number().over(window_spec))

df = df.select("StockGroupKey", *[col for col in df.columns if col != "StockGroupKey"])

# Create a temporary view from the bronze Parquet data (df) as the source
df.createOrReplaceTempView("v_bronze_StockGroups")

In [None]:
%%sql
-- Create table in Lake Database
USE silver;
CREATE TABLE IF NOT EXISTS silver.StockGroups (
    StockGroupKey INT, -- Surrogate Key
    StockGroupID INT,
    StockGroupName STRING,
    last_updated TIMESTAMP,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    is_active BOOLEAN
);


In [None]:
%%sql
-- SCD Type 2: Insert the updated rows (setting is_active = 0 and end_date for historical rows)
INSERT INTO silver.StockGroups
SELECT
    target.StockGroupKey,
    target.StockGroupID,
    target.StockGroupName,
    target.last_updated,
    target.start_date,
    FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS end_date, -- Expiration date
    false AS is_active
FROM silver.StockGroups AS target
JOIN v_bronze_StockGroups AS source
    ON target.StockGroupID = source.StockGroupID
WHERE target.is_active = true
    AND (
        target.StockGroupName <> source.StockGroupName
        OR target.last_updated <> source.last_updated
    );


In [None]:
%%sql
-- SCD Type 2: Drop old rows after update (which have is_active = 1)
INSERT OVERWRITE silver.StockGroups
SELECT *
FROM silver.StockGroups AS target
WHERE NOT EXISTS (
    SELECT 1
    FROM v_bronze_StockGroups AS source
    WHERE target.StockGroupID = source.StockGroupID
    AND target.is_active = true
    AND (
        target.StockGroupName <> source.StockGroupName
        OR target.last_updated <> source.last_updated
    )
);

In [None]:
%%sql
-- SCD Type 2: Insert new rows (new records)
WITH max_key AS (  
    SELECT COALESCE(MAX(StockGroupKey), 0) AS max_id FROM silver.StockGroups  
),  
new_data AS (  
    SELECT  
        source.StockGroupID,  
        source.StockGroupName,  
        source.last_updated,  
        FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS start_date,  
        NULL AS end_date,  
        true AS is_active,  
        ROW_NUMBER() OVER (ORDER BY source.StockGroupID) AS rn  
    FROM v_bronze_StockGroups AS source  
    LEFT JOIN silver.StockGroups AS target  
        ON source.StockGroupID = target.StockGroupID  
        AND target.is_active = true  
    WHERE target.StockGroupID IS NULL  
       OR target.StockGroupName <> source.StockGroupName  
       OR target.last_updated <> source.last_updated  
)  
INSERT INTO silver.StockGroups  
SELECT  
    max_key.max_id + new_data.rn AS StockGroupKey,  
    new_data.StockGroupID,  
    new_data.StockGroupName,  
    new_data.last_updated,  
    new_data.start_date,  
    new_data.end_date,  
    new_data.is_active  
FROM new_data  
CROSS JOIN max_key;

In [None]:
# Write to a Parquet file in the Silver layer of the storage account
df_StockGroups = spark.sql("SELECT * FROM silver.StockGroups")
df_StockGroups.repartition(1).write.mode("overwrite").parquet("abfss://final@bibik224161840.dfs.core.windows.net/silver/silver.StockGroups.parquet")

# 8. StockItemStockGroups

In [None]:
# Read parquet file in bronze
df = spark.read.load(path='abfss://final@bibik224161840.dfs.core.windows.net/bronze/Warehouse.StockItemStockGroups.parquet', format='parquet')

# Add surrogate key column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("StockItemStockGroupID")
df = df.withColumn("StockItemStockGroupKey", row_number().over(window_spec))

df = df.select("StockItemStockGroupKey", *[col for col in df.columns if col != "StockItemStockGroupKey"])

# Create a temporary view from the bronze Parquet data (df) as the source
df.createOrReplaceTempView("v_bronze_StockItemStockGroups")

In [None]:
%%sql
-- Create table in Lake Database
USE silver;
CREATE TABLE IF NOT EXISTS silver.StockItemStockGroups (
    StockItemStockGroupKey INT, -- Surrogate Key
    StockItemStockGroupID INT,
    StockItemID STRING,
    StockGroupID STRING,
    last_updated TIMESTAMP,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    is_active BOOLEAN
);


In [None]:
%%sql
-- SCD Type 2: Insert the updated rows (setting is_active = 0 and end_date for historical rows)
INSERT INTO silver.StockItemStockGroups
SELECT
    target.StockItemStockGroupKey,
    target.StockItemStockGroupID,
    target.StockItemID,
    target.StockGroupID,
    target.last_updated,
    target.start_date,
    FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS end_date, -- Expiration date
    false AS is_active
FROM silver.StockItemStockGroups AS target
JOIN v_bronze_StockItemStockGroups AS source
    ON target.StockItemStockGroupID = source.StockItemStockGroupID
WHERE target.is_active = true
    AND (
        target.StockItemID <> source.StockItemID
        OR target.StockGroupID <> source.StockGroupID
        OR target.last_updated <> source.last_updated
    );



In [None]:
%%sql
-- SCD Type 2: Drop old rows after update (which have is_active = 1)
INSERT OVERWRITE silver.StockItemStockGroups
SELECT *
FROM silver.StockItemStockGroups AS target
WHERE NOT EXISTS (
    SELECT 1
    FROM v_bronze_StockItemStockGroups AS source
    WHERE target.StockItemStockGroupID = source.StockItemStockGroupID
    AND target.is_active = true
    AND (
        target.StockItemID <> source.StockItemID
        OR target.StockGroupID <> source.StockGroupID
        OR target.last_updated <> source.last_updated
    )
);


In [None]:
%%sql
-- SCD Type 2: Insert new rows (new records)
WITH max_key AS (  
    SELECT COALESCE(MAX(StockItemStockGroupKey), 0) AS max_id FROM silver.StockItemStockGroups  
),  
new_data AS (  
    SELECT  
        source.StockItemStockGroupID,  
        source.StockItemID,  
        source.StockGroupID,  
        source.last_updated,  
        FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS start_date,  
        NULL AS end_date,  
        true AS is_active,  
        ROW_NUMBER() OVER (ORDER BY source.StockItemStockGroupID) AS rn  
    FROM v_bronze_StockItemStockGroups AS source  
    LEFT JOIN silver.StockItemStockGroups AS target  
        ON source.StockItemStockGroupID = target.StockItemStockGroupID  
        AND target.is_active = true  
    WHERE target.StockItemStockGroupID IS NULL  
       OR target.StockItemID <> source.StockItemID  
       OR target.StockGroupID <> source.StockGroupID  
       OR target.last_updated <> source.last_updated  
)  
INSERT INTO silver.StockItemStockGroups  
SELECT  
    max_key.max_id + new_data.rn AS StockItemStockGroupKey,  
    new_data.StockItemStockGroupID,  
    new_data.StockItemID,  
    new_data.StockGroupID,  
    new_data.last_updated,  
    new_data.start_date,  
    new_data.end_date,  
    new_data.is_active  
FROM new_data  
CROSS JOIN max_key;

In [None]:
# Write to a Parquet file in the Silver layer of the storage account
df_StockItemStockGroups = spark.sql("SELECT * FROM silver.StockItemStockGroups")
df_StockItemStockGroups.repartition(1).write.mode("overwrite").parquet("abfss://final@bibik224161840.dfs.core.windows.net/silver/silver.StockItemStockGroups.parquet")

# 9. Suppliers

In [None]:
# Read parquet file in bronze
df = spark.read.load(path='abfss://final@bibik224161840.dfs.core.windows.net/bronze/Purchasing.Suppliers.parquet', format='parquet')

# Add surrogate key column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("SupplierID")
df = df.withColumn("SupplierKey", row_number().over(window_spec))

df = df.select("SupplierKey", *[col for col in df.columns if col != "SupplierKey"])

# Create a temporary view from the bronze Parquet data (df) as the source
df.createOrReplaceTempView("v_bronze_Suppliers")

In [None]:
%%sql
-- Create table in Lake Database
USE silver;
CREATE TABLE IF NOT EXISTS silver.Suppliers (
    SupplierKey INT, -- Surrogate Key
    SupplierID INT,
    SupplierName STRING,
    SupplierCategoryID INT,
    PrimaryContactPersonID INT,
    AlternateContactPersonID INT,
    DeliveryMethodID INT,
    DeliveryCityID INT,
    PostalCityID INT,
    SupplierReference STRING,
    BankAccountName STRING,
    BankAccountBranch STRING,
    BankAccountCode STRING,
    BankAccountNumber STRING,
    BankInternationalCode STRING,
    PaymentDays INT,
    InternalComments STRING,
    PhoneNumber STRING,
    FaxNumber STRING,
    WebsiteURL STRING,
    DeliveryAddressLine1 STRING,
    DeliveryAddressLine2 STRING,
    DeliveryPostalCode STRING,
    PostalAddressLine1 STRING,
    PostalAddressLine2 STRING,
    PostalPostalCode STRING,
    last_updated TIMESTAMP,
    DeliveryLocation STRING,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    is_active BOOLEAN
);

In [None]:
%%sql
-- SCD Type 2: Insert the updated rows (setting is_active = 0 and end_date for historical rows)
INSERT INTO silver.Suppliers
SELECT
    target.SupplierKey,
    target.SupplierID,
    target.SupplierName,
    target.SupplierCategoryID,
    target.PrimaryContactPersonID,
    target.AlternateContactPersonID,
    target.DeliveryMethodID,
    target.DeliveryCityID,
    target.PostalCityID,
    target.SupplierReference,
    target.BankAccountName,
    target.BankAccountBranch,
    target.BankAccountCode,
    target.BankAccountNumber,
    target.BankInternationalCode,
    target.PaymentDays,
    target.InternalComments,
    target.PhoneNumber,
    target.FaxNumber,
    target.WebsiteURL,
    target.DeliveryAddressLine1,
    target.DeliveryAddressLine2,
    target.DeliveryPostalCode,
    target.PostalAddressLine1,
    target.PostalAddressLine2,
    target.PostalPostalCode,
    target.last_updated,
    target.DeliveryLocation,
    target.start_date,
    FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS end_date, -- Expiration date
    false AS is_active
FROM silver.Suppliers AS target
JOIN v_bronze_Suppliers AS source
    ON target.SupplierID = source.SupplierID
WHERE target.is_active = true
    AND (
        target.SupplierName <> source.SupplierName
        OR target.SupplierCategoryID <> source.SupplierCategoryID
        OR target.PrimaryContactPersonID <> source.PrimaryContactPersonID
        OR target.AlternateContactPersonID <> source.AlternateContactPersonID
        OR target.DeliveryMethodID <> source.DeliveryMethodID
        OR target.DeliveryCityID <> source.DeliveryCityID
        OR target.PostalCityID <> source.PostalCityID
        OR target.SupplierReference <> source.SupplierReference
        OR target.BankAccountName <> source.BankAccountName
        OR target.BankAccountBranch <> source.BankAccountBranch
        OR target.BankAccountCode <> source.BankAccountCode
        OR target.BankAccountNumber <> source.BankAccountNumber
        OR target.BankInternationalCode <> source.BankInternationalCode
        OR target.PaymentDays <> source.PaymentDays
        OR target.InternalComments <> source.InternalComments
        OR target.PhoneNumber <> source.PhoneNumber
        OR target.FaxNumber <> source.FaxNumber
        OR target.WebsiteURL <> source.WebsiteURL
        OR target.DeliveryAddressLine1 <> source.DeliveryAddressLine1
        OR target.DeliveryAddressLine2 <> source.DeliveryAddressLine2
        OR target.DeliveryPostalCode <> source.DeliveryPostalCode
        OR target.PostalAddressLine1 <> source.PostalAddressLine1
        OR target.PostalAddressLine2 <> source.PostalAddressLine2
        OR target.PostalPostalCode <> source.PostalPostalCode
        OR target.DeliveryLocation <> source.DeliveryLocation
        OR target.last_updated <> source.last_updated
    );



In [None]:
%%sql
-- SCD Type 2: Drop old rows after update (which have is_active = 1)
INSERT OVERWRITE silver.Suppliers
SELECT *
FROM silver.Suppliers AS target
WHERE NOT EXISTS (
    SELECT 1
    FROM v_bronze_Suppliers AS source
    WHERE target.SupplierID = source.SupplierID
    AND target.is_active = true
    AND (
        target.SupplierName <> source.SupplierName
        OR target.SupplierCategoryID <> source.SupplierCategoryID
        OR target.PrimaryContactPersonID <> source.PrimaryContactPersonID
        OR target.AlternateContactPersonID <> source.AlternateContactPersonID
        OR target.DeliveryMethodID <> source.DeliveryMethodID
        OR target.DeliveryCityID <> source.DeliveryCityID
        OR target.PostalCityID <> source.PostalCityID
        OR target.SupplierReference <> source.SupplierReference
        OR target.BankAccountName <> source.BankAccountName
        OR target.BankAccountBranch <> source.BankAccountBranch
        OR target.BankAccountCode <> source.BankAccountCode
        OR target.BankAccountNumber <> source.BankAccountNumber
        OR target.BankInternationalCode <> source.BankInternationalCode
        OR target.PaymentDays <> source.PaymentDays
        OR target.InternalComments <> source.InternalComments
        OR target.PhoneNumber <> source.PhoneNumber
        OR target.FaxNumber <> source.FaxNumber
        OR target.WebsiteURL <> source.WebsiteURL
        OR target.DeliveryAddressLine1 <> source.DeliveryAddressLine1
        OR target.DeliveryAddressLine2 <> source.DeliveryAddressLine2
        OR target.DeliveryPostalCode <> source.DeliveryPostalCode
        OR target.PostalAddressLine1 <> source.PostalAddressLine1
        OR target.PostalAddressLine2 <> source.PostalAddressLine2
        OR target.PostalPostalCode <> source.PostalPostalCode
        OR target.DeliveryLocation <> source.DeliveryLocation
        OR target.last_updated <> source.last_updated
    )
);

In [None]:
%%sql
-- SCD Type 2: Insert new rows (new records)
WITH max_key AS (  
    -- Retrieve the maximum existing SupplierKey in the Silver table  
    SELECT COALESCE(MAX(SupplierKey), 0) AS max_id FROM silver.Suppliers  
),  
new_data AS (  
    -- Create a list of new records to be inserted into the Silver table  
    SELECT  
        source.SupplierID,  
        source.SupplierName,  
        source.SupplierCategoryID,  
        source.PrimaryContactPersonID,  
        source.AlternateContactPersonID,  
        source.DeliveryMethodID,  
        source.DeliveryCityID,  
        source.PostalCityID,  
        source.SupplierReference,  
        source.BankAccountName,  
        source.BankAccountBranch,  
        source.BankAccountCode,  
        source.BankAccountNumber,  
        source.BankInternationalCode,  
        source.PaymentDays,  
        source.InternalComments,  
        source.PhoneNumber,  
        source.FaxNumber,  
        source.WebsiteURL,  
        source.DeliveryAddressLine1,  
        source.DeliveryAddressLine2,  
        source.DeliveryPostalCode,  
        source.PostalAddressLine1,  
        source.PostalAddressLine2,  
        source.PostalPostalCode,  
        source.last_updated,  
        source.DeliveryLocation,  
        FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS start_date,  
        NULL AS end_date,  
        true AS is_active,  
        ROW_NUMBER() OVER (ORDER BY source.SupplierID) AS rn  -- Assign row numbers to new records
    FROM v_bronze_Suppliers AS source  
    LEFT JOIN silver.Suppliers AS target  
        ON source.SupplierID = target.SupplierID  
        AND target.is_active = true  -- Compare only with currently active records  
    WHERE target.SupplierID IS NULL  -- New records that do not exist in the Silver table  
       OR target.SupplierName <> source.SupplierName  -- Name has changed  
       OR target.SupplierCategoryID <> source.SupplierCategoryID  
       OR target.PrimaryContactPersonID <> source.PrimaryContactPersonID  
       OR target.AlternateContactPersonID <> source.AlternateContactPersonID  
       OR target.DeliveryMethodID <> source.DeliveryMethodID  
       OR target.DeliveryCityID <> source.DeliveryCityID  
       OR target.PostalCityID <> source.PostalCityID  
       OR target.SupplierReference <> source.SupplierReference  
       OR target.BankAccountName <> source.BankAccountName  
       OR target.BankAccountBranch <> source.BankAccountBranch  
       OR target.BankAccountCode <> source.BankAccountCode  
       OR target.BankAccountNumber <> source.BankAccountNumber  
       OR target.BankInternationalCode <> source.BankInternationalCode  
       OR target.PaymentDays <> source.PaymentDays  
       OR target.InternalComments <> source.InternalComments  
       OR target.PhoneNumber <> source.PhoneNumber  
       OR target.FaxNumber <> source.FaxNumber  
       OR target.WebsiteURL <> source.WebsiteURL  
       OR target.DeliveryAddressLine1 <> source.DeliveryAddressLine1  
       OR target.DeliveryAddressLine2 <> source.DeliveryAddressLine2  
       OR target.DeliveryPostalCode <> source.DeliveryPostalCode  
       OR target.PostalAddressLine1 <> source.PostalAddressLine1  
       OR target.PostalAddressLine2 <> source.PostalAddressLine2  
       OR target.PostalPostalCode <> source.PostalPostalCode  
       OR target.DeliveryLocation <> source.DeliveryLocation  
       OR target.last_updated <> source.last_updated  
)  
-- Insert new data into the Silver table  
INSERT INTO silver.Suppliers  
SELECT  
    max_key.max_id + new_data.rn AS SupplierKey, -- New surrogate key  
    new_data.SupplierID,  
    new_data.SupplierName,  
    new_data.SupplierCategoryID,  
    new_data.PrimaryContactPersonID,  
    new_data.AlternateContactPersonID,  
    new_data.DeliveryMethodID,  
    new_data.DeliveryCityID,  
    new_data.PostalCityID,  
    new_data.SupplierReference,  
    new_data.BankAccountName,  
    new_data.BankAccountBranch,  
    new_data.BankAccountCode,  
    new_data.BankAccountNumber,  
    new_data.BankInternationalCode,  
    new_data.PaymentDays,  
    new_data.InternalComments,  
    new_data.PhoneNumber,  
    new_data.FaxNumber,  
    new_data.WebsiteURL,  
    new_data.DeliveryAddressLine1,  
    new_data.DeliveryAddressLine2,  
    new_data.DeliveryPostalCode,  
    new_data.PostalAddressLine1,  
    new_data.PostalAddressLine2,  
    new_data.PostalPostalCode,  
    new_data.last_updated,  
    new_data.DeliveryLocation,  
    new_data.start_date,  
    new_data.end_date,  
    new_data.is_active  
FROM new_data  
CROSS JOIN max_key;


In [None]:
df_Suppliers = spark.sql("SELECT * FROM silver.Suppliers")
df_Suppliers.repartition(1).write.mode("overwrite").parquet("abfss://final@bibik224161840.dfs.core.windows.net/silver/silver.Suppliers.parquet")

# 10. SupplierTransactions

In [None]:
# Read parquet file in bronze
df = spark.read.load(path='abfss://final@bibik224161840.dfs.core.windows.net/bronze/Purchasing.SupplierTransactions.parquet', format='parquet')

# Add surrogate key column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("SupplierTransactionID")
df = df.withColumn("SupplierTransactionKey", row_number().over(window_spec))

df = df.select("SupplierTransactionKey", *[col for col in df.columns if col != "SupplierTransactionKey"])

# Create a temporary view from the bronze Parquet data (df) as the source
df.createOrReplaceTempView("v_bronze_SupplierTransactions")

In [None]:
%%sql
-- Create table in Lake Database
USE silver;
CREATE TABLE IF NOT EXISTS silver.SupplierTransactions (
    SupplierTransactionKey INT, -- Surrogate Key
    SupplierTransactionID INT,
    SupplierID INT,
    TransactionTypeID INT,
    PurchaseOrderID INT,
    PaymentMethodID INT,
    SupplierInvoiceNumber STRING,
    TransactionDate DATE,
    AmountExcludingTax DECIMAL(18,2),
    TaxAmount DECIMAL(18,2),
    TransactionAmount DECIMAL(18,2),
    OutstandingBalance DECIMAL(18,2),
    FinalizationDate DATE,
    IsFinalized BOOLEAN,
    last_updated TIMESTAMP,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    is_active BOOLEAN
);

In [None]:
%%sql
-- SCD Type 2: Insert the updated rows (setting is_active = 0 and end_date for historical rows)
INSERT INTO silver.SupplierTransactions
SELECT
    target.SupplierTransactionKey,
    target.SupplierTransactionID,
    target.SupplierID,
    target.TransactionTypeID,
    target.PurchaseOrderID,
    target.PaymentMethodID,
    target.SupplierInvoiceNumber,
    target.TransactionDate,
    target.AmountExcludingTax,
    target.TaxAmount,
    target.TransactionAmount,
    target.OutstandingBalance,
    target.FinalizationDate,
    target.IsFinalized,
    target.last_updated,
    target.start_date,
    FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS end_date, -- Expiration date
    false AS is_active
FROM silver.SupplierTransactions AS target
JOIN v_bronze_SupplierTransactions AS source
    ON target.SupplierTransactionID = source.SupplierTransactionID
WHERE target.is_active = true
    AND (
        target.SupplierID <> source.SupplierID
        OR target.TransactionTypeID <> source.TransactionTypeID
        OR target.PurchaseOrderID <> source.PurchaseOrderID
        OR target.PaymentMethodID <> source.PaymentMethodID
        OR target.SupplierInvoiceNumber <> source.SupplierInvoiceNumber
        OR target.TransactionDate <> source.TransactionDate
        OR target.AmountExcludingTax <> source.AmountExcludingTax
        OR target.TaxAmount <> source.TaxAmount
        OR target.TransactionAmount <> source.TransactionAmount
        OR target.OutstandingBalance <> source.OutstandingBalance
        OR target.FinalizationDate <> source.FinalizationDate
        OR target.IsFinalized <> source.IsFinalized
        OR target.last_updated <> source.last_updated
    );

In [None]:
%%sql
-- SCD Type 2: Drop old rows after update (which have is_active = 1)
INSERT OVERWRITE silver.SupplierTransactions
SELECT *
FROM silver.SupplierTransactions AS target
WHERE NOT EXISTS (
    SELECT 1
    FROM v_bronze_SupplierTransactions AS source
    WHERE target.SupplierTransactionID = source.SupplierTransactionID
    AND target.is_active = true
    AND (
        target.SupplierID <> source.SupplierID
        OR target.TransactionTypeID <> source.TransactionTypeID
        OR target.PurchaseOrderID <> source.PurchaseOrderID
        OR target.PaymentMethodID <> source.PaymentMethodID
        OR target.SupplierInvoiceNumber <> source.SupplierInvoiceNumber
        OR target.TransactionDate <> source.TransactionDate
        OR target.AmountExcludingTax <> source.AmountExcludingTax
        OR target.TaxAmount <> source.TaxAmount
        OR target.TransactionAmount <> source.TransactionAmount
        OR target.OutstandingBalance <> source.OutstandingBalance
        OR target.FinalizationDate <> source.FinalizationDate
        OR target.IsFinalized <> source.IsFinalized
        OR target.last_updated <> source.last_updated
    )
);

In [None]:
%%sql
-- SCD Type 2: Insert new rows (new records)
WITH max_key AS (  
    SELECT COALESCE(MAX(SupplierTransactionKey), 0) AS max_id FROM silver.SupplierTransactions  
),  
new_data AS (  
    SELECT  
        source.SupplierTransactionID,  
        source.SupplierID,  
        source.TransactionTypeID,  
        source.PurchaseOrderID,  
        source.PaymentMethodID,  
        source.SupplierInvoiceNumber,  
        source.TransactionDate,  
        source.AmountExcludingTax,  
        source.TaxAmount,  
        source.TransactionAmount,  
        source.OutstandingBalance,  
        source.FinalizationDate,  
        source.IsFinalized,  
        source.last_updated,  
        FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS start_date,  
        NULL AS end_date,  
        true AS is_active,  
        ROW_NUMBER() OVER (ORDER BY source.SupplierTransactionID) AS rn  
    FROM v_bronze_SupplierTransactions AS source  
    LEFT JOIN silver.SupplierTransactions AS target  
        ON source.SupplierTransactionID = target.SupplierTransactionID  
        AND target.is_active = true  
    WHERE target.SupplierTransactionID IS NULL  
       OR target.SupplierID <> source.SupplierID  
       OR target.TransactionTypeID <> source.TransactionTypeID  
       OR target.PurchaseOrderID <> source.PurchaseOrderID  
       OR target.PaymentMethodID <> source.PaymentMethodID  
       OR target.SupplierInvoiceNumber <> source.SupplierInvoiceNumber  
       OR target.TransactionDate <> source.TransactionDate  
       OR target.AmountExcludingTax <> source.AmountExcludingTax  
       OR target.TaxAmount <> source.TaxAmount  
       OR target.TransactionAmount <> source.TransactionAmount  
       OR target.OutstandingBalance <> source.OutstandingBalance  
       OR target.FinalizationDate <> source.FinalizationDate  
       OR target.IsFinalized <> source.IsFinalized  
       OR target.last_updated <> source.last_updated  
)  
INSERT INTO silver.SupplierTransactions  
SELECT  
    max_key.max_id + new_data.rn AS SupplierTransactionKey,  
    new_data.SupplierTransactionID,  
    new_data.SupplierID,  
    new_data.TransactionTypeID,  
    new_data.PurchaseOrderID,  
    new_data.PaymentMethodID,  
    new_data.SupplierInvoiceNumber,  
    new_data.TransactionDate,  
    new_data.AmountExcludingTax,  
    new_data.TaxAmount,  
    new_data.TransactionAmount,  
    new_data.OutstandingBalance,  
    new_data.FinalizationDate,  
    new_data.IsFinalized,  
    new_data.last_updated,  
    new_data.start_date,  
    new_data.end_date,  
    new_data.is_active  
FROM new_data  
CROSS JOIN max_key;

In [None]:
# Write to a Parquet file in the Silver layer of the storage account
df_SupplierTransactions = spark.sql("SELECT * FROM silver.SupplierTransactions")
df_SupplierTransactions.repartition(1).write.mode("overwrite").parquet("abfss://final@bibik224161840.dfs.core.windows.net/silver/silver.SupplierTransactions.parquet")

# 11. People

In [None]:
# Read parquet file in bronze
df = spark.read.load(path='abfss://final@bibik224161840.dfs.core.windows.net/bronze/Application.People.parquet', format='parquet')

# Add surrogate key column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("PersonID")
df = df.withColumn("PersonKey", row_number().over(window_spec))

df = df.select("PersonKey", *[col for col in df.columns if col != "PersonKey"])

# Create a temporary view from the bronze Parquet data (df) as the source
df.createOrReplaceTempView("v_bronze_People")

In [None]:
%%sql
-- Create table in Lake Database
USE silver;
CREATE TABLE IF NOT EXISTS silver.People (
    PersonKey INT, -- Surrogate Key
    PersonID INT,
    FullName STRING,
    PreferredName STRING,
    SearchName STRING,
    IsPermittedToLogon BOOLEAN,
    LogonName STRING,
    IsExternalLogonProvider BOOLEAN,
    HashedPassword BINARY,
    IsSystemUser BOOLEAN,
    IsEmployee BOOLEAN,
    IsSalesperson BOOLEAN,
    UserPreferences STRING,
    PhoneNumber STRING,
    FaxNumber STRING,
    EmailAddress STRING,
    Photo BINARY,
    CustomFields STRING,
    OtherLanguages STRING,
    last_updated TIMESTAMP,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    is_active BOOLEAN
);

In [None]:
%%sql
-- SCD Type 2: Insert the updated rows (setting is_active = 0 and end_date for historical rows)
INSERT INTO silver.People
SELECT
    target.PersonKey,
    target.PersonID,
    target.FullName,
    target.PreferredName,
    target.SearchName,
    target.IsPermittedToLogon,
    target.LogonName,
    target.IsExternalLogonProvider,
    target.HashedPassword,
    target.IsSystemUser,
    target.IsEmployee,
    target.IsSalesperson,
    target.UserPreferences,
    target.PhoneNumber,
    target.FaxNumber,
    target.EmailAddress,
    target.Photo,
    target.CustomFields,
    target.OtherLanguages,
    target.last_updated,
    target.start_date,
    FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS end_date, -- Expiration date
    false AS is_active
FROM silver.People AS target
JOIN v_bronze_People AS source
    ON target.PersonID = source.PersonID
WHERE target.is_active = true
    AND (
        target.FullName <> source.FullName
        OR target.PreferredName <> source.PreferredName
        OR target.SearchName <> source.SearchName
        OR target.IsPermittedToLogon <> source.IsPermittedToLogon
        OR target.LogonName <> source.LogonName
        OR target.IsExternalLogonProvider <> source.IsExternalLogonProvider
        OR target.HashedPassword <> source.HashedPassword
        OR target.IsSystemUser <> source.IsSystemUser
        OR target.IsEmployee <> source.IsEmployee
        OR target.IsSalesperson <> source.IsSalesperson
        OR target.UserPreferences <> source.UserPreferences
        OR target.PhoneNumber <> source.PhoneNumber
        OR target.FaxNumber <> source.FaxNumber
        OR target.EmailAddress <> source.EmailAddress
        OR target.Photo <> source.Photo
        OR target.CustomFields <> source.CustomFields
        OR target.OtherLanguages <> source.OtherLanguages
        OR target.last_updated <> source.last_updated
    );

In [None]:
%%sql
-- SCD Type 2: Drop old rows after update (which have is_active = 1)
INSERT OVERWRITE silver.People
SELECT *
FROM silver.People AS target
WHERE NOT EXISTS (
    SELECT 1
    FROM v_bronze_People AS source
    WHERE target.PersonID = source.PersonID
    AND target.is_active = true
    AND (
        target.FullName <> source.FullName
        OR target.PreferredName <> source.PreferredName
        OR target.SearchName <> source.SearchName
        OR target.IsPermittedToLogon <> source.IsPermittedToLogon
        OR target.LogonName <> source.LogonName
        OR target.IsExternalLogonProvider <> source.IsExternalLogonProvider
        OR target.HashedPassword <> source.HashedPassword
        OR target.IsSystemUser <> source.IsSystemUser
        OR target.IsEmployee <> source.IsEmployee
        OR target.IsSalesperson <> source.IsSalesperson
        OR target.UserPreferences <> source.UserPreferences
        OR target.PhoneNumber <> source.PhoneNumber
        OR target.FaxNumber <> source.FaxNumber
        OR target.EmailAddress <> source.EmailAddress
        OR target.Photo <> source.Photo
        OR target.CustomFields <> source.CustomFields
        OR target.OtherLanguages <> source.OtherLanguages
        OR target.last_updated <> source.last_updated
    )
);

In [None]:
%%sql
-- SCD Type 2: Insert new rows (new records)
WITH max_key AS (  
    SELECT COALESCE(MAX(PersonKey), 0) AS max_id FROM silver.People  
),  
new_data AS (  
    SELECT  
        source.PersonID,  
        source.FullName,  
        source.PreferredName,  
        source.SearchName,  
        source.IsPermittedToLogon,  
        source.LogonName,  
        source.IsExternalLogonProvider,  
        source.HashedPassword,  
        source.IsSystemUser,  
        source.IsEmployee,  
        source.IsSalesperson,  
        source.UserPreferences,  
        source.PhoneNumber,  
        source.FaxNumber,  
        source.EmailAddress,  
        source.Photo,  
        source.CustomFields,  
        source.OtherLanguages,  
        source.last_updated,  
        FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS start_date,  
        NULL AS end_date,  
        true AS is_active,  
        ROW_NUMBER() OVER (ORDER BY source.PersonID) AS rn  
    FROM v_bronze_People AS source  
    LEFT JOIN silver.People AS target  
        ON source.PersonID = target.PersonID  
        AND target.is_active = true  
    WHERE target.PersonID IS NULL  
       OR target.FullName <> source.FullName  
       OR target.PreferredName <> source.PreferredName  
       OR target.SearchName <> source.SearchName  
       OR target.IsPermittedToLogon <> source.IsPermittedToLogon  
       OR target.LogonName <> source.LogonName  
       OR target.IsExternalLogonProvider <> source.IsExternalLogonProvider  
       OR target.HashedPassword <> source.HashedPassword  
       OR target.IsSystemUser <> source.IsSystemUser  
       OR target.IsEmployee <> source.IsEmployee  
       OR target.IsSalesperson <> source.IsSalesperson  
       OR target.UserPreferences <> source.UserPreferences  
       OR target.PhoneNumber <> source.PhoneNumber  
       OR target.FaxNumber <> source.FaxNumber  
       OR target.EmailAddress <> source.EmailAddress  
       OR target.Photo <> source.Photo  
       OR target.CustomFields <> source.CustomFields  
       OR target.OtherLanguages <> source.OtherLanguages  
       OR target.last_updated <> source.last_updated  
)  
INSERT INTO silver.People  
SELECT  
    max_key.max_id + new_data.rn AS PersonKey,  
    new_data.PersonID,  
    new_data.FullName,  
    new_data.PreferredName,  
    new_data.SearchName,  
    new_data.IsPermittedToLogon,  
    new_data.LogonName,  
    new_data.IsExternalLogonProvider,  
    new_data.HashedPassword,  
    new_data.IsSystemUser,  
    new_data.IsEmployee,  
    new_data.IsSalesperson,  
    new_data.UserPreferences,  
    new_data.PhoneNumber,  
    new_data.FaxNumber,  
    new_data.EmailAddress,  
    new_data.Photo,  
    new_data.CustomFields,  
    new_data.OtherLanguages,  
    new_data.last_updated,  
    new_data.start_date,  
    new_data.end_date,  
    new_data.is_active  
FROM new_data  
CROSS JOIN max_key;


In [None]:
# Write to a Parquet file in the Silver layer of the storage account
df_People = spark.sql("SELECT * FROM silver.People")
df_People.repartition(1).write.mode("overwrite").parquet("abfss://final@bibik224161840.dfs.core.windows.net/silver/silver.People.parquet")

# 12. PurchaseOrders

In [3]:
# Read parquet file in bronze
df = spark.read.load(path='abfss://final@bibik224161840.dfs.core.windows.net/bronze/Purchasing.PurchaseOrders.parquet', format='parquet')

# Add surrogate key column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("PurchaseOrderID")
df = df.withColumn("PurchaseOrderKey", row_number().over(window_spec))

df = df.select("PurchaseOrderKey", *[col for col in df.columns if col != "PurchaseOrderKey"])

# Create a temporary view from the bronze Parquet data (df) as the source
df.createOrReplaceTempView("v_bronze_PurchaseOrders")

StatementMeta(sparkpool, 72, 2, Finished, Available, Finished)

In [4]:
%%sql
-- Create table in Lake Database
USE silver;
CREATE TABLE IF NOT EXISTS silver.PurchaseOrders (
    PurchaseOrderKey INT, -- Surrogate Key
    PurchaseOrderID INT,
    SupplierID INT,
    OrderDate TIMESTAMP,
    DeliveryMethodID INT,
    ContactPersonID INT,
    ExpectedDeliveryDate TIMESTAMP,
    SupplierReference STRING,
    IsOrderFinalized BOOLEAN,
    Comments STRING,
    InternalComments STRING,
    last_updated TIMESTAMP,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    is_active BOOLEAN
);


StatementMeta(sparkpool, 72, 4, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [5]:
%%sql
-- SCD Type 2: Insert the updated rows (setting is_active = 0 and end_date for historical rows)
INSERT INTO silver.PurchaseOrders
SELECT
    target.PurchaseOrderKey,
    target.PurchaseOrderID,
    target.SupplierID,
    target.OrderDate,
    target.DeliveryMethodID,
    target.ContactPersonID,
    target.ExpectedDeliveryDate,
    target.SupplierReference,
    target.IsOrderFinalized,
    target.Comments,
    target.InternalComments,
    target.last_updated,
    target.start_date,
    FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS end_date, -- Expiration date
    false AS is_active
FROM silver.PurchaseOrders AS target
JOIN v_bronze_PurchaseOrders AS source
    ON target.PurchaseOrderID = source.PurchaseOrderID
WHERE target.is_active = true
    AND (
        target.SupplierID <> source.SupplierID
        OR target.OrderDate <> source.OrderDate
        OR target.DeliveryMethodID <> source.DeliveryMethodID
        OR target.ContactPersonID <> source.ContactPersonID
        OR target.ExpectedDeliveryDate <> source.ExpectedDeliveryDate
        OR target.SupplierReference <> source.SupplierReference
        OR target.IsOrderFinalized <> source.IsOrderFinalized
        OR target.Comments <> source.Comments
        OR target.InternalComments <> source.InternalComments
        OR target.last_updated <> source.last_updated
    );

StatementMeta(sparkpool, 72, 5, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [6]:
%%sql
-- SCD Type 2: Drop old rows after update (which have is_active = 1)
INSERT OVERWRITE silver.PurchaseOrders
SELECT *
FROM silver.PurchaseOrders AS target
WHERE NOT EXISTS (
    SELECT 1
    FROM v_bronze_PurchaseOrders AS source
    WHERE target.PurchaseOrderID = source.PurchaseOrderID
    AND target.is_active = true
    AND (
        target.SupplierID <> source.SupplierID
        OR target.OrderDate <> source.OrderDate
        OR target.DeliveryMethodID <> source.DeliveryMethodID
        OR target.ContactPersonID <> source.ContactPersonID
        OR target.ExpectedDeliveryDate <> source.ExpectedDeliveryDate
        OR target.SupplierReference <> source.SupplierReference
        OR target.IsOrderFinalized <> source.IsOrderFinalized
        OR target.Comments <> source.Comments
        OR target.InternalComments <> source.InternalComments
        OR target.last_updated <> source.last_updated
    )
);


StatementMeta(sparkpool, 72, 6, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [7]:
%%sql
-- SCD Type 2: Insert new rows (new records)
WITH max_key AS (  
    SELECT COALESCE(MAX(PurchaseOrderKey), 0) AS max_id FROM silver.PurchaseOrders  
),  
new_data AS (  
    SELECT  
        source.PurchaseOrderID,  
        source.SupplierID,  
        source.OrderDate,  
        source.DeliveryMethodID,  
        source.ContactPersonID,  
        source.ExpectedDeliveryDate,  
        source.SupplierReference,  
        source.IsOrderFinalized,  
        source.Comments,  
        source.InternalComments,  
        source.last_updated,  
        FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS start_date,  
        NULL AS end_date,  
        true AS is_active,  
        ROW_NUMBER() OVER (ORDER BY source.PurchaseOrderID) AS rn  
    FROM v_bronze_PurchaseOrders AS source  
    LEFT JOIN silver.PurchaseOrders AS target  
        ON source.PurchaseOrderID = target.PurchaseOrderID  
        AND target.is_active = true  
    WHERE target.PurchaseOrderID IS NULL  
       OR target.SupplierID <> source.SupplierID  
       OR target.OrderDate <> source.OrderDate  
       OR target.DeliveryMethodID <> source.DeliveryMethodID  
       OR target.ContactPersonID <> source.ContactPersonID  
       OR target.ExpectedDeliveryDate <> source.ExpectedDeliveryDate  
       OR target.SupplierReference <> source.SupplierReference  
       OR target.IsOrderFinalized <> source.IsOrderFinalized  
       OR target.Comments <> source.Comments  
       OR target.InternalComments <> source.InternalComments  
       OR target.last_updated <> source.last_updated  
)  
INSERT INTO silver.PurchaseOrders  
SELECT  
    max_key.max_id + new_data.rn AS PurchaseOrderKey,  
    new_data.PurchaseOrderID,  
    new_data.SupplierID,  
    new_data.OrderDate,  
    new_data.DeliveryMethodID,  
    new_data.ContactPersonID,  
    new_data.ExpectedDeliveryDate,  
    new_data.SupplierReference,  
    new_data.IsOrderFinalized,  
    new_data.Comments,  
    new_data.InternalComments,  
    new_data.last_updated,  
    new_data.start_date,  
    new_data.end_date,  
    new_data.is_active  
FROM new_data  
CROSS JOIN max_key;

StatementMeta(sparkpool, 72, 7, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [8]:
# Write to a Parquet file in the Silver layer of the storage account
df_PurchaseOrders = spark.sql("SELECT * FROM silver.PurchaseOrders")
df_PurchaseOrders.repartition(1).write.mode("overwrite").parquet("abfss://final@bibik224161840.dfs.core.windows.net/silver/silver.PurchaseOrders.parquet")

StatementMeta(sparkpool, 72, 8, Finished, Available, Finished)

# 13. PurchaseOrderLines

In [9]:
# Read parquet file in bronze
df = spark.read.load(path='abfss://final@bibik224161840.dfs.core.windows.net/bronze/Purchasing.PurchaseOrderLines.parquet', format='parquet')

# Add surrogate key column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("PurchaseOrderLineID")
df = df.withColumn("PurchaseOrderLineKey", row_number().over(window_spec))

df = df.select("PurchaseOrderLineKey", *[col for col in df.columns if col != "PurchaseOrderLineKey"])

# Create a temporary view from the bronze Parquet data (df) as the source
df.createOrReplaceTempView("v_bronze_PurchaseOrderLines")

StatementMeta(sparkpool, 72, 9, Finished, Available, Finished)

In [10]:
%%sql
-- Create table in Lake Database
USE silver;
CREATE TABLE IF NOT EXISTS silver.PurchaseOrderLines (
    PurchaseOrderLineKey INT, -- Surrogate Key
    PurchaseOrderLineID INT,
    PurchaseOrderID INT,
    StockItemID INT,
    OrderedOuters INT,
    Description STRING,
    ReceivedOuters INT,
    PackageTypeID INT,
    ExpectedUnitPricePerOuter DECIMAL(18,2),
    LastReceiptDate TIMESTAMP,
    IsOrderLineFinalized BOOLEAN,
    last_updated TIMESTAMP,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    is_active BOOLEAN
);

StatementMeta(sparkpool, 72, 11, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [11]:
%%sql
-- SCD Type 2: Insert the updated rows (setting is_active = 0 and end_date for historical rows)
INSERT INTO silver.PurchaseOrderLines
SELECT
    target.PurchaseOrderLineKey,
    target.PurchaseOrderLineID,
    target.PurchaseOrderID,
    target.StockItemID,
    target.OrderedOuters,
    target.Description,
    target.ReceivedOuters,
    target.PackageTypeID,
    target.ExpectedUnitPricePerOuter,
    target.LastReceiptDate,
    target.IsOrderLineFinalized,
    target.last_updated,
    target.start_date,
    FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS end_date, -- Expiration date
    false AS is_active
FROM silver.PurchaseOrderLines AS target
JOIN v_bronze_PurchaseOrderLines AS source
    ON target.PurchaseOrderLineID = source.PurchaseOrderLineID
WHERE target.is_active = true
    AND (
        target.PurchaseOrderID <> source.PurchaseOrderID
        OR target.StockItemID <> source.StockItemID
        OR target.OrderedOuters <> source.OrderedOuters
        OR target.Description <> source.Description
        OR target.ReceivedOuters <> source.ReceivedOuters
        OR target.PackageTypeID <> source.PackageTypeID
        OR target.ExpectedUnitPricePerOuter <> source.ExpectedUnitPricePerOuter
        OR target.LastReceiptDate <> source.LastReceiptDate
        OR target.IsOrderLineFinalized <> source.IsOrderLineFinalized
        OR target.last_updated <> source.last_updated
    );

StatementMeta(sparkpool, 72, 12, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [12]:
%%sql
-- SCD Type 2: Drop old rows after update (which have is_active = 1)
INSERT OVERWRITE silver.PurchaseOrderLines
SELECT *
FROM silver.PurchaseOrderLines AS target
WHERE NOT EXISTS (
    SELECT 1
    FROM v_bronze_PurchaseOrderLines AS source
    WHERE target.PurchaseOrderLineID = source.PurchaseOrderLineID
    AND target.is_active = true
    AND (
        target.PurchaseOrderID <> source.PurchaseOrderID
        OR target.StockItemID <> source.StockItemID
        OR target.OrderedOuters <> source.OrderedOuters
        OR target.Description <> source.Description
        OR target.ReceivedOuters <> source.ReceivedOuters
        OR target.PackageTypeID <> source.PackageTypeID
        OR target.ExpectedUnitPricePerOuter <> source.ExpectedUnitPricePerOuter
        OR target.LastReceiptDate <> source.LastReceiptDate
        OR target.IsOrderLineFinalized <> source.IsOrderLineFinalized
        OR target.last_updated <> source.last_updated
    )
);

StatementMeta(sparkpool, 72, 13, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [13]:
%%sql
-- SCD Type 2: Insert new rows (new records)
WITH max_key AS (  
    SELECT COALESCE(MAX(PurchaseOrderLineKey), 0) AS max_id FROM silver.PurchaseOrderLines  
),  
new_data AS (  
    SELECT  
        source.PurchaseOrderLineID,  
        source.PurchaseOrderID,  
        source.StockItemID,  
        source.OrderedOuters,  
        source.Description,  
        source.ReceivedOuters,  
        source.PackageTypeID,  
        source.ExpectedUnitPricePerOuter,  
        source.LastReceiptDate,  
        source.IsOrderLineFinalized,  
        source.last_updated,  
        FROM_UTC_TIMESTAMP(current_timestamp(), 'Asia/Ho_Chi_Minh') AS start_date,  
        NULL AS end_date,  
        true AS is_active,  
        ROW_NUMBER() OVER (ORDER BY source.PurchaseOrderLineID) AS rn  
    FROM v_bronze_PurchaseOrderLines AS source  
    LEFT JOIN silver.PurchaseOrderLines AS target  
        ON source.PurchaseOrderLineID = target.PurchaseOrderLineID  
        AND target.is_active = true  
    WHERE target.PurchaseOrderLineID IS NULL  
       OR target.PurchaseOrderID <> source.PurchaseOrderID  
       OR target.StockItemID <> source.StockItemID  
       OR target.OrderedOuters <> source.OrderedOuters  
       OR target.Description <> source.Description  
       OR target.ReceivedOuters <> source.ReceivedOuters  
       OR target.PackageTypeID <> source.PackageTypeID  
       OR target.ExpectedUnitPricePerOuter <> source.ExpectedUnitPricePerOuter  
       OR target.LastReceiptDate <> source.LastReceiptDate  
       OR target.IsOrderLineFinalized <> source.IsOrderLineFinalized  
       OR target.last_updated <> source.last_updated  
)  
INSERT INTO silver.PurchaseOrderLines  
SELECT  
    max_key.max_id + new_data.rn AS PurchaseOrderLineKey,  
    new_data.PurchaseOrderLineID,  
    new_data.PurchaseOrderID,  
    new_data.StockItemID,  
    new_data.OrderedOuters,  
    new_data.Description,  
    new_data.ReceivedOuters,  
    new_data.PackageTypeID,  
    new_data.ExpectedUnitPricePerOuter,  
    new_data.LastReceiptDate,  
    new_data.IsOrderLineFinalized,  
    new_data.last_updated,  
    new_data.start_date,  
    new_data.end_date,  
    new_data.is_active  
FROM new_data  
CROSS JOIN max_key;

StatementMeta(sparkpool, 72, 14, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [14]:
# Write to a Parquet file in the Silver layer of the storage account
df_PurchaseOrderLines = spark.sql("SELECT * FROM silver.PurchaseOrderLines")
df_PurchaseOrderLines.repartition(1).write.mode("overwrite").parquet("abfss://final@bibik224161840.dfs.core.windows.net/silver/silver.PurchaseOrderLines.parquet")

StatementMeta(sparkpool, 72, 15, Finished, Available, Finished)