In [41]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
from pyspark.sql.window import Window


In [42]:
source_schema = 'silver_sales'

target_schema = 'gold_sales'

surrogate_Key = 'DimResellerKey'

key_col = 'CustomerID'

In [43]:
df_src = spark.sql(f"""
SELECT 
	c.CustomerID, c.PersonID, c.StoreID, c.AccountNumber,
	sto.`Name` as StoreName, sto.SalesPersonID,
	a.AddressLine1, a.City, a.PostalCode, 
	sp.`Name` as StateProvinceName, sp.StateProvinceCode, 
	cr.`Name` as CountryName, st.CountryRegionCode as CountryCode,
	st.`Name` as Territory, st.`Group`,
	adt.`Name` as AddressType,
	p.PersonType, p.Title, p.FirstName, p.MiddleName, p.LastName,
	pp.PhoneNumber,
	pnt.`Name` as PhoneNumberType,
	CURRENT_TIMESTAMP() AS StartDate,
    CAST('3000-12-31 00:00:00' AS TIMESTAMP) AS EndDate,
    'Y' AS IsCurrent
FROM silver_sales.reseller c
LEFT JOIN silver_sales.BusinessEntityAddress bea
ON c.StoreID = bea.BusinessEntityID AND bea.AddressTypeID = 3
LEFT JOIN silver_sales.Address a
ON bea.AddressID = a.AddressID
LEFT JOIN silver_sales.StateProvince sp
ON a.StateProvinceID = sp.StateProvinceID
LEFT JOIN silver_sales.SalesTerritory st
ON sp.TerritoryID = st.TerritoryID
LEFT JOIN silver_sales.CountryRegion cr
ON st.CountryRegionCode = cr.CountryRegionCode
LEFT JOIN silver_sales.AddressType adt
ON bea.AddressTypeID = adt.AddressTypeID
LEFT JOIN silver_sales.Person p
ON c.PersonID = p.BusinessEntityID
LEFT JOIN silver_sales.PersonPhone pp
ON p.BusinessEntityID = pp.BusinessEntityID
LEFT JOIN silver_sales.PhoneNumberType pnt
ON pp.PhoneNumberTypeID = pnt.PhoneNumberTypeID AND pnt.`Name` = 'Work'
LEFT JOIN silver_sales.Store sto
ON c.StoreID = sto.BusinessEntityID
WHERE StoreID IS NOT NULL
""")

In [44]:
if spark.catalog.tableExists(F"{target_schema}.DimReseller"):
    df_trg = spark.sql(f"""
                    SELECT {key_col}, {surrogate_Key}, CreatedAt, UpdatedAt
                    FROM {target_schema}.DimReseller
                    """)

else:
    df_trg = spark.sql(f"""
                    SELECT 
                    CAST('0' AS INT) AS {key_col}, 
                    CAST('0' AS INT) AS {surrogate_Key}, 
                    CAST('1900-01-01 00:00:00' AS TIMESTAMP) AS CreatedAt,
                    CAST('1900-01-01 00:00:00' AS TIMESTAMP) AS UpdatedAt
                    WHERE 1=0
                    """)

In [45]:
df_src.createOrReplaceTempView("src")
df_trg.createOrReplaceTempView("trg")

df_join = spark.sql(f"""
                SELECT 
                src.*,
                trg.{surrogate_Key},
                trg.CreatedAt,
                trg.UpdatedAt
                FROM
                src LEFT JOIN trg
                ON src.{key_col} = trg.{key_col}
            """)

In [46]:
df_old = df_join.filter(col(f"{surrogate_Key}").isNotNull())

df_new = df_join.filter(col(f"{surrogate_Key}").isNull())

In [47]:
df_old_enr = df_old.withColumn("UpdatedAt", current_timestamp())

In [48]:
if spark.catalog.tableExists(F"{target_schema}.DimReseller"):
    max_surrogate_key = spark.sql(f"""
                        SELECT MAX({surrogate_Key}) from {target_schema}.DimReseller
                        """).collect()[0][0]
    w = Window.orderBy(monotonically_increasing_id())

    df_new_enr = df_new.withColumn(f"{surrogate_Key}", row_number().over(w) + lit(max_surrogate_key))\
                        .withColumn("CreatedAt", current_timestamp())\
                        .withColumn("UpdatedAt", current_timestamp())

else:
    max_surrogate_key = 0
    w = Window.orderBy(monotonically_increasing_id())

    df_new_enr = df_new.withColumn(f"{surrogate_Key}", row_number().over(w) + lit(max_surrogate_key))\
                        .withColumn("CreatedAt", current_timestamp())\
                        .withColumn("UpdatedAt", current_timestamp())

max_surrogate_key

In [49]:
df_union = df_old_enr.unionByName(df_new_enr)

df_union.createOrReplaceTempView("df_final")

In [50]:
if spark.catalog.tableExists(f"{target_schema}.DimReseller"):
    spark.sql(F"""
            MERGE INTO {target_schema}.DimReseller AS trg
            USING df_final AS src
            ON trg.{surrogate_Key} = src.{surrogate_Key}
            AND trg.IsCurrent = 'Y'

            WHEN MATCHED AND (
                trg.PersonID <> src.PersonID OR
                trg.StoreID <> src.StoreID OR
                trg.AccountNumber <> src.AccountNumber OR
                trg.StoreName <> src.StoreName OR
                trg.SalesPersonID <> src.SalesPersonID OR
                trg.AddressLine1 <> src.AddressLine1 OR
                trg.City <> src.City OR
                trg.PostalCode <> src.PostalCode OR
                trg.StateProvinceName <> src.StateProvinceName OR
                trg.StateProvinceCode <> src.StateProvinceCode OR
                trg.CountryName <> src.CountryName OR
                trg.CountryCode <> src.CountryCode OR
                trg.Territory <> src.Territory OR
                trg.Group <> src.Group OR
                trg.FirstName <> src.FirstName OR
                trg.MiddleName <> src.MiddleName OR
                trg.LastName <> src.LastName OR
                trg.PhoneNumber <> src.PhoneNumber OR
                trg.PhoneNumberType <> src.PhoneNumberType
            )
            THEN UPDATE SET
                trg.IsCurrent = 'N',
                trg.EndDate = Current_Timestamp();
    """)

    spark.sql(f"""
            MERGE INTO {target_schema}.DimReseller AS trg
            USING df_final AS src
            ON trg.{surrogate_Key} = src.{surrogate_Key}
            AND trg.IsCurrent = 'Y'
            WHEN NOT MATCHED THEN INSERT *

    """)

else:
    df_union.write.format("delta")\
                    .mode("append")\
                    .option("path", "abfss://gold@dlcontoso.dfs.core.windows.net/sales/DimReseller")\
                    .saveAsTable(f"{target_schema}.DimReseller")