In [1]:
import time
import json
import random
from datetime import datetime
from kafka import KafkaProducer, KafkaConsumer
import pandas as pd
import matplotlib.pyplot as plt
from collections import deque
from IPython.display import clear_output

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def generate_sensor_data(sensor_id):
    return {
        "timestamp": datetime.utcnow().isoformat(),
        "sensor_id": sensor_id,
        "temperature": round(random.uniform(20, 40), 2),
        "humidity": round(random.uniform(30, 90), 2),
        "pressure": round(random.uniform(950, 1050), 2)
    }

for _ in range(100):
    data = generate_sensor_data(sensor_id=random.randint(1, 5))
    producer.send('sensor-data', value=data)
    print("Sent:", data)
    time.sleep(0.5) 

producer.flush()

consumer = KafkaConsumer(
    'sensor-data',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='sensor-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

data_list = []

print("Listening for sensor data...")
for i, message in enumerate(consumer):
    data = message.value
    data_list.append(data)
    print(data)
    
    if i >= 50:  
        break

df = pd.DataFrame(data_list)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.head()

def detect_anomalies(df):
    conditions = (
        (df['temperature'] > 35) | 
        (df['temperature'] < 22) |
        (df['humidity'] > 85) |
        (df['pressure'] < 960)
    )
    df['anomaly'] = conditions
    return df

df = detect_anomalies(df)
df[df['anomaly'] == True]

plt.figure(figsize=(12, 6))
plt.plot(df['timestamp'], df['temperature'], label='Temperature (Â°C)', color='tomato')
plt.plot(df['timestamp'], df['humidity'], label='Humidity (%)', color='skyblue')
plt.plot(df['timestamp'], df['pressure'], label='Pressure (hPa)', color='green')
plt.title('Sensor Data Stream')
plt.xlabel('Timestamp')
plt.ylabel('Sensor Readings')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

ModuleNotFoundError: No module named 'kafka'