# CREATE FACT TABLE

# Reading Silver Data

## Step 1: Setting Up the Kitchen (Environment)

We need tools and ingredients to make our dish. Here, we import tools from PySpark and DeltaTables to handle data.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *

## Step 2: Fetching Ingredients (Reading Source Data)

We go to the silver storage (where raw data is kept) and bring all the sales data.

In [0]:
df_silver = spark.sql('''
select * 
from parquet.`abfss://silver@carprojdatalake.dfs.core.windows.net/carsales`
''')

# Reading all the Dims

## Step 3: Bringing Helpers (Reading Dimension Tables)

We fetch helper tables (dimensions) that give more details about branches, dealers, models, and dates.

In [0]:
df_dealer = spark.sql("select * from cars_catalog.gold.dim_dealer")
df_branch = spark.sql("select * from cars_catalog.gold.dim_branch")
df_model = spark.sql("select * from cars_catalog.gold.dim_model")
df_date = spark.sql("select * from cars_catalog.gold.dim_date")

# Bring Keys to the fact table

## Step 4: Combining Ingredients (Joining Tables)

We mix the main ingredients (sales data) with helpers (dimension tables) to add details using matching IDs:

In [0]:
df_fact = df_silver.join(df_branch, df_silver["Branch_ID"] == df_branch["Branch_ID"],how="left")\
                   .join(df_dealer, df_silver["Dealer_ID"] == df_dealer["Dealer_ID"],how="left")\
                   .join(df_model, df_silver["Model_ID"] == df_model["Model_ID"],how="left")\
                   .join(df_date, df_silver["Date_ID"] == df_date["Date_ID"],how="left")\
                   .select(df_silver["Revenue"],df_silver["Units_Sold"],df_silver["RevPerUnit"],df_branch["dim_branch_key"],df_dealer["dim_dealer_key"],df_model["dim_model_key"],df_date["dim_date_key"])

# Writing Fact Tables

## Step 5: Baking the Dish (Writing the Fact Table)

We check if the table already exists.

If yes, update and add new data.

If no, create a new table.

In [0]:
if spark.catalog.tableExists("fact_sales"):
    deltatb1 = DeltaTable.forPath(spark, "abfss://gold@carprojdatalake.dfs.core.windows.net/factsales")

    deltatb1.alias("trg").merge(df_fact.alias("src"), "trg.dim_branch_key = src.dim_branch_key and trg.dim_dealer_key = src.dim_dealer_key and trg.dim_model_key = src.dim_model_key and trg.dim_date_key = src.dim_date_key")\
             .whenMatchedUpdateAll()\
             .whenNotMatchedInsertAll()\
             .execute()


else:
    df_fact.write.format("delta")\
                 .mode("overwrite")\
                 .option("path","abfss://gold@carprojdatalake.dfs.core.windows.net/factsales")\
                 .saveAsTable("cars_catalog.gold.fact_sales")   

## Step 6: Taste the Dish (Check the Output)

Finally, we check how our dish turned out by viewing the table.

In [0]:
%sql
select * from cars_catalog.gold.fact_sales

Revenue,Units_Sold,RevPerUnit,dim_branch_key,dim_dealer_key,dim_model_key,dim_date_key
13363978,2,6681989.0,418,6,155,825
17376468,3,5792156.0,1557,197,252,825
9664767,3,3221589.0,1058,104,199,752
5525304,3,1841768.0,789,95,183,752
12971088,3,4323696.0,497,231,106,882
7321228,1,7321228.0,1804,41,41,988
11379294,2,5689647.0,734,177,107,988
11611234,2,5805617.0,1211,182,110,1043
19979446,2,9989723.0,116,204,185,1043
14181510,3,4727170.0,116,160,238,826
