In [0]:
from pyspark.sql.functions import*
from pyspark.sql.types import*

# CREATE FLAG PARAMETER

In [0]:
# Creating a widget for incremental flag with a default value of '0'
dbutils.widgets.text('incremental_flag' , '0');

In [0]:
# Retrieve the value of the 'incremental_flag' widget from Databricks
incremental_flag = dbutils.widgets.get('incremental_flag')
# Print the value of the 'incremental_flag' to the console (for debugging or checking)
print(incremental_flag);

0


# CREATING DIMENSION MODEL

FETCH RELATIVE COLUMNS

In [0]:
# Query to load distinct Model_ID and model_category from a Parquet file in Azure Data Lake
df_src = spark.sql('''
    SELECT DISTINCT Model_ID, model_category 
    FROM PARQUET.`abfss://silver@azstrgaccenterprisecars.dfs.core.windows.net/carsales`
''')

DIM MODEL SINK INITIAL AND INCREMENTAL

In [0]:
# Check if the table 'cars_catalog.gold.dim_model' exists in the catalog
if spark.catalog.tableExists('cars_catalog.gold.dim_model'):
    df_sink = spark.sql('''
        SELECT dim_model_key, Model_ID, model_category 
        FROM cars_catalog.gold.dim_model
    ''')
else:
    df_sink = spark.sql('''
        SELECT 1 as dim_model_key, Model_ID, model_category 
        FROM PARQUET.`abfss://silver@azstrgaccenterprisecars.dfs.core.windows.net/carsales`
        WHERE 1=0
    ''')

In [0]:
df_sink.display()

dim_model_key,Model_ID,model_category


### FILTERING new records and old records

In [0]:
# Perform a LEFT JOIN between df_src and df_sink DataFrames on 'Model_ID'
df_filter = df_src.join(  # Start the join operation
    df_sink,  # The second DataFrame to join with (df_sink)
    df_src.Model_ID == df_sink.Model_ID,  # The condition to join on: matching 'Model_ID' in both DataFrames
    'left'  # The type of join: 'left' join keeps all rows from df_src and matches with df_sink where possible
).select(  # Select specific columns from the result of the join
    df_src.Model_ID,  # Select 'Model_ID' from df_src
    df_src.model_category,  # Select 'model_category' from df_src
    df_sink.dim_model_key  # Select 'dim_model_key' from df_sink (which might be null if no match is found)
)


### df_filter_old

In [0]:
df_filter_old = df_filter.filter(df_filter.dim_model_key.isNotNull())
df_filter_old.display()

Model_ID,model_category,dim_model_key


### df_filter_new

In [0]:
df_filter_new = df_filter.filter(df_filter.dim_model_key.isNull()).select(df_filter.Model_ID, df_filter.model_category)

### Create surrogate key

**Fetch the max surrogate key from Existing Table**

In [0]:
if incremental_flag == '0':
  max_value = 1;
else:
  max_value_df = spark.sql('''
    SELECT MAX(dim_model_key) as max_value
    from cars_catalog.gold.dim_model
  ''')
  max_value = max_value_df.collect()[0][0]+1


Create Surrogate Key column and ADD the max surrogate key

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

In [0]:
df_filter_new = df_filter_new.withColumn( 
    'dim_model_key',  # The new column will be named 'dim_model_key'
    max_value + monotonically_increasing_id()  # Add the 'max_value' to a unique ID for each row
)


Create Final DF - df_filter_old + df_filter_new

In [0]:
df_final = df_filter_old.union(df_filter_new)  # Perform a union operation to combine the two DataFrames

### SCD TYPE - 1 (UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
# Check if the table exists
if not spark.catalog.tableExists('cars_catalog.gold.dim_model'):
    # Initial Run: The table does not exist, so we write the data for the first time
    df_final.write.format('delta')\
        .mode('overwrite')\
        .option("path", "abfss://gold@azstrgaccenterprisecars.dfs.core.windows.net/dim_model")\
        .saveAsTable('cars_catalog.gold.dim_model')
else:
    # Incremental Run: The table exists, so we perform a merge operation
    delta_tbl = DeltaTable.forPath(spark, "abfss://gold@azstrgaccenterprisecars.dfs.core.windows.net/dim_model")
    delta_tbl.alias("trg").merge(df_final.alias("src"), "trg.dim_model_key = src.dim_model_key")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()

In [0]:
%sql
SELECT * FROM cars_catalog.gold.dim_model

org.apache.spark.sql.catalyst.ExtendedAnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `cars_catalog`.`gold`.`dim_model` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS. SQLSTATE: 42P01; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [cars_catalog, gold, dim_model], [], false

	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.tableNotFound(package.scala:90)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:252)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:216)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:293)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$an