In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, avg, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

In [3]:
spark = SparkSession.builder.appName("Streaming-IoT").getOrCreate()

In [4]:
dataframe_stream = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

In [5]:
schema = StructType([
    StructField("timestamp", StringType()),
    StructField("sensor_id", StringType()),
    StructField("equipment_id", StringType()),
    StructField("temperature", DoubleType()),
    StructField("vibration", DoubleType()),
    StructField("pressure", DoubleType()),
    StructField("status", StringType())
])

dataframe_parsed = dataframe_stream.select(from_json(col("value"), schema).alias("data")).select("data.*")

dataframe_agg = dataframe_parsed.groupBy(
    window(col("timestamp"), "5 minutes"), col("sensor_id")
).agg(
    avg("temperature").alias("avg_temperature"),
    avg("vibration").alias("avg_vibration"),
    avg("pressure").alias("avg_pressure")
)

In [None]:
query = dataframe_agg.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()

---
## Fenêtre glissante de 1 minute

In [None]:
dataframe_agg_1min = dataframe_parsed.groupBy(
    window(col("timestamp"), "1 minute"),
    col("sensor_id")
).agg(
    avg("temp").alias("avg_temp")
)

query_1min = dataframe_agg_1min.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

# query_1min.awaitTermination()

---
## Sauvegarde des résultats en Parquet

In [None]:
output_path = "data/output/streaming_temp.parquet"

query_parquet = dataframe_agg_1min.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", output_path) \
    .option("checkpointLocation", "data/output/checkpoint") \
    .start()

# query_parquet.awaitTermination()