## **Ejercicio 2.7**


**Enunciado**: Modifica el archivo water_data_consumer_pr_2.ipynb para tener consumidores con un group_id común. Cada sensor_id debe producir un plot con los datos producidos por ese sensor_id. Crea una copia de water_data_consumer_pr_2.ipynb modificado y adjunta captura de los dos notebooks modificados ejecutándose a la vez y gestionando diferentes sensor_id y los plots resultantes.

Si se debe generar un gráfico (plot) por cada sensor, entonces habrá tantos productores como sensores. Supongamos que tenemos 6 sensores; en ese caso, necesitamos implementar 6 productores, los cuales escribirán en un mismo topic, llamado 'water_quality'. Se divide el topic en 2 particiones


In [12]:
from kafka import KafkaProducer
from kafka import KafkaAdminClient
from kafka.admin import NewTopic

import json
import time
import random
from math import cos, pi
import numpy as np

In [13]:
def generate_sensor_data(sensor_id):
    timestamp = int(time.time())

    # Simulate IoT sensor data for water quality metrics with realistic patterns
    water_temperature = random.uniform(1, 3) + round(20 + 5 * (1 + 0.5 * (1 + cos((timestamp % 86400) / 86400 * 2 * pi))), 2)
    ph_level = random.uniform(0, 1) + round(7.5 + 0.2 * (1 + cos((timestamp % 86400) / 86400 * 2 * pi)), 2)
    turbidity = round(random.uniform(5, 50), 2)  # Turbidity in NTU (Nephelometric Turbidity Units)
    dissolved_oxygen = round(random.uniform(5, 12), 2)  # Dissolved Oxygen in mg/L
    
    return {
        "sensor_id": sensor_id,
        "timestamp": timestamp,
        "water_temperature": water_temperature,
        "ph_level": ph_level,
        "turbidity": turbidity,
        "dissolved_oxygen": dissolved_oxygen
    }


In [14]:
### CÓDIGO PARA ELIMINAR EL TOPIC Y VOLVER A CREARLO CON "N" PARTICIONES
kafka_topic = "water_quality"
kafka_bootstrap_servers = ["localhost:9092"] 

admin_client = KafkaAdminClient(bootstrap_servers=kafka_bootstrap_servers)
existing_topics = admin_client.list_topics()

if kafka_topic in existing_topics:
    print(f"El topic '{kafka_topic}' ya existe. Eliminándolo...")
    admin_client.delete_topics([kafka_topic])

In [15]:
kafka_topic = "water_quality"
kafka_bootstrap_servers = ["localhost:9092"] 

### Creacion del topic con tantas particiones como sensores existan.
partitions = 2
# Se usa el cliente de Kafka en Python
admin_client = KafkaAdminClient(bootstrap_servers=kafka_bootstrap_servers)
topic = NewTopic(name='water_quality', num_partitions=partitions, replication_factor=1)

admin_client.create_topics([topic])

print(f"Topic '{kafka_topic}' creado con {partitions} particiones.")

Topic 'water_quality' creado con 2 particiones.


In [16]:
import sys
import time
# Kafka configuration
kafka_topic = "water_quality"
kafka_bootstrap_servers = ["localhost:9092"] 

# 6 sensores
producer_number = 6
producers = []
producer_ids = []

for i in range(producer_number):
    producer_ids.append(str(i))
    # Create Kafka producer
    producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers,value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    producers.append(producer)

print(f"Producing messages to Kafka topic '{kafka_topic}'...")

try:
    while True:
        for i in range(producer_number):
            # Generate sensor data
            sensor_data = generate_sensor_data(i)
            print(f'Sensor: {i} sends data: {sensor_data}')
            # Publish sensor data to Kafka
            print(f"keyyy: {producer_ids[i].encode('utf-8')}")
            producers[i].send(kafka_topic, key=producer_ids[i].encode('utf-8'),value=sensor_data)

        time.sleep(1)
except KeyboardInterrupt:
    print("Stopped producing messages.")
finally:
    for producer in producers:
        producer.close()

Producing messages to Kafka topic 'water_quality'...
Sensor: 0 sends data: {'sensor_id': 0, 'timestamp': 1741186007, 'water_temperature': 27.06355093495869, 'ph_level': 8.537601663239226, 'turbidity': 6.9, 'dissolved_oxygen': 10.94}
keyyy: b'0'
Sensor: 1 sends data: {'sensor_id': 1, 'timestamp': 1741186007, 'water_temperature': 28.018991117858352, 'ph_level': 7.975691113243694, 'turbidity': 23.65, 'dissolved_oxygen': 9.36}
keyyy: b'1'
Sensor: 2 sends data: {'sensor_id': 2, 'timestamp': 1741186007, 'water_temperature': 27.198155961558488, 'ph_level': 8.510099427001753, 'turbidity': 7.39, 'dissolved_oxygen': 8.93}
keyyy: b'2'
Sensor: 3 sends data: {'sensor_id': 3, 'timestamp': 1741186007, 'water_temperature': 27.830472030408888, 'ph_level': 7.700855897675119, 'turbidity': 22.57, 'dissolved_oxygen': 5.93}
keyyy: b'3'
Sensor: 4 sends data: {'sensor_id': 4, 'timestamp': 1741186007, 'water_temperature': 28.527388280805134, 'ph_level': 8.409207246586742, 'turbidity': 14.94, 'dissolved_oxygen'