In [15]:
# ======================
# Librerías
# ======================
from confluent_kafka.admin import AdminClient, NewTopic  # Para crear y administrar topics en Kafka
from confluent_kafka import Producer, Consumer, KafkaError  # Para enviar y recibir mensajes
import time  # Para pausas y temporizadores
import random  # Para generar temperaturas simuladas
import json  # Para codificar/decodificar mensajes en formato JSON
from datetime import datetime  # Para agregar fecha y hora a los mensajes

# Ejercicio: Kafka con Producer y Consumer en Python

# ======================
# Configuración (VARIABLES DE CONEXIÓN)
# ======================
BROKER_HOST = "localhost"  # Host donde corre Kafka
BROKER_PORT = 9092  # Puerto del broker Kafka
TOPIC_NAME = "kafkatemperaturas"  # Nombre del topic que vamos a crear/usar

BOOTSTRAP_SERVERS = f"{BROKER_HOST}:{BROKER_PORT}"  # Dirección completa del broker

# ======================
# Crear cliente Admin
# ======================
# Este cliente nos permite crear topics directamente desde Python
admin_client = AdminClient({'bootstrap.servers': BOOTSTRAP_SERVERS})

# ======================
# Definir tema (ESAS SON LAS VARIABLES NECESARIAS)
# ======================
# Creamos la configuración del nuevo topic: 1 partición y replicación 1
new_topic = NewTopic(
    topic=TOPIC_NAME,
    num_partitions=1,
    replication_factor=1
)

# ======================
# Crear el tema
# ======================
# Intentamos crear el topic en Kafka. Si ya existe, avisa del error
fs = admin_client.create_topics([new_topic])

# ======================
# Comprobar resultado
# ======================
for topic, f in fs.items():
    try:
        f.result()  # Bloquea hasta que la creación del topic se complete
        print(f"Tema '{topic}' creado correctamente ")
    except Exception as e:
        # Si ya existía o hubo otro error, lo informamos por pantalla
        print(f" Error creando el tema {topic}: {e}")


 Error creando el tema kafkatemperaturas: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'kafkatemperaturas' already exists."}


In [26]:
# 2. Implementar un script en Python que funcione como Producer y envíe temperaturas
# simuladas al tema `kafkatemperaturas` cada 5 segundos

# ======================
# VARIABLES QUE SE USAN AHORA, QUE ESTÁN DECLARADAS MÁS ARRIBA
# ======================
# BROKER_HOST = "localhost"
# BROKER_PORT = 9092
# TOPIC_NAME = "kafkatemperaturas"
# BOOTSTRAP_SERVERS = f"{BROKER_HOST}:{BROKER_PORT}"
# Estas variables ya las definimos antes. Las necesitamos para conectar el Producer al broker.

# ======================
# Crear Producer
# ======================
# Creamos un productor Kafka que enviará mensajes al broker especificado
producer = Producer({'bootstrap.servers': BOOTSTRAP_SERVERS})

# Función de callback para informar si el envío del mensaje fue exitoso o si hubo error
def delivery_report(err, msg):
    if err is not None:
        # Si ocurrió un error al enviar el mensaje, lo mostramos por pantalla
        print(f" Error al enviar mensaje: {err}")
    else:
        # Si el mensaje se envió correctamente, mostramos a qué topic y partición fue enviado
        print(f" Mensaje enviado a {msg.topic()} [{msg.partition()}]")

# ======================
# Pedimos al usuario la cantidad de mensajes que quiere enviar
# ======================
cantidad_mensajes = int(input("¿Cuántos mensajes quieres enviar? "))

# ======================
# Loop para enviar temperaturas simuladas
# ======================
try:
    for i in range(cantidad_mensajes):
        # Generamos una temperatura simulada entre 15 y 35 grados, con 2 decimales
        temperatura = round(random.uniform(15, 35), 2)

        # Creamos un mensaje JSON con la temperatura y la unidad (°C)
        mensaje = json.dumps({"temperatura": temperatura, "unidad": "°C"})

        # Enviamos el mensaje al topic definido. 
        # Callback 'delivery_report' nos dirá si se envió bien
        producer.produce(TOPIC_NAME, value=mensaje, callback=delivery_report)

        # 'flush' asegura que el mensaje se envíe antes de continuar
        producer.flush()

        # Se espera 2 segundos antes de enviar la siguiente temperatura
        # Esto simula lecturas periódicas como si vinieran de un sensor real
        time.sleep(2)

    print(f"Se enviaron los {cantidad_mensajes} mensajes solicitados. Producer finalizado.")

except KeyboardInterrupt:
    # Permite detener el bucle de manera limpia
    print(" Producer detenido por usuario")


 Mensaje enviado a kafkatemperaturas [0]
 Mensaje enviado a kafkatemperaturas [0]
 Mensaje enviado a kafkatemperaturas [0]
 Mensaje enviado a kafkatemperaturas [0]
Se enviaron los 4 mensajes solicitados. Producer finalizado.


In [27]:
# ======================
# Configuración
# ======================
GRUPO_CONSUMIDOR = "grupo_temperaturas"  # Identificador del grupo de consumidores

# ======================
# Crear Consumer
# ======================
consumer = Consumer({
    'bootstrap.servers': BOOTSTRAP_SERVERS,  # Broker al que nos conectamos
    'group.id': GRUPO_CONSUMIDOR,           # Grupo de consumidores (permite manejar offsets)
    'auto.offset.reset': 'earliest'         # Empieza a leer desde el principio si el grupo no tiene offsets previos
})

# Nos suscribimos al topic que queremos leer
consumer.subscribe([TOPIC_NAME])

# ======================
# Fichero donde guardar los datos
# ======================
FICHERO_JSON = "temperaturas_recibidas.json"  # Archivo donde se guardarán las temperaturas

print(f"Esperando mensajes del topic '{TOPIC_NAME}'... Se detendrá automáticamente si no hay más mensajes")

# ======================
# Loop de consumo
# ======================
try:
    intentos_sin_mensajes = 0          # Contador de intentos consecutivos sin mensajes
    MAX_INTENTOS_SIN_MENSAJES = 5      # Número de intentos antes de asumir que no hay más mensajes

    while True:
        # Esperamos 1 segundo para recibir un mensaje
        msg = consumer.poll(1.0)

        if msg is None:
            # Si no llegó ningún mensaje, incrementamos el contador
            intentos_sin_mensajes += 1
            # Si superamos el límite de intentos sin mensajes, cerramos el bucle
            if intentos_sin_mensajes >= MAX_INTENTOS_SIN_MENSAJES:
                print("No se recibieron más mensajes. Cerrando consumer automáticamente.")
                break
            continue
        else:
            # Reiniciamos el contador si llega un mensaje
            intentos_sin_mensajes = 0

        if msg.error():
            # Si hay algún error en el mensaje
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # Fin de la partición, no es un error grave
                continue
            else:
                # Otro error, lo mostramos por pantalla
                print(f" Error en el consumidor: {msg.error()}")
                continue

        # Decodificamos el mensaje JSON recibido del topic
        data = json.loads(msg.value().decode('utf-8'))

        # Se añade la hora exacta en que el Consumer recibe el mensaje
        data["fecha_hora"] = datetime.now().strftime("%d/%m/%Y %H:%M:%S")

        # Muestro en consola que se ha recibido
        print(f"Mensaje recibido: {data}")

        # Lo guardo en un JSON
        try:
            # Try catch para intentar leer los registros existentes
            with open(FICHERO_JSON, "r") as f:
                registros = json.load(f)
        except FileNotFoundError:
            # Si el archivo no existe, se crean los registros vacíos
            registros = []

        # Se añade el nuevo mensaje
        registros.append(data)

        # Se guarda todo lo nuevo en el JSON
        with open(FICHERO_JSON, "w") as f:
            json.dump(registros, f, indent=4, ensure_ascii=False)

        # Pausa breve para diferenciar segundos entre mensajes
        # Esto evita que todos los mensajes procesados muy rápidos tengan la misma hora exacta
        time.sleep(1)

except KeyboardInterrupt:
    # Permite cerrar el consumer de forma limpia cuando se presiona Ctrl+C
    print("Consumer detenido por el usuario.")

finally:
    # Cerramos el consumer para liberar recursos
    consumer.close()


Esperando mensajes del topic 'kafkatemperaturas'... Se detendrá automáticamente si no hay más mensajes
Mensaje recibido: {'temperatura': 17.95, 'unidad': '°C', 'fecha_hora': '12/11/2025 23:19:37'}
Mensaje recibido: {'temperatura': 22.69, 'unidad': '°C', 'fecha_hora': '12/11/2025 23:19:38'}
Mensaje recibido: {'temperatura': 19.36, 'unidad': '°C', 'fecha_hora': '12/11/2025 23:19:39'}
Mensaje recibido: {'temperatura': 30.14, 'unidad': '°C', 'fecha_hora': '12/11/2025 23:19:40'}
No se recibieron más mensajes. Cerrando consumer automáticamente.
