## Fetching new data from silver layer

In [0]:
from pyspark.sql.functions import *  
from pyspark.sql.types import *
from delta.tables import DeltaTable

gold_table_name = "carsales_catalog.gold.dealer_dimension"
gold_table_schema = StructType([
    StructField("dim_dealer_key", LongType(), True),
    StructField("DealerId", StringType(), False),
    StructField("DealerName", StringType(), False)
])

is_incremental = dbutils.widgets.get("is_incremental").lower() == "true"
filter_condition = True
if is_incremental and spark.catalog.tableExists(gold_table_name):
    last_load_on_gold = spark.sql(f"DESCRIBE HISTORY {gold_table_name}")\
                                .select(max("timestamp")).collect()[0][0]
    filter_condition = col("silver_load_timestamp") > last_load_on_gold
    
path = "abfss://silver@carsalesdatalake04ajaz.dfs.core.windows.net/transformed_data"
silver_df = spark.read.format("delta").load(path).filter(filter_condition)

dim_src_df = silver_df.select("DealerId", "DealerName").distinct()
dim_src_df.head(5) 


## Creating Surrogate key for new records after segregation

### Getting existing table - car model dim 

In [0]:
max_surrogate_key = 1

if spark.catalog.tableExists(gold_table_name):
    gold_model_df = spark.table(gold_table_name)
    max_surrogate_key = gold_model_df.select(max("dim_dealer_key")).collect()[0][0]
    print(max_surrogate_key)
else:
    gold_model_df = spark.createDataFrame([], gold_table_schema)

### Filter new data and updated data to prepare upserted data

In [0]:
bussiness_key = "DealerId"
new_dim_data_df = dim_src_df\
    .join(gold_model_df, bussiness_key, "leftanti")\
    .withColumn("dim_dealer_key", lit(max_surrogate_key + monotonically_increasing_id()))
updated_dim_data_df = dim_src_df.alias('src')\
    .join(gold_model_df.alias('sink'), bussiness_key, "left")\
    .filter(col("sink.dim_dealer_key").isNotNull())\
    .select("src.*", "sink.dim_dealer_key")

src_staging_df = new_dim_data_df.union(updated_dim_data_df)
src_staging_df.display()


## Implement SCD-1 by Upserting Changes to Delta lake, 

In [0]:
adls_gold_path = "abfss://gold@carsalesdatalake04ajaz.dfs.core.windows.net/dealer_dimension"
if is_incremental and spark.catalog.tableExists(gold_table_name):
    sink_df = DeltaTable.table(gold_table_name)
    sink_df\
        .merge(src_staging_df, sink_df['dim_dealer_key'] == src_staging_df['dim_dealer_key'])\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
else:
    src_staging_df.write.format('delta')\
        .mode('overwrite')\
        .option("path", adls_gold_path)\
        .saveAsTable(gold_table_name)

## Testing

In [0]:
spark.sql(f"select * from {gold_table_name} limit 10").head(5)