In [None]:
import json
from kafka import KafkaConsumer, KafkaProducer

# Налаштування Kafka
BROKER = "localhost:9092"
SENSOR_TOPIC = "building_sensors_your_name"  # Замініть 'your_name' на ваш ідентифікатор
TEMPERATURE_ALERTS_TOPIC = "temperature_alerts_your_name"
HUMIDITY_ALERTS_TOPIC = "humidity_alerts_your_name"

# Ініціалізація споживача Kafka
consumer = KafkaConsumer(
    SENSOR_TOPIC,
    bootstrap_servers=BROKER,
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)

# Ініціалізація продюсера Kafka для сповіщень
producer = KafkaProducer(
    bootstrap_servers=BROKER, value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

try:
    for message in consumer:
        data = message.value
        sensor_id = data["sensor_id"]
        temperature = data["temperature"]
        humidity = data["humidity"]
        timestamp = data["timestamp"]

        # Перевірка температури
        if temperature > 40.0:
            alert = {
                "sensor_id": sensor_id,
                "timestamp": timestamp,
                "temperature": temperature,
                "message": "Перевищено допустимий рівень температури!",
            }
            producer.send(TEMPERATURE_ALERTS_TOPIC, value=alert)
            print(f"Відправлено сповіщення про температуру: {alert}")

        # Перевірка вологості
        if humidity > 80.0 or humidity < 20.0:
            alert = {
                "sensor_id": sensor_id,
                "timestamp": timestamp,
                "humidity": humidity,
                "message": "Вологість виходить за допустимі рамки!",
            }
            producer.send(HUMIDITY_ALERTS_TOPIC, value=alert)
            print(f"Відправлено сповіщення про вологість: {alert}")

except KeyboardInterrupt:
    print("Завершення роботи скрипту...")
finally:
    consumer.close()
    producer.close()