## Baseline receiver stateless monitoring 

In [1]:
import os

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages "
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.6,"
    "org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.6 "
    "pyspark-shell"
)

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("read_test_straeam") \
        .getOrCreate()

:: loading settings :: url = jar:file:/home/guest/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/guest/.ivy2/cache
The jars for the packages stored in: /home/guest/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-78b5969e-b9d8-47ad-9c55-5499e236b17e;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.6 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.6 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in c

In [3]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

KAFKA_BOOTSTRAP_SERVERS = "127.0.0.1:9092"
KAFKA_TOPIC = "sensors"


schema = StructType().add("sensor", StringType())\
                     .add("info", StructType()\
                          .add("timestamp_sent", StringType())\
                          .add("timestamp_received", StringType())\
                          .add("obs", StringType())\
                          .add("drift", StringType()))

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
  .option("subscribe", KAFKA_TOPIC) \
  .option("startingOffsets", "earliest") \
  .load()\
  .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))\
            .select(col("parsed_value.sensor"), \
                    col("parsed_value.info.timestamp_sent"),\
                    col("parsed_value.info.timestamp_received"),\
                    col("parsed_value.info.obs"),\
                    col("parsed_value.info.drift"))

In [4]:
df.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- timestamp_sent: string (nullable = true)
 |-- timestamp_received: string (nullable = true)
 |-- obs: string (nullable = true)
 |-- drift: string (nullable = true)



In [5]:
watermark=True
WATERMARK_SIZE="1 minute"
WINDOW_SIZE="1 minute"

In [6]:
from pyspark.sql.functions import window, avg, count, desc, stddev, min, max, isnull, col, sum, isnan, unix_timestamp, round
from pyspark.sql.functions import from_unixtime, to_utc_timestamp, date_format

df = df.withColumn("timestamp_received", to_utc_timestamp("timestamp_received", "Europe/Rome"))\
         .withColumn("timestamp_sent", to_utc_timestamp("timestamp_received", "Europe/Rome"))\
            .withColumn("time_diff", (col("timestamp_received").cast("double") - col("timestamp_sent").cast("double")))\


if watermark:
    df = df.withWatermark("timestamp_received", WATERMARK_SIZE)

eventAvg = df.groupBy(window(df.timestamp_received, WINDOW_SIZE), df.sensor)\
                    .agg(
                        count("obs").alias("count_obs"),
                        round(avg("obs"),3).alias("avg_obs"),
                        round(stddev("obs"),3).alias("+/-std_obs"),
                        round(max("obs"),3).alias("max_obs"),
                        round(min("obs"),3).alias("min_obs"),
                        round(sum((isnull("obs") | isnan("obs")).cast("int")),3).alias("sum_miss_obs"),
                        round(avg("time_diff"),3).alias("avg_time_delay")
                        )\
                    .withColumn("window_start", col("window.start")) \
                    .withColumn("window_end", col("window.end"))\
                    .withColumn("day", date_format("window_start", "yyyy-MM-dd")) \
                    .withColumn("hour", date_format("window_start", "HH")) \
                    .drop("window") #.sort(desc("window"))

In [None]:
eventAvg.writeStream \
        .outputMode("append") \
        .trigger(processingTime="30 seconds") \
        .format("csv") \
        .option("path", "/home/guest/notebooks/data/output/") \
        .option("checkpointLocation", "/home/guest/notebooks/data/checkpoints") \
        .option("header", "true") \
        .partitionBy("day", "hour") \
        .start().awaitTermination()

In [9]:
final_df = spark.read.csv("/home/guest/notebooks/data/output/day=2025-06-20/hour=15", header=True, inferSchema=True)
final_df.take(5)

[Row(sensor='sensor-1', count_obs=12, avg_obs=20.811, +/-std_obs=14.149, max_obs=33.189, min_obs=-0.509, sum_miss_obs=0, avg_time_delay=7200.0, window_start=datetime.datetime(2025, 6, 20, 15, 9), window_end=datetime.datetime(2025, 6, 20, 15, 10)),
 Row(sensor='sensor-3', count_obs=12, avg_obs=7.772, +/-std_obs=11.624, max_obs=4.503, min_obs=-0.577, sum_miss_obs=0, avg_time_delay=7200.0, window_start=datetime.datetime(2025, 6, 20, 15, 9), window_end=datetime.datetime(2025, 6, 20, 15, 10)),
 Row(sensor='sensor-3', count_obs=12, avg_obs=17.059, +/-std_obs=12.081, max_obs=5.17, min_obs=-1.783, sum_miss_obs=0, avg_time_delay=7200.0, window_start=datetime.datetime(2025, 6, 20, 15, 10), window_end=datetime.datetime(2025, 6, 20, 15, 11)),
 Row(sensor='sensor-3', count_obs=12, avg_obs=0.524, +/-std_obs=2.329, max_obs=6.031, min_obs=-0.224, sum_miss_obs=0, avg_time_delay=7200.0, window_start=datetime.datetime(2025, 6, 20, 15, 11), window_end=datetime.datetime(2025, 6, 20, 15, 12)),
 Row(sensor='

                                                                                

In [None]:
final_df.orderBy(col("window_start").asc()).coalesce(1).write \
    .option("header", "true") \
    .mode("overwrite") \
    .csv("/home/guest/notebooks/final_output")

In [13]:
import pandas as pd

pd.read_csv("/home/guest/notebooks/final_output/part-00000-e5615ba8-438b-4c07-aabd-8473b786aff0-c000.csv")

Unnamed: 0,sensor,count_obs,avg_obs,+/-std_obs,max_obs,min_obs,sum_miss_obs,avg_time_delay,window_start,window_end
0,sensor-1,12,3.546,3.254,7.364,-0.425,0,7200.0,2025-06-20T15:08:00.000Z,2025-06-20T15:09:00.000Z
1,sensor-3,12,4.534,6.962,5.362,-3.946,0,7200.0,2025-06-20T15:08:00.000Z,2025-06-20T15:09:00.000Z
2,sensor-2,6,4.945,2.057,8.19,1.914,6,7200.0,2025-06-20T15:08:00.000Z,2025-06-20T15:09:00.000Z
3,sensor-1,12,20.811,14.149,33.189,-0.509,0,7200.0,2025-06-20T15:09:00.000Z,2025-06-20T15:10:00.000Z
4,sensor-3,12,7.772,11.624,4.503,-0.577,0,7200.0,2025-06-20T15:09:00.000Z,2025-06-20T15:10:00.000Z
5,sensor-2,6,3.768,5.914,4.84,-1.129,6,7200.0,2025-06-20T15:09:00.000Z,2025-06-20T15:10:00.000Z
6,sensor-3,12,17.059,12.081,5.17,-1.783,0,7200.0,2025-06-20T15:10:00.000Z,2025-06-20T15:11:00.000Z
7,sensor-1,12,9.34,12.913,7.663,-3.17,0,7200.0,2025-06-20T15:10:00.000Z,2025-06-20T15:11:00.000Z
8,sensor-2,5,5.13,2.421,8.972,3.026,7,7200.0,2025-06-20T15:10:00.000Z,2025-06-20T15:11:00.000Z
9,sensor-3,12,0.524,2.329,6.031,-0.224,0,7200.0,2025-06-20T15:11:00.000Z,2025-06-20T15:12:00.000Z


25/06/20 17:18:30 ERROR MicroBatchExecution: Query [id = ba4eda95-f62b-41a3-9b13-2c74a66b7dab, runId = 1ebcaae4-05fa-4093-af3f-0ed1bfce8f06] terminated with error
java.io.FileNotFoundException: File file:/home/guest/notebooks/data/checkpoints/offsets does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
	at org.apache.hadoop.fs.DelegateToFileSystem.getFileStatus(DelegateToFileSystem.java:128)
	at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:93)
	at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:353)
	at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
	at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
	at org.apache.hadoop.fs.File

In [None]:
#results = eventAvg.writeStream \
#         .outputMode('complete')\
#         .format("console") \
#         .start() \
#         .awaitTermination()