In [1]:
# Weather Data Streaming Analysis with Spark Structured Streaming
# This notebook consumes weather data from Kafka and calculates 5-minute average temperatures

# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime

# Create Spark session
spark = SparkSession.builder \
    .appName("WeatherStreamingAnalysis") \
    .getOrCreate()

# Set the checkpoint location for fault tolerance
checkpoint_path = "Files/checkpoints/weather_streaming"

# Define Kafka configuration
KAFKA_BROKER = "tcp://6.tcp.eu.ngrok.io:11349"  # Replace with your actual Ngrok address
KAFKA_TOPIC = "weather_data"

# Define the schema for weather data
weather_schema = StructType([
    StructField("city", StringType(), True),
    StructField("temperature_c", DoubleType(), True),
    StructField("temperature_f", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("wind_speed_kph", DoubleType(), True),
    StructField("wind_speed_mph", DoubleType(), True),
    StructField("local_time", StringType(), True),
    StructField("last_updated", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("condition", StringType(), True)
])

# Read streaming data from Kafka
raw_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "latest") \
    .load()

# Parse the JSON data and add processing timestamp
parsed_stream_df = raw_stream_df.select(
    col("key").cast("string"),
    from_json(col("value").cast("string"), weather_schema).alias("data"),
    col("timestamp").alias("kafka_timestamp")
).select(
    col("data.*"),  # Extract all fields from data
    col("kafka_timestamp")
).withColumn(
    "processing_timestamp", 
    current_timestamp()  # Add processing timestamp
)

# Display the incoming data for monitoring (optional)
# query_display = parsed_stream_df.writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .start()

# Calculate 5-minute window averages with 1-minute slide
windowed_avg_df = parsed_stream_df \
    .withWatermark("processing_timestamp", "10 minutes") \
    .groupBy(
        window(col("processing_timestamp"), "5 minutes", "1 minute"),
        col("city")
    ) \
    .agg(
        avg("temperature_c").alias("avg_temperature_c"),
        avg("temperature_f").alias("avg_temperature_f"),
        avg("humidity").alias("avg_humidity"),
        avg("wind_speed_kph").alias("avg_wind_speed_kph"),
        count("*").alias("message_count")
    ) \
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("city"),
        col("avg_temperature_c"),
        col("avg_temperature_f"),
        col("avg_humidity"),
        col("avg_wind_speed_kph"),
        col("message_count"),
        current_timestamp().alias("computation_time")
    )

# Write the results to Delta table in Lakehouse
def write_to_delta(df, epoch_id):
    df.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable("avg_temperature")

# Start the streaming query
streaming_query = windowed_avg_df.writeStream \
    .outputMode("update") \
    .foreachBatch(write_to_delta) \
    .option("checkpointLocation", checkpoint_path) \
    .start()

print("üöÄ Spark Streaming started successfully!")
print("üìä Streaming weather data from Kafka...")
print("‚è∞ Calculating 5-minute average temperatures every 1 minute")
print("üíæ Saving results to Delta table: avg_temperature")

# Keep the stream running
streaming_query.awaitTermination()

StatementMeta(, a60827ec-e921-492c-8745-b107ed45afbc, 3, Submitted, Running, Running)

üöÄ Spark Streaming started successfully!
üìä Streaming weather data from Kafka...
‚è∞ Calculating 5-minute average temperatures every 1 minute
üíæ Saving results to Delta table: avg_temperature


In [6]:
# Delta tablosundaki son kayƒ±tlarƒ± kontrol et
display(spark.sql("""
    SELECT 
        window_start,
        window_end, 
        city,
        avg_temperature_c,
        message_count,
        computation_time
    FROM avg_temperature 
    ORDER BY window_start DESC
    LIMIT 10
"""))

StatementMeta(, ff934409-fb7f-4961-be0e-bbfd1e5fec47, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b79f0aee-bd5c-4839-bccd-9171a93dc6e7)