#Load Python Packages

In [0]:
# Databricks notebook source
import pyspark.sql.functions as F
from  pyspark.sql.functions import col, struct, to_json, unix_timestamp, lit
from pyspark.sql.types import StructField, StructType, StringType, MapType, IntegerType, DoubleType

# Connect to Kafka Cluster to get Messages from Topic

In [0]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "pkc-4j8dq.southeastasia.azure.confluent.cloud:9092") \
      .option("subscribe", "weather_forecast_topic") \
      .option("startingOffsets", "latest") \
      .option("kafka.security.protocol","SASL_SSL") \
      .option("kafka.sasl.mechanism", "PLAIN") \
      .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="VKPF5GQQWAEBDNUK" password="T6bIoUE7X7pdUdbZgZvKHsLEVWTfJt7NIc4rFzBDUUscpQQoZ2HYQTxFw+uUensn";""") \
    .load()

In [0]:
# display(df)

In [0]:
json_schema = StructType(
    [   
        StructField("key_id", IntegerType(), True),
        StructField("cloud_cover", IntegerType(), True),
        StructField("wind_speed", DoubleType(), True),
        StructField("wind_gust", DoubleType(), True),
        StructField("humidity", DoubleType(), True),
        StructField("pressure", DoubleType(), True),
        StructField("global_radiation", DoubleType(), True),
        StructField("precipitation", DoubleType(), True),
        StructField("sunshine", DoubleType(), True),
        StructField("temp_mean", DoubleType(), True),
        StructField("temp_min", DoubleType(), True),
        StructField("temp_max", DoubleType(), True)
    ]
)

# Create New Delta Table and Remove Unused Tables

In [0]:
# # List all folders within /FileStore/tables/
# folders = [item.name for item in dbutils.fs.ls("/FileStore/tables/") if item.isDir()]

# # Print the list of folders
# print(folders)


In [0]:
# Remove folder recursively using dbutils.fs.rm
dbutils.fs.rm("/FileStore/tables/weather_forecast_delta_table/", True)
# dbutils.fs.rm("/FileStore/tables/oslo.csv", True)

Out[6]: True

In [0]:
%sql
CREATE TABLE weather_forecast_delta_table(key_id INT, time_stamp INT, BBQ_weather INT, cloud_cover INT, wind_speed DOUBLE, wind_gust DOUBLE, humidity DOUBLE, pressure DOUBLE, global_radiation DOUBLE, precipitation DOUBLE, sunshine DOUBLE, temp_mean DOUBLE, temp_min DOUBLE, temp_max DOUBLE)
USING DELTA
LOCATION '/FileStore/tables/weather_forecast_delta_table/'

# Save Dataframe into Delta Table

In [0]:
# Function to write each batch to Delta table
def write_to_delta_table(batch_df, batch_id):
    # Write the filtered DataFrame to Delta table in append mode
    batch_df.write.format("delta").mode("append").save("dbfs:/FileStore/tables/weather_forecast_delta_table/")

In [0]:
val_df = df.withColumn("time_stamp", unix_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ssZ")).withColumn("BBQ_weather", lit(-1)).withColumn('value', F.from_json(F.col('value').cast('string'), json_schema)).select(F.col('value.key_id'),'time_stamp','BBQ_weather',F.col('value.cloud_cover'),F.col('value.wind_speed'),F.col('value.wind_gust'),F.col('value.humidity'),F.col('value.pressure'),F.col('value.global_radiation'),F.col('value.precipitation'),F.col('value.sunshine'),F.col('value.temp_mean'),F.col('value.temp_min'),F.col('value.temp_max')) 
val_df = val_df.withColumn("time_stamp", val_df["time_stamp"].cast("integer"))
display(val_df)

# Write DataFrame using foreachBatch
val_df.writeStream.foreachBatch(write_to_delta_table).start()

key_id,time_stamp,BBQ_weather,cloud_cover,wind_speed,wind_gust,humidity,pressure,global_radiation,precipitation,sunshine,temp_mean,temp_min,temp_max
94602,1714277642,-1,4,6.354334339389647,20.19716624793635,0.2833427120919592,1.0369272214916112,0.7189952564445578,5.357868179702652,1.409730401715974,19.879597030185717,2.6510822383737693,34.04260839559076
94603,1714277645,-1,3,7.625449640520935,16.048118964835258,0.5392202578283631,1.0173187701059192,2.3761611017316158,4.514262345091068,12.159595456269557,16.983588302403113,-2.0847366074574234,27.425118819296152
94604,1714277651,-1,4,7.504974422330644,3.295188466982271,0.4215644828801029,1.039074721403685,0.5018122133174047,4.550774991693873,21.4768258181674,16.13140005607749,-0.3590568026075438,34.37575210143853
94605,1714277668,-1,4,2.464976982649582,20.006129799990024,0.4151845212069111,0.9981785344486682,0.9075902539876908,0.5038429498653959,16.92400555302241,11.400605959587928,3.3122054897918307,25.86734320686496
94606,1714277673,-1,2,7.593816085942606,13.98283288906952,0.8585351702108338,1.0462164254284347,2.152056906719451,3.657391709636155,4.147151867893573,15.14542401503703,2.5678068340081968,27.083225776065007
94607,1714277686,-1,2,5.173197916962663,7.08451339627265,0.4820286526041498,0.9677272191765832,0.1065351565697645,4.259862816683537,21.06538830195858,11.178000794800582,-0.5044363442601147,25.682759004522016
94608,1714277697,-1,1,9.993249628937216,4.235476996463609,0.3538399337929329,0.97143769296632,0.9749466685484556,2.347631333680621,3.49913689110143,13.255591894906525,2.562351606485385,33.32650342148484
94609,1714277703,-1,1,1.914304791594108,12.792704007629917,0.283432089070244,0.9731635463861676,2.7250101742027804,4.532285473143117,6.34520764143122,11.078312168512152,-2.3438151342584064,32.825654355725824
94610,1714277715,-1,5,10.806883610596836,5.631170361696288,0.7768859095607864,1.0152524440672075,3.011726944553172,4.089578331696991,19.20603596950078,15.077626694615732,-5.038192316582684,31.744937667182725
94611,1714277720,-1,0,5.510087970096984,22.336048620860957,0.3819713303517215,1.0052770580474744,0.8227350775264937,3.4511206080079444,11.00054786293709,16.197659491146492,3.9541590755288336,28.778435501332297


In [0]:
val_df.printSchema()

root
 |-- key_id: integer (nullable = true)
 |-- time_stamp: integer (nullable = true)
 |-- BBQ_weather: integer (nullable = false)
 |-- cloud_cover: integer (nullable = true)
 |-- wind_speed: double (nullable = true)
 |-- wind_gust: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- global_radiation: double (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- sunshine: double (nullable = true)
 |-- temp_mean: double (nullable = true)
 |-- temp_min: double (nullable = true)
 |-- temp_max: double (nullable = true)

