In [2]:
# Load data to the dataframe as a starting point to create the gold layer
df = spark.read.table("Sales.dbo.sales_silver")

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 4, Finished, Available, Finished)

In [4]:
from pyspark.sql.types import *
from delta.tables import*

# Define the schema for the dimdate_gold table
DeltaTable.createIfNotExists(spark) \
   .tableName("sales.dbo.dimdate_gold") \
   .addColumn("OrderDate", DateType()) \
   .addColumn("Day", IntegerType()) \
   .addColumn("Month", IntegerType()) \
   .addColumn("Year", IntegerType()) \
   .addColumn("mmmyyyy", StringType()) \
   .addColumn("yyyymm", StringType()) \
   .execute()

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 6, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x7232c383f350>

In [5]:
from pyspark.sql.functions import col, dayofmonth, month, year, date_format

# Create dataframe for dimDate_gold

dfdimDate_gold = df.dropDuplicates(["OrderDate"]).select(col("OrderDate"), \
       dayofmonth("OrderDate").alias("Day"), \
       month("OrderDate").alias("Month"), \
       year("OrderDate").alias("Year"), \
       date_format(col("OrderDate"), "MMM-yyyy").alias("mmmyyyy"), \
       date_format(col("OrderDate"), "yyyyMM").alias("yyyymm"), \
   ).orderBy("OrderDate")

# Display the first 10 rows of the dataframe to preview your data

display(dfdimDate_gold.head(10))

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d80d43e5-8bf0-45e5-850c-b8af6ba0f61d)

In [7]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, 'Tables/dbo/dimdate_gold')

dfUpdates = dfdimDate_gold

deltaTable.alias('gold') \
 .merge(
   dfUpdates.alias('updates'),
   'gold.OrderDate = updates.OrderDate'
 ) \
  .whenMatchedUpdate(set =
   {

   }
 ) \
.whenNotMatchedInsert(values =
   {
     "OrderDate": "updates.OrderDate",
     "Day": "updates.Day",
     "Month": "updates.Month",
     "Year": "updates.Year",
     "mmmyyyy": "updates.mmmyyyy",
     "yyyymm": "updates.yyyymm"
   }
 ) \
 .execute()

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 9, Finished, Available, Finished)

In [9]:
from pyspark.sql.types import *
from delta.tables import *

# Create customer_gold dimension delta table
DeltaTable.createIfNotExists(spark) \
   .tableName("sales.dbo.dimcustomer_gold") \
   .addColumn("CustomerName", StringType()) \
   .addColumn("Email",  StringType()) \
   .addColumn("First", StringType()) \
   .addColumn("Last", StringType()) \
   .addColumn("CustomerID", LongType()) \
   .execute()

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 11, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x7232c19536d0>

In [10]:
from pyspark.sql.functions import col, split

# Create customer_silver dataframe

dfdimCustomer_silver = df.dropDuplicates(["CustomerName","Email"]).select(col("CustomerName"),col("Email")) \
   .withColumn("First",split(col("CustomerName"), " ").getItem(0)) \
   .withColumn("Last",split(col("CustomerName"), " ").getItem(1)) 

# Display the first 10 rows of the dataframe to preview your data

display(dfdimCustomer_silver.head(10))

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0f347490-b071-453c-8618-e401af9c6f8a)

In [11]:
from pyspark.sql.functions import monotonically_increasing_id, col, when, coalesce, max, lit

dfdimCustomer_temp = spark.read.table("Sales.dbo.dimCustomer_gold")

MAXCustomerID = dfdimCustomer_temp.select(coalesce(max(col("CustomerID")),lit(0)).alias("MAXCustomerID")).first()[0]

dfdimCustomer_gold = dfdimCustomer_silver.join(dfdimCustomer_temp,(dfdimCustomer_silver.CustomerName == dfdimCustomer_temp.CustomerName) & (dfdimCustomer_silver.Email == dfdimCustomer_temp.Email), "left_anti")

dfdimCustomer_gold = dfdimCustomer_gold.withColumn("CustomerID",monotonically_increasing_id() + MAXCustomerID + 1)

# Display the first 10 rows of the dataframe to preview your data

display(dfdimCustomer_gold.head(10))

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d3d8d24b-57ef-497c-9a0c-51cfc89fa130)

In [13]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, 'Tables/dbo/dimcustomer_gold')

dfUpdates = dfdimCustomer_gold

deltaTable.alias('gold') \
 .merge(
   dfUpdates.alias('updates'),
   'gold.CustomerName = updates.CustomerName AND gold.Email = updates.Email'
 ) \
  .whenMatchedUpdate(set =
   {

   }
 ) \
.whenNotMatchedInsert(values =
   {
     "CustomerName": "updates.CustomerName",
     "Email": "updates.Email",
     "First": "updates.First",
     "Last": "updates.Last",
     "CustomerID": "updates.CustomerID"
   }
 ) \
 .execute()

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 15, Finished, Available, Finished)

In [15]:
from pyspark.sql.types import *
from delta.tables import *

DeltaTable.createIfNotExists(spark) \
   .tableName("sales.dbo.dimproduct_gold") \
   .addColumn("ItemName", StringType()) \
   .addColumn("ItemID", LongType()) \
   .addColumn("ItemInfo", StringType()) \
   .execute()

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 17, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x7232c198fa50>

In [16]:
from pyspark.sql.functions import col, split, lit, when

# Create product_silver dataframe

dfdimProduct_silver = df.dropDuplicates(["Item"]).select(col("Item")) \
   .withColumn("ItemName",split(col("Item"), ", ").getItem(0)) \
   .withColumn("ItemInfo",when((split(col("Item"), ", ").getItem(1).isNull() | (split(col("Item"), ", ").getItem(1)=="")),lit("")).otherwise(split(col("Item"), ", ").getItem(1))) 

# Display the first 10 rows of the dataframe to preview your data

display(dfdimProduct_silver.head(10))

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 262e6c44-8418-47c9-ae7b-40d90f8dfc37)

In [17]:
from pyspark.sql.functions import monotonically_increasing_id, col, lit, max, coalesce

#dfdimProduct_temp = dfdimProduct_silver
dfdimProduct_temp = spark.read.table("Sales.dbo.dimProduct_gold")

MAXProductID = dfdimProduct_temp.select(coalesce(max(col("ItemID")),lit(0)).alias("MAXItemID")).first()[0]

dfdimProduct_gold = dfdimProduct_silver.join(dfdimProduct_temp,(dfdimProduct_silver.ItemName == dfdimProduct_temp.ItemName) & (dfdimProduct_silver.ItemInfo == dfdimProduct_temp.ItemInfo), "left_anti")

dfdimProduct_gold = dfdimProduct_gold.withColumn("ItemID",monotonically_increasing_id() + MAXProductID + 1)

# Display the first 10 rows of the dataframe to preview your data

display(dfdimProduct_gold.head(10))

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0a9a70ad-b0f7-480e-ae4d-030f2ce136bd)

In [18]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, 'Tables/dbo/dimproduct_gold')

dfUpdates = dfdimProduct_gold

deltaTable.alias('gold') \
 .merge(
       dfUpdates.alias('updates'),
       'gold.ItemName = updates.ItemName AND gold.ItemInfo = updates.ItemInfo'
       ) \
       .whenMatchedUpdate(set =
       {

       }
       ) \
       .whenNotMatchedInsert(values =
        {
         "ItemName": "updates.ItemName",
         "ItemInfo": "updates.ItemInfo",
         "ItemID": "updates.ItemID"
         }
         ) \
         .execute()

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 20, Finished, Available, Finished)

In [19]:
from pyspark.sql.types import *
from delta.tables import *

DeltaTable.createIfNotExists(spark) \
   .tableName("sales.dbo.factsales_gold") \
   .addColumn("CustomerID", LongType()) \
   .addColumn("ItemID", LongType()) \
   .addColumn("OrderDate", DateType()) \
   .addColumn("Quantity", IntegerType()) \
   .addColumn("UnitPrice", FloatType()) \
   .addColumn("Tax", FloatType()) \
   .execute()

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 21, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x7232c182ba10>

In [20]:
from pyspark.sql.functions import col

dfdimCustomer_temp = spark.read.table("Sales.dbo.dimCustomer_gold")
dfdimProduct_temp = spark.read.table("Sales.dbo.dimProduct_gold")

df = df.withColumn("ItemName",split(col("Item"), ", ").getItem(0)) \
   .withColumn("ItemInfo",when((split(col("Item"), ", ").getItem(1).isNull() | (split(col("Item"), ", ").getItem(1)=="")),lit("")).otherwise(split(col("Item"), ", ").getItem(1))) \


# Create Sales_gold dataframe

dffactSales_gold = df.alias("df1").join(dfdimCustomer_temp.alias("df2"),(df.CustomerName == dfdimCustomer_temp.CustomerName) & (df.Email == dfdimCustomer_temp.Email), "left") \
       .join(dfdimProduct_temp.alias("df3"),(df.ItemName == dfdimProduct_temp.ItemName) & (df.ItemInfo == dfdimProduct_temp.ItemInfo), "left") \
   .select(col("df2.CustomerID") \
       , col("df3.ItemID") \
       , col("df1.OrderDate") \
       , col("df1.Quantity") \
       , col("df1.UnitPrice") \
       , col("df1.Tax") \
   ).orderBy(col("df1.OrderDate"), col("df2.CustomerID"), col("df3.ItemID"))

# Display the first 10 rows of the dataframe to preview your data

display(dffactSales_gold.head(10))

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 22, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, da9954dc-710f-40d6-a8e6-c03da50f8101)

In [21]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, 'Tables/dbo/factsales_gold')

dfUpdates = dffactSales_gold

deltaTable.alias('gold') \
 .merge(
   dfUpdates.alias('updates'),
   'gold.OrderDate = updates.OrderDate AND gold.CustomerID = updates.CustomerID AND gold.ItemID = updates.ItemID'
 ) \
  .whenMatchedUpdate(set =
   {

   }
 ) \
.whenNotMatchedInsert(values =
   {
     "CustomerID": "updates.CustomerID",
     "ItemID": "updates.ItemID",
     "OrderDate": "updates.OrderDate",
     "Quantity": "updates.Quantity",
     "UnitPrice": "updates.UnitPrice",
     "Tax": "updates.Tax"
   }
 ) \
 .execute()

StatementMeta(, 95f7f4ad-0cbf-46da-a620-ee6ccb06a1d8, 23, Finished, Available, Finished)