In [0]:
dbutils.widgets.text('load_type','F')
load = dbutils.widgets.get('load_type')

##Creating temp view on silver data

In [0]:
df_silver = spark.read.format('parquet')\
                .option('inferSchema',True)\
                    .load('abfss://silver@carsalesdeltalake.dfs.core.windows.net/carsales/')

In [0]:
df_silver.createOrReplaceTempView('silver_view')

In [0]:
%sql
select * from silver_view;

##Creating dim_model (SCD Type1) table during initial load

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:

if load == 'F':
    spark.sql('''create table carsales_catalog.gold.dim_model (
        Model_key bigint generated always as identity (start with 100000 increment by 1) not null primary key, 
        Model_ID string,
        Product_Name string
    )
    using DELTA
    LOCATION 'abfss://gold@carsalesdeltalake.dfs.core.windows.net/model'
    ''')

##Merging source data in dim_model

In [0]:
%sql
merge into carsales_catalog.gold.dim_model as tgt
using (select distinct Model_ID,Product_Name from silver_view) as src 
on tgt.Model_ID = src.Model_ID
when matched and src.PRODUCT_NAME <> tgt.Product_Name then
update set tgt.Product_Name = src.Product_Name
when not matched then
insert (Model_ID,Product_Name) values (src.Model_ID,src.Product_Name)


##Creating dim_branch  (SCD Type1) table during initial load

In [0]:
if load=='F':
    spark.sql(''' create table carsales_catalog.gold.dim_branch (
        Branch_key bigint generated always as identity (start with 200000 increment by 1) not null primary key,
        Branch_ID string,
        BranchName string )
        using DELTA
        location 'abfss://gold@carsalesdeltalake.dfs.core.windows.net/branch'
        ''')

##Merging source data in dim_branch

In [0]:
%sql
merge into carsales_catalog.gold.dim_branch as tgt
using (select distinct Branch_ID,BranchName from silver_view) as src
on src.Branch_ID=tgt.Branch_ID
when matched and src.BranchName <> tgt.BranchName then
update set tgt.BranchName=src.BranchName
when not matched then
insert(Branch_ID,BranchName) values (src.Branch_ID,src.BranchName)

In [0]:
%sql
select * from carsales_catalog.gold.dim_branch;

##Creating dim_dealer  (SCD Type1) table during initial load

In [0]:
if load=='F':
    spark.sql(''' create table carsales_catalog.gold.dim_dealer (
        Dealer_key bigint generated always as identity (start with 300000 increment by 1) not null primary key,
        Dealer_ID string,
        DealerName string )
        using DELTA
        location 'abfss://gold@carsalesdeltalake.dfs.core.windows.net/dealer'
        ''')

##Merging source data in dim_dealer

In [0]:
%sql
merge into carsales_catalog.gold.dim_dealer as tgt
using (select distinct Dealer_ID,DealerName from silver_view) as src
on src.Dealer_ID=tgt.Dealer_ID
when matched and src.DealerName <> tgt.DealerName then
update set tgt.DealerName=src.DealerName
when not matched then
insert(Dealer_ID,DealerName) values (src.Dealer_ID,src.DealerName)

##Creating dim_date  (SCD Type1) table during initial load

In [0]:
if load=='F':
    spark.sql(''' create table carsales_catalog.gold.dim_date (
        Date_key bigint generated always as identity (start with 400000 increment by 1) not null primary key,
        Day integer,
        Month integer,
        Year integer,
        Date_of_Sale date)
        using DELTA
        location 'abfss://gold@carsalesdeltalake.dfs.core.windows.net/date'
        ''')

##Merging source data in dim_date

In [0]:
%sql
merge into carsales_catalog.gold.dim_date as tgt
using (select distinct Day,Month,Year,Date_of_Sale from silver_view order by Date_of_Sale) as src 
on tgt.Date_of_Sale=src.Date_of_Sale
when not matched then
insert(Day,Month,Year,Date_of_Sale) values (src.Day,src.Month,src.Year,src.Date_of_Sale)

##Creating fact_sales 

In [0]:
if load == 'F':
  spark.sql('''create table carsales_catalog.gold.fact_sales (
    Sale_id bigint generated always as identity primary key not null,
    Model_key bigint,
    Dealer_key bigint,
    Branch_key bigint,
    Date_key bigint,
    Revenue integer,
    Units_Sold integer,
    constraint fk_model foreign key (Model_key) references carsales_catalog.gold.dim_model(Model_key),
    constraint fk_dealer foreign key (Dealer_key) references carsales_catalog.gold.dim_dealer(Dealer_key),
    constraint fk_branch foreign key (Branch_key) references carsales_catalog.gold.dim_branch(Branch_key),
    constraint fk_date foreign key (Date_key) references carsales_catalog.gold.dim_date(Date_key)
    )
    using delta
    location 'abfss://gold@carsalesdeltalake.dfs.core.windows.net/fact_carsales'
  ''')



##Loading fact table

In [0]:
%sql
insert overwrite carsales_catalog.gold.fact_sales(Revenue, Units_Sold, Model_key, Branch_key, Dealer_key, Date_key)
     select src.Revenue,
       src.Units_Sold,
       dim_model.Model_key,
       dim_branch.Branch_key,
       dim_dealer.Dealer_key,
       dim_date.Date_key
       from silver_view as src
       left outer join carsales_catalog.gold.dim_model on src.Model_ID=dim_model.Model_ID
       left outer join carsales_catalog.gold.dim_branch on src.Branch_ID=dim_branch.Branch_ID
       left outer join carsales_catalog.gold.dim_dealer on src.Dealer_ID=dim_dealer.Dealer_ID
       left outer join carsales_catalog.gold.dim_date on src.Date_of_Sale=dim_date.Date_of_Sale ;


In [0]:
%sql
select * from carsales_catalog.gold.fact_sales;