In [None]:
# 1. перевірка що створені топіки в Kafka:

from kafka import KafkaAdminClient

# Підключення до Kafka
admin_client = KafkaAdminClient(
    bootstrap_servers="localhost:9092",
)

# Отримання списку тем
topics = admin_client.list_topics()

# Перевірка, чи існують конкретні теми
expected_topics = ["building-sensors-maxim", "humidity-alerts-maxim", "temperature-alerts-maxim"]

print(f"Available topics: {topics}")

for topic in expected_topics:
    if topic in topics:
        print(f"Topic '{topic}' is created.")
    else:
        print(f"Topic '{topic}' is not created.")


In [None]:
# 2.генерація даних та відправка їх у топік building_sensors_maxim:

from kafka import KafkaProducer
import random
import time
import json
from datetime import datetime

folder_path = "K:/PowerBi/Go IT Magistr/12_Data Engineering/hw_3"

# Конфігурація продюсера
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Генерація випадкових даних
def generate_sensor_data(sensor_id):
    temperature = random.uniform(25, 45)
    humidity = random.uniform(15, 85)
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    return {
        'sensor_id': sensor_id,
        'temperature': temperature,
        'humidity': humidity,
        'timestamp': timestamp
    }

# Функція для відправки даних
def send_sensor_data(sensor_id, num_messages=10):
    for _ in range(num_messages):
        data = generate_sensor_data(sensor_id)
        producer.send('building-sensors-maxim', value=data)
        print(f"Data sent: {data}")
        time.sleep(2)  

# Виклик функції для відправки 10 повідомлень
sensor_id = random.randint(1, 1000)  
send_sensor_data(sensor_id, num_messages=10)




In [None]:
#3. обробка даних з building_sensors_maxim: from kafka import KafkaConsumer
from kafka import KafkaConsumer, KafkaProducer
import json

# Конфігурація консьюмера
consumer = KafkaConsumer(
    'building-sensors-maxim',  # Топік, з якого зчитуємо дані
    bootstrap_servers='localhost:9092',  # Сервер Kafka
    group_id='sensor_group',  # Ідентифікатор групи консюмерів
    value_deserializer=lambda m: json.loads(m.decode('utf-8')) 
)

# Конфігурація продюсера для відправки сповіщень
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',  
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  
)

# Функція для перевірки порогових значень і відправки сповіщень
def process_sensor_data():
    for message in consumer:
        data = message.value  
        print(f"Received message: {data}")  

        sensor_id = data['sensor_id']
        temperature = data['temperature']
        humidity = data['humidity']
        timestamp = data['timestamp']
        
        # Перевірка температури
        if temperature > 40:
            alert = {
                'sensor_id': sensor_id,
                'temperature': temperature,
                'timestamp': timestamp,
                'message': 'Temperature exceeds 40°C'
            }
            producer.send('temperature-alerts-maxim', value=alert)  
            print(f"Temperature alert sent: {alert}")

        # Перевірка вологості
        if humidity > 80 or humidity < 20:
            alert = {
                'sensor_id': sensor_id,
                'humidity': humidity,
                'timestamp': timestamp,
                'message': 'Humidity out of range (less than 20% or more than 80%)'
            }
            producer.send('humidity-alerts-maxim', value=alert)  
            print(f"Humidity alert sent: {alert}")

# Виклик функції для обробки даних
process_sensor_data()


In [None]:
#4. отримання та виведення сповіщень з temperature_alerts_maxim та humidity_alerts_maxim: 
from kafka import KafkaConsumer
import json

# Конфігурація консьюмера для топіка temperature_alerts
consumer_temperature = KafkaConsumer(
    'temperature-alerts-maxim',  
    bootstrap_servers='localhost:9092',  
    group_id='alert_group',  
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))  
)

# Конфігурація консьюмера для топіка humidity_alerts
consumer_humidity = KafkaConsumer(
    'humidity-alerts-maxim', 
    bootstrap_servers='localhost:9092', 
    group_id='alert_group',  
    value_deserializer=lambda m: json.loads(m.decode('utf-8')) 
)

# Функція для виведення сповіщень на екран
def display_alerts():
    print("Starting to receive alerts...")
    
    # Читання повідомлень з обох топіків
    for message in consumer_temperature:
        alert = message.value
        print(f"Temperature alert received: {alert}")

    for message in consumer_humidity:
        alert = message.value
        print(f"Humidity alert received: {alert}")

# Виклик функції для обробки та виведення сповіщень
display_alerts()




