In [None]:
# Генерація та надсилання даних від сенсора до Kafka
from kafka import KafkaProducer
import random
import time
import json
from datetime import datetime

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def generate_sensor_data(sensor_id):
    temperature = random.uniform(25, 45)
    humidity = random.uniform(15, 85)
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    return {
        'sensor_id': sensor_id,
        'temperature': temperature,
        'humidity': humidity,
        'timestamp': timestamp
    }

def send_sensor_data(sensor_id, num_messages=10):
    for _ in range(num_messages):
        data = generate_sensor_data(sensor_id)
        producer.send('building-sensors-maxim', value=data)
        print(f"Data sent: {data}")
        time.sleep(2)  

sensor_id = random.randint(1, 1000)  
send_sensor_data(sensor_id, num_messages=10)


In [None]:
# Обробка даних з Kafka з використанням ковзного вікна
from kafka import KafkaConsumer
import json
from datetime import datetime
from collections import deque

consumer = KafkaConsumer(
    'building-sensors-maxim',
    bootstrap_servers='localhost:9092',
    group_id='sensor_group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

window_size = 60  
sliding_interval = 1  
watermark_duration = 10  
window = deque() 

def calculate_averages(window):
    total_temperature = sum([data['temperature'] for data in window])
    total_humidity = sum([data['humidity'] for data in window])
    count = len(window)
    return total_temperature / count, total_humidity / count if count else 0

def process_stream():
    print("Starting to process the stream of data...")
    for message in consumer:
        data = message.value
        print(f"Received message: {data}")  
        timestamp = datetime.strptime(data['timestamp'], '%Y-%m-%d %H:%M:%S')

        window.append((timestamp, data))

        while window and (timestamp - window[0][0]).total_seconds() > window_size + watermark_duration:
            window.popleft()

        if len(window) >= sliding_interval:
            avg_temperature, avg_humidity = calculate_averages([d[1] for d in window])
            print(f"Avg Temperature: {avg_temperature:.2f}, Avg Humidity: {avg_humidity:.2f}")

        print(f"Processed data: {data['sensor_id']} - Temperature: {data['temperature']}, Humidity: {data['humidity']} at {data['timestamp']}")

process_stream()




In [None]:
# Перевірка умов для алертів на основі отриманих даних
import pandas as pd
from kafka import KafkaConsumer
import json
from datetime import datetime
from collections import deque

alerts_conditions_path = r'K:\PowerBi\Go IT Magistr\12_Data Engineering\hw_4\alerts_conditions.csv'

consumer = KafkaConsumer(
    'building-sensors-maxim',
    bootstrap_servers='localhost:9092',
    group_id='sensor_group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

alert_conditions_df = pd.read_csv(alerts_conditions_path)

window_size = 60  
sliding_interval = 1  
watermark_duration = 10  
window = deque()  

def calculate_averages(window):
    total_temperature = sum([data['temperature'] for data in window])
    total_humidity = sum([data['humidity'] for data in window])
    count = len(window)
    return total_temperature / count, total_humidity / count if count else 0

def check_alert_conditions(avg_temperature, avg_humidity, alert_conditions_df):
    alert_conditions_df['avg_temperature'] = avg_temperature
    alert_conditions_df['avg_humidity'] = avg_humidity

    alerts = alert_conditions_df[
        ((alert_conditions_df['min_temperature'] <= avg_temperature) | (alert_conditions_df['min_temperature'] == -999)) &
        ((alert_conditions_df['max_temperature'] >= avg_temperature) | (alert_conditions_df['max_temperature'] == -999)) &
        ((alert_conditions_df['min_humidity'] <= avg_humidity) | (alert_conditions_df['min_humidity'] == -999)) &
        ((alert_conditions_df['max_humidity'] >= avg_humidity) | (alert_conditions_df['max_humidity'] == -999))
    ]
    return alerts

def process_stream_with_alerts():
    print("Starting to process the stream of data with alerts...")
    for message in consumer:
        data = message.value
        print(f"Received message: {data}")  
        timestamp = datetime.strptime(data['timestamp'], '%Y-%m-%d %H:%M:%S')

        window.append((timestamp, data))

        while window and (timestamp - window[0][0]).total_seconds() > window_size + watermark_duration:
            window.popleft()

        if len(window) >= sliding_interval:
            avg_temperature, avg_humidity = calculate_averages([d[1] for d in window])
            print(f"Avg Temperature: {avg_temperature:.2f}, Avg Humidity: {avg_humidity:.2f}")

            alerts = check_alert_conditions(avg_temperature, avg_humidity, alert_conditions_df)
            if not alerts.empty:
                for _, alert in alerts.iterrows():
                    print(f"Alert Triggered! {alert['alert_message']} (Code: {alert['alert_code']})")

        print(f"Processed data: {data['sensor_id']} - Temperature: {data['temperature']}, Humidity: {data['humidity']} at {data['timestamp']}")

process_stream_with_alerts()


In [None]:
# Запис алертів у Kafka-топік
from kafka import KafkaProducer
import json
from datetime import datetime

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def write_alerts_to_kafka(alerts):
    for _, alert in alerts.iterrows():
        alert_message = {
            'window': {
                'start': alert['window_start'],
                'end': alert['window_end']
            },
            't_avg': alert['avg_temperature'],
            'h_avg': alert['avg_humidity'],
            'code': alert['alert_code'],
            'message': alert['alert_message'],
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        producer.send('building-alerts-maxim', value=alert_message)
        print(f"Alert sent to Kafka: {alert_message}")

def process_stream_with_alerts_and_write_to_kafka():
    print("Starting to process the stream of data with alerts...")
    for message in consumer:
        data = message.value
        print(f"Received message: {data}") 
        timestamp = datetime.strptime(data['timestamp'], '%Y-%m-%d %H:%M:%S')

        window.append((timestamp, data))

        while window and (timestamp - window[0][0]).total_seconds() > window_size + watermark_duration:
            window.popleft()

        if len(window) >= sliding_interval:
            avg_temperature, avg_humidity = calculate_averages([d[1] for d in window])
            print(f"Avg Temperature: {avg_temperature:.2f}, Avg Humidity: {avg_humidity:.2f}")

            alerts = check_alert_conditions(avg_temperature, avg_humidity, alert_conditions_df)
            if not alerts.empty:
                for _, alert in alerts.iterrows():
                    print(f"Alert Triggered! {alert['alert_message']} (Code: {alert['alert_code']})")

                    alert['window_start'] = window[0][0].strftime('%Y-%m-%dT%H:%M:%S.%f+03:00')
                    alert['window_end'] = window[-1][0].strftime('%Y-%m-%dT%H:%M:%S.%f+03:00')
                    alert['avg_temperature'] = avg_temperature
                    alert['avg_humidity'] = avg_humidity

                    write_alerts_to_kafka(alerts)

        print(f"Processed data: {data['sensor_id']} - Temperature: {data['temperature']}, Humidity: {data['humidity']} at {data['timestamp']}")

process_stream_with_alerts_and_write_to_kafka()
