In [7]:
from pyspark.sql.functions import col, current_timestamp, trim, when
from delta.tables import DeltaTable

# Paths to Product Bronze CSVs

bronze_products_path = "Files/Bronze/Products"
bronze_subcat_path = "Files/Bronze/Product_Subcategories"
bronze_cat_path = "Files/Bronze/Product_Categories"

# Read raw CSVs

df_products = spark.read.option("header", True).csv(bronze_products_path)
df_subcat  = spark.read.option("header", True).csv(bronze_subcat_path)
df_cat     = spark.read.option("header", True).csv(bronze_cat_path)

# Clean DataFrames

df_products_clean = (
    df_products
    .withColumn("ProductKey", col("ProductKey").cast("int"))
    .withColumn("ProductSubcategoryKey", col("ProductSubcategoryKey").cast("int"))
    .withColumn(
        "ProductSKU",
        when(trim(col("ProductSKU")) == "", None).otherwise(trim(col("ProductSKU")))
    )
    .withColumn(
        "ProductName",
        when(trim(col("ProductName")) == "", None).otherwise(trim(col("ProductName")))
    )
    .withColumn(
        "ModelName",
        when(trim(col("ModelName")) == "", None).otherwise(trim(col("ModelName")))
    )
    .withColumn(
        "ProductDescription",
        when(trim(col("ProductDescription")) == "", None).otherwise(trim(col("ProductDescription")))
    )
    .dropDuplicates(["ProductKey"])
)

df_subcat_clean = (
    df_subcat
    .withColumn("ProductSubcategoryKey", col("ProductSubcategoryKey").cast("int"))
    .withColumn("ProductCategoryKey", col("ProductCategoryKey").cast("int"))
    .withColumn(
        "SubcategoryName",
        when(trim(col("SubcategoryName")) == "", None).otherwise(trim(col("SubcategoryName")))
    )
    .dropDuplicates(["ProductSubcategoryKey"])
)

df_cat_clean = (
    df_cat
    .withColumn("ProductCategoryKey", col("ProductCategoryKey").cast("int"))
    .withColumn(
        "CategoryName",
        when(trim(col("CategoryName")) == "", None).otherwise(trim(col("CategoryName")))
    )
    .dropDuplicates(["ProductCategoryKey"])
)

# Join DataFrames
df_joined = (
    df_products_clean
    .join(df_subcat_clean, "ProductSubcategoryKey", "left")
    .join(df_cat_clean, "ProductCategoryKey", "left")
)

# Rename columns
df_renamed = (
    df_joined
    .withColumnRenamed("ProductCategoryKey", "CategoryKey")
    .withColumnRenamed("ProductSubcategoryKey", "SubcategoryKey")
)

#Enforce schema and add created/modified timestamps

df_final = (
    df_renamed
    .select(
        col("ProductKey").cast("int"),
        col("ProductSKU").cast("string"),
        col("ProductName").cast("string"),
        col("ModelName").cast("string"),
        col("ProductDescription").cast("string"),
        col("CategoryKey").cast("int"),
        col("CategoryName").cast("string"),
        col("SubcategoryKey").cast("int"),
        col("SubcategoryName").cast("string")
    )
    .withColumn("CreatedTS", current_timestamp())
    .withColumn("ModifiedTS", current_timestamp())
)

# UPSERT into Product Delta Table

delta_table_path = "Tables/Silver/products"

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.alias("silver").merge(
    df_final.alias("updates"),
    "silver.ProductKey = updates.ProductKey"
).whenMatchedUpdate(
    set={
        "ProductSKU": "updates.ProductSKU",
        "ProductName": "updates.ProductName",
        "ModelName": "updates.ModelName",
        "ProductDescription": "updates.ProductDescription",
        "CategoryKey": "updates.CategoryKey",
        "CategoryName": "updates.CategoryName",
        "SubcategoryKey": "updates.SubcategoryKey",
        "SubcategoryName": "updates.SubcategoryName",
        "ModifiedTS": "updates.ModifiedTS"
    }
).whenNotMatchedInsert(
    values={
        "ProductKey": "updates.ProductKey",
        "ProductSKU": "updates.ProductSKU",
        "ProductName": "updates.ProductName",
        "ModelName": "updates.ModelName",
        "ProductDescription": "updates.ProductDescription",
        "CategoryKey": "updates.CategoryKey",
        "CategoryName": "updates.CategoryName",
        "SubcategoryKey": "updates.SubcategoryKey",
        "SubcategoryName": "updates.SubcategoryName",
        "CreatedTS": "updates.CreatedTS",
        "ModifiedTS": "updates.ModifiedTS"
    }
).execute()


StatementMeta(, ac5c9a0a-ee4c-42f3-a7c6-6c3532c797ff, 9, Finished, Available, Finished)

In [14]:
from pyspark.sql.functions import col, trim, when, to_date, concat_ws, regexp_replace, initcap
from delta.tables import DeltaTable

# Read data from Bronze customers

bronze_customers_path = "Files/Bronze/Customers/"

df_customers_raw = (
    spark.read
    .option("header", True)
    .csv(bronze_customers_path)
)

# Clean and enforce schema

df_customers_clean = (
    df_customers_raw
    .withColumn("CustomerKey", col("CustomerKey").cast("int"))
    .withColumn("Prefix", when(trim(col("Prefix")) == "", None).otherwise(trim(col("Prefix"))))
    .withColumn(
        "FirstName",
        when(trim(col("FirstName")) == "", None)
        .otherwise(initcap(trim(col("FirstName")))))
    .withColumn(
        "LastName",
        when(trim(col("LastName")) == "", None)
        .otherwise(initcap(trim(col("LastName")))))
    .withColumn("BirthDate", col("BirthDate"))
    .withColumn("BirthDate", to_date(col("BirthDate"), "M/d/yyyy"))
    .withColumn("MaritalStatus", when(trim(col("MaritalStatus")) == "", None).otherwise(trim(col("MaritalStatus"))))
    .withColumn("Gender", when(trim(col("Gender")) == "", None).otherwise(trim(col("Gender"))))
    .withColumn("EmailAddress", when(trim(col("EmailAddress")) == "", None).otherwise(trim(col("EmailAddress"))))
    .withColumn(
        "AnnualIncome",
        regexp_replace(
            regexp_replace(col("AnnualIncome"), "\\$", ""),
            ",",
            ""
        ).cast("int"))
    .withColumn("TotalChildren", col("TotalChildren").cast("int"))
    .withColumn("EducationLevel", when(trim(col("EducationLevel")) == "", None).otherwise(trim(col("EducationLevel"))))
    .withColumn("Occupation", when(trim(col("Occupation")) == "", None).otherwise(trim(col("Occupation"))))
    .withColumn("HomeOwner", when(trim(col("HomeOwner")) == "", None).otherwise(trim(col("HomeOwner"))))
    .withColumnRenamed("EmailAddress", "Email")
    .withColumn(
        "FullName",
        concat_ws(" ", col("FirstName"), col("LastName")))
    .dropDuplicates(["CustomerKey"])
)


df_customers_final = (
    df_customers_clean
    .select(
        col("CustomerKey").cast("int"),
        col("Prefix").cast("string"),
        col("FirstName").cast("string"),
        col("LastName").cast("string"),
        col("FullName").cast("string"),
        col("Gender").cast("string"),
        col("MaritalStatus").cast("string"),
        col("BirthDate").cast("date"),
        col("Email").cast("string"),
        col("AnnualIncome").cast("int"),
        col("TotalChildren").cast("int"),
        col("EducationLevel").cast("string"),
        col("Occupation").cast("string"),
        col("HomeOwner").cast("string")
    )
    .withColumn("CreatedTS", current_timestamp())
    .withColumn("ModifiedTS", current_timestamp())
)

# UPSERT into CUstomers Delta Table

delta_table_path = "Tables/Silver/customers"

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.alias("silver").merge(
    df_customers_final.alias("updates"),
    "silver.CustomerKey = updates.CustomerKey"
).whenMatchedUpdate(
    set={
        "Prefix": "updates.Prefix",
        "FirstName": "updates.FirstName",
        "LastName": "updates.LastName",
        "FullName": "updates.FullName",
        "Gender": "updates.Gender",
        "MaritalStatus": "updates.MaritalStatus",
        "BirthDate": "updates.BirthDate",
        "Email": "updates.Email",
        "AnnualIncome": "updates.AnnualIncome",
        "TotalChildren": "updates.TotalChildren",
        "EducationLevel": "updates.EducationLevel",
        "Occupation": "updates.Occupation",
        "HomeOwner": "updates.HomeOwner",
        "ModifiedTS": "updates.ModifiedTS"
    }
).whenNotMatchedInsert(
    values={
        "CustomerKey": "updates.CustomerKey",
        "Prefix": "updates.Prefix",
        "FirstName": "updates.FirstName",
        "LastName": "updates.LastName",
        "FullName": "updates.FullName",
        "Gender": "updates.Gender",
        "MaritalStatus": "updates.MaritalStatus",
        "BirthDate": "updates.BirthDate",
        "Email": "updates.Email",
        "AnnualIncome": "updates.AnnualIncome",
        "TotalChildren": "updates.TotalChildren",
        "EducationLevel": "updates.EducationLevel",
        "Occupation": "updates.Occupation",
        "HomeOwner": "updates.HomeOwner",
        "CreatedTS": "updates.CreatedTS",
        "ModifiedTS": "updates.ModifiedTS"
    }
).execute()

StatementMeta(, ac5c9a0a-ee4c-42f3-a7c6-6c3532c797ff, 16, Finished, Available, Finished)

In [16]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable

bronze_path = "Files/Bronze/Territories"

df_territories_raw = (
    spark.read
    .option("header", "true")
    .csv(bronze_path)
)

df_territories_clean = (
    df_territories_raw

    .withColumn("SalesTerritoryKey", col("SalesTerritoryKey").cast("int"))

    .withColumn(
        "Region",
        when(trim(col("Region")) == "", None)
        .otherwise(initcap(trim(col("Region"))))
    )
    .withColumn(
        "Country",
        when(trim(col("Country")) == "", None)
        .otherwise(initcap(trim(col("Country"))))
    )
    .withColumn(
        "Continent",
        when(trim(col("Continent")) == "", None)
        .otherwise(initcap(trim(col("Continent"))))
    )
)

df_territories_silver = (
    df_territories_clean
    .withColumnRenamed("SalesTerritoryKey", "TerritoryKey")
)

df_territories_silver = df_territories_silver.select(
    "TerritoryKey",
    "Continent",
    "Country",
    "Region"
)

delta_table_path = "Tables/Silver/territories"

delta_table = DeltaTable.forPath(spark, delta_table_path)

(
    delta_table.alias("silver")
    .merge(
        df_territories_silver.alias("updates"),
        "silver.TerritoryKey = updates.TerritoryKey"
    )
    .whenMatchedUpdate(set={
        "Continent": "updates.Continent",
        "Country": "updates.Country",
        "Region": "updates.Region",
        "ModifiedTS": "current_timestamp()"
    })
    .whenNotMatchedInsert(values={
        "TerritoryKey": "updates.TerritoryKey",
        "Continent": "updates.Continent",
        "Country": "updates.Country",
        "Region": "updates.Region",
        "CreatedTS": "current_timestamp()",
        "ModifiedTS": "current_timestamp()"
    })
    .execute()
)


StatementMeta(, ac5c9a0a-ee4c-42f3-a7c6-6c3532c797ff, 18, Finished, Available, Finished)

In [19]:
from pyspark.sql.functions import to_date, col, year, month, dayofmonth, dayofweek, weekofyear, quarter, dayofyear
from delta.tables import DeltaTable

bronze_calendar_path = "Files/Bronze/Calendar/"

df_calendar_raw = (
    spark.read
    .option("header", True)
    .csv(bronze_calendar_path)
)

df_calendar = df_calendar_raw.withColumn(
    "Date", to_date(col("Date"), "M/d/yyyy")
)


df_calendar_enriched = df_calendar.withColumn("Year", year(col("Date"))) \
    .withColumn("Month", month(col("Date"))) \
    .withColumn("Day", dayofmonth(col("Date"))) \
    .withColumn("WeekOfYear", weekofyear(col("Date"))) \
    .withColumn("Quarter", quarter(col("Date"))) \
    .withColumn("DayOfWeek", dayofweek(col("Date"))) \
    .withColumn("DayOfYear", dayofyear(col("Date"))) \
    .withColumn("IsWeekend", (col("DayOfWeek").isin([1,7])).cast("boolean")) \
    .withColumn("CreatedTS", current_timestamp()) \
    .withColumn("ModifiedTS", current_timestamp())

df_calendar_final = df_calendar_enriched.select(
    "Date",
    "Year",
    "Quarter",
    "Month",
    "WeekOfYear",
    "Day",
    "DayOfWeek",
    "DayOfYear",
    "IsWeekend",
    "CreatedTS",
    "ModifiedTS"
)

delta_table = DeltaTable.forPath(spark, "Tables/Silver/calendar")

delta_table.alias("silver").merge(
    df_calendar_final.alias("updates"),
    "silver.Date = updates.Date"
).whenMatchedUpdate(set={
    "Year": "updates.Year",
    "Quarter": "updates.Quarter",
    "Month": "updates.Month",
    "WeekOfYear": "updates.WeekOfYear",
    "Day": "updates.Day",
    "DayOfWeek": "updates.DayOfWeek",
    "DayOfYear": "updates.DayOfYear",
    "IsWeekend": "updates.IsWeekend",
    "ModifiedTS": "current_timestamp()"
}).whenNotMatchedInsert(values={
    "Date": "updates.Date",
    "Year": "updates.Year",
    "Quarter": "updates.Quarter",
    "Month": "updates.Month",
    "WeekOfYear": "updates.WeekOfYear",
    "Day": "updates.Day",
    "DayOfWeek": "updates.DayOfWeek",
    "DayOfYear": "updates.DayOfYear",
    "IsWeekend": "updates.IsWeekend",
    "CreatedTS": "current_timestamp()",
    "ModifiedTS": "current_timestamp()"
}).execute()


StatementMeta(, ac5c9a0a-ee4c-42f3-a7c6-6c3532c797ff, 21, Finished, Available, Finished)

In [27]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

bronze_path = "Files/Bronze/Sales"

sales_raw_schema = '''
    OrderDate STRING,
    StockDate STRING,
    OrderNumber STRING NOT NULL,
    ProductKey INT NOT NULL,
    CustomerKey INT NOT NULL,
    TerritoryKey INT,
    OrderLineItem INT NOT NULL,
    OrderQuantity INT
'''

df_sales_bronze =(
    spark.read
    .format("csv")
    .option("header", True)
    .schema(sales_raw_schema)
    .load(bronze_path)
)

df_sales_clean = (
    df_sales_bronze
    .dropna(subset=["OrderNumber", "OrderLineItem"])
    .withColumn("OrderDate", to_date(col("OrderDate"), "M/d/yyyy"))
    .withColumn("StockDate", to_date(col("StockDate"), "M/d/yyyy"))
    .withColumn("OrderQuantity", when(col("OrderQuantity") < 0, None).otherwise(col("OrderQuantity")))
    .withColumnRenamed("OrderNumber", "SalesOrderNumber") \
    .withColumnRenamed("OrderLineItem", "SalesOrderLineNumber") \
    .withColumnRenamed("OrderQuantity", "Quantity") \
    .withColumnRenamed("ProductKey", "ProductID") \
    .withColumnRenamed("CustomerKey", "CustomerID") \
    .withColumnRenamed("TerritoryKey", "TerritoryID")
    .withColumn("ProductID", col("ProductID").cast(IntegerType())) \
    .withColumn("CustomerID", col("CustomerID").cast(IntegerType())) \
    .withColumn("TerritoryID", col("TerritoryID").cast(IntegerType())) \
    .withColumn("SalesOrderLineNumber", col("SalesOrderLineNumber").cast(IntegerType())) \
    .withColumn("Quantity", col("Quantity").cast(IntegerType()))
)


silver_column_order = [
    "SalesOrderNumber",
    "SalesOrderLineNumber",
    "OrderDate",
    "StockDate",
    "CustomerID",
    "ProductID",
    "TerritoryID",
    "Quantity"
]

df_sales_final = df_sales_clean.select(*silver_column_order)


StatementMeta(, 3fc31f86-3547-4bd4-8cbb-b7d3ebb8b1e2, 29, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5a13576f-afc3-4a63-8149-ea603d4ae3d0)

In [30]:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, col

# Delta Table path

delta_table_path = "Tables/Silver/sales"

# Add metadata columns

df_to_upsert = df_sales_final.withColumn("CreatedTS", current_timestamp()) \
                       .withColumn("ModifiedTS", current_timestamp())

# Reference the Delta table

deltaTable = DeltaTable.forPath(spark, delta_table_path)


# Perform UPSERT (MERGE)

deltaTable.alias("silver")\
    .merge(
        df_to_upsert.alias("updates"),
        """
        silver.SalesOrderNumber = updates.SalesOrderNumber
        AND silver.SalesOrderLineNumber = updates.SalesOrderLineNumber
        """
    ).whenMatchedUpdate(
        set = {
            "OrderDate": "updates.OrderDate",
            "StockDate": "updates.StockDate",
            "CustomerID": "updates.CustomerID",
            "ProductID": "updates.ProductID",
            "TerritoryID": "updates.TerritoryID",
            "Quantity": "updates.Quantity",
            "ModifiedTS": "updates.ModifiedTS"
        }
    ).whenNotMatchedInsert(
        values = {
            "SalesOrderNumber": "updates.SalesOrderNumber",
            "SalesOrderLineNumber": "updates.SalesOrderLineNumber",
            "OrderDate": "updates.OrderDate",
            "StockDate": "updates.StockDate",
            "CustomerID": "updates.CustomerID",
            "ProductID": "updates.ProductID",
            "TerritoryID": "updates.TerritoryID",
            "Quantity": "updates.Quantity",
            "CreatedTS": "updates.CreatedTS",
            "ModifiedTS": "updates.ModifiedTS"
        }
).execute()

StatementMeta(, 3fc31f86-3547-4bd4-8cbb-b7d3ebb8b1e2, 32, Finished, Available, Finished)

In [21]:
from pyspark.sql.functions import col, to_date, current_timestamp
from delta.tables import DeltaTable

bronze_returns_path = "Files/Bronze/Returns/"

df_returns_raw = (
    spark.read
    .option("header", True)
    .csv(bronze_returns_path)
)

df_returns_clean = (
    df_returns_raw
    .withColumn("ReturnDate", to_date(col("ReturnDate"), "M/d/yyyy"))
    .withColumn("TerritoryKey", col("TerritoryKey").cast("int"))
    .withColumn("ProductKey", col("ProductKey").cast("int"))
    .withColumn("ReturnQuantity", col("ReturnQuantity").cast("int"))
)

df_returns_final = df_returns_clean.select(
    "ReturnDate",
    "TerritoryKey",
    "ProductKey",
    "ReturnQuantity"
).withColumn("CreatedTS", current_timestamp()) \
 .withColumn("ModifiedTS", current_timestamp())

delta_table_path = "Tables/Silver/returns"

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.alias("silver").merge(
    df_returns_final.alias("updates"),
    "silver.ReturnDate = updates.ReturnDate AND silver.TerritoryKey = updates.TerritoryKey AND silver.ProductKey = updates.ProductKey"
).whenMatchedUpdate(set={
    "ReturnQuantity": "updates.ReturnQuantity",
    "ModifiedTS": "current_timestamp()"
}).whenNotMatchedInsert(values={
    "ReturnDate": "updates.ReturnDate",
    "TerritoryKey": "updates.TerritoryKey",
    "ProductKey": "updates.ProductKey",
    "ReturnQuantity": "updates.ReturnQuantity",
    "CreatedTS": "current_timestamp()",
    "ModifiedTS": "current_timestamp()"
}).execute()


StatementMeta(, ac5c9a0a-ee4c-42f3-a7c6-6c3532c797ff, 23, Finished, Available, Finished)