## Create Fact Table

***Reading silver Table***

In [0]:
df_silver = spark.sql("select * from parquet.`abfss://silver@adlscardatalake.dfs.core.windows.net/carsales`")
df_silver.display()

## Creating DIMS

In [0]:
df_model = spark.sql("select * from cars_catalog.gold.dim_model")
df_branch = spark.sql("select * from cars_catalog.gold.dim_branch")
df_dealer = spark.sql("select * from cars_catalog.gold.dim_dealer")
df_date = spark.sql("select * from cars_catalog.gold.dim_date")

***Joining DIMS with dim_keys to create fact table***

In [0]:
df_fact = df_silver.join(df_branch, df_silver.Branch_ID == df_branch.Branch_ID,'left') \
           .join(df_model, df_silver.Model_ID == df_model.Model_ID,'left') \
           .join(df_dealer, df_silver.Dealer_ID == df_dealer.Dealer_ID,'left') \
           .join(df_date, df_silver.Date_ID == df_date.Date_ID,'left') \
           .select(df_silver['Revenue'], df_silver['Units_Sold'],df_silver['RevenuePerUnit'], df_model['dim_model_key'], df_branch['dim_branch_key'], df_dealer['dim_dealer_key'], df_date['dim_date_key']).dropDuplicates()

df_fact.display()

## Writing Fact Table

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("factsales"):
    delta_tbl  = DeltaTable.forName("cars_catalog.gold.factsales")
    
    delta_tbl.alias("trg").merge(df_fact.alias("src"),'trg.dim_model_key == src.dim_model_key','trg.dim_branch_key == src.dim_branch_key','trg.dim_dealer_key == src.dim_dealer_key','trg.dim_date_key == src.dim_date_key') \
        .whenMatched().updateAll()\
        .whenNotMatched().insertAll()

else:
    df_fact.write.format("delta")\
        .mode("overwrite")\
        .option("path",'abfss://gold@adlscardatalake.dfs.core.windows.net/factsales')\
        .saveAsTable("cars_catalog.gold.factsales")

In [0]:
%sql
select  distinct(*) from cars_catalog.gold.factsales