In [0]:
from pyspark.sql.functions import date_format

In [0]:
%sql
create or replace table datamodel.default.sales (
    OrderID INT,
    OrderDate DATE,
    CustomerID INT,
    CustomerName VARCHAR(100),
    CustomerEmail VARCHAR(100),
    ProductID INT,
    ProductName VARCHAR(100),
    ProductCategory VARCHAR(50),
    RegionID INT,
    RegionName VARCHAR(50),
    Country VARCHAR(50),
    Quantity INT,
    UnitPrice DECIMAL(10,2),
    TotalAmount DECIMAL(10,2)
);

In [0]:
%sql
INSERT INTO datamodel.default.sales (OrderID, OrderDate, CustomerID, CustomerName, CustomerEmail, ProductID, ProductName, ProductCategory, RegionID, RegionName, Country, Quantity, UnitPrice, TotalAmount) 
VALUES 
(1, '2024-02-01', 101, 'Alice Johnson', 'alice@example.com', 201, 'Laptop', 'Electronics', 301, 'North America', 'USA', 2, 800.00, 1600.00),
(2, '2024-02-02', 102, 'Bob Smith', 'bob@example.com', 202, 'Smartphone', 'Electronics', 302, 'Europe', 'Germany', 1, 500.00, 500.00),
(3, '2024-02-03', 103, 'Charlie Brown', 'charlie@example.com', 203, 'Tablet', 'Electronics', 303, 'Asia', 'India', 3, 300.00, 900.00),
(4, '2024-02-04', 101, 'Alice Johnson', 'alice@example.com', 204, 'Headphones', 'Accessories', 301, 'North America', 'USA', 1, 150.00, 150.00),
(5, '2024-02-05', 104, 'David Lee', 'david@example.com', 205, 'Gaming Console', 'Electronics', 302, 'Europe', 'France', 1, 400.00, 400.00),
(6, '2024-02-06', 102, 'Bob Smith', 'bob@example.com', 206, 'Smartwatch', 'Electronics', 303, 'Asia', 'China', 2, 200.00, 400.00),
(7, '2024-02-07', 105, 'Eve Adams', 'eve@example.com', 201, 'Laptop', 'Electronics', 301, 'North America', 'Canada', 1, 800.00, 800.00),
(8, '2024-02-08', 106, 'Frank Miller', 'frank@example.com', 207, 'Monitor', 'Accessories', 302, 'Europe', 'Italy', 2, 250.00, 500.00),
(9, '2024-02-09', 107, 'Grace White', 'grace@example.com', 208, 'Keyboard', 'Accessories', 303, 'Asia', 'Japan', 3, 100.00, 300.00),
(10, '2024-02-10', 104, 'David Lee', 'david@example.com', 209, 'Mouse', 'Accessories', 301, 'North America', 'USA', 1, 50.00, 50.00);


num_affected_rows,num_inserted_rows
10,10


In [0]:
%sql
select * from datamodel.default.sales;

OrderID,OrderDate,CustomerID,CustomerName,CustomerEmail,ProductID,ProductName,ProductCategory,RegionID,RegionName,Country,Quantity,UnitPrice,TotalAmount
1,2024-02-01,101,Alice Johnson,alice@example.com,201,Laptop,Electronics,301,North America,USA,2,800.0,1600.0
2,2024-02-02,102,Bob Smith,bob@example.com,202,Smartphone,Electronics,302,Europe,Germany,1,500.0,500.0
3,2024-02-03,103,Charlie Brown,charlie@example.com,203,Tablet,Electronics,303,Asia,India,3,300.0,900.0
4,2024-02-04,101,Alice Johnson,alice@example.com,204,Headphones,Accessories,301,North America,USA,1,150.0,150.0
5,2024-02-05,104,David Lee,david@example.com,205,Gaming Console,Electronics,302,Europe,France,1,400.0,400.0
6,2024-02-06,102,Bob Smith,bob@example.com,206,Smartwatch,Electronics,303,Asia,China,2,200.0,400.0
7,2024-02-07,105,Eve Adams,eve@example.com,201,Laptop,Electronics,301,North America,Canada,1,800.0,800.0
8,2024-02-08,106,Frank Miller,frank@example.com,207,Monitor,Accessories,302,Europe,Italy,2,250.0,500.0
9,2024-02-09,107,Grace White,grace@example.com,208,Keyboard,Accessories,303,Asia,Japan,3,100.0,300.0
10,2024-02-10,104,David Lee,david@example.com,209,Mouse,Accessories,301,North America,USA,1,50.0,50.0


In [0]:
%sql
INSERT INTO datamodel.default.sales (OrderID, OrderDate, CustomerID, CustomerName, CustomerEmail, ProductID, ProductName, ProductCategory, RegionID, RegionName, Country, Quantity, UnitPrice, TotalAmount) 
VALUES 
(11, '2024-02-11', 101, 'Alice Johnson', 'alice@example.com', 201, 'Gaming Laptop', 'Electronics', 301, 'North America', 'USA', 2, 800.00, 1600.00),
(12, '2024-02-12', 102, 'Bob Smith', 'bob.smith@example.com', 230, 'Airpods', 'Electronics', 302, 'Europe', 'Germany', 1, 500.00, 500.00),
(13, '2024-02-12', 108, 'Stephen Lee', 'stephen@example.com', 205, 'Gaming Console', 'Electronics', 302, 'Europe', 'France', 1, 400.00, 400.00)

num_affected_rows,num_inserted_rows
3,3


### Staging Layer

#### Transient

In [0]:
if spark.catalog.tableExists("datamodel.dwh.stag_sales"):
    last_load_date_obj = spark.sql("select max(OrderDate) as last_load_date from datamodel.dwh.dim_dates").collect()[0]['last_load_date']
    last_load_date = last_load_date_obj.strftime("%Y-%m-%d") if last_load_date_obj else '2100-01-01'
else:
    last_load_date = '1900-01-01'

In [0]:
last_load_date

'1900-01-01'

#### Transformation

In [0]:
spark.sql(f"""
            select
              *,
              TotalAmount / 4000 as TotalAmountUSD,
              split(cast(OrderDate as string), '-')[0] as OrderYear,
              split(cast(OrderDate as string), '-')[1] as OrderMonth,
              current_date() as ProcessedDate
            from datamodel.default.sales
            where OrderID is not null
              and Quantity is not null
              and UnitPrice is not null
              and OrderDate > '{last_load_date}'
          """).createOrReplaceTempView("stag_sales_view")

In [0]:
%sql
create schema datamodel.dwh;

In [0]:
%sql
create or replace table datamodel.dwh.stag_sales
as
select * from stag_sales_view;

num_affected_rows,num_inserted_rows


In [0]:
%sql
drop table datamodel.dwh.stag_sales;

In [0]:
%sql
select * from datamodel.dwh.stag_sales;

OrderID,OrderDate,CustomerID,CustomerName,CustomerEmail,ProductID,ProductName,ProductCategory,RegionID,RegionName,Country,Quantity,UnitPrice,TotalAmount,TotalAmountUSD,OrderYear,OrderMonth,ProcessedDate
1,2024-02-01,101,Alice Johnson,alice@example.com,201,Laptop,Electronics,301,North America,USA,2,800.0,1600.0,0.4,2024,2,2025-08-13
2,2024-02-02,102,Bob Smith,bob@example.com,202,Smartphone,Electronics,302,Europe,Germany,1,500.0,500.0,0.125,2024,2,2025-08-13
3,2024-02-03,103,Charlie Brown,charlie@example.com,203,Tablet,Electronics,303,Asia,India,3,300.0,900.0,0.225,2024,2,2025-08-13
4,2024-02-04,101,Alice Johnson,alice@example.com,204,Headphones,Accessories,301,North America,USA,1,150.0,150.0,0.0375,2024,2,2025-08-13
5,2024-02-05,104,David Lee,david@example.com,205,Gaming Console,Electronics,302,Europe,France,1,400.0,400.0,0.1,2024,2,2025-08-13
6,2024-02-06,102,Bob Smith,bob@example.com,206,Smartwatch,Electronics,303,Asia,China,2,200.0,400.0,0.1,2024,2,2025-08-13
7,2024-02-07,105,Eve Adams,eve@example.com,201,Laptop,Electronics,301,North America,Canada,1,800.0,800.0,0.2,2024,2,2025-08-13
8,2024-02-08,106,Frank Miller,frank@example.com,207,Monitor,Accessories,302,Europe,Italy,2,250.0,500.0,0.125,2024,2,2025-08-13
9,2024-02-09,107,Grace White,grace@example.com,208,Keyboard,Accessories,303,Asia,Japan,3,100.0,300.0,0.075,2024,2,2025-08-13
10,2024-02-10,104,David Lee,david@example.com,209,Mouse,Accessories,301,North America,USA,1,50.0,50.0,0.0125,2024,2,2025-08-13


### Core Layer

#### dim_customers

In [0]:
%sql
create or replace table datamodel.dwh.dim_customers
as
select 
  row_number() over (order by CustomerID) as dim_CustomerKey,
  *,
  current_date() as StartDate,
  date('2300-01-01') as EndDate,
  true as isCurrent
from (select distinct
  CustomerID,
  CustomerName,
  CustomerEmail
from datamodel.dwh.stag_sales);

num_affected_rows,num_inserted_rows


#### SCD Type - 2

In [0]:
spark.sql("""
  with base as (select distinct
      CustomerID,
      CustomerName,
      CustomerEmail
    from datamodel.dwh.stag_sales)
  , base2 as (
    select 
      base.*,
      cu.CustomerID RecordFilter,
      cu.dim_CustomerKey,
      mx.max_key
    from base 
    left join datamodel.dwh.dim_customers cu
      on base.CustomerID = cu.CustomerID
    cross join (select coalesce(max(dim_CustomerKey), 0) max_key from datamodel.dwh.dim_customers) mx)
  , all_records as (select row_number() over (order by CustomerID) + max_key as dim_CustomerKey,
      CustomerID,
      CustomerName,
      CustomerEmail
    from base2 where RecordFilter is null
    union all
    select dim_CustomerKey,
      CustomerID,
      CustomerName,
      CustomerEmail from base2 
    where RecordFilter is not null)
  select
    *
  from all_records
""").createOrReplaceTempView("dim_customers_view")

In [0]:
%sql
select * from dim_customers_view;

dim_CustomerKey,CustomerID,CustomerName,CustomerEmail
8,108,Stephen Lee,stephen@example.com
1,101,Alice Johnson,alice@example.com
2,102,Bob Smith,bob.smith@example.com


In [0]:
%sql
merge into datamodel.dwh.dim_customers dst
using dim_customers_view src
on dst.CustomerID = src.CustomerID and dst.isCurrent = true
when matched 
and (dst.CustomerName <> src.CustomerName 
      or dst.CustomerEmail <> src.CustomerEmail) then 
  update set endDate = current_date(), isCurrent = false

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1,1,0,0


In [0]:
%sql
merge into datamodel.dwh.dim_customers dst
using dim_customers_view src
on dst.CustomerID = src.CustomerID and dst.isCurrent = true
when not matched then 
  insert (dim_CustomerKey, CustomerID, CustomerName, CustomerEmail, StartDate, EndDate, isCurrent) 
  values (src.dim_CustomerKey, src.CustomerID, src.CustomerName, src.CustomerEmail, current_date(), date('2300-01-01'), true)


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,0,0,2


In [0]:
%sql
select * from datamodel.dwh.dim_customers;

dim_CustomerKey,CustomerID,CustomerName,CustomerEmail,StartDate,EndDate,isCurrent
1,101,Alice Johnson,alice@example.com,2025-08-13,2300-01-01,True
3,103,Charlie Brown,charlie@example.com,2025-08-13,2300-01-01,True
4,104,David Lee,david@example.com,2025-08-13,2300-01-01,True
5,105,Eve Adams,eve@example.com,2025-08-13,2300-01-01,True
6,106,Frank Miller,frank@example.com,2025-08-13,2300-01-01,True
7,107,Grace White,grace@example.com,2025-08-13,2300-01-01,True
2,102,Bob Smith,bob@example.com,2025-08-13,2025-08-13,False
8,108,Stephen Lee,stephen@example.com,2025-08-13,2300-01-01,True
2,102,Bob Smith,bob.smith@example.com,2025-08-13,2300-01-01,True


In [0]:
%sql
describe history datamodel.dwh.dim_customers;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
11,2025-08-13T03:32:18.000Z,78362573187593,phyominnthwin@gmail.com,MERGE,"Map(predicate -> [""((CustomerID#27898 = CustomerID#25731) AND isCurrent#27903)""], clusterBy -> [], matchedPredicates -> [], statsOnLoad -> true, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,,0813-021143-gaucu8mq-v2n,10.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 3931, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 1931, materializeSourceTimeMs -> 9, numTargetRowsInserted -> 2, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 0, numTargetRowsUpdated -> 0, numOutputRows -> 2, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 3, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1835)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
10,2025-08-13T03:31:18.000Z,78362573187593,phyominnthwin@gmail.com,OPTIMIZE,"Map(predicate -> [], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,,0813-021143-gaucu8mq-v2n,9.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 2, numRemovedBytes -> 4153, p25FileSize -> 2287, numDeletionVectorsRemoved -> 1, minFileSize -> 2287, numAddedFiles -> 1, maxFileSize -> 2287, p75FileSize -> 2287, p50FileSize -> 2287, numAddedBytes -> 2287)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
9,2025-08-13T03:31:16.000Z,78362573187593,phyominnthwin@gmail.com,MERGE,"Map(predicate -> [""((CustomerID#26951 = CustomerID#25731) AND isCurrent#26956)""], clusterBy -> [], matchedPredicates -> [{""predicate"":""(NOT (CustomerName#26952 = CustomerName#25732) OR NOT (CustomerEmail#26953 = CustomerEmail#25733))"",""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [])",,,0813-021143-gaucu8mq-v2n,8.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetBytesAdded -> 1895, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 1, numTargetRowsMatchedUpdated -> 1, executionTimeMs -> 5971, materializeSourceTimeMs -> 616, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 1831, numTargetRowsUpdated -> 1, numOutputRows -> 1, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 3, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 3403)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
8,2025-08-13T03:18:52.000Z,78362573187593,phyominnthwin@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,,0813-021143-gaucu8mq-v2n,7.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 1, numRemovedBytes -> 2248, numOutputRows -> 7, numOutputBytes -> 2258)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
7,2025-08-13T03:16:55.000Z,78362573187593,phyominnthwin@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,,0813-021143-gaucu8mq-v2n,6.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 1, numRemovedBytes -> 2050, numOutputRows -> 7, numOutputBytes -> 2248)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
6,2025-08-13T03:16:23.000Z,78362573187593,phyominnthwin@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,,0813-021143-gaucu8mq-v2n,5.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 1, numRemovedBytes -> 2050, numOutputRows -> 7, numOutputBytes -> 2050)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
5,2025-08-13T03:13:25.000Z,78362573187593,phyominnthwin@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,,0813-021143-gaucu8mq-v2n,4.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 1, numRemovedBytes -> 1670, numOutputRows -> 7, numOutputBytes -> 2050)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
4,2025-08-13T02:20:30.000Z,78362573187593,phyominnthwin@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,,0813-021143-gaucu8mq-v2n,3.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 1, numRemovedBytes -> 1673, numOutputRows -> 7, numOutputBytes -> 1670)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
3,2025-08-12T17:21:07.000Z,78362573187593,phyominnthwin@gmail.com,OPTIMIZE,"Map(predicate -> [], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,,0812-152854-mjevjyvi-v2n,2.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 2, numRemovedBytes -> 3078, p25FileSize -> 1673, numDeletionVectorsRemoved -> 1, minFileSize -> 1673, numAddedFiles -> 1, maxFileSize -> 1673, p75FileSize -> 1673, p50FileSize -> 1673, numAddedBytes -> 1673)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
2,2025-08-12T17:21:05.000Z,78362573187593,phyominnthwin@gmail.com,MERGE,"Map(predicate -> [""(CustomerID#21825 = CustomerID#21787)""], clusterBy -> [], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,,0812-152854-mjevjyvi-v2n,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetBytesAdded -> 1408, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 1, numTargetRowsMatchedUpdated -> 2, executionTimeMs -> 5798, materializeSourceTimeMs -> 1025, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 1568, numTargetRowsUpdated -> 2, numOutputRows -> 2, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 3069)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13


In [0]:
%sql
restore table datamodel.dwh.dim_customers to version as of 10;

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
2287,1,1,0,1994,0


#### dim_products

In [0]:
%sql
create or replace table datamodel.dwh.dim_products
as
select 
  row_number() over (order by ProductID) as dim_ProductKey,
  *
from (select distinct
  ProductID,
  ProductName,
  ProductCategory
from datamodel.dwh.stag_sales);

num_affected_rows,num_inserted_rows


#### SCD Type - 1

In [0]:
spark.sql("""
  with base as (select distinct
      ProductID,
      ProductName,
      ProductCategory
    from datamodel.dwh.stag_sales)
  , base2 as (
    select 
      base.*,
      cu.ProductID RecordFilter,
      cu.dim_ProductKey,
      mx.max_key
    from base 
    left join datamodel.dwh.dim_products cu
      on base.ProductID = cu.ProductID
    cross join (select coalesce(max(dim_ProductKey), 0) max_key from datamodel.dwh.dim_products) mx)
  , all_records as (select row_number() over (order by ProductID) + max_key as dim_ProductKey,
      ProductID,
      ProductName,
      ProductCategory
    from base2 where RecordFilter is null
    union all
    select dim_ProductKey,
      ProductID,
      ProductName,
      ProductCategory from base2 
    where RecordFilter is not null)
  select
    *
  from all_records
""").createOrReplaceTempView("dim_products_view")

In [0]:
%sql
select * from dim_products_view;

dim_ProductKey,ProductID,ProductName,ProductCategory
10,230,Airpods,Electronics
5,205,Gaming Console,Electronics
1,201,Gaming Laptop,Electronics


In [0]:
%sql
merge into datamodel.dwh.dim_products dst
using dim_products_view src
on dst.ProductID = src.ProductID
when matched then update set *
when not matched then insert *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
3,2,0,1


In [0]:
%sql
select * from datamodel.dwh.dim_products;

dim_ProductKey,ProductID,ProductName,ProductCategory
2,202,Smartphone,Electronics
3,203,Tablet,Electronics
4,204,Headphones,Accessories
6,206,Smartwatch,Electronics
7,207,Monitor,Accessories
8,208,Keyboard,Accessories
9,209,Mouse,Accessories
10,230,Airpods,Electronics
5,205,Gaming Console,Electronics
1,201,Gaming Laptop,Electronics


#### dim_regions

In [0]:
%sql
create or replace table datamodel.dwh.dim_regions
as
select 
  row_number() over (order by RegionID, Country) as dim_RegionKey,
  *
from (select distinct
  RegionID,
  RegionName,
  Country
from datamodel.dwh.stag_sales);

num_affected_rows,num_inserted_rows


#### SCD Type - 1

In [0]:
spark.sql("""
  with base as (select distinct
      RegionID,
      RegionName,
      Country
    from datamodel.dwh.stag_sales)
  , base2 as (
    select 
      base.*,
      cu.Country RecordFilter,
      cu.dim_RegionKey,
      mx.max_key
    from base 
    left join datamodel.dwh.dim_regions cu
      on base.Country = cu.Country
    cross join (select coalesce(max(dim_RegionKey), 0) max_key from datamodel.dwh.dim_regions) mx)
  , all_records as (select row_number() over (order by Country) + max_key as dim_RegionKey,
      RegionID,
      RegionName,
      Country
    from base2 where RecordFilter is null
    union all
    select dim_RegionKey,
      RegionID,
      RegionName,
      Country from base2 
    where RecordFilter is not null)
  select
    *
  from all_records
""").createOrReplaceTempView("dim_regions_view")

In [0]:
%sql
select * from dim_regions_view;

dim_RegionKey,RegionID,RegionName,Country
2,301,North America,USA
3,302,Europe,France
4,302,Europe,Germany


In [0]:
%sql
merge into datamodel.dwh.dim_regions dst
using dim_regions_view src
on dst.Country = src.Country
when matched then update set *
when not matched then insert *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
3,3,0,0


In [0]:
%sql
select * from datamodel.dwh.dim_regions;

dim_RegionKey,RegionID,RegionName,Country
1,301,North America,Canada
5,302,Europe,Italy
6,303,Asia,China
7,303,Asia,India
8,303,Asia,Japan
3,302,Europe,France
2,301,North America,USA
4,302,Europe,Germany


#### dim_dates

In [0]:
%sql
create or replace table datamodel.dwh.dim_dates
as
select 
  row_number() over (order by OrderDate) as dim_DateKey,
  *
from (select distinct
  OrderDate,
  OrderYear,
  OrderMonth
from datamodel.dwh.stag_sales);

num_affected_rows,num_inserted_rows


#### SCD Type - 1

In [0]:
spark.sql("""
  select 
    row_number() over (order by st.OrderDate) + mx.max_key as dim_DateKey, 
    st.*
  from (select distinct
      OrderDate,
      OrderYear,
      OrderMonth
    from datamodel.dwh.stag_sales) st
  cross join (select coalesce(max(dim_DateKey), 0) max_key from datamodel.dwh.dim_dates) mx
""").createOrReplaceTempView("dim_dates_view")

In [0]:
%sql
select * from dim_dates_view;

dim_DateKey,OrderDate,OrderYear,OrderMonth
11,2024-02-11,2024,2
12,2024-02-12,2024,2


##### Upsert to be idempotent

In [0]:
%sql
merge into datamodel.dwh.dim_dates dst
using dim_dates_view src
on dst.OrderDate = src.OrderDate
when matched then update set *
when not matched then insert *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,0,0,2


In [0]:
%sql
select * from datamodel.dwh.dim_dates;

dim_DateKey,OrderDate,OrderYear,OrderMonth
1,2024-02-01,2024,2
2,2024-02-02,2024,2
3,2024-02-03,2024,2
4,2024-02-04,2024,2
5,2024-02-05,2024,2
6,2024-02-06,2024,2
7,2024-02-07,2024,2
8,2024-02-08,2024,2
9,2024-02-09,2024,2
10,2024-02-10,2024,2


#### fact_sales

In [0]:
%sql
create or replace table datamodel.dwh.fact_sales (
    OrderID int,
    dim_DateKey int,
    dim_CustomerKey int,
    dim_ProductKey int,
    dim_RegionKey int,
    m_Quantity decimal,
    m_UnitPrice decimal,
    m_TotalAmount decimal,
    m_TotalAmountUSD decimal
)

In [0]:
%sql
create or replace temp view fact_sales_view
as
select
    st.OrderID,
    dt.dim_DateKey,
    cu.dim_CustomerKey,
    pr.dim_ProductKey,
    rg.dim_RegionKey,
    st.Quantity,
    st.UnitPrice,
    st.TotalAmount,
    st.TotalAmountUSD
from datamodel.dwh.stag_sales st
left join datamodel.dwh.dim_customers cu
on st.CustomerID = cu.CustomerID and cu.isCurrent = true
left join datamodel.dwh.dim_products pr
on st.ProductID = pr.ProductID
left join datamodel.dwh.dim_regions rg
on st.RegionID = rg.RegionID and st.Country = rg.Country
left join datamodel.dwh.dim_dates dt
on st.OrderDate = dt.OrderDate
left join datamodel.dwh.fact_sales f
on st.OrderID = f.OrderID
where f.OrderID is null;

In [0]:
%sql
select * from fact_sales_view;

OrderID,dim_DateKey,dim_CustomerKey,dim_ProductKey,dim_RegionKey,Quantity,UnitPrice,TotalAmount,TotalAmountUSD
11,11,1,1,2,2,800.0,1600.0,0.4
12,12,2,10,4,1,500.0,500.0,0.125
13,12,8,5,3,1,400.0,400.0,0.1


#### Incremental loading

In [0]:
%sql
insert into datamodel.dwh.fact_sales
select * from fact_sales_view;

num_affected_rows,num_inserted_rows
3,3


In [0]:
%sql
select * from datamodel.dwh.fact_sales;

OrderID,dim_DateKey,dim_CustomerKey,dim_ProductKey,dim_RegionKey,m_Quantity,m_UnitPrice,m_TotalAmount,m_TotalAmountUSD
1,1,1,1,2,2,800,1600,0
2,2,2,2,4,1,500,500,0
3,3,3,3,7,3,300,900,0
4,4,1,4,2,1,150,150,0
5,5,4,5,3,1,400,400,0
6,6,2,6,6,2,200,400,0
7,7,5,1,1,1,800,800,0
8,8,6,7,5,2,250,500,0
9,9,7,8,8,3,100,300,0
10,10,4,9,2,1,50,50,0


In [0]:
%sql
describe history datamodel.dwh.fact_sales;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
7,2025-08-13T03:47:51.000Z,78362573187593,phyominnthwin@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,,0813-021143-gaucu8mq-v2n,6.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 4, numOutputBytes -> 2618)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
6,2025-08-13T02:23:34.000Z,78362573187593,phyominnthwin@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,,0813-021143-gaucu8mq-v2n,5.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 10, numOutputBytes -> 2779)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
5,2025-08-13T02:23:13.000Z,78362573187593,phyominnthwin@gmail.com,CREATE OR REPLACE TABLE,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,,0813-021143-gaucu8mq-v2n,4.0,WriteSerializable,False,Map(),,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
4,2025-08-12T17:54:45.000Z,78362573187593,phyominnthwin@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> false, partitionBy -> [])",,,0812-152854-mjevjyvi-v2n,3.0,WriteSerializable,False,"Map(numFiles -> 0, numOutputRows -> 0, numOutputBytes -> 0)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
3,2025-08-12T16:28:29.000Z,78362573187593,phyominnthwin@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,,0812-152854-mjevjyvi-v2n,2.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 10, numOutputBytes -> 2779)",,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
2,2025-08-12T16:28:14.000Z,78362573187593,phyominnthwin@gmail.com,CREATE OR REPLACE TABLE,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,,0812-152854-mjevjyvi-v2n,1.0,WriteSerializable,False,Map(),,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
1,2025-08-12T16:23:04.000Z,78362573187593,phyominnthwin@gmail.com,CREATE OR REPLACE TABLE,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,,0812-152854-mjevjyvi-v2n,0.0,WriteSerializable,False,Map(),,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13
0,2025-08-12T16:12:20.000Z,78362573187593,phyominnthwin@gmail.com,CREATE TABLE,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,,0812-152854-mjevjyvi-v2n,,WriteSerializable,True,Map(),,Databricks-Runtime/17.0.x-aarch64-photon-scala2.13


In [0]:
%sql
restore table datamodel.dwh.fact_sales to version as of 6;

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
2779,1,1,0,2618,0
