**Step 2: Daily Delta Table Merge**

This is a notebook sample for the daily Delta Table merge operation.

In [0]:
# Mount the Azure Blob Storage as needed.
# dbutils.fs.mount(
#   source = "wasbs://data@easonblobstorage.blob.core.windows.net",
#   mount_point = "/mnt/easonblobstorage",
#   extra_configs = {"fs.azure.account.key.easonblobstorage.blob.core.windows.net":dbutils.secrets.get(scope = "eason_scope_1", key = "easonblobstorage01_key")})

In [0]:
# Unmount the Azure Blob Storage as needed.
# dbutils.fs.unmount("/mnt/easonblobstorage")

In [0]:
# List based folder of data.
display(dbutils.fs.ls("/mnt/easonblobstorage/online_retail"))

path,name,size,modificationTime
dbfs:/mnt/easonblobstorage/online_retail/bronze_data/,bronze_data/,0,0
dbfs:/mnt/easonblobstorage/online_retail/silver_data/,silver_data/,0,1650424756000


In [0]:
# Import the datetime library for date tracking.
from datetime import datetime

In [0]:
# Get date of today and save in specific format.
now = datetime.now()
#date_string = now.strftime("%d%m%Y")
date_string = "02122010" # Set date specifically for demonstration purposes.
print(date_string)

In [0]:
# Set the daily data path.
daily_data_folder_path = "/mnt/easonblobstorage/online_retail/bronze_data/" + date_string
daily_data_file_path = "/mnt/easonblobstorage/online_retail/bronze_data/" + date_string + "/*.csv"

In [0]:
# List the folders and files in bronze_data folder.
display(dbutils.fs.ls(daily_data_folder_path))

path,name,size,modificationTime
dbfs:/mnt/easonblobstorage/online_retail/bronze_data/02122010/02122010.csv,02122010.csv,4741249,1650423315000


In [0]:
ONLINE_RETAIL_DAILY = spark.read.csv(daily_data_file_path, header=True)

In [0]:
# Count the number of rows in the Daily Spark dataframe.
ONLINE_RETAIL_DAILY.count()

In [0]:
# Count the number of columns in the Daily Spark dataframe.
len(ONLINE_RETAIL_DAILY.columns)

In [0]:
# Display the Daily Spark dataframe.
display(ONLINE_RETAIL_DAILY)

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536598,21421,PORCELAIN ROSE LARGE,12,2/12/2010 7:48,1.25,13090.0,United Kingdom
536598,21422,PORCELAIN ROSE SMALL,16,2/12/2010 7:48,0.85,13090.0,United Kingdom
536598,22178,VICTORIAN GLASS HANGING T-LIGHT,24,2/12/2010 7:48,1.25,13090.0,United Kingdom
536598,22617,BAKING SET SPACEBOY DESIGN,24,2/12/2010 7:48,4.25,13090.0,United Kingdom
536599,22968,ROSE COTTAGE KEEPSAKE BOX,8,2/12/2010 7:49,8.5,15694.0,United Kingdom
536599,22727,ALARM CLOCK BAKELIKE RED,4,2/12/2010 7:49,3.75,15694.0,United Kingdom
536599,22726,ALARM CLOCK BAKELIKE GREEN,12,2/12/2010 7:49,3.75,15694.0,United Kingdom
536599,84945,MULTI COLOUR SILVER T-LIGHT HOLDER,36,2/12/2010 7:49,0.85,15694.0,United Kingdom
536599,20749,ASSORTED COLOUR MINI CASES,12,2/12/2010 7:49,6.35,15694.0,United Kingdom
536599,21056,DOCTOR'S BAG SOFT TOY,8,2/12/2010 7:49,8.95,15694.0,United Kingdom


In [0]:
# Import necessary library for Delta Table operations.
from delta.tables import *

In [0]:
# Define the Delta Table path.
ONLINE_RETAIL_MASTER_SILVER_DELTA_TABLE_LOAD = DeltaTable.forPath(spark, "/mnt/easonblobstorage/online_retail/silver_data/online_retail_silver/")

In [0]:
# Perform Delta Table Merge with Delta Table and Daily Spark dataframe.
ONLINE_RETAIL_MASTER_SILVER_DELTA_TABLE_LOAD.alias("ONLINE_RETAIL_MASTER_SILVER_DELTA_TABLE_LOAD").merge(
    ONLINE_RETAIL_DAILY.alias("ONLINE_RETAIL_DAILY"),
    "ONLINE_RETAIL_MASTER_SILVER_DELTA_TABLE_LOAD.InvoiceNo = ONLINE_RETAIL_DAILY.InvoiceNo")\
  .whenMatchedUpdate(set = {
        "InvoiceNo" : "ONLINE_RETAIL_DAILY.InvoiceNo",
        "StockCode": "ONLINE_RETAIL_DAILY.StockCode",
        "Description": "ONLINE_RETAIL_DAILY.Description",
        "Quantity": "ONLINE_RETAIL_DAILY.Quantity",
        "InvoiceDate": "ONLINE_RETAIL_DAILY.InvoiceDate",
        "UnitPrice": "ONLINE_RETAIL_DAILY.UnitPrice",
        "CustomerID": "ONLINE_RETAIL_DAILY.CustomerID",
        "Country": "ONLINE_RETAIL_DAILY.Country"
        }) \
  .whenNotMatchedInsert(values = {
        "InvoiceNo" : "ONLINE_RETAIL_DAILY.InvoiceNo",
        "StockCode": "ONLINE_RETAIL_DAILY.StockCode",
        "Description": "ONLINE_RETAIL_DAILY.Description",
        "Quantity": "ONLINE_RETAIL_DAILY.Quantity",
        "InvoiceDate": "ONLINE_RETAIL_DAILY.InvoiceDate",
        "UnitPrice": "ONLINE_RETAIL_DAILY.UnitPrice",
        "CustomerID": "ONLINE_RETAIL_DAILY.CustomerID",
        "Country": "ONLINE_RETAIL_DAILY.Country"
        }) \
  .execute()

In [0]:
# Load Delta Table (Merged) as Spark dataframe.
ONLINE_RETAIL_MASTER_SILVER_DELTA_TABLE = spark.read.format("delta").load("/mnt/easonblobstorage/online_retail/silver_data/online_retail_silver/")
display(ONLINE_RETAIL_MASTER_SILVER_DELTA_TABLE)

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,1/12/2010 8:26,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,1/12/2010 8:26,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,1/12/2010 8:26,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,1/12/2010 8:26,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,1/12/2010 8:26,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,1/12/2010 8:26,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,1/12/2010 8:26,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,1/12/2010 8:28,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,1/12/2010 8:28,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,1/12/2010 8:34,1.69,13047.0,United Kingdom


In [0]:
# Count the number of rows in the Delta Table (Merged) dataframe.
ONLINE_RETAIL_MASTER_SILVER_DELTA_TABLE.count()

In [0]:
# Count the number of columns in the Delta Table.
len(ONLINE_RETAIL_MASTER_SILVER_DELTA_TABLE.columns)