Create Master Delta Table

In [0]:
# Mount the Azure Blob Storage.
dbutils.fs.mount(
  source = "wasbs://data@easonblobstorage.blob.core.windows.net",
  mount_point = "/mnt/easonblobstorage",
  extra_configs = {"fs.azure.account.key.easonblobstorage.blob.core.windows.net":dbutils.secrets.get(scope = "eason_scope_1", key = "easonblobstorage01_key")})

In [0]:
# Unmount the Azure Blob Storage as needed.
# dbutils.fs.unmount("/mnt/easonblobstorage")

In [0]:
# List based folder of data.
display(dbutils.fs.ls("/mnt/easonblobstorage/telco_customer_churn"))

path,name,size,modificationTime
dbfs:/mnt/easonblobstorage/telco_customer_churn/bronze_data/,bronze_data/,0,0
dbfs:/mnt/easonblobstorage/telco_customer_churn/silver_data/,silver_data/,0,0


In [0]:
# Read first 100 records of data as Master Table in Spark dataframe.
master_path = "/mnt/easonblobstorage/telco_customer_churn/bronze_data/WA_Fn-UseC_-Telco-Customer-Churn_R2_100.csv"
TELCO_CUSTOMER_CHURN_MASTER = spark.read.csv(master_path, header=True)

In [0]:
# Count the number of rows in the Master Table.
TELCO_CUSTOMER_CHURN_MASTER.count()

In [0]:
# Count the number of columns in the Master Table.
len(TELCO_CUSTOMER_CHURN_MASTER.columns)

In [0]:
# Display the Master Table.
display(TELCO_CUSTOMER_CHURN_MASTER)

customerID,gender,SeniorCitizen,Partner,Dependents,Tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
7590-VHVEG,0,0,1,0,1,0,0,1,0,1,0,0,0,0,0,1,2,29.85,29.85,0
5575-GNVDE,1,0,0,0,34,1,0,1,1,0,1,0,0,0,1,0,3,56.95,1889.5,0
3668-QPYBK,1,0,0,0,2,1,0,1,1,1,0,0,0,0,0,1,3,53.85,108.15,1
7795-CFOCW,1,0,0,0,45,0,0,1,1,0,1,1,0,0,1,0,0,42.3,1840.75,0
9237-HQITU,0,0,0,0,2,1,0,2,0,0,0,0,0,0,0,1,2,70.7,151.65,1
9305-CDSKC,0,0,0,0,8,1,1,2,0,0,1,0,1,1,0,1,2,99.65,820.5,1
1452-KIOVK,1,0,0,1,22,1,1,2,0,1,0,0,1,0,0,1,1,89.1,1949.4,0
6713-OKOMC,0,0,0,0,10,0,0,1,1,0,0,0,0,0,0,0,3,29.75,301.9,0
7892-POOKP,0,0,1,0,28,1,1,2,0,0,1,1,1,1,0,1,2,104.8,3046.05,1
6388-TABGU,1,0,0,1,62,1,0,1,1,1,0,0,0,0,1,0,0,56.15,3487.95,0


In [0]:
# Create a new sub-folder in silver_data folder for Delta Table saving.
# dbutils.fs.mkdirs("/mnt/easonblobstorage/telco_customer_churn/silver_data/telco_customer_churn_silver")

In [0]:
# Save the Master Table dataframe as Delta Table.
TELCO_CUSTOMER_CHURN_MASTER.write.format("delta").save("/mnt/easonblobstorage/telco_customer_churn/silver_data/telco_customer_churn_silver/")
# Create a SQL Table from Delta.
spark.sql("CREATE TABLE TELCO_CUSTOMER_CHURN_SILVER_DELTA USING DELTA LOCATION '/mnt/easonblobstorage/telco_customer_churn/silver_data/telco_customer_churn_silver/'")

In [0]:
# List the folder of the Delta Table.
display(dbutils.fs.ls('/mnt/easonblobstorage/telco_customer_churn/silver_data/telco_customer_churn_silver/'))

path,name,size,modificationTime
dbfs:/mnt/easonblobstorage/telco_customer_churn/silver_data/telco_customer_churn_silver/_delta_log/,_delta_log/,0,1649933238000
dbfs:/mnt/easonblobstorage/telco_customer_churn/silver_data/telco_customer_churn_silver/part-00000-55a84a02-fde4-4c29-aea6-aadc6c142ff0-c000.snappy.parquet,part-00000-55a84a02-fde4-4c29-aea6-aadc6c142ff0-c000.snappy.parquet,9173,1649933231000


In [0]:
# Try to run simple SQL Query over Delta.
display(spark.sql("SELECT COUNT(*) FROM TELCO_CUSTOMER_CHURN_SILVER_DELTA"))

count(1)
100


In [0]:
# Drop Delta based SQL Table as needed.
# spark.sql("DROP TABLE TELCO_CUSTOMER_CHURN_SILVER_DELTA")

In [0]:
# Remove all Delta Table folders and files as needed.
# dbutils.fs.rm('/mnt/easonblobstorage/telco_customer_churn/silver_data/telco_customer_churn_silver/', True)

In [0]:
# Load Delta Table as Spark dataframe.
TELCO_CUSTOMER_CHURN_SILVER_DELTA_LOAD = spark.read.format("delta").load("/mnt/easonblobstorage/telco_customer_churn/silver_data/telco_customer_churn_silver/")
display(TELCO_CUSTOMER_CHURN_SILVER_DELTA_LOAD)

customerID,gender,SeniorCitizen,Partner,Dependents,Tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
7590-VHVEG,0,0,1,0,1,0,0,1,0,1,0,0,0,0,0,1,2,29.85,29.85,0
5575-GNVDE,1,0,0,0,34,1,0,1,1,0,1,0,0,0,1,0,3,56.95,1889.5,0
3668-QPYBK,1,0,0,0,2,1,0,1,1,1,0,0,0,0,0,1,3,53.85,108.15,1
7795-CFOCW,1,0,0,0,45,0,0,1,1,0,1,1,0,0,1,0,0,42.3,1840.75,0
9237-HQITU,0,0,0,0,2,1,0,2,0,0,0,0,0,0,0,1,2,70.7,151.65,1
9305-CDSKC,0,0,0,0,8,1,1,2,0,0,1,0,1,1,0,1,2,99.65,820.5,1
1452-KIOVK,1,0,0,1,22,1,1,2,0,1,0,0,1,0,0,1,1,89.1,1949.4,0
6713-OKOMC,0,0,0,0,10,0,0,1,1,0,0,0,0,0,0,0,3,29.75,301.9,0
7892-POOKP,0,0,1,0,28,1,1,2,0,0,1,1,1,1,0,1,2,104.8,3046.05,1
6388-TABGU,1,0,0,1,62,1,0,1,1,1,0,0,0,0,1,0,0,56.15,3487.95,0


In [0]:
# Count the number of rows in the Delta Table.
TELCO_CUSTOMER_CHURN_SILVER_DELTA_LOAD.count()

In [0]:
# Count the number of columns in the Delta Table.
len(TELCO_CUSTOMER_CHURN_SILVER_DELTA_LOAD.columns)

Obtain Daily Update Dataset and Perform Delta Table Merge

In [0]:
# Import the datetime library for date tracking.
from datetime import datetime

In [0]:
# Get date of today and save in specific format.
now = datetime.now()
date_string = now.strftime("%d%m%Y")
print(date_string)

In [0]:
# List the folders and files in bronze_data folder.
display(dbutils.fs.ls("/mnt/easonblobstorage/telco_customer_churn/bronze_data/"))

path,name,size,modificationTime
dbfs:/mnt/easonblobstorage/telco_customer_churn/bronze_data/14042022/,14042022/,0,0
dbfs:/mnt/easonblobstorage/telco_customer_churn/bronze_data/WA_Fn-UseC_-Telco-Customer-Churn_R2_100.csv,WA_Fn-UseC_-Telco-Customer-Churn_R2_100.csv,6395,1649932386000


In [0]:
# Define the daily folder path and read data as Daily Spark dataframe.
daily_data_path = "/mnt/easonblobstorage/telco_customer_churn/bronze_data/" + date_string + "/*.csv"
TELCO_CUSTOMER_CHURN_DAILY = spark.read.csv(daily_data_path, header=True)

In [0]:
# Count the number of rows in the Daily Spark dataframe.
TELCO_CUSTOMER_CHURN_DAILY.count()

In [0]:
# Count the number of columns in the Daily Spark dataframe.
len(TELCO_CUSTOMER_CHURN_DAILY.columns)

In [0]:
# Display the Daily Spark dataframe.
display(TELCO_CUSTOMER_CHURN_DAILY)

customerID,gender,SeniorCitizen,Partner,Dependents,Tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
6380-ARCEH,1,0,0,0,1,1,0,0,2,2,2,2,2,2,0,0,3,20.2,20.2,0
3679-XASPY,0,0,1,1,1,1,0,0,2,2,2,2,2,2,0,0,2,19.45,19.45,0
7123-WQUHX,1,0,0,0,38,1,1,2,0,0,1,1,1,0,1,0,0,95.0,3605.6,0
5386-THSLQ,0,1,1,0,66,0,0,1,0,1,1,0,1,0,1,0,0,45.55,3027.25,0
3192-NQECA,1,0,1,0,68,1,1,2,0,1,1,1,1,1,2,1,0,110.0,7611.85,1
6180-YBIQI,1,0,0,0,5,0,0,1,0,0,0,0,0,0,0,0,3,24.3,100.2,0
6728-DKUCO,0,0,1,1,72,1,1,2,1,1,0,0,1,1,1,1,2,104.15,7303.05,0
9750-BOOHV,0,0,0,0,32,0,0,1,1,0,0,0,0,0,1,0,3,30.15,927.65,0
8597-CWYHH,1,0,0,0,43,1,1,2,0,0,0,0,1,1,1,0,3,94.35,3921.3,0
2848-YXSMW,1,0,1,1,72,1,0,0,2,2,2,2,2,2,2,0,1,19.4,1363.25,0


In [0]:
# Import necessary library for Delta Table operations.
from delta.tables import *

In [0]:
# Define the Delta Table path.
TELCO_CUSTOMER_CHURN_SILVER_DELTA_LOAD = DeltaTable.forPath(spark, "/mnt/easonblobstorage/telco_customer_churn/silver_data/telco_customer_churn_silver/")

In [0]:
# Perform Delta Table Merge with Delta Table and Daily Spark dataframe.
TELCO_CUSTOMER_CHURN_SILVER_DELTA_LOAD.alias("TELCO_CUSTOMER_CHURN_SILVER_DELTA_LOAD").merge(
    TELCO_CUSTOMER_CHURN_DAILY.alias("TELCO_CUSTOMER_CHURN_DAILY"),
    "TELCO_CUSTOMER_CHURN_SILVER_DELTA_LOAD.customerID = TELCO_CUSTOMER_CHURN_DAILY.customerID") \
  .whenMatchedUpdate(set = {
        "gender" : "TELCO_CUSTOMER_CHURN_DAILY.gender",
        "SeniorCitizen": "TELCO_CUSTOMER_CHURN_DAILY.SeniorCitizen",
        "Partner": "TELCO_CUSTOMER_CHURN_DAILY.Partner",
        "Dependents": "TELCO_CUSTOMER_CHURN_DAILY.Dependents",
        "Tenure": "TELCO_CUSTOMER_CHURN_DAILY.Tenure",
        "PhoneService": "TELCO_CUSTOMER_CHURN_DAILY.PhoneService",
        "MultipleLines": "TELCO_CUSTOMER_CHURN_DAILY.MultipleLines",
        "InternetService": "TELCO_CUSTOMER_CHURN_DAILY.InternetService",
        "OnlineSecurity": "TELCO_CUSTOMER_CHURN_DAILY.OnlineSecurity",
        "OnlineBackup": "TELCO_CUSTOMER_CHURN_DAILY.OnlineBackup",
        "DeviceProtection": "TELCO_CUSTOMER_CHURN_DAILY.DeviceProtection",
        "TechSupport": "TELCO_CUSTOMER_CHURN_DAILY.TechSupport",
        "StreamingTV": "TELCO_CUSTOMER_CHURN_DAILY.StreamingTV",
        "StreamingMovies": "TELCO_CUSTOMER_CHURN_DAILY.StreamingMovies",
        "Contract": "TELCO_CUSTOMER_CHURN_DAILY.Contract",
        "PaperlessBilling": "TELCO_CUSTOMER_CHURN_DAILY.PaperlessBilling",
        "PaymentMethod": "TELCO_CUSTOMER_CHURN_DAILY.PaymentMethod",
        "MonthlyCharges": "TELCO_CUSTOMER_CHURN_DAILY.MonthlyCharges",
        "TotalCharges": "TELCO_CUSTOMER_CHURN_DAILY.TotalCharges",
        "Churn": "TELCO_CUSTOMER_CHURN_DAILY.Churn"
        }) \
  .whenNotMatchedInsert(values = {
        "gender" : "TELCO_CUSTOMER_CHURN_DAILY.gender",
        "SeniorCitizen": "TELCO_CUSTOMER_CHURN_DAILY.SeniorCitizen",
        "Partner": "TELCO_CUSTOMER_CHURN_DAILY.Partner",
        "Dependents": "TELCO_CUSTOMER_CHURN_DAILY.Dependents",
        "Tenure": "TELCO_CUSTOMER_CHURN_DAILY.Tenure",
        "PhoneService": "TELCO_CUSTOMER_CHURN_DAILY.PhoneService",
        "MultipleLines": "TELCO_CUSTOMER_CHURN_DAILY.MultipleLines",
        "InternetService": "TELCO_CUSTOMER_CHURN_DAILY.InternetService",
        "OnlineSecurity": "TELCO_CUSTOMER_CHURN_DAILY.OnlineSecurity",
        "OnlineBackup": "TELCO_CUSTOMER_CHURN_DAILY.OnlineBackup",
        "DeviceProtection": "TELCO_CUSTOMER_CHURN_DAILY.DeviceProtection",
        "TechSupport": "TELCO_CUSTOMER_CHURN_DAILY.TechSupport",
        "StreamingTV": "TELCO_CUSTOMER_CHURN_DAILY.StreamingTV",
        "StreamingMovies": "TELCO_CUSTOMER_CHURN_DAILY.StreamingMovies",
        "Contract": "TELCO_CUSTOMER_CHURN_DAILY.Contract",
        "PaperlessBilling": "TELCO_CUSTOMER_CHURN_DAILY.PaperlessBilling",
        "PaymentMethod": "TELCO_CUSTOMER_CHURN_DAILY.PaymentMethod",
        "MonthlyCharges": "TELCO_CUSTOMER_CHURN_DAILY.MonthlyCharges",
        "TotalCharges": "TELCO_CUSTOMER_CHURN_DAILY.TotalCharges",
        "Churn": "TELCO_CUSTOMER_CHURN_DAILY.Churn"
        }) \
  .execute()

In [0]:
# Load Delta Table (Merged) as Spark dataframe.
TELCO_CUSTOMER_CHURN_SILVER_DELTA_LOAD = spark.read.format("delta").load("/mnt/easonblobstorage/telco_customer_churn/silver_data/telco_customer_churn_silver/")
display(TELCO_CUSTOMER_CHURN_SILVER_DELTA_LOAD)

customerID,gender,SeniorCitizen,Partner,Dependents,Tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
7590-VHVEG,0,0,1,0,1,0,0,1,0,1,0,0,0,0,0,1,2,29.85,29.85,0
5575-GNVDE,1,0,0,0,34,1,0,1,1,0,1,0,0,0,1,0,3,56.95,1889.5,0
3668-QPYBK,1,0,0,0,2,1,0,1,1,1,0,0,0,0,0,1,3,53.85,108.15,1
7795-CFOCW,1,0,0,0,45,0,0,1,1,0,1,1,0,0,1,0,0,42.3,1840.75,0
9237-HQITU,0,0,0,0,2,1,0,2,0,0,0,0,0,0,0,1,2,70.7,151.65,1
9305-CDSKC,0,0,0,0,8,1,1,2,0,0,1,0,1,1,0,1,2,99.65,820.5,1
1452-KIOVK,1,0,0,1,22,1,1,2,0,1,0,0,1,0,0,1,1,89.1,1949.4,0
6713-OKOMC,0,0,0,0,10,0,0,1,1,0,0,0,0,0,0,0,3,29.75,301.9,0
7892-POOKP,0,0,1,0,28,1,1,2,0,0,1,1,1,1,0,1,2,104.8,3046.05,1
6388-TABGU,1,0,0,1,62,1,0,1,1,1,0,0,0,0,1,0,0,56.15,3487.95,0


In [0]:
# Count the number of rows in the Delta Table (Merged) dataframe.
TELCO_CUSTOMER_CHURN_SILVER_DELTA_LOAD.count()

In [0]:
# Count the number of columns in the Delta Table (Merged) dataframe.
len(TELCO_CUSTOMER_CHURN_DAILY.columns)