Производитель(датчики)

In [6]:
from confluent_kafka import Producer
import time
import json
import random

# Kafka producer configuration
producer_config = {
    'bootstrap.servers': 'localhost:9092',
}

# Create Kafka producer instance
producer = Producer(producer_config)

# Function to generate sensor data
def generate_sensor_data(sensor_type, sensor_name):
    return {
        'timestamp': int(time.time()),
        'sensor_type': sensor_type,
        'sensor_name': sensor_name,
        'value': round(random.uniform(0, 100), 2)
    }

# Function to send sensor data to Kafka topic
def send_sensor_data(sensor_type, sensor_name):
    topic = 'iot_sensor_data'

    while True:
        data = generate_sensor_data(sensor_type, sensor_name)
        data_str = json.dumps(data)

        # Produce message to Kafka topic
        producer.produce(topic, value=data_str)
        producer.flush()

        time.sleep(random.uniform(0.1, 2.0))  # Simulate variable delays
    

In [None]:
# Start two sensor threads (you can adjust the number of sensors)
import threading

thread1 = threading.Thread(target=send_sensor_data, args=('temperature', 'sensor1'))
thread2 = threading.Thread(target=send_sensor_data, args=('pressure', 'sensor2'))

thread1.start()
thread2.start()

thread1.join()
thread2.join()


Потребитель (обработчик)

In [8]:
from confluent_kafka import Consumer
import pandas as pd
import json

# Kafka consumer configuration
consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'sensor_consumer_group',
    'auto.offset.reset': 'earliest'
}

# Create Kafka consumer instance
consumer = Consumer(consumer_config)
consumer.subscribe(['iot_sensor_data'])

# Variables for accumulating data
sensor_data = []

# Function to process messages and update data
def process_messages():
    for _ in range(100):  # Adjust the number of messages to process
        msg = consumer.poll(1.0)  # Timeout in seconds

        if msg is None:
            continue
        if msg.error():
            print(f'Consumer error: {msg.error()}')
            continue

        data_str = msg.value().decode('utf-8')
        data = json.loads(data_str)
        sensor_data.append(data)

In [9]:
process_messages()

# Convert accumulated data to Pandas DataFrame
df = pd.DataFrame(sensor_data)

# Group by sensor type and sensor name, calculate average value
avg_by_type = df.groupby('sensor_type')['value'].mean()
avg_by_name = df.groupby('sensor_name')['value'].mean()

print('Average by Sensor Type:')
print(avg_by_type)

print('\nAverage by Sensor Name:')
print(avg_by_name)


Average by Sensor Type:
sensor_type
pressure       41.686429
temperature    56.566000
Name: value, dtype: float64

Average by Sensor Name:
sensor_name
sensor1    56.566000
sensor2    41.686429
Name: value, dtype: float64


%4|1702838948.589|MAXPOLL|rdkafka#consumer-5| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 259ms (adjust max.poll.interval.ms for long-running message processing): leaving group
%6|1702981667.473|FAIL|rdkafka#consumer-5| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected (after 143063073ms in state UP)
%6|1702981667.474|FAIL|rdkafka#producer-4| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected (after 143493864ms in state UP)
%6|1702981669.048|FAIL|rdkafka#producer-4| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1570ms in state APIVERSION_QUERY)
%6|1702981669.049|FAIL|rdkafka#consumer-5| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuratio