In [1]:
# Встановлюємо необхідні пакети
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!pip install -q findspark pyspark==3.5.0 kafka-python

# Конфігурація середовища
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

import findspark
findspark.init()

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m285.4/285.4 kB[0m [31m17.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
# Завантаження Kafka
!wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
!tar -xzf kafka_2.13-3.6.0.tgz

# Запуск Zookeeper і Kafka
os.system("nohup kafka_2.13-3.6.0/bin/zookeeper-server-start.sh kafka_2.13-3.6.0/config/zookeeper.properties &")
os.system("nohup kafka_2.13-3.6.0/bin/kafka-server-start.sh kafka_2.13-3.6.0/config/server.properties &")

--2025-04-20 09:29:29--  https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 113257079 (108M) [application/x-gzip]
Saving to: ‘kafka_2.13-3.6.0.tgz’


2025-04-20 09:29:34 (23.2 MB/s) - ‘kafka_2.13-3.6.0.tgz’ saved [113257079/113257079]



0

In [3]:
# Встановлення бібліотеки Kafka Python
!pip install kafka-python



In [4]:
# Створення топіків
!kafka_2.13-3.6.0/bin/kafka-topics.sh --create --topic atr_building_sensors --bootstrap-server localhost:9092
!kafka_2.13-3.6.0/bin/kafka-topics.sh --create --topic atr_alerts --bootstrap-server localhost:9092

Created topic atr_building_sensors.
Created topic atr_alerts.


In [5]:
# Перевірка створених топіків
from kafka.admin import KafkaAdminClient
admin_client = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
print([topic for topic in admin_client.list_topics() if "atr" in topic])

['atr_alerts', 'atr_building_sensors']


In [6]:
# Завантаження параметрів алертів з CSV
import pandas as pd
# Створюємо CSV-файл з параметрами
params_csv = """id,humidity_min,humidity_max,temperature_min,temperature_max,code,message
1,0,40,-999,-999,101,"It's too dry"
2,60,100,-999,-999,102,"It's too wet"
3,-999,-999,-300,30,103,"It's too cold"
4,-999,-999,40,300,104,"It's too hot"
"""

# Збереження параметрів у CSV-файл
with open("alert_params.csv", "w") as file:
    file.write(params_csv)

# Завантаження параметрів
alerts_df = pd.read_csv("alert_params.csv")
alerts_df.head()

Unnamed: 0,id,humidity_min,humidity_max,temperature_min,temperature_max,code,message
0,1,0,40,-999,-999,101,It's too dry
1,2,60,100,-999,-999,102,It's too wet
2,3,-999,-999,-300,30,103,It's too cold
3,4,-999,-999,40,300,104,It's too hot


In [7]:
# Spark сесія
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("IoT_Alerts") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

In [8]:
# Скрипт для відправки даних до топіків, датчик 1
from kafka import KafkaProducer
import json, random, time
from datetime import datetime

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

sensor_id = random.randint(1000, 9999)

for _ in range(30):
    data = {
        "sensor_id": sensor_id,
        "timestamp": datetime.now().isoformat(),
        "temperature": random.randint(25, 45),
        "humidity": random.randint(15, 85)
    }
    print(f"Sensor {sensor_id} data sent:", data)
    producer.send("atr_building_sensors", data)
    producer.flush()  # негайне відправлення
    time.sleep(2)

Sensor 3551 data sent: {'sensor_id': 3551, 'timestamp': '2025-04-20T09:33:14.505970', 'temperature': 30, 'humidity': 27}
Sensor 3551 data sent: {'sensor_id': 3551, 'timestamp': '2025-04-20T09:33:16.683490', 'temperature': 39, 'humidity': 59}
Sensor 3551 data sent: {'sensor_id': 3551, 'timestamp': '2025-04-20T09:33:18.688438', 'temperature': 37, 'humidity': 75}
Sensor 3551 data sent: {'sensor_id': 3551, 'timestamp': '2025-04-20T09:33:20.693770', 'temperature': 42, 'humidity': 63}
Sensor 3551 data sent: {'sensor_id': 3551, 'timestamp': '2025-04-20T09:33:22.698653', 'temperature': 42, 'humidity': 48}
Sensor 3551 data sent: {'sensor_id': 3551, 'timestamp': '2025-04-20T09:33:24.703515', 'temperature': 27, 'humidity': 85}
Sensor 3551 data sent: {'sensor_id': 3551, 'timestamp': '2025-04-20T09:33:26.709576', 'temperature': 25, 'humidity': 19}
Sensor 3551 data sent: {'sensor_id': 3551, 'timestamp': '2025-04-20T09:33:28.714201', 'temperature': 32, 'humidity': 77}
Sensor 3551 data sent: {'sensor_

In [9]:
# Скрипт для відправки даних до топіків, датчик 2
from kafka import KafkaProducer
import json, random, time
from datetime import datetime

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

sensor_id = random.randint(1000, 9999)

for _ in range(30):
    data = {
        "sensor_id": sensor_id,
        "timestamp": datetime.now().isoformat(),
        "temperature": random.randint(25, 45),
        "humidity": random.randint(15, 85)
    }
    print(f"Sensor {sensor_id} data sent:", data)
    producer.send("atr_building_sensors", data)
    producer.flush()  # негайне відправлення
    time.sleep(2)

Sensor 2327 data sent: {'sensor_id': 2327, 'timestamp': '2025-04-20T09:34:29.870233', 'temperature': 30, 'humidity': 29}
Sensor 2327 data sent: {'sensor_id': 2327, 'timestamp': '2025-04-20T09:34:31.979818', 'temperature': 39, 'humidity': 39}
Sensor 2327 data sent: {'sensor_id': 2327, 'timestamp': '2025-04-20T09:34:33.984553', 'temperature': 39, 'humidity': 20}
Sensor 2327 data sent: {'sensor_id': 2327, 'timestamp': '2025-04-20T09:34:35.988871', 'temperature': 35, 'humidity': 44}
Sensor 2327 data sent: {'sensor_id': 2327, 'timestamp': '2025-04-20T09:34:37.993492', 'temperature': 35, 'humidity': 42}
Sensor 2327 data sent: {'sensor_id': 2327, 'timestamp': '2025-04-20T09:34:39.997362', 'temperature': 39, 'humidity': 53}
Sensor 2327 data sent: {'sensor_id': 2327, 'timestamp': '2025-04-20T09:34:42.001586', 'temperature': 36, 'humidity': 75}
Sensor 2327 data sent: {'sensor_id': 2327, 'timestamp': '2025-04-20T09:34:44.006932', 'temperature': 39, 'humidity': 28}
Sensor 2327 data sent: {'sensor_

In [10]:
# Spark Structured Streaming (Sliding Window + Watermark)
from pyspark.sql.functions import from_json, col, window, avg, current_timestamp, to_json, struct
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
    StructField("sensor_id", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("temperature", IntegerType()),
    StructField("humidity", IntegerType())
])

sensor_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "atr_building_sensors") \
    .option("startingOffsets", "latest") \
    .load()

sensor_values = sensor_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

agg_df = sensor_values \
    .withWatermark("timestamp", "10 seconds") \
    .groupBy(window(col("timestamp"), "1 minute", "30 seconds")) \
    .agg(avg("temperature").alias("t_avg"), avg("humidity").alias("h_avg"))

# query з логуванням
query = agg_df.writeStream.outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination(180)
query.stop()

In [11]:
# Перевірка алертів та запис у Kafka
from pyspark.sql.functions import current_timestamp, struct, to_json

alerts_spark_df = spark.createDataFrame(alerts_df)

alerts_final_df = agg_df.crossJoin(alerts_spark_df).where(
    ((col("humidity_min") != -999) & (col("h_avg") < col("humidity_min"))) |
    ((col("humidity_max") != -999) & (col("h_avg") > col("humidity_max"))) |
    ((col("temperature_min") != -999) & (col("t_avg") < col("temperature_min"))) |
    ((col("temperature_max") != -999) & (col("t_avg") > col("temperature_max")))
).select(
    "window", "t_avg", "h_avg", "code", "message", current_timestamp().alias("timestamp")
)

# Stream до Kafka
query_kafka = alerts_final_df.select(to_json(struct("*")).alias("value")).writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "atr_alerts") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .start()

# Stream до консолі
query_console = alerts_final_df.writeStream.outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query_kafka.awaitTermination(180)
query_console.awaitTermination(180)

query_kafka.stop()
query_console.stop()

In [13]:
# Перевірка отриманих алертів
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'atr_alerts',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    consumer_timeout_ms=30000  # 30 секунд очікування
)

for message in consumer:
    print("🔥 ALERT:", message.value)

consumer.close()

