Importing the libraries.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *


from pyspark.sql.window import Window
from delta.tables import DeltaTable
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType
from pyspark.dbutils import DBUtils

In [0]:
# Databricks notebook source


# COMMAND ----------

catalog_name = "streaming1"
db_name = "silver"
table_name='shopify_orders'



dbutils.widgets.dropdown("trigger_available_now", "False", ["True", "False"])
trigger_available_now = dbutils.widgets.get("trigger_available_now") == "True"

notebook_name = DBUtils(spark).notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get().split("/")[-1].split(".")[0]
checkpoint_path = f"/Volumes/{catalog_name}/{db_name}/checkpoints/{notebook_name}/"
print(checkpoint_path)



/Volumes/streaming1/silver/checkpoints/(silver FV) Real-time Data Processing with Azure Databricks (and Event Hubs)-2/


In [0]:

try:
    spark.sql(f"create schema if not exists {catalog_name}.{db_name} ;") 
except:
    print('check if silver schema already exists')

try:
    spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog_name}.{db_name}.checkpoints;") 
except:
    print('check if silver checkpoints already exists')




#### Silver Layer

In [0]:
json_schema = StructType([StructField("id",StringType(),True),
                      StructField("cancel_reason",StringType(),True),
                      StructField("cancelled_at", StringType(),True),
                      StructField("checkout_id",StringType(),True),
                      StructField("created_at",StringType(),True),
                      StructField("customer_locale", StringType(),True),
                      StructField("financial_status",StringType(),True),
                      StructField("presentment_currency",StringType(),True),
                      StructField("processed_at",StringType(),True),
                      StructField("subtotal_price",StringType(),True),
                      StructField("billing_address",StructType(
                          [StructField('province', StringType()),
                           StructField('country', StringType())]),True),
                      StructField("line_items",
                          ArrayType(   (StructType([StructField('product_id', StringType()),
                                                     StructField('fulfillable_quantity', StringType()),
                                                     StructField('price', StringType())]  )  )  )
                                                         ,True)
                      ])



In [0]:
# Reading and Transforming: Load streaming data from the 'streaming.bronze.weather' Delta table, cast 'body' to string, parse JSON, and select specific fields
df = spark.readStream\
    .format("delta")\
    .table(f"{catalog_name}.bronze.{table_name}")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.id","body.cancel_reason","body.cancelled_at","body.checkout_id","body.created_at","body.customer_locale","body.financial_status","body.presentment_currency","body.processed_at","body.subtotal_price","body.billing_address.province","body.billing_address.country", explode("body.line_items").alias("Input_array"))\
    .select("id","cancel_reason","cancelled_at","checkout_id","created_at","customer_locale","financial_status","presentment_currency","processed_at","subtotal_price","province","country",F.col('Input_array.product_id'),F.col('Input_array.fulfillable_quantity').alias('fulfillable_quantity'),F.col('Input_array.price').alias('price'))
    


# Displaying stream: Visualize the transformed data in the DataFrame for verification and analysis
df.display()

# Writing stream: Save the transformed data to the 'streaming.silver.weather' Delta table in 'append' mode with checkpointing for data reliability
df.writeStream\
    .option("checkpointLocation", checkpoint_path)\
    .outputMode("append")\
    .format("delta")\
    .toTable(f"{catalog_name}.{db_name}.{table_name}")

id,cancel_reason,cancelled_at,checkout_id,created_at,customer_locale,financial_status,presentment_currency,processed_at,subtotal_price,province,country,product_id,fulfillable_quantity,price
6145856831786,,,37682609324330,2024-09-08T21:41:38-04:00,en-CA,paid,CAD,2024-09-08T21:41:37-04:00,240.0,Quebec,Canada,9433793495338,1,70.0
6145856831786,,,37682609324330,2024-09-08T21:41:38-04:00,en-CA,paid,CAD,2024-09-08T21:41:37-04:00,240.0,Quebec,Canada,9433768526122,1,170.0
6145862598954,,,37682624921898,2024-09-08T21:50:07-04:00,en-CA,paid,CAD,2024-09-08T21:50:06-04:00,360.0,Quebec,Canada,9433791234346,1,140.0
6145862598954,,,37682624921898,2024-09-08T21:50:07-04:00,en-CA,paid,CAD,2024-09-08T21:50:06-04:00,360.0,Quebec,Canada,9433762726186,1,220.0
6145863745834,,,37682627805482,2024-09-08T21:51:47-04:00,en-CA,paid,CAD,2024-09-08T21:51:46-04:00,339.9,Quebec,Canada,9433758531882,1,179.95
6145863745834,,,37682627805482,2024-09-08T21:51:47-04:00,en-CA,paid,CAD,2024-09-08T21:51:46-04:00,339.9,Quebec,Canada,9433760694570,1,159.95
6145554776362,,,37682049941802,2024-09-08T15:49:17-04:00,en-CA,paid,CAD,2024-09-08T15:49:16-04:00,360.0,Quebec,Canada,9433762726186,1,220.0
6145554776362,,,37682049941802,2024-09-08T15:49:17-04:00,en-CA,paid,CAD,2024-09-08T15:49:16-04:00,360.0,Quebec,Canada,9433791234346,1,140.0
6145597210922,,,37682120786218,2024-09-08T16:24:53-04:00,en-CA,paid,CAD,2024-09-08T16:24:52-04:00,170.0,Quebec,Canada,9433768526122,1,170.0
