## 1. Daten simulieren & als Stream speichern (in memory)

In [0]:
from pyspark.sql.functions import expr, current_timestamp

# Erzeuge kontinuierlich neue Daten mit Spark's 'rate' Quelle
# Diese Quelle erzeugt automatisch 1 Zeile pro Sekunde mit einer Zählspalte "value"
simulierter_stream = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1)  # 1 Messwert pro Sekunde
    .load()
    .withColumn("zeitstempel", current_timestamp())
    .withColumn("sensor_id", expr("concat('sensor_', CAST(rand() * 5 + 1 AS INT))"))
    .withColumn("temperatur", expr("round(20 + rand() * 15, 2)"))
)

# Schreibe den Stream in eine temporäre In-Memory-View namens "live_daten"
stream_query = (
    simulierter_stream.writeStream
    .format("memory")
    .queryName("live_daten")
    .outputMode("append")
    .start()
)


### Kontrollanzeige der Live-Daten

In [0]:
# Zeige aktuelle Live-Daten aus der In-Memory View
spark.sql("SELECT * FROM live_daten").show()


+--------------------+-----+--------------------+---------+----------+
|           timestamp|value|         zeitstempel|sensor_id|temperatur|
+--------------------+-----+--------------------+---------+----------+
|2025-06-24 18:11:...|    0|2025-06-24 18:11:...| sensor_1|     24.17|
|2025-06-24 18:11:...|    4|2025-06-24 18:11:...| sensor_4|     23.62|
|2025-06-24 18:11:...|    8|2025-06-24 18:11:...| sensor_2|     31.41|
|2025-06-24 18:11:...|    1|2025-06-24 18:11:...| sensor_2|     22.18|
|2025-06-24 18:11:...|    5|2025-06-24 18:11:...| sensor_4|     32.43|
|2025-06-24 18:11:...|    9|2025-06-24 18:11:...| sensor_5|     20.64|
|2025-06-24 18:11:...|    2|2025-06-24 18:11:...| sensor_2|     22.41|
|2025-06-24 18:11:...|    6|2025-06-24 18:11:...| sensor_1|     27.42|
|2025-06-24 18:11:...|   10|2025-06-24 18:11:...| sensor_3|     32.44|
|2025-06-24 18:11:...|    3|2025-06-24 18:11:...| sensor_2|     22.23|
|2025-06-24 18:11:...|    7|2025-06-24 18:11:...| sensor_4|     24.32|
|2025-

## 2. Aggregation über Zeitfenster – Durchschnittstemperatur pro Sensor und Minute

In [0]:
from pyspark.sql.functions import window, avg

# Aggregiere direkt den bestehenden Streaming-DataFrame
aggregierte_daten = (
    simulierter_stream
    .groupBy(
        window("zeitstempel", "1 minute"),  # Zeitfenster von 1 Minute
        "sensor_id"
    )
    .agg(
        avg("temperatur").alias("durchschnitt_temperatur")
    )
    .selectExpr(
        "window.start as fenster_start",
        "window.end as fenster_ende",
        "sensor_id",
        "round(durchschnitt_temperatur, 2) as temperatur_avg"
    )
)


### Ergebnis in eine neue In-Memory-View

In [0]:

# Starte neuen Streaming-Job für die aggregierten Ergebnisse
aggregations_stream = (
    aggregierte_daten.writeStream
    .format("memory")
    .queryName("temperatur_pro_minute")
    .outputMode("complete")
    .start()
)


### Ergebnisse der agregierten Werte

In [0]:
# Zeige aggregierte Werte 
spark.sql("SELECT * FROM temperatur_pro_minute").show()


+-------------------+-------------------+---------+--------------+
|      fenster_start|       fenster_ende|sensor_id|temperatur_avg|
+-------------------+-------------------+---------+--------------+
|2025-06-24 18:16:00|2025-06-24 18:17:00| sensor_4|         28.33|
|2025-06-24 18:17:00|2025-06-24 18:18:00| sensor_1|         25.77|
|2025-06-24 18:11:00|2025-06-24 18:12:00| sensor_4|         28.06|
|2025-06-24 18:13:00|2025-06-24 18:14:00| sensor_2|         28.02|
|2025-06-24 18:15:00|2025-06-24 18:16:00| sensor_2|         25.98|
|2025-06-24 18:15:00|2025-06-24 18:16:00| sensor_4|         25.63|
|2025-06-24 18:17:00|2025-06-24 18:18:00| sensor_4|         26.91|
|2025-06-24 18:11:00|2025-06-24 18:12:00| sensor_1|         28.89|
|2025-06-24 18:16:00|2025-06-24 18:17:00| sensor_5|         26.87|
|2025-06-24 18:18:00|2025-06-24 18:19:00| sensor_4|         25.29|
|2025-06-24 18:17:00|2025-06-24 18:18:00| sensor_2|         27.39|
|2025-06-24 18:17:00|2025-06-24 18:18:00| sensor_5|         26

## 3. Kritische Werte filtern

In [0]:
# Filtere alle Temperaturwerte über 28 Grad (z. B. als Anomalie)
kritische_temperaturen = simulierter_stream.filter("temperatur > 28")

# Starte neuen Streaming-Job für die kritischen Werte
kritisch_stream = (
    kritische_temperaturen.writeStream
    .format("memory")
    .queryName("kritische_temperaturen")
    .outputMode("append")
    .start()
)


### Kontrolle

In [0]:
# Zeige gefilterte Werte mit Temperatur > 28 °C
spark.sql("SELECT * FROM kritische_temperaturen").show()


+--------------------+-----+--------------------+---------+----------+
|           timestamp|value|         zeitstempel|sensor_id|temperatur|
+--------------------+-----+--------------------+---------+----------+
|2025-06-24 18:11:...|    0|2025-06-24 18:11:...| sensor_3|     29.94|
|2025-06-24 18:11:...|    1|2025-06-24 18:11:...| sensor_5|     29.75|
|2025-06-24 18:11:...|    2|2025-06-24 18:11:...| sensor_5|     28.03|
|2025-06-24 18:11:...|    3|2025-06-24 18:11:...| sensor_4|     28.61|
|2025-06-24 18:11:...|    4|2025-06-24 18:11:...| sensor_3|     31.63|
|2025-06-24 18:11:...|   12|2025-06-24 18:11:...| sensor_5|      33.7|
|2025-06-24 18:11:...|   16|2025-06-24 18:11:...| sensor_2|     29.01|
|2025-06-24 18:11:...|    5|2025-06-24 18:11:...| sensor_2|     28.56|
|2025-06-24 18:11:...|   21|2025-06-24 18:11:...| sensor_2|     31.27|
|2025-06-24 18:11:...|   10|2025-06-24 18:11:...| sensor_4|     34.62|
|2025-06-24 18:11:...|   14|2025-06-24 18:11:...| sensor_3|     33.76|
|2025-

## 4. Temperaturklassifikation in Status-Kategorien

In [0]:
from pyspark.sql.functions import when, col

# Erweitere den ursprünglichen Stream um eine Status-Klassifikation
klassifizierter_stream = simulierter_stream.withColumn(
    "status",
    when(col("temperatur") > 28, "kritisch")
    .when(col("temperatur") < 22, "niedrig")
    .otherwise("normal")
)

# Schreibe diesen erweiterten Stream in eine neue In-Memory-View
status_stream = (
    klassifizierter_stream.writeStream
    .format("memory")
    .queryName("sensor_daten_mit_status")
    .outputMode("append")
    .start()
)


### Kontrolle

In [0]:
# Zeige klassifizierte Live-Daten mit Status
spark.sql("SELECT * FROM sensor_daten_mit_status").show()


+--------------------+-----+--------------------+---------+----------+--------+
|           timestamp|value|         zeitstempel|sensor_id|temperatur|  status|
+--------------------+-----+--------------------+---------+----------+--------+
|2025-06-24 18:11:...|    0|2025-06-24 18:11:...| sensor_3|     25.91|  normal|
|2025-06-24 18:11:...|    4|2025-06-24 18:11:...| sensor_1|     33.54|kritisch|
|2025-06-24 18:11:...|    8|2025-06-24 18:11:...| sensor_2|     27.58|  normal|
|2025-06-24 18:11:...|   12|2025-06-24 18:11:...| sensor_4|     23.21|  normal|
|2025-06-24 18:11:...|   16|2025-06-24 18:11:...| sensor_3|     34.67|kritisch|
|2025-06-24 18:11:...|   20|2025-06-24 18:11:...| sensor_2|     21.59| niedrig|
|2025-06-24 18:11:...|    1|2025-06-24 18:11:...| sensor_2|     23.18|  normal|
|2025-06-24 18:11:...|    5|2025-06-24 18:11:...| sensor_5|     31.85|kritisch|
|2025-06-24 18:11:...|    9|2025-06-24 18:11:...| sensor_3|     23.66|  normal|
|2025-06-24 18:11:...|   13|2025-06-24 1

## 5. Visualiserung in Databricks

In [0]:
display(spark.sql("""
  SELECT status, COUNT(*) as anzahl
  FROM sensor_daten_mit_status
  GROUP BY status
"""))


status,anzahl
normal,177
kritisch,193
niedrig,53


In [0]:
display(spark.sql("""
SELECT 
  status,
  COUNT(*) AS anzahl_messwerte,
  ROUND(AVG(temperatur), 2) AS durchschnitt_temperatur,
  MAX(temperatur) AS max_temperatur
FROM sensor_daten_mit_status
WHERE zeitstempel >= current_timestamp() - INTERVAL 1 HOUR
GROUP BY status
ORDER BY status
"""))


status,anzahl_messwerte,durchschnitt_temperatur,max_temperatur
kritisch,193,31.31,34.99
niedrig,53,21.03,21.99
normal,177,24.97,28.0


## 6. Persistente Speicherung (nicht möglich da keine Schreibrechte)

In [0]:
#kritische_temperaturen.writeStream \
#    .format("parquet") \
#    .option("path", "/mnt/data/kritisch/") \
 #   .option("checkpointLocation", "/mnt/checkpoint/kritisch/") \
  #  .outputMode("append") \
   # .start()


## 7. Aggregation für Trend

In [0]:
from pyspark.sql.functions import window, avg

# Aggregiere Temperaturdaten pro Sensor in 1-Minuten-Zeitfenstern
# Dies dient als Grundlage für eine Trendlinie (z. B. Liniendiagramm)
trend_stream = (
    simulierter_stream
    .groupBy(
        window("zeitstempel", "1 minute"),  # Nutzt das gleiche Zeitfenster wie in Schritt 2
        "sensor_id"
    )
    .agg(
        avg("temperatur").alias("temperatur_avg")
    )
    .selectExpr(
        "window.start as fenster_start",
        "sensor_id",
        "round(temperatur_avg, 2) as temperatur_durchschnitt"
    )
)

# Schreibe den Trend-Aggregations-Stream in eine neue In-Memory-View
trend_query = (
    trend_stream.writeStream
    .format("memory")
    .queryName("temperatur_trend")
    .outputMode("complete")
    .start()
)


In [0]:
display(spark.sql("SELECT * FROM temperatur_trend"))


fenster_start,sensor_id,temperatur_durchschnitt
2025-06-24T18:16:00Z,sensor_4,27.55
2025-06-24T18:17:00Z,sensor_1,26.15
2025-06-24T18:11:00Z,sensor_4,25.93
2025-06-24T18:13:00Z,sensor_2,29.12
2025-06-24T18:15:00Z,sensor_2,26.66
2025-06-24T18:15:00Z,sensor_4,27.99
2025-06-24T18:17:00Z,sensor_4,28.95
2025-06-24T18:11:00Z,sensor_1,27.17
2025-06-24T18:16:00Z,sensor_5,27.59
2025-06-24T18:17:00Z,sensor_2,28.74


Databricks visualization. Run in Databricks to view.