## Gold Notebook

In [0]:
#Add libraries
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Creating FLAG Parameter

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')
print(incremental_flag)
print(type(incremental_flag))

0
<class 'str'>


## Creating Dimensional Model

### Our Silver Table:

In [0]:
%sql
SELECT * FROM parquet.`abfss://silver@datalake.dfs.core.windows.net/carsales`

### Creating the first dimension (Dim_Model)
**Step 1: I create a new Dataframe using spark.sql with model ID and model category.**

In [0]:
df_source = spark.sql('''
SELECT DISTINCT  
    (Model_ID) as Model_ID
    ,model_category
 FROM parquet.`abfss://silver@datalake.dfs.core.windows.net/carsales`
 ''')

**Step 2: Add surrogate key to the table.**

In [0]:
%python
# Example of incremental load schema:
if spark.catalog.tableExists('carsproject_catalog.gold.dim_model'):  # this runs the incremental load.
    df_sink = spark.sql('''
    SELECT 
      1 as dim_model_key, 
      Model_ID, 
      model_category 
    FROM carsproject_catalog.gold.dim_model
    ''')
else:  # this runs the initial load if the table does not already exist.
    df_sink = spark.sql('''
    SELECT 
      1 as dim_model_key, 
      Model_ID, 
      model_category 
    FROM parquet.`abfss://silver@datalake.dfs.core.windows.net/carsales`
    WHERE 1=0 
    ''')

In [0]:
%sql
--SELECT 
--  dim_model_key, 
--  Model_ID, 
--  model_category 
--FROM parquet.`abfss://silver@datalake.dfs.core.windows.net/carsales`
--WHERE 1=0 --this means that this condition is always false. In other words only returns the schema.

**Filtering new records and old records**

In [0]:
df_filter = df_source.join(df_sink, df_source['Model_ID'] == df_sink['Model_ID'], 'left').select(df_source['Model_ID'], df_source['model_category'], df_sink['dim_model_key'])

df_filter_old

In [0]:
df_filter_old = df_filter.filter(col('dim_model_key').isNotNull())

df_filter_new

In [0]:
df_filter_new = df_filter.filter(col('dim_model_key').isNull()).select(df_source['Model_ID'], df_source['model_category'])

In [0]:
display(df_filter_new)

Model_ID,model_category


## Create Surrogate Key

**Fetch the max Surrogate Key from existing table**

In [0]:
if (incremental_flag == '0'):
    max_value = 1
else:
    max_value = spark.sql("select max(dim_model_key) from carsproject_catalog.gold.dim_model")
    max_value = max_value.collect()[0][0]+1



**Create Surrogate Key Column and ADD the max surrogate key**

In [0]:
df_filter_new = df_filter_new.withColumn('dim_model_key', max_value+monotonically_increasing_id())

## Create Final DF - df_filter_old + df_filter_new

In [0]:
df_final = df_filter_new.union(df_filter_old)

In [0]:
df_final.display()

## SCD TYPE - 1 (UPSERT)
- SCD (Slowly Changing Dimension)
- UPSERT (Update + Insert)

In [0]:
from delta.tables import DeltaTable

In [0]:
%python
from pyspark.sql.functions import col

# Remove duplicates from the source DataFrame
df_final_deduped = df_final.dropDuplicates(["dim_model_key"])

# Incremental RUN
if spark.catalog.tableExists('carsproject_catalog.gold.dim_model'):
    delta_tbl = DeltaTable.forPath(spark, "abfss://gold@datalake.dfs.core.windows.net/dim_model")
    delta_tbl.alias("trg").merge(
        df_final_deduped.alias("src"),
        "trg.dim_model_key = src.dim_model_key"
    ).whenMatchedUpdateAll()\
     .whenNotMatchedInsertAll()\
     .execute()

# Initial RUN
else:
    df_final_deduped.write.format("delta")\
        .mode("overwrite")\
        .option("path", "abfss://gold@datalake.dfs.core.windows.net/dim_model")\
        .saveAsTable("carsproject_catalog.gold.dim_model")

In [0]:
%sql
SELECT * FROM carsproject_catalog.gold.dim_model

Model_ID,model_category,dim_model_key
Che-M47,Che,2
Toy-M205,Toy,3
BMW-M249,BMW,4
Mer-M122,Mer,5
Hon-M215,Hon,6
Nis-M82,Nis,7
Toy-M206,Toy,8
Mar-M139,Mar,9
Mar-M146,Mar,10
Mar-M141,Mar,11
