In [72]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
#https://microsoftlearning.github.io/mslearn-fabric/Instructions/Labs/03b-medallion-lakehouse.html




StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 74, Finished, Available)

### Create a lakehouse and upload data to Landing Zone layer
Now that you have a workspace, it's time to switch to the Data engineering experience in the Fabric portal and create a data lakehouse for the data you're going to analyze.

At the bottom left of the Power BI portal, select the Power BI icon and switch to the Data Engineering experience. If you do not see the data engineering experience, contact your Fabric administrator with a request to enable Fabric.

In the Synapse Data Engineering home page, create a new Lakehouse named Sales.

After a minute or so, a new empty lakehouse will be created. You need to ingest some data into the data lakehouse for analysis. There are multiple ways to do this, but in this exercise you'll simply download a text file to your local computer (or lab VM if applicable) and then upload it to your lakehouse.

Download the data file for this exercise from https://github.com/MicrosoftLearning/dp-data/blob/main/orders.zip. Extract the files and save them with their original names on your local computer (or lab VM if applicable). There should be 3 files containing sales data for 3 years: 2019.csv, 2020.csv, and 2021.csv.

Return to the web browser tab containing your lakehouse, and in the ... menu for the Files folder in the Explorer pane, select New subfolder and create a folder named bronze.

In the ... menu for the bronze folder, select Upload and Upload files, and then upload the 3 files (2019.csv, 2020.csv, and 2021.csv) from your local computer (or lab VM if applicable) to the lakehouse. Use the shift key to upload all 3 files at once.



## Landing Zone to Raw Lakehouse ETL Pipeline

In [73]:
 from pyspark.sql.types import *
    
 # Create the schema for the table
 orderSchema = StructType([
     StructField("SalesOrderNumber", StringType()),
     StructField("SalesOrderLineNumber", IntegerType()),
     StructField("OrderDate", DateType()),
     StructField("CustomerName", StringType()),
     StructField("Email", StringType()),
     StructField("Item", StringType()),
     StructField("Quantity", IntegerType()),
     StructField("UnitPrice", FloatType()),
     StructField("Tax", FloatType())
     ])
    
 # Import all files from bronze folder of lakehouse
 df = spark.read.format("csv").option("header", "true").schema(orderSchema).load("Files/LandingZone/*.csv")
    
 # Display the first 10 rows of the dataframe to preview your data
 display(df.head(10))

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 75, Finished, Available)

SynapseWidget(Synapse.DataFrame, a07b0e74-3b3e-4862-87ec-705f715ca1f9)

In [74]:
 from pyspark.sql.functions import when, lit, col, current_timestamp, input_file_name
    
 # Add columns IsFlagged, CreatedTS and ModifiedTS
 df = df.withColumn("FileName", input_file_name()) \
     .withColumn("IsFlagged", when(col("OrderDate") < '2019-08-01',True).otherwise(False)) \
     .withColumn("CreatedTS", current_timestamp()).withColumn("ModifiedTS", current_timestamp())
    
 # Update CustomerName to "Unknown" if CustomerName null or empty
 df = df.withColumn("CustomerName", when((col("CustomerName").isNull() | (col("CustomerName")=="")),lit("Unknown")).otherwise(col("CustomerName")))

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 76, Finished, Available)

In [75]:
#ou’ll define the schema for the sales_silver table in the sales database using Delta Lake format. C
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType, BooleanType
from delta.tables import DeltaTable

DeltaTable.createIfNotExists(spark)\
    .tableName("deltalakehouse_bronze.sales_bronze")\
    .addColumn("SalesOrderNumber", StringType())\
    .addColumn("SalesOrderLineNumber", IntegerType())\
    .addColumn("OrderDate", DateType())\
    .addColumn("CustomerName", StringType())\
    .addColumn("Email", StringType())\
    .addColumn("Item", StringType())\
    .addColumn("Quantity", IntegerType())\
    .addColumn("UnitPrice", FloatType())\
    .addColumn("Tax", FloatType())\
    .addColumn("FileName", StringType())\
    .addColumn("IsFlagged", BooleanType())\
    .addColumn("CreatedTS", DateType())\
    .addColumn("ModifiedTS", DateType())\
    .execute()


#optional way in sparksql
#spark.sql("""
#    CREATE TABLE IF NOT EXISTS deltalakehouse_bronze.sales_bronze (
#        SalesOrderNumber STRING,
#        SalesOrderLineNumber INT,
#        OrderDate DATE,
#        CustomerName STRING,
#        Email STRING,
#        Item STRING,
#        Quantity INT,
#        UnitPrice FLOAT,
#        Tax FLOAT,
#        FileName STRING,
#        IsFlagged BOOLEAN,
#        CreatedTS DATE,
#        ModifiedTS DATE
#    )
#    USING delta;
#""")



StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 77, Finished, Available)

<delta.tables.DeltaTable at 0x7f3d25a86f50>

In [76]:
# Update existing records and insert new ones based on a condition defined by the columns SalesOrderNumber, OrderDate, CustomerName, and Item.
#perform an upsert operation on a Delta table, updating existing records based on specific conditions and inserting new records when no match is found.

from delta.tables import *
    
deltaTable = DeltaTable.forPath(spark, 'Tables/sales_bronze')
    
dfUpdates = df
    
deltaTable.alias('bronze') \
  .merge(
    dfUpdates.alias('updates'),
    'bronze.SalesOrderNumber = updates.SalesOrderNumber and bronze.OrderDate = updates.OrderDate and bronze.CustomerName = updates.CustomerName and bronze.Item = updates.Item'
  ) \
   .whenMatchedUpdate(set =
    {
          
    }
  ) \
 .whenNotMatchedInsert(values =
    {
      "SalesOrderNumber": "updates.SalesOrderNumber",
      "SalesOrderLineNumber": "updates.SalesOrderLineNumber",
      "OrderDate": "updates.OrderDate",
      "CustomerName": "updates.CustomerName",
      "Email": "updates.Email",
      "Item": "updates.Item",
      "Quantity": "updates.Quantity",
      "UnitPrice": "updates.UnitPrice",
      "Tax": "updates.Tax",
      "FileName": "updates.FileName",
      "IsFlagged": "updates.IsFlagged",
      "CreatedTS": "updates.CreatedTS",
      "ModifiedTS": "updates.ModifiedTS"
    }
  ) \
  .execute()

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 78, Finished, Available)

## create the bronze to silver pipeline

In [77]:
 # Load data to the dataframe as a starting point to create the silver layer
 df = spark.read.table("deltalakehouse_bronze.sales_bronze")

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 79, Finished, Available)

In [78]:
from pyspark.sql.types import *
from delta.tables import *

# Define the schema for the dimdate_silver table
DeltaTable.createIfNotExists(spark) \
    .tableName("deltalakehouse_silver.dimdate_silver") \
    .addColumn("OrderDate", DateType()) \
    .addColumn("Day", IntegerType()) \
    .addColumn("Month", IntegerType()) \
    .addColumn("Year", IntegerType()) \
    .addColumn("mmmyyyy", StringType()) \
    .addColumn("yyyymm", StringType()) \
    .execute()

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 80, Finished, Available)

<delta.tables.DeltaTable at 0x7f3d25a809a0>

In [79]:
# Import necessary functions from PySpark SQL
from pyspark.sql.functions import col, dayofmonth, month, year, date_format

# Create a new DataFrame 'dfdimDate_silver' from the original DataFrame 'df'.
# Drop duplicate rows based on the "OrderDate" column.
# Select and transform various date components such as day, month, and year.
# The data is also ordered by "OrderDate".
dfdimDate_silver = df.dropDuplicates(["OrderDate"]).select(
        col("OrderDate"),  # Keep the original "OrderDate" column
        dayofmonth("OrderDate").alias("Day"),  # Extract the day from "OrderDate"
        month("OrderDate").alias("Month"),  # Extract the month from "OrderDate"
        year("OrderDate").alias("Year"),  # Extract the year from "OrderDate"
        date_format(col("OrderDate"), "MMM-yyyy").alias("mmmyyyy"),  # Format the date as "MMM-yyyy"
        date_format(col("OrderDate"), "yyyyMM").alias("yyyymm")  # Format the date as "yyyyMM"
    ).orderBy("OrderDate")  # Order the DataFrame based on "OrderDate"

# Display the first 10 rows of the DataFrame 'dfdimDate_silver' to preview the data
display(dfdimDate_silver.head(10))


StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 81, Finished, Available)

SynapseWidget(Synapse.DataFrame, 388fbfe4-b369-408e-a1cd-2335e16d163e)

In [80]:
# update the date dimension as new data comes in:

## This code performs an upsert operation on a Delta table. 
##It updates existing records and inserts new ones based on the 'OrderDate'.

from delta.tables import *

# Initialize a DeltaTable object for the table located at the specified path 'Tables/dimdate_silver'
#deltaTable = DeltaTable.forPath(spark, 'Tables/dimdate_silver')

# Initialize a DeltaTable object for the table with the specified name 'deltalakehouse_silver.dimdate_silver'
deltaTable = DeltaTable.forName(spark, "deltalakehouse_silver.dimdate_silver")

# Store the DataFrame that contains the data for updating the Delta table into dfUpdates
dfUpdates = dfdimDate_silver

# Alias the existing Delta table as 'silver'
deltaTable.alias('silver') \
  .merge(dfUpdates.alias('updates'), 'silver.OrderDate = updates.OrderDate') \
  .whenMatchedUpdate(set = {}) \
  .whenNotMatchedInsert(values = {
      "OrderDate": "updates.OrderDate",
      "Day": "updates.Day",
      "Month": "updates.Month",
      "Year": "updates.Year",
      "mmmyyyy": "updates.mmmyyyy",
      "yyyymm": "updates.yyyymm"
  }) \
  .execute()

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 82, Finished, Available)

In [81]:
from pyspark.sql.types import *
from delta.tables import *

# Create customer_silver dimension delta table
DeltaTable.createIfNotExists(spark) \
    .tableName("deltalakehouse_silver.dimcustomer_silver") \
    .addColumn("CustomerName", StringType()) \
    .addColumn("Email",  StringType()) \
    .addColumn("First", StringType()) \
    .addColumn("Last", StringType()) \
    .addColumn("CustomerID", LongType()) \
    .execute()

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 83, Finished, Available)

<delta.tables.DeltaTable at 0x7f3d25a87850>

In [82]:
from pyspark.sql.functions import col, split

# Create customer_silver dataframe

dfdimCustomer_silver = df.dropDuplicates(["CustomerName","Email"]).select(col("CustomerName"),col("Email")) \
    .withColumn("First",split(col("CustomerName"), " ").getItem(0)) \
    .withColumn("Last",split(col("CustomerName"), " ").getItem(1)) 

# Display the first 10 rows of the dataframe to preview your data

display(dfdimDate_silver.head(10))

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 84, Finished, Available)

SynapseWidget(Synapse.DataFrame, b81493e1-7fe5-46d2-944b-ffe3356b32b1)

In [83]:
#we'll **create the ID column for our customers**

from pyspark.sql.functions import monotonically_increasing_id, col, when, coalesce, max, lit

dfdimCustomer_temp = spark.read.table("deltalakehouse_silver.dimCustomer_silver")

MAXCustomerID = dfdimCustomer_temp.select(coalesce(max(col("CustomerID")),lit(0)).alias("MAXCustomerID")).first()[0]

dfdimCustomer_silver = dfdimCustomer_silver.join(dfdimCustomer_temp,(dfdimCustomer_silver.CustomerName == dfdimCustomer_temp.CustomerName) & (dfdimCustomer_silver.Email == dfdimCustomer_temp.Email), "left_anti")

dfdimCustomer_silver = dfdimCustomer_silver.withColumn("CustomerID",monotonically_increasing_id() + MAXCustomerID + 1)

# Display the first 10 rows of the dataframe to preview your data

display(dfdimDate_silver.head(10))

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 85, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2fe7cd8e-a63e-4243-a50d-493bda5acbcd)

In [84]:
#Now you'll ensure that your customer table remains up-to-date as new data comes in
from delta.tables import *

# Initialize a DeltaTable object for the table with the specified name 'deltalakehouse_silver.dimdate_silver'
deltaTable = DeltaTable.forName(spark, "deltalakehouse_silver.dimcustomer_silver")

dfUpdates = dfdimCustomer_silver

deltaTable.alias('bronze') \
    .merge(
    dfUpdates.alias('updates'),
    'bronze.CustomerName = updates.CustomerName AND bronze.Email = updates.Email'
    ) \
    .whenMatchedUpdate(set =
    {
        
    }
    ) \
    .whenNotMatchedInsert(values =
    {
        "CustomerName": "updates.CustomerName",
        "Email": "updates.Email",
        "First": "updates.First",
        "Last": "updates.Last",
        "CustomerID": "updates.CustomerID"
    }
    ) \
    .execute()

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 86, Finished, Available)

In [85]:
    #create your product dimension
    from pyspark.sql.types import *
    from delta.tables import *
    
    DeltaTable.createIfNotExists(spark) \
        .tableName("deltalakehouse_silver.dimproduct_silver") \
        .addColumn("ItemName", StringType()) \
        .addColumn("ItemID", LongType()) \
        .addColumn("ItemInfo", StringType()) \
        .execute()

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 87, Finished, Available)

<delta.tables.DeltaTable at 0x7f3d25a87130>

In [86]:
#create the **customer_silver** dataframe. You'll use this later on the Sales join.
from pyspark.sql.functions import col, split, lit

# Create Customer_silver dataframe, this dataframe will be used later on on the Sales join

dfdimProduct_silver = df.dropDuplicates(["Item"]).select(col("Item")) \
    .withColumn("ItemName",split(col("Item"), ", ").getItem(0)) \
    .withColumn("ItemInfo",when((split(col("Item"), ", ").getItem(1).isNull() | (split(col("Item"), ", ").getItem(1)=="")),lit("")).otherwise(split(col("Item"), ", ").getItem(1))) 

# Display the first 10 rows of the dataframe to preview your data

display(dfdimProduct_silver.head(10))

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 88, Finished, Available)

SynapseWidget(Synapse.DataFrame, ce6ec640-d427-41e3-a69b-e40862a1ac36)

In [87]:
#The code aims to generate unique IDs for a Spark DataFrame named dfdimProduct_silver. The IDs are based on the highest existing "ItemID" in a temporary DataFrame, dfdimProduct_temp. The code uses a left anti join to identify any new items in dfdimProduct_silver that don't exist in dfdimProduct_temp, and then assigns new IDs to these items.
#create IDs for your **dimProduct_silver table**
from pyspark.sql.functions import monotonically_increasing_id, col, lit, max, coalesce

print("Step 1: Reading the existing dimProduct_silver table into a DataFrame.")
dfdimProduct_temp = spark.read.table("deltalakehouse_silver.dimProduct_silver")

print("Step 2: Calculating the maximum ItemID.")
MAXProductID = dfdimProduct_temp.select(max(col("ItemID")).alias("MAXItemID")).first()[0]
if MAXProductID is None:
    MAXProductID = 0  # Default to 0 if no max ID is found
print(f"Max Product ID is {MAXProductID}")

print("Step 3: Performing a left anti join to identify new records.")
dfdimProduct_silver = dfdimProduct_silver.join(dfdimProduct_temp, (dfdimProduct_silver.ItemName == dfdimProduct_temp.ItemName) & (dfdimProduct_silver.ItemInfo == dfdimProduct_temp.ItemInfo), "left_anti")

print("Step 4: Adding ItemID to new records.")
dfdimProduct_silver = dfdimProduct_silver.withColumn("ItemID", monotonically_increasing_id() + MAXProductID + 1)

print("Step 5: Displaying the DataFrame.")
dfdimProduct_silver.show(10)



StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 89, Finished, Available)

Step 1: Reading the existing dimProduct_silver table into a DataFrame.
Step 2: Calculating the maximum ItemID.
Max Product ID is 130
Step 3: Performing a left anti join to identify new records.
Step 4: Adding ItemID to new records.
Step 5: Displaying the DataFrame.
+----+--------+--------+------+
|Item|ItemName|ItemInfo|ItemID|
+----+--------+--------+------+
+----+--------+--------+------+



In [88]:
#Similar to what you've done with your other dimensions, you need to ensure that your product table remains up-to-date as new data comes in.
#      This calculates the next available product ID based on the current data in the table, assigns these new IDs to the products, and then displays the updated product information (if the display command is uncommented)
from delta.tables import DeltaTable

print("Step 6: Initializing the DeltaTable object.")
deltaTable = DeltaTable.forName(spark, "deltalakehouse_silver.dimproduct_silver")

print("Step 7: Executing the merge operation.")
deltaTable.alias("existing") \
    .merge(
        dfdimProduct_silver.alias("new"),
        "existing.ItemName = new.ItemName AND existing.ItemInfo = new.ItemInfo"
    ) \
    .whenMatchedUpdate(set={}) \
    .whenNotMatchedInsert(values={
        "ItemName": "new.ItemName",
        "ItemInfo": "new.ItemInfo",
        "ItemID": "new.ItemID"
    }) \
    .execute()

print("Merge operation complete.")


StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 90, Finished, Available)

Step 6: Initializing the DeltaTable object.
Step 7: Executing the merge operation.
Merge operation complete.


In [89]:
#Now that you have your dimensions built out, the final step is to create the fact table.

from pyspark.sql.types import *
from delta.tables import *

DeltaTable.createIfNotExists(spark) \
    .tableName("deltalakehouse_silver.factsales_silver") \
    .addColumn("CustomerID", LongType()) \
    .addColumn("ItemID", LongType()) \
    .addColumn("OrderDate", DateType()) \
    .addColumn("Quantity", IntegerType()) \
    .addColumn("UnitPrice", FloatType()) \
    .addColumn("Tax", FloatType()) \
    .execute()

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 91, Finished, Available)

<delta.tables.DeltaTable at 0x7f3d25a84940>

In [90]:
#create a **new dataframe** to combine sales data with customer and product information include customer ID, item ID, order date, quantity, unit price, and tax:
from pyspark.sql.functions import col

dfdimCustomer_temp = spark.read.table("deltalakehouse_silver.dimCustomer_silver")
dfdimProduct_temp = spark.read.table("deltalakehouse_silver.dimProduct_silver")

df = df.withColumn("ItemName",split(col("Item"), ", ").getItem(0)) \
    .withColumn("ItemInfo",when((split(col("Item"), ", ").getItem(1).isNull() | (split(col("Item"), ", ").getItem(1)=="")),lit("")).otherwise(split(col("Item"), ", ").getItem(1))) \


# Create Sales_silver dataframe

dffactSales_silver = df.alias("df1").join(dfdimCustomer_temp.alias("df2"),(df.CustomerName == dfdimCustomer_temp.CustomerName) & (df.Email == dfdimCustomer_temp.Email), "left") \
        .join(dfdimProduct_temp.alias("df3"),(df.ItemName == dfdimProduct_temp.ItemName) & (df.ItemInfo == dfdimProduct_temp.ItemInfo), "left") \
    .select(col("df2.CustomerID") \
        , col("df3.ItemID") \
        , col("df1.OrderDate") \
        , col("df1.Quantity") \
        , col("df1.UnitPrice") \
        , col("df1.Tax") \
    ).orderBy(col("df1.OrderDate"), col("df2.CustomerID"), col("df3.ItemID"))

# Display the first 10 rows of the dataframe to preview your data

display(dffactSales_silver.head(10))

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 92, Finished, Available)

SynapseWidget(Synapse.DataFrame, c9ea7511-6785-404a-80cc-34f98addd222)

In [91]:
#Now you'll ensure that sales data remains up-to-date

#Here you're using Delta Lake's merge operation to synchronize and update the factsales_silver table with new sales data (dffactSales_silver). The operation compares the order date, customer ID, and item ID between the existing data (bronze table) and the new data (updates DataFrame), updating matching records and inserting new records as needed.


from delta.tables import *

deltaTable = DeltaTable.forName(spark, "deltalakehouse_silver.factsales_silver")

dfUpdates = dffactSales_silver

deltaTable.alias('bronze') \
    .merge(
    dfUpdates.alias('updates'),
    'bronze.OrderDate = updates.OrderDate AND bronze.CustomerID = updates.CustomerID AND bronze.ItemID = updates.ItemID'
    ) \
    .whenMatchedUpdate(set =
    {
        
    }
    ) \
    .whenNotMatchedInsert(values =
    {
        "CustomerID": "updates.CustomerID",
        "ItemID": "updates.ItemID",
        "OrderDate": "updates.OrderDate",
        "Quantity": "updates.Quantity",
        "UnitPrice": "updates.UnitPrice",
        "Tax": "updates.Tax"
    }
    ) \
    .execute()

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 93, Finished, Available)

In [92]:


df = spark.sql("SELECT * FROM deltalakehouse_silver.dimcustomer_silver LIMIT 1000")
display(df)



StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 94, Finished, Available)

SynapseWidget(Synapse.DataFrame, 8edd73c3-8f6e-49f2-8497-c572da0a2b81)

In [93]:
df = spark.sql("SELECT * FROM deltalakehouse_silver.dimdate_silver LIMIT 1000")
display(df)

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 95, Finished, Available)

SynapseWidget(Synapse.DataFrame, 485f58aa-f131-4ede-95bf-f06e3885725a)

In [94]:
df = spark.sql("SELECT * FROM deltalakehouse_silver.dimproduct_silver LIMIT 1000")
display(df)

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 96, Finished, Available)

SynapseWidget(Synapse.DataFrame, 7d211410-ec4c-4295-862e-c59c4dfc8c89)

In [95]:
df = spark.sql("SELECT * FROM deltalakehouse_silver.factsales_silver LIMIT 1000")
display(df)

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 97, Finished, Available)

SynapseWidget(Synapse.DataFrame, 32bd8614-db85-463f-9fb6-53c66f0d716b)

## Create Silver to Gold Pipeline

In [96]:
#Below is the code for loading the data from the Silver layer to the Gold layer. I've assumed that the Gold table is named "deltalakehouse_gold.dimdate_gold". The code below will read from the Silver layer and upsert into the Gold layer using the Delta Lake framework.

# Import necessary modules
from pyspark.sql.types import *
from pyspark.sql.functions import col, dayofmonth, month, year, date_format
from delta.tables import *

# Load data from Silver layer
dfdimDate_silver = spark.read.table("deltalakehouse_silver.dimdate_silver")

# Create or Initialize a DeltaTable object for the Gold layer table
# It will create the table if it does not exist
DeltaTable.createIfNotExists(spark) \
    .tableName("deltalakehouse_gold.dimdate_gold") \
    .addColumn("OrderDate", DateType()) \
    .addColumn("Day", IntegerType()) \
    .addColumn("Month", IntegerType()) \
    .addColumn("Year", IntegerType()) \
    .addColumn("mmmyyyy", StringType()) \
    .addColumn("yyyymm", StringType()) \
    .execute()

# Initialize a DeltaTable object for the Gold layer table
deltaTableGold = DeltaTable.forName(spark, "deltalakehouse_gold.dimdate_gold")

# Alias the existing Gold Delta table as 'gold'
# Perform merge operation to upsert data from Silver to Gold
deltaTableGold.alias('gold') \
  .merge(dfdimDate_silver.alias('silver'), 'gold.OrderDate = silver.OrderDate') \
  .whenMatchedUpdate(set = {}) \
  .whenNotMatchedInsert(values = {
      "OrderDate": "silver.OrderDate",
      "Day": "silver.Day",
      "Month": "silver.Month",
      "Year": "silver.Year",
      "mmmyyyy": "silver.mmmyyyy",
      "yyyymm": "silver.yyyymm"
  }) \
  .execute()


StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 98, Finished, Available)

In [97]:
df = spark.sql("SELECT * FROM deltalakehouse_gold.dimdate_gold LIMIT 1000")
display(df)

StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 99, Finished, Available)

SynapseWidget(Synapse.DataFrame, 3073f895-111f-4363-a7ad-072de470a2ba)

In [98]:
# transformation from Silver to Gold while performing a direct mapping

from pyspark.sql.types import *
from pyspark.sql.functions import col, split, monotonically_increasing_id, coalesce, max, lit
from delta.tables import *

# Create customer_gold dimension delta table in Gold layer
DeltaTable.createIfNotExists(spark) \
    .tableName("deltalakehouse_gold.dimcustomer_gold") \
    .addColumn("CustomerName", StringType()) \
    .addColumn("Email",  StringType()) \
    .addColumn("First", StringType()) \
    .addColumn("Last", StringType()) \
    .addColumn("CustomerID", LongType()) \
    .execute()

# Load data from Silver layer to a DataFrame
dfdimCustomer_silver = spark.read.table("deltalakehouse_silver.dimcustomer_silver")

# Initialize a DeltaTable object for the Gold layer table
deltaTableGold = DeltaTable.forName(spark, "deltalakehouse_gold.dimcustomer_gold")

# Perform merge (upsert) from Silver to Gold
deltaTableGold.alias('gold') \
    .merge(
    dfdimCustomer_silver.alias('silver'),
    'gold.CustomerName = silver.CustomerName AND gold.Email = silver.Email'
    ) \
    .whenMatchedUpdate(set = {}) \
    .whenNotMatchedInsert(values = {
        "CustomerName": "silver.CustomerName",
        "Email": "silver.Email",
        "First": "silver.First",
        "Last": "silver.Last",
        "CustomerID": "silver.CustomerID"
    }) \
    .execute()


StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 100, Finished, Available)

In [99]:
#Below is the code that performs a Silver to Gold transformation for the product dimension with direct mapping. This code assumes that you have a Gold layer table named deltalakehouse_gold.dimproduct_gold.

from pyspark.sql.types import *
from pyspark.sql.functions import col, split, monotonically_increasing_id, coalesce, max, lit, when
from delta.tables import *

# Create product_gold dimension delta table in the Gold layer
DeltaTable.createIfNotExists(spark) \
    .tableName("deltalakehouse_gold.dimproduct_gold") \
    .addColumn("ItemName", StringType()) \
    .addColumn("ItemID", LongType()) \
    .addColumn("ItemInfo", StringType()) \
    .execute()

# Load data from the Silver layer to a DataFrame
dfdimProduct_silver = spark.read.table("deltalakehouse_silver.dimproduct_silver")

# Initialize a DeltaTable object for the Gold layer table
deltaTableGold = DeltaTable.forName(spark, "deltalakehouse_gold.dimproduct_gold")

# Perform merge (upsert) operation from Silver to Gold
deltaTableGold.alias('gold') \
    .merge(
    dfdimProduct_silver.alias('silver'),
    'gold.ItemName = silver.ItemName AND gold.ItemInfo = silver.ItemInfo'
    ) \
    .whenMatchedUpdate(set = {}) \
    .whenNotMatchedInsert(values = {
        "ItemName": "silver.ItemName",
        "ItemID": "silver.ItemID",
        "ItemInfo": "silver.ItemInfo"
    }) \
    .execute()


StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 101, Finished, Available)

In [100]:
#Below is the code that performs a Silver to Gold transformation for the fact sales table with direct mapping. This code assumes that you have a Gold layer table named deltalakehouse_gold.factsales_gold.

from pyspark.sql.types import *
from pyspark.sql.functions import col, split, when, lit
from delta.tables import *

# Create factsales_gold fact delta table in the Gold layer
DeltaTable.createIfNotExists(spark) \
    .tableName("deltalakehouse_gold.factsales_gold") \
    .addColumn("CustomerID", LongType()) \
    .addColumn("ItemID", LongType()) \
    .addColumn("OrderDate", DateType()) \
    .addColumn("Quantity", IntegerType()) \
    .addColumn("UnitPrice", FloatType()) \
    .addColumn("Tax", FloatType()) \
    .execute()

# Load data from the Silver layer to a DataFrame
dffactSales_silver = spark.read.table("deltalakehouse_silver.factsales_silver")

# Initialize a DeltaTable object for the Gold layer table
deltaTableGold = DeltaTable.forName(spark, "deltalakehouse_gold.factsales_gold")

# Perform merge (upsert) operation from Silver to Gold
deltaTableGold.alias('gold') \
    .merge(
    dffactSales_silver.alias('silver'),
    'gold.OrderDate = silver.OrderDate AND gold.CustomerID = silver.CustomerID AND gold.ItemID = silver.ItemID'
    ) \
    .whenMatchedUpdate(set = {}) \
    .whenNotMatchedInsert(values = {
        "CustomerID": "silver.CustomerID",
        "ItemID": "silver.ItemID",
        "OrderDate": "silver.OrderDate",
        "Quantity": "silver.Quantity",
        "UnitPrice": "silver.UnitPrice",
        "Tax": "silver.Tax"
    }) \
    .execute()


StatementMeta(, 95dc5ac9-e4a4-41ad-b059-2adfbace481d, 102, Finished, Available)