In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# We are defining the structure of the JSON coming from Kafka
schema = StructType([
    StructField("city", StringType()),
    StructField("temp", DoubleType()),
    StructField("humidity", IntegerType()),
    StructField("wind_speed", DoubleType()),
    StructField("local_time", StringType()),
    StructField("last_updated", StringType())
])

StatementMeta(, a3363a14-13a2-4f43-b1b7-2a383c70be68, 20, Finished, Available, Finished)

In [None]:
# Replace the 'EXTERNAL' address with the address in the Ngrok terminal (Ex: 0.tcp.eu.ngrok.io:12345)
ngrok_address = "NGROK-ADDRESS" 

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", ngrok_address) \
    .option("subscribe", "weather_topic") \
    .option("startingOffsets", "latest") \
    .load()

# We convert the raw data (binary) into a readable table.
weather_df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

StatementMeta(, a3363a14-13a2-4f43-b1b7-2a383c70be68, 21, Finished, Available, Finished)

In [None]:
# Add the timestamp of the transaction time to each record
weather_with_ts = weather_df.withColumn("timestamp", current_timestamp())

# 5-minute window, 1-minute shift with average calculation
avg_temp_df = weather_with_ts \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(window(col("timestamp"), "5 minutes", "1 minute")) \
    .agg(avg("temp").alias("avg_temperature")) \
    .select("window.start", "window.end", "avg_temperature")

StatementMeta(, a3363a14-13a2-4f43-b1b7-2a383c70be68, 22, Finished, Available, Finished)

In [21]:
query = avg_temp_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "Files/checkpoint/final_test") \
    .toTable("avg_temperature")

StatementMeta(, a3363a14-13a2-4f43-b1b7-2a383c70be68, 23, Finished, Available, Finished)

In [None]:
spark.sql("CREATE TABLE IF NOT EXISTS avg_temperature USING DELTA LOCATION 'Tables/avg_temperature'")

StatementMeta(, a3363a14-13a2-4f43-b1b7-2a383c70be68, 24, Finished, Available, Finished)

DataFrame[]

In [None]:
display(spark.sql("SELECT * FROM avg_temperature"))

StatementMeta(, a3363a14-13a2-4f43-b1b7-2a383c70be68, 25, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 026906cb-4727-4cef-98d5-889c3f0a1181)