Importing the libraries.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *


from pyspark.sql.window import Window
from delta.tables import DeltaTable
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType
from pyspark.dbutils import DBUtils

In [0]:
# Databricks notebook source


# COMMAND ----------

catalog_name = "streaming1"
db_name = "silver"
table_name='weather'



dbutils.widgets.dropdown("trigger_available_now", "False", ["True", "False"])
trigger_available_now = dbutils.widgets.get("trigger_available_now") == "True"

notebook_name = DBUtils(spark).notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get().split("/")[-1].split(".")[0]
checkpoint_path = f"/Volumes/{catalog_name}/{db_name}/checkpoints/{notebook_name}/"
print(checkpoint_path)



/Volumes/streaming1/silver/checkpoints/(weather) (silver FV) Real-time Data Processing weather/


In [0]:

try:
    spark.sql(f"create schema if not exists {catalog_name}.{db_name} ;") 
except:
    print('check if silver schema already exists')

try:
    spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog_name}.{db_name}.checkpoints;") 
except:
    print('check if silver checkpoints already exists')




#### Silver Layer

In [0]:
json_schema = StructType([StructField("temperature",StringType(),True),
                      StructField("time",StringType(),True),
                      StructField("skycondition", StringType(),True)])

In [0]:
# Reading and Transforming: Load streaming data from the 'streaming.bronze.weather' Delta table, cast 'body' to string, parse JSON, and select specific fields
df = spark.readStream\
    .format("delta")\
    .table(f"{catalog_name}.bronze.{table_name}")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.temperature","body.time","body.skycondition", col("enqueuedTime").alias('timestamp'))




# Displaying stream: Visualize the transformed data in the DataFrame for verification and analysis
df.display()

# Writing stream: Save the transformed data to the 'streaming.silver.weather' Delta table in 'append' mode with checkpointing for data reliability
df.writeStream\
    .option("checkpointLocation", checkpoint_path)\
    .outputMode("append")\
    .format("delta")\
    .toTable(f"{catalog_name}.{db_name}.{table_name}")

temperature,time,skycondition,timestamp
22░C,Monday 8:43 p.m.,Clear,2024-09-10T00:44:23.937Z
22°C,Monday 8:44 p.m.,Clear,2024-09-10T00:45:34.033Z
22░C,Monday 8:45 p.m.,Clear,2024-09-10T00:46:44.175Z
22░C,Monday 8:46 p.m.,Clear,2024-09-10T00:47:55.287Z
22░C,Monday 8:48 p.m.,Clear,2024-09-10T00:49:05.805Z
22░C,Monday 8:42 p.m.,Clear,2024-09-10T00:43:14.138Z
22°C,Monday 8:49 p.m.,Clear,2024-09-10T00:50:15.934Z
22°C,Monday 8:50 p.m.,Clear,2024-09-10T00:51:26.39Z
