### Fact - Sale - Incremental

This cell reads incremental raw data from _csv files_ coming from bronze zone, add additional computed columns to it and create a temporary Spark view.

In [1]:
# Import all sql functions
from pyspark.sql.functions import *
#read new data for three months
df= spark.read.format("csv")\
              .option("header","true")\
              .option("inferSchema","true")\
              .load("Files/wwi/incremental/fact_sale_1y_incremental")
# add additional computed columns
df = df.withColumn("Year", year(col("InvoiceDateKey")))
df = df.withColumn("Quarter", quarter(col("InvoiceDateKey")))
df = df.withColumn("Month" , month(col("InvoiceDateKey"))) 
# create new view
# run next line if you want use upsert in sql
df = df.createOrReplaceTempView("fact_sale_incremental")
# delete view
# spark.catalog.dropTempView("fact_sale_incremental")

StatementMeta(, d5a22bcd-d05a-4b1d-b3b0-0e08cf2ca57b, 3, Finished, Available, Finished)

In [2]:
%%sql
select Year,Quarter,Month ,count(*)
from fact_sale
group by Year,Quarter,Month
order by Year,Quarter,Month

StatementMeta(, d5a22bcd-d05a-4b1d-b3b0-0e08cf2ca57b, 4, Finished, Available, Finished)

<Spark SQL result set with 11 rows and 4 fields>

In [3]:
# from delta.tables import DeltaTable 
# deltaTable=DeltaTable.forName(spark,"wwi_silver.fact_sale")

# deltaTable.alias("target").merge(
#     df.alias("source"),
#       """
#     target.SaleKey = source.SaleKey AND
#     target.InvoiceDateKey = source.InvoiceDateKey AND
#     target.Year = 2000 AND
#     target.Quarter = 4
#     """
# ).whenMatchedUpdateAll()\
#  .whenNotMatchedInsertAll()\
#  .execute()

StatementMeta(, d5a22bcd-d05a-4b1d-b3b0-0e08cf2ca57b, 5, Finished, Available, Finished)

This cell leverages Delta Lake MERGE statement  to update data into a target delta lake table. With ON clause, you specify the key columns to match records from both the sides and include columns from target table to filter out only partitions necessary for this upsert. This improves the performance of your merge statements by pruning all other partitions not necessary for this merge statement.

In [4]:
%%sql
MERGE INTO wwi_silver.fact_sale target
USING fact_sale_incremental source
ON 
  source.SaleKey = target.SaleKey AND source.InvoiceDateKey = target.InvoiceDateKey -- Unique key for update identification
  AND target.Year IN (2000) AND target.Quarter IN (4) -- Partition Pruning for optimized performance
WHEN MATCHED THEN
  UPDATE SET 
   target.CityKey = source.CityKey
      , target.CustomerKey = source.CustomerKey
      , target.BillToCustomerKey = source.BillToCustomerKey
      , target.StockItemKey = source.StockItemKey
      , target.DeliveryDateKey = source.DeliveryDateKey
      , target.SalespersonKey = source.SalespersonKey
      , target.WWIInvoiceID = source.WWIInvoiceID
      , target.Description = source.Description
      , target.Package = source.Package
      , target.Quantity = source.Quantity
      , target.UnitPrice = source.UnitPrice
      , target.TaxRate = source.TaxRate
      , target.TotalExcludingTax = source.TotalExcludingTax
      , target.TaxAmount = source.TaxAmount
      , target.Profit = source.Profit
      , target.TotalIncludingTax = source.TotalIncludingTax
      , target.TotalDryItems = source.TotalDryItems
      , target.TotalChillerItems = source.TotalChillerItems
      , target.LineageKey = source.LineageKey
WHEN NOT MATCHED
  THEN INSERT (
    target.SaleKey, target.CityKey, target.CustomerKey, target.BillToCustomerKey, target.StockItemKey, target.InvoiceDateKey,
    target.DeliveryDateKey, target.SalespersonKey, target.WWIInvoiceID, target.Description, target.Package, 
    target.Quantity, target.UnitPrice, target.TaxRate, target.TotalExcludingTax, target.TaxAmount, target.Profit, 
    target.TotalIncludingTax, target.TotalDryItems, target.TotalChillerItems, target.LineageKey, 
    target.Year, target.Quarter, target.Month)
 VALUES (
   source.SaleKey, source.CityKey, source.CustomerKey, source.BillToCustomerKey, source.StockItemKey, source.InvoiceDateKey,
   source.DeliveryDateKey, source.SalespersonKey, source.WWIInvoiceID, source.Description, source.Package,
   source.Quantity, source.UnitPrice, source.TaxRate, source.TotalExcludingTax, source.TaxAmount, source.Profit,
   source.TotalIncludingTax, source.TotalDryItems, source.TotalChillerItems, source.LineageKey, 
   source.Year, source.Quarter, source.Month)

StatementMeta(, d5a22bcd-d05a-4b1d-b3b0-0e08cf2ca57b, 6, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

In [5]:
%%sql
SELECT Year, Quarter, Month, count(*)
FROM wwi_silver.fact_sale 
GROUP BY Year, Quarter, Month
ORDER BY Year, Quarter, Month;

StatementMeta(, d5a22bcd-d05a-4b1d-b3b0-0e08cf2ca57b, 7, Finished, Available, Finished)

<Spark SQL result set with 12 rows and 4 fields>

In [6]:
%%sql
DESCRIBE HISTORY wwi_silver.fact_sale

StatementMeta(, d5a22bcd-d05a-4b1d-b3b0-0e08cf2ca57b, 8, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 15 fields>

In [7]:
df_silver_fact_sale = spark.read.format("delta").option("versionAsOf", 0).load("Tables/fact_sale").filter("SaleKey = 45655215")
display(df_silver_fact_sale)
df_silver_fact_sale = spark.read.format("delta").option("versionAsOf", 1).load("Tables/fact_sale").filter("SaleKey = 45655215")
display(df_silver_fact_sale)

StatementMeta(, d5a22bcd-d05a-4b1d-b3b0-0e08cf2ca57b, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 24020150-ce85-4a3a-93fe-d3cfa91840c2)

SynapseWidget(Synapse.DataFrame, 27f11bfb-df49-4d0c-89b6-7df602965d49)