#Import necessary modules

In [0]:
from pyspark.sql.functions import col,monotonically_increasing_id
from delta.tables import DeltaTable

# Read Clean Data and Fich Realative Columns

In [0]:
df_src = spark.sql('''
SELECT
DISTINCT(Customer_ID) AS Customer_ID ,
Customer_Name ,
Segment	,
City ,
State ,
Country,
Region FROM parquet.`abfss://silver@globalsalestorage.dfs.core.windows.net/clean_e_commerce_data`
''')

# Initial and Incremental

In [0]:
#incremental
if spark.catalog.tableExists('ecom_catalog.gold.dim_customer'):
    df_sink = spark.sql('''
    SELECT DISTINCT(Customer_ID) AS Customer_ID, Customer_Name, Segment, City, State, Country, Region, dim_customer_key
    FROM ecom_catalog.gold.dim_customer
    ''')
else:
#initial
    df_sink = spark.sql('''
    SELECT DISTINCT Customer_ID, Customer_Name, Segment, City, State, Country, Region, 1 AS dim_customer_key
    FROM parquet.`abfss://silver@globalsalestorage.dfs.core.windows.net/clean_e_commerce_data`
    WHERE 1=0
    ''')

# Join Source and Sink

In [0]:
df_filtering = df_src.join(df_sink, df_src["Customer_ID"] == df_sink["Customer_ID"], 'left') \
    .select(
        df_sink["dim_customer_key"],
        df_src["Customer_ID"],
        df_src["Customer_Name"],
        df_src["Segment"],
        df_src["City"],
        df_src["State"],
        df_src["Country"],
        df_src["Region"]
    )

dim_customer_key,Customer_ID,Customer_Name,Segment,City,State,Country,Region
1,b6083eef,Brandon Allen,Home Office,Riveraland,MO,Bolivia,North
2,386ab6d8,Carol Bentley,Corporate,Matthewshire,KS,Lebanon,West
3,78ce465a,Brandon Salinas,Consumer,North William,ME,Malaysia,North
4,7c951754,Dr. Connie Vasquez,Home Office,West Charleshaven,NE,Congo,North
5,9c62c3e7,Vanessa Ray,Corporate,Lake Benjaminfurt,NJ,Yemen,North
6,ecf890e8,Tony Vaughn,Corporate,South Jamestown,OH,Tanzania,North
7,d0ddb879,Michelle Cohen,Home Office,Ramirezchester,CT,Mali,East
8,c37fa05d,Rodney Montgomery,Corporate,Olsonhaven,OH,Netherlands Antilles,West
9,3a2f5a0a,Justin Wyatt,Consumer,South Darren,CA,Paraguay,North
10,72183476,Jesus Shaw,Home Office,New Mariaton,KY,French Southern Territories,West


# Filtering Old and New Rows

### Filtering Old Rows
 

In [0]:
df_filter_old = df_filtering.filter(col("dim_customer_key").isNotNull())

dim_customer_key,Customer_ID,Customer_Name,Segment,City,State,Country,Region
1,b6083eef,Brandon Allen,Home Office,Riveraland,MO,Bolivia,North
2,386ab6d8,Carol Bentley,Corporate,Matthewshire,KS,Lebanon,West
3,78ce465a,Brandon Salinas,Consumer,North William,ME,Malaysia,North
4,7c951754,Dr. Connie Vasquez,Home Office,West Charleshaven,NE,Congo,North
5,9c62c3e7,Vanessa Ray,Corporate,Lake Benjaminfurt,NJ,Yemen,North
6,ecf890e8,Tony Vaughn,Corporate,South Jamestown,OH,Tanzania,North
7,d0ddb879,Michelle Cohen,Home Office,Ramirezchester,CT,Mali,East
8,c37fa05d,Rodney Montgomery,Corporate,Olsonhaven,OH,Netherlands Antilles,West
9,3a2f5a0a,Justin Wyatt,Consumer,South Darren,CA,Paraguay,North
10,72183476,Jesus Shaw,Home Office,New Mariaton,KY,French Southern Territories,West


### Filtering New Rows

In [0]:
df_filter_new = df_filtering.filter(col("dim_customer_key").isNull()).select(
    df_filtering["Customer_ID"],
    df_filtering["Customer_Name"],
    df_filtering["Segment"],
    df_filtering["City"],
    df_filtering["State"],
    df_filtering["Country"],
    df_filtering["Region"]
)

Customer_ID,Customer_Name,Segment,City,State,Country,Region


## Create Surrogate Key

In [0]:
if not spark.catalog.tableExists('ecom_catalog.gold.dim_customer'):
    max_value = 0
else:
    max_value_df = spark.sql('''
    SELECT MAX(dim_customer_key) AS max_value FROM ecom_catalog.gold.dim_customer
    ''')
    max_value = max_value_df.collect()[0][0]

In [0]:
df_filter_new = df_filter_new.withColumn("dim_customer_key", monotonically_increasing_id() + max_value + 1)
df_filter_new = df_filter_new.select(
    col("dim_customer_key"),
    col("Customer_ID"),
    col("Customer_Name"),
    col("Segment"),
    col("City"),
    col("State"),
    col("Country"),
    col("Region")
)

In [0]:
df_final = df_filter_old.union(df_filter_new)



# SCD Type-1 (UPSERT)

In [0]:


# Check if the target table exists
if spark.catalog.tableExists('ecom_catalog.gold.dim_customer'):
    # Load the existing Delta table
    delta_tbl = DeltaTable.forPath(
        spark, 
        "abfss://gold@globalsalestorage.dfs.core.windows.net/dim_customer"
    )

    # Perform MERGE (upsert)
    (
        delta_tbl.alias("t")
        .merge(
            df_final.alias("s"),
            "t.dim_customer_key = s.dim_customer_key"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

else:
    # Initial load (table creation)
    (
        df_final.write
        .format("delta")
        .mode("overwrite")
        .option("path", "abfss://gold@globalsalestorage.dfs.core.windows.net/dim_customer")
        .saveAsTable("ecom_catalog.gold.dim_customer")
    )