In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id
from pyspark.sql.types import *
from delta.tables import DeltaTable

# Create Flag Parameter

In [0]:
dbutils.widgets.text("p_incremental_flag","")
v_incre_flag = dbutils.widgets.get("p_incremental_flag")

In [0]:
dbutils.widgets.text("p_ingestion_date","")
v_ingest_date = dbutils.widgets.get("p_ingestion_date")

# Create DIMENSIONS MODEL

### Fetch Relative Columns

In [0]:
df_src = spark.sql(f"""
SELECT  DISTINCT(Dealer_ID) as Dealer_ID, DealerName, ingestion_date
FROM PARQUET.`abfss://silver@cardeprojectdl.dfs.core.windows.net/carsales`
WHERE ingestion_date = '{v_ingest_date}'
""")

In [0]:
df_src.display()

Dealer_ID,DealerName,ingestion_date
DLR0062,Ford Australia Motors,2025-05-01
DLR0063,Ford do Brasil Motors,2025-05-01
DLR0060,"Fisker, Karma Motors",2025-05-01
XYZ0063,Datafam Dealers,2025-05-01


### dim_model Sink - Initial and Incremental

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_dealer'): # incremental
    # df_sink = spark.sql('''
    #                 SELECT dim_branch_key, Branch_ID, BranchName
    #                 from PARQUET.`abfss://silver@cardeprojectdl.dfs.core.windows.net/carsales`
    #                 ''')
    df_sink = spark.sql('''
                    SELECT *
                    FROM DELTA.`abfss://gold@cardeprojectdl.dfs.core.windows.net/dim_dealer`
                    ''')

else: # initial
    df_sink = spark.sql('''
                        SELECT 1 as dim_dealer_key, CAST(NULL AS TIMESTAMP) as updated_at, Dealer_ID, DealerName
                        from PARQUET.`abfss://silver@cardeprojectdl.dfs.core.windows.net/carsales`
                        WHERE 1=0
                        ''')

In [0]:
df_sink.display()

### Filtering new records and old records

In [0]:
df_filter = df_src.join(df_sink, df_src['Dealer_ID'] == df_sink['Dealer_ID'], how='left') \
      .select(df_src['Dealer_ID'], df_src['DealerName'], df_sink['updated_at'], df_sink['dim_dealer_key'])

In [0]:
df_filter.display()

Dealer_ID,DealerName,updated_at,dim_dealer_key
DLR0062,Ford Australia Motors,2025-05-06 14:50:40.208000,71.0
DLR0063,Ford do Brasil Motors,2025-05-06 14:50:40.208000,233.0
DLR0060,"Fisker, Karma Motors",2025-05-06 14:50:40.208000,137.0
XYZ0063,Datafam Dealers,,


 **df_filter_old**

In [0]:
df_filter_old = df_filter.filter(col('dim_dealer_key').isNotNull())

In [0]:
df_filter_old.display()

Dealer_ID,DealerName,updated_at,dim_dealer_key
DLR0062,Ford Australia Motors,2025-05-06 14:50:40.208000,71
DLR0063,Ford do Brasil Motors,2025-05-06 14:50:40.208000,233
DLR0060,"Fisker, Karma Motors",2025-05-06 14:50:40.208000,137


 **df_filter_new**

In [0]:
df_filter_new = df_filter.filter(col('dim_dealer_key').isNull()).select(col('Dealer_ID'), col('DealerName'), col("updated_at"))
df_filter_new.display()

Dealer_ID,DealerName,updated_at
XYZ0063,Datafam Dealers,


# Create Surrogate Key

### Fetching the max Surrogate key from existing table

this Surrogate Key acts as the start point for incremental loading

In [0]:
if (v_incre_flag == '0'):
    max_value = 1
else:
    max_value_df = spark.sql("select max(dim_dealer_key) from cars_catalog.gold.dim_dealer")
    max_value = max_value_df.collect()[0][0] + 1

### Creating Surrogate Key column and ADD the max surrogate key

In [0]:
df_filter_new = df_filter_new.withColumn('dim_dealer_key', max_value + monotonically_increasing_id())

In [0]:
df_filter_new.display()

Dealer_ID,DealerName,updated_at,dim_dealer_key
XYZ0063,Datafam Dealers,,268


### Create Final DF = df_filter_old + df_filter_new

In [0]:
df_final = df_filter_new.union(df_filter_old)

In [0]:
df_final.display()

Dealer_ID,DealerName,updated_at,dim_dealer_key
XYZ0063,Datafam Dealers,,268
DLR0062,Ford Australia Motors,2025-05-06 14:50:40.208000,71
DLR0063,Ford do Brasil Motors,2025-05-06 14:50:40.208000,233
DLR0060,"Fisker, Karma Motors",2025-05-06 14:50:40.208000,137


# SCD - TYPE 1 (UPSERT)

In [0]:
from pyspark.sql.functions import current_timestamp,lit

In [0]:
cur_time_str = spark.sql("SELECT current_timestamp()").collect()[0][0].strftime("%Y-%m-%d %H:%M:%S.%f")

# incremental load
if spark.catalog.tableExists('cars_catalog.gold.dim_dealer'): 
    deltaTable = DeltaTable.forPath(spark, "abfss://gold@cardeprojectdl.dfs.core.windows.net/dim_dealer")

    deltaTable.alias("tar").merge(df_final.alias("src"), "tar.dim_dealer_key=src.dim_dealer_key") \
                    .whenMatchedUpdate(set={
                        "Dealer_ID": "src.Dealer_ID",
                        "DealerName": "src.DealerName",
                        "updated_at": f"'{cur_time_str}'"
                        
                    }
                        
                    ) \
                    .whenNotMatchedInsert(values={
                        "Dealer_ID": "src.Dealer_ID",
                        "DealerName": "src.DealerName",
                        "dim_dealer_key": "src.dim_dealer_key",
                        "updated_at": f"'{cur_time_str}'"
                    }) \
                    .execute()
    spark.sql(f"""
                UPDATE cars_catalog.default.metadata_table
                SET last_updated_time = '{cur_time_str}'
                WHERE table_name = "dim_dealer"
""")
# initial run
else: 
    df_final = df_final.withColumn("updated_at", lit(cur_time_str))
    df_final.write.mode("overwrite") \
        .format("delta") \
        .option("path", "abfss://gold@cardeprojectdl.dfs.core.windows.net/dim_dealer") \
        .saveAsTable("cars_catalog.gold.dim_dealer")
    
    spark.sql(f"""
              INSERT INTO cars_catalog.default.metadata_table
              VALUES ("dim_dealer", '{cur_time_str}')
              """)

In [0]:
# %sql
# DROP TABLE cars_catalog.gold.dim_dealer;

In [0]:
%sql
SELECT * FROM cars_catalog.gold.dim_dealer;

In [0]:
%sql
select * from cars_catalog.default.metadata_table

table_name,last_updated_time
dim_dealer,2025-05-06T14:56:03.186Z
dim_date,2025-05-06T14:17:50.197Z


In [0]:
# %sql
# TRUNCATE TABLE cars_catalog.default.metadata_table;