In [1]:
import numpy as np
import random
import json
import time
from kafka import KafkaProducer

## Variables de Ambiente

In [2]:
KAFKA_SERVER = 'lab9.alumchat.lol:9092'
TOPIC = '21438' 

## Simulación de Sensores

In [3]:
def get_temperature(mean_temp: int, std_dev: int):
    res = np.random.normal(mean_temp, std_dev, 1)[0]
    return round(res, 2)

def get_humidity(mean_humidity: int, std_dev: int):
    humidity_sample = np.random.normal(mean_humidity, std_dev, 1)
    res = np.clip(humidity_sample, 0, 100)[0]
    return round(res, 2)

def get_wind_dir():
    directions = ["N", "NO", "O", "SO", "S", "SE", "E", "NE"]
    return random.choice(directions)

def to_dict(temp: int, humidity: int, wind_dir: str):
    dictionary = {
        "temperatura": temp,
        "humedad": humidity,
        "direccion_viento": wind_dir
    }

    return json.dumps(dictionary)

## Envío de Datos al Edge Server
Nota: El stream de datos debe detenerse manualmente, de forma que se levante un `KeyboardInterupt`.

In [4]:
def send_data(producer):
    while True:
        temperature = get_temperature(mean_temp=15, std_dev=5)
        humidity = get_humidity(mean_humidity=60, std_dev=15)    
        wind_dir = get_wind_dir()
        data = to_dict(temperature, humidity, wind_dir)

        # Genera y envía los datos        
        producer.send(TOPIC, data)
        print(f"Enviado: {data}")

        # Espera entre 15 y 30 segundos antes de enviar el siguiente dato
        time_to_sleep = random.randint(15, 30)
        time.sleep(time_to_sleep)

In [6]:
producer = KafkaProducer(
    bootstrap_servers=KAFKA_SERVER,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

try:
    send_data(producer)
except KeyboardInterrupt:
    print("Process finished successfully")
finally:
    # Ensure all messages are sent
    producer.flush()
    producer.close()

Enviado: {"temperatura": 14.25, "humedad": 67.87, "direccion_viento": "N"}
Enviado: {"temperatura": 12.97, "humedad": 66.02, "direccion_viento": "O"}
Enviado: {"temperatura": 18.61, "humedad": 64.81, "direccion_viento": "S"}
Process finished successfully


## Consumir y Desplegar Datos Meteorológicos