In [1]:
from kafka import KafkaConsumer
import json
import matplotlib.pyplot as plt
import time

In [5]:
# Initialize data storage for plotting
timestamps = []
water_temperatures = []
ph_levels = []
turbidities = []
dissolved_oxygen_levels = []

In [2]:
# Kafka configuration
def initialize_consumer():
    kafka_topic = "water_quality"
    kafka_bootstrap_servers = ["localhost:9092"]

    # Create Kafka consumer
    consumer = KafkaConsumer(
        kafka_topic,
        bootstrap_servers=kafka_bootstrap_servers,
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        auto_offset_reset='latest',
        enable_auto_commit=True
    )
    return consumer

In [3]:
# Receive all published messages and update plot
def update_plot(consumer):
    try:
        for message in consumer:
            # Parse the message
            sensor_data = message.value
            print(f"Received: {sensor_data}")

            # Update data storage
            timestamps.append(sensor_data['timestamp'])
            water_temperatures.append(sensor_data['water_temperature'])
            ph_levels.append(sensor_data['ph_level'])
            turbidities.append(sensor_data['turbidity'])
            dissolved_oxygen_levels.append(sensor_data['dissolved_oxygen'])

            # Keep only the last 100 entries for plotting
            if len(timestamps) > 100:
                timestamps.pop(0)
                water_temperatures.pop(0)
                ph_levels.pop(0)
                turbidities.pop(0)
                dissolved_oxygen_levels.pop(0)

            # Clear the current axes and redraw the plots
            plt.figure(figsize=(10, 8))

            plt.subplot(2, 2, 1)
            plt.plot(timestamps, water_temperatures, label="Water Temperature", color="blue")
            plt.title("Water Temperature")
            plt.ylabel("°C")

            plt.subplot(2, 2, 2)
            plt.plot(timestamps, ph_levels, label="pH Level", color="green")
            plt.title("pH Level")
            plt.ylabel("pH")

            plt.subplot(2, 2, 3)
            plt.plot(timestamps, turbidities, label="Turbidity", color="orange")
            plt.title("Turbidity")
            plt.ylabel("NTU")

            plt.subplot(2, 2, 4)
            plt.plot(timestamps, dissolved_oxygen_levels, label="Dissolved Oxygen", color="red")
            plt.title("Dissolved Oxygen")
            plt.ylabel("mg/L")

            plt.tight_layout()

            # Save the plot as an image
            plt.savefig(f"water_quality_plot.png")
            plt.close()

            break  # Process one message at a time
    except KeyboardInterrupt:
        print("Stopped consuming messages.")
        consumer.close()

In [None]:
consumer = initialize_consumer()
print("Subscribed to Kafka topic 'water_quality'.")

try:
    while True:
        update_plot(consumer)
except KeyboardInterrupt:
    print("Stopped visualization.")
    consumer.close()

Subscribed to Kafka topic 'water_quality'.
Received: {'timestamp': 1736421122, 'water_temperature': 26.38055098120148, 'ph_level': 8.057112969540205, 'turbidity': 13.9, 'dissolved_oxygen': 6.45}
Received: {'timestamp': 1736421127, 'water_temperature': 26.429848885812078, 'ph_level': 7.624881651283361, 'turbidity': 11.7, 'dissolved_oxygen': 6.57}
Received: {'timestamp': 1736421132, 'water_temperature': 27.75093966215566, 'ph_level': 8.459510112536124, 'turbidity': 12.27, 'dissolved_oxygen': 8.14}
Received: {'timestamp': 1736421137, 'water_temperature': 26.496794746797935, 'ph_level': 7.638518119603817, 'turbidity': 28.13, 'dissolved_oxygen': 7.16}
Received: {'timestamp': 1736421142, 'water_temperature': 26.197449009873893, 'ph_level': 8.003490327846047, 'turbidity': 24.36, 'dissolved_oxygen': 7.91}
Received: {'timestamp': 1736421147, 'water_temperature': 26.16639025888128, 'ph_level': 8.36526267280038, 'turbidity': 17.46, 'dissolved_oxygen': 8.19}
Received: {'timestamp': 1736421152, 'wa