In [9]:
from kafka import KafkaConsumer, KafkaProducer
from config import kafka_config
import json

# Створення Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=kafka_config['bootstrap_servers'],
    security_protocol=kafka_config['security_protocol'],
    sasl_mechanism=kafka_config['sasl_mechanism'],
    sasl_plain_username=kafka_config['username'],
    sasl_plain_password=kafka_config['password'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda v: json.dumps(v).encode('utf-8')
)


# Створення Kafka Consumer
consumer = KafkaConsumer(
    bootstrap_servers=kafka_config['bootstrap_servers'],
    security_protocol=kafka_config['security_protocol'],
    sasl_mechanism=kafka_config['sasl_mechanism'],
    sasl_plain_username=kafka_config['username'],
    sasl_plain_password=kafka_config['password'],
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    key_deserializer=lambda v: json.loads(v.decode('utf-8')),
    auto_offset_reset='earliest',  # Зчитування повідомлень з початку
    enable_auto_commit=True,       # Автоматичне підтвердження зчитаних повідомлень
    group_id='my_consumer_group_3'   # Ідентифікатор групи споживачів
)

# Назва топіку
my_name = "vasyliev_v"
sensor_topic_name = f'{my_name}_building_sensors'
temp_alert_topic_name = f'{my_name}_temperature_alerts'
humidity_alert_topic_name = f'{my_name}_humidity_alerts'



# Підписка на топік з даними сенсорів
consumer.subscribe([sensor_topic_name])
print(f"Subscribed to topic '{sensor_topic_name}'")

# Обробка повідомлень з топіку з даними сенсорів
try:
    for message in consumer:
        print(f"Received message: {message.value} with key: {message.key}, partition {message.partition}")
        
		# Перевірка на перевищення порогу температури
		# Якщо температура більше 40 градусів
		# Створюємо повідомлення для відправки в топік з попередженням
        if message.value['temperature'] > 40:
            temp_alert_message = {
								"sensor_id": message.value['sensor_id'],
								"timestamp": message.value['timestamp'],
								"temperature": message.value['temperature'],
								"alert_message": "Temperature threshold exceeded"
								}
            print("Temperature alert triggered!")
            producer.send(temp_alert_topic_name, value=temp_alert_message)
            producer.flush()  # Очікування, поки всі повідомлення будуть відправлені
            print(f"Sent alert to topic '{temp_alert_topic_name}': {temp_alert_message}")
                  
		# Перевірка на перевищення порогу вологості
		# Якщо вологість більше 80% або менше 20%
		# Створюємо повідомлення для відправки в топік з попередженням
        if message.value['humidity'] > 80 or message.value['humidity'] < 20:
            humidity_alert_message = {
								"sensor_id": message.value['sensor_id'],
								"timestamp": message.value['timestamp'],
								"humidity": message.value['humidity'],
								"alert_message": "Humidity threshold exceeded"
								}
            print("Humidity alert triggered!")
            producer.send(humidity_alert_topic_name, value=humidity_alert_message)
            producer.flush()  # Очікування, поки всі повідомлення будуть відправлені
            print(f"Sent alert to topic '{humidity_alert_topic_name}': {humidity_alert_message}")		

except Exception as e:
    print(f"An error occurred: {e}")
finally:
    consumer.close()  # Закриття consumer

Subscribed to topic 'vasyliev_v_building_sensors'
Received message: {'timestamp': 1743956450.6756322, 'sensor_id': 'sensor_24', 'temperature': 28, 'humidity': 45} with key: 1, partition 0
Received message: {'timestamp': 1743956452.7360837, 'sensor_id': 'sensor_40', 'temperature': 39, 'humidity': 25} with key: 2, partition 0
Received message: {'timestamp': 1743956454.7936566, 'sensor_id': 'sensor_5', 'temperature': 36, 'humidity': 33} with key: 3, partition 0
Received message: {'timestamp': 1743956463.0320024, 'sensor_id': 'sensor_15', 'temperature': 31, 'humidity': 67} with key: 7, partition 0
Received message: {'timestamp': 1743956465.0926013, 'sensor_id': 'sensor_23', 'temperature': 31, 'humidity': 58} with key: 8, partition 0
Received message: {'timestamp': 1743956471.2761486, 'sensor_id': 'sensor_36', 'temperature': 27, 'humidity': 82} with key: 11, partition 0
Humidity alert triggered!
Sent alert to topic 'vasyliev_v_humidity_alerts': {'sensor_id': 'sensor_36', 'timestamp': 174395

KeyboardInterrupt: 