## **STREAMING - OpenWeatherMap**

In [None]:
import json
import pyspark
from pyspark.sql.functions import from_json, col, when, avg, window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, FloatType

In [None]:
with open("/variables.json", "r") as file :
    data = json.load(file)

apikey = data["apikey"]
locations = data["locations"]
plants = data["plants"]

# Kafka configuration
kafka_broker = data["kafka"]["broker"]
kafka_topic = data["kafka"]["topic"]

In [None]:
conf = pyspark.SparkConf() \
    .setAppName('SparkApp') \
    .setMaster('spark://spark:7077') \
    .set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
    .set("spark.sql.shuffle.partitions", "10")

spark_context = pyspark.SparkContext.getOrCreate(conf=conf)
sql_context = pyspark.sql.SQLContext(spark_context)

In [None]:
# Define schema for the Kafka message
schema = StructType([
    StructField("dt", IntegerType(), True),
    StructField("day", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("hour", IntegerType(), True),
    StructField("minute", IntegerType(), True),
    StructField("clouds", IntegerType(), True),
    StructField("temp", FloatType(), True),
    StructField("feels_like", FloatType(), True),
    StructField("humidity", FloatType(), True),
    StructField("visibility", IntegerType(), True)
])

# Read raw data from Kafka
raw_stream = sql_context.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Parse Kafka messages
parsed_stream = raw_stream.selectExpr("CAST(value AS STRING) AS message") \
    .select(from_json(col("message"), schema).alias("data")) \
    .select(
        col("data.dt").cast(TimestampType()).alias("dt"),  # Convert timestamp to Spark TimestampType
        col("data.day").alias("day"),
        col("data.month").alias("month"),
        col("data.year").alias("year"),
        col("data.hour").alias("hour"),
        col("data.minute").alias("minute"),
        col("data.clouds").alias("clouds"),
        col("data.temp").alias("temp"),
        col("data.feels_like").alias("feels_like"),
        col("data.humidity").alias("humidity"),
        col("data.visibility").alias("visibility")
    )

transformed_stream = parsed_stream \
    .withColumn("morning", when(col("hour") > 13, 1).otherwise(2)) \
    .withColumn("hash", ((col("year")*10000+col("month"))*100+col("day"))*10+col("morning")) \
    .withWatermark("dt", "10 minutes") 

rolling_average = transformed_stream \
    .groupBy("hash", window(col("dt"), "5 minutes")) \
    .agg(
        avg("clouds").alias("avg_clouds"),
        avg("temp").alias("avg_temp"),
        avg("feels_like").alias("avg_feels_like"),
        avg("humidity").alias("avg_humidity"),
        avg("visibility").alias("avg_visibility")
    ) 

# Output rolling average to the console
query = rolling_average.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

In [None]:
query.stop()