# Tutorial pr√°ctico de Apache Kafka (KRaft)

## Introducci√≥n y preparaci√≥n del entorno

En este cuaderno se utilizar√° un cl√∫ster de Apache Kafka desplegado mediante contenedores Docker,
basado en la arquitectura **KRaft (Kafka Raft Metadata Mode)**, sin dependencia de ZooKeeper.

Este apartado inicial tiene como finalidad **verificar que el entorno est√° correctamente configurado**
y que podemos comunicarnos con el cl√∫ster Kafka.


## 1. Preparaci√≥n del entorno de trabajo

El cuaderno utiliza Python para interactuar con Kafka mediante la librer√≠a
**confluent-kafka**, un wrapper de alto rendimiento basado en *librdkafka*.

Antes de comenzar, asumimos que:
- El cl√∫ster Kafka est√° en ejecuci√≥n.
- Jupyter tiene acceso a la red donde se encuentran los brokers.
- La variable de entorno con los brokers est√° correctamente definida.

En el siguiente bloque se importar√°n las librer√≠as necesarias
y se comprobar√° la conectividad b√°sica con el cl√∫ster.


In [None]:
#En el caso que la librer√≠a no exista, hay que instalarla
%pip install --upgrade confluent-kafka

from confluent_kafka.admin import AdminClient
from confluent_kafka import KafkaException
import os



### Configuraci√≥n de conexi√≥n al cl√∫ster

Los brokers de Kafka se proporcionan mediante una variable de entorno,
lo que permite desacoplar el c√≥digo del entorno f√≠sico donde se ejecuta.

Esta aproximaci√≥n es habitual en entornos profesionales
y facilita el despliegue del mismo c√≥digo en distintos escenarios
(desarrollo, pruebas o producci√≥n).


In [None]:
# Obtener la lista de brokers desde la variable de entorno
brokers = os.getenv("KAFKA_BROKERS")

if not brokers:
    raise RuntimeError("No se ha definido la variable de entorno KAFKA_BROKERS")

print(f"Brokers configurados: {brokers}")


### Comprobaci√≥n de conectividad con el cl√∫ster

En este primer contacto con Kafka no se producir√°n ni consumir√°n mensajes.
√önicamente se consultar√° la **metadata del cl√∫ster** para verificar que:

- El cl√∫ster est√° accesible.
- Los brokers responden correctamente.
- Podemos obtener informaci√≥n b√°sica de la infraestructura.

Este tipo de comprobaci√≥n es habitual como primer paso
en cualquier aplicaci√≥n que vaya a interactuar con Kafka.


In [None]:
from confluent_kafka.admin import AdminClient
import os

# Crear cliente administrativo
admin_client = AdminClient({'bootstrap.servers': brokers})

# Obtener metadatos completos del cl√∫ster
cluster_metadata = admin_client.list_topics(timeout=10)

print("=== Brokers del cl√∫ster ===")
for broker_id, broker in cluster_metadata.brokers.items():
    print(f"Broker ID: {broker_id}, Host: {broker.host}, Port: {broker.port}")

# Identificar el controller activo
controller_id = cluster_metadata.controller_id
controller = cluster_metadata.brokers[controller_id]

print("\n=== Controller activo ===")
print(f"Broker ID: {controller_id}, Host: {controller.host}, Port: {controller.port}")


### Observaci√≥n en Kafdrop

Para complementar las acciones realizadas desde este cuaderno, utilizaremos **Kafdrop** como herramienta de exploraci√≥n visual del cl√∫ster Kafka.

Kafdrop permite inspeccionar de forma gr√°fica y en tiempo real distintos elementos del cl√∫ster, como:
- Brokers disponibles
- Topics existentes
- N√∫mero de particiones y factor de replicaci√≥n
- L√≠deres de partici√≥n
- Mensajes almacenados

Durante el desarrollo de este tutorial, tras ejecutar los distintos fragmentos de c√≥digo en Python, se recomienda acceder a la interfaz web de Kafdrop para comprobar y analizar los efectos producidos sobre el cl√∫ster.

La interfaz est√° disponible en la siguiente URL:

üëâ **http://localhost:9000/**

Esta observaci√≥n cruzada entre c√≥digo y visualizaci√≥n facilitar√° la comprensi√≥n de los conceptos te√≥ricos que se van introduciendo a lo largo de la sesi√≥n.


## 2. Topics como entidad l√≥gica distribuida

En Apache Kafka, un *topic* es una entidad l√≥gica que representa un flujo de datos. A diferencia de otros sistemas de mensajer√≠a, un topic **no es un recurso f√≠sico localizado en un √∫nico nodo**, sino una estructura distribuida a lo largo del cl√∫ster.

Cada topic se divide en **particiones**, y cada partici√≥n se replica en uno o varios brokers. Esta arquitectura permite:

- Escalar el procesamiento de datos de forma horizontal.
- Garantizar tolerancia a fallos mediante replicaci√≥n.
- Distribuir la carga de lectura y escritura entre distintos brokers.

Desde el punto de vista del cliente (productores y consumidores), el topic se percibe como una √∫nica entidad l√≥gica, aunque internamente est√© repartido entre m√∫ltiples nodos.


### Particiones y r√©plicas

Una **partici√≥n** es la unidad m√≠nima de paralelismo en Kafka. Cada partici√≥n tiene:

- Un **l√≠der**, que gestiona las operaciones de lectura y escritura.
- Una o varias **r√©plicas seguidoras**, que mantienen una copia sincronizada de los datos.

Si el broker que aloja al l√≠der de una partici√≥n deja de estar disponible, Kafka puede **elegir autom√°ticamente un nuevo l√≠der** entre las r√©plicas disponibles, garantizando la continuidad del servicio.

Este comportamiento ser√° observable m√°s adelante tanto desde el c√≥digo como desde la interfaz web de Kafdrop.


In [None]:
from confluent_kafka.admin import AdminClient, NewTopic
import os

# Configuraci√≥n de conexi√≥n
brokers = os.environ.get(
    "KAFKA_BROKERS",
    "kafka1:9092,kafka2:9094,kafka3:9096,kafka4:9098"
)

admin_client = AdminClient({'bootstrap.servers': brokers})

# Definici√≥n del topic
topic_name = "demo-topic-distribuido"

new_topic = NewTopic(
    topic=topic_name,
    num_partitions=3,
    replication_factor=2
)

# Creaci√≥n del topic
fs = admin_client.create_topics([new_topic])

for topic, future in fs.items():
    try:
        future.result()
        print(f"Topic '{topic}' creado correctamente.")
    except Exception as e:
        print(f"Error al crear el topic '{topic}': {e}")


El topic creado tiene las siguientes caracter√≠sticas:

- **3 particiones**, lo que permite paralelizar la producci√≥n y el consumo.
- **Factor de replicaci√≥n 2**, lo que implica que cada partici√≥n estar√° presente en dos brokers distintos.

A partir de este momento, el topic existe f√≠sicamente repartido entre varios brokers del cl√∫ster. Cada partici√≥n tendr√° un broker l√≠der asignado autom√°ticamente por Kafka.


In [None]:
# Obtener metadatos del topic creado
metadata = admin_client.list_topics(timeout=10)

topic_metadata = metadata.topics[topic_name]

print(f"=== Detalle del topic '{topic_name}' ===")

for partition_id, partition in topic_metadata.partitions.items():
    print(
        f"Partici√≥n {partition_id} | "
        f"L√≠der: {partition.leader} | "
        f"R√©plicas: {partition.replicas} | "
        f"ISR: {partition.isrs}"
    )


En la salida anterior se puede observar:

- El **broker l√≠der** de cada partici√≥n.
- La lista de **r√©plicas** asignadas.
- El conjunto ISR (*In-Sync Replicas*), que indica qu√© r√©plicas est√°n correctamente sincronizadas.

Estos valores pueden cambiar din√°micamente si un broker deja de estar disponible o se reincorpora al cl√∫ster.


### Observaci√≥n en Kafdrop

Accede a la interfaz web de Kafdrop:

http://localhost:9000/

Dentro del topic `demo-topic-distribuido`, revisa:

- El n√∫mero de particiones.
- Qu√© broker act√∫a como l√≠der de cada partici√≥n.
- C√≥mo se distribuyen las r√©plicas entre los distintos brokers.

Contrasta esta informaci√≥n con la obtenida mediante el c√≥digo Python para reforzar la comprensi√≥n del car√°cter distribuido de los topics en Kafka.


## 3. Productores: introducci√≥n al env√≠o de mensajes en Kafka

Hasta ahora hemos analizado Kafka desde un punto de vista estructural: el cl√∫ster, los brokers y los topics.
En este apartado damos el siguiente paso natural: **empezar a generar datos reales**.

Un **productor** es el componente encargado de enviar mensajes a Kafka.  
Desde el punto de vista del cliente, Kafka act√∫a como un sistema distribuido capaz de:

- recibir mensajes de forma concurrente,
- almacenarlos de manera tolerante a fallos,
- y distribuirlos entre los brokers del cl√∫ster.

En este primer contacto trabajaremos con productores de forma **intencionalmente sencilla**, centr√°ndonos √∫nicamente en:
- enviar mensajes de texto,
- verificar su llegada al cl√∫ster,
- observar el resultdos posteriores.


### Conceptos clave del productor

Antes de pasar al c√≥digo, conviene fijar algunos conceptos b√°sicos:

- Un productor **no env√≠a mensajes a un broker concreto**, sino a un *topic*.
- Kafka se encarga internamente de decidir:
  - a qu√© partici√≥n se asigna el mensaje,
  - en qu√© broker se almacena.
- El env√≠o de mensajes es **as√≠ncrono** por defecto.
- El productor recibe confirmaci√≥n de que el mensaje ha sido aceptado por el cl√∫ster.

En este punto del tutorial asumimos:
- un cl√∫ster Kafka ya operativo,
- al menos un topic existente,
- acceso al cl√∫ster mediante la librer√≠a `confluent-kafka` desde Python.


In [None]:
from confluent_kafka import Producer
import socket


### Configuraci√≥n b√°sica del productor

Para crear un productor necesitamos, como m√≠nimo, indicar:
- la lista de brokers del cl√∫ster (bootstrap servers),
- un identificador de cliente.

No es necesario especificar particiones ni brokers concretos.
Kafka se encarga de la distribuci√≥n interna de los mensajes.


In [None]:
conf = {
    "bootstrap.servers": "kafka1:9092,kafka2:9094,kafka3:9096,kafka4:9098",
    "client.id": socket.gethostname()
}

producer = Producer(conf)


### Env√≠o de un primer mensaje

Vamos a enviar un mensaje simple de texto a un topic existente.
En este ejemplo utilizaremos un topic previamente creado en el cl√∫ster.

Tras ejecutar el c√≥digo:
- el mensaje ser√° enviado a Kafka,
- podremos comprobar su llegada desde la interfaz web de Kafdrop.


In [None]:
topic_name = "demo-topic"

producer.produce(
    topic=topic_name,
    value="Primer mensaje enviado desde un productor en Python"
)

producer.flush()


### Observaci√≥n en Kafdrop

Accede a la interfaz web de Kafdrop para comprobar el resultado del env√≠o:

http://localhost:9000/

Desde Kafdrop puedes:
- localizar el topic utilizado,
- visualizar los mensajes almacenados,
- comprobar en qu√© partici√≥n se ha escrito elter Kafka.


## 4. El mensaje en Kafka: clave, valor y distribuci√≥n

Hasta este punto del tutorial hemos trabajado con el cl√∫ster, los brokers y los topics como entidades distribuidas.
El siguiente paso natural es entender **qu√© es realmente un mensaje en Kafka** y c√≥mo su estructura influye directamente en el
comportamiento del sistema.

En Kafka, un mensaje no es solo un ‚Äúdato que se env√≠a‚Äù, sino una unidad que:
- se almacena en una partici√≥n concreta,
- sigue reglas de orden,
- y participa en mecanismos de escalabilidad y tolerancia a fallos.

En este apartado nos centraremos en dos elementos clave del mensaje:
- **key (clave)**
- **value (valor)**



### Estructura b√°sica de un mensaje Kafka

Conceptualmente, un mensaje Kafka puede representarse como:

- **key**: valor opcional que Kafka utiliza para decidir la partici√≥n destino.
- **value**: el contenido del mensaje (texto, binario, JSON, CSV, etc.).
- **offset**: posici√≥n del mensaje dentro de la partici√≥n (asignado por Kafka).
- **timestamp**: marca temporal asociada al mensaje.

En este bloque nos centraremos en la relaci√≥n entre **key**, **particiones** y **orden**.


### Clave y particionado

Kafka distribuye los mensajes de un topic entre sus particiones siguiendo estas reglas generales:

- **Mensajes sin clave**  
  Kafka distribuye los mensajes entre las particiones disponibles de forma autom√°tica.
  El objetivo es equilibrar la carga.

- **Mensajes con clave**  
  Kafka calcula un hash de la clave y siempre env√≠a los mensajes con la misma clave
  a la **misma partici√≥n**.

Esto implica dos consecuencias importantes:
- El orden **solo est√° garantizado dentro de una partici√≥n**.
- La clave es una herramienta fundamental para controlar el orden l√≥gico de los mensajes.


In [None]:
from confluent_kafka import Producer
import time

brokers = "kafka1:9092,kafka2:9094,kafka3:9096,kafka4:9098"
topic = "demo-topic-distribuido"

producer = Producer({
    "bootstrap.servers": brokers
})


### Env√≠o de mensajes sin clave

Vamos a enviar varios mensajes **sin clave** al mismo topic.
Kafka decidir√° autom√°ticamente a qu√© partici√≥n va cada uno.

Despu√©s de ejecutar la celda, observa el resultado en Kafdrop:
- topic `demo-topic-distribuido`
- pesta√±a *Topics ‚Üí Partitions*


In [None]:
for i in range(10):
    producer.produce(
        topic=topic,
        value=f"Mensaje sin clave {i}"
    )

producer.flush()


### Observaci√≥n en Kafdrop

Accede a la interfaz web de Kafdrop:

http://localhost:9000/

Observa:
- c√≥mo los mensajes se reparten entre distintas particiones,
- que no existe una relaci√≥n directa entre el contenido del mensaje y la partici√≥n.

Esto es un comportamiento normal cuando **no se define clave**.


### Env√≠o de mensajes con clave

Ahora enviaremos mensajes **con la misma clave**.
El objetivo es comprobar que Kafka los dirige siempre a la misma partici√≥n.


In [None]:
for i in range(10):
    producer.produce(
        topic=topic,
        key="usuario-123",
        value=f"Mensaje con clave usuario-123 #{i}"
    )

producer.flush()


### Observaci√≥n del efecto de la clave

En Kafdrop, revisa nuevamente las particiones del topic.

Deber√≠as observar que:
- todos los mensajes con la clave `usuario-123` est√°n en la **misma partici√≥n**,
- el orden de los mensajes se mantiene dentro de esa partici√≥n.

Esto ilustra una regla fundamental de Kafka:
> El orden est√° garantizado por partici√≥n, no por topic.


### M√∫ltiples claves, m√∫ltiples flujos

Por √∫ltimo, enviaremos mensajes con **claves distintas** para simular varios flujos de datos independientes.


In [None]:
claves = ["cliente-A", "cliente-B", "cliente-C"]

for clave in claves:
    for i in range(5):
        producer.produce(
            topic=topic,
            key=clave,
            value=f"Evento {i} para {clave}"
        )

producer.flush()


### Qu√© debe quedar claro tras este apartado

- La clave no es obligatoria, pero **s√≠ estrat√©gica**.
- Kafka no garantiza orden global, solo **orden por partici√≥n**.
- Elegir bien la clave impacta directamente en:
  - escalabilidad,
  - orden,
  - dise√±o de consumidores.

Estos conceptos ser√°n fundamentales cuando pasemos a:
- consumidores,
- procesamiento paralelo,
- y dise√±o de aplicaciones reales con Kafka.


## 5. El consumidor en Kafka

Hasta ahora hemos visto c√≥mo se producen mensajes y c√≥mo estos se distribuyen en el cl√∫ster.
El siguiente elemento clave es el **consumidor**, responsable de leer los mensajes almacenados en los topics.

En Kafka, consumir no significa ‚Äúrecibir un mensaje y borrarlo‚Äù, sino:
- leer mensajes de una partici√≥n,
- mantener la posici√≥n de lectura (offset),
- y coordinarse con otros consumidores cuando trabajan en grupo.

Este apartado se centrar√° en:
- c√≥mo se leen los mensajes,
- qu√© es un offset,
- y c√≥mo Kafka gestiona el consumo distribuido.


### Conceptos b√°sicos del consumo

Al consumir mensajes en Kafka debemos tener claros los siguientes conceptos:

- **Offset**  
  Es la posici√≥n de lectura dentro de una partici√≥n.
  Kafka no borra los mensajes cuando se consumen; el consumidor simplemente avanza su offset.

- **Lectura secuencial**  
  Los mensajes se leen en orden dentro de cada partici√≥n.

- **Estado del consumidor**  
  Kafka guarda el offset asociado a un consumidor (o grupo) para saber por d√≥nde continuar.


In [None]:
from confluent_kafka import Consumer

### Configuraci√≥n b√°sica de un consumidor

Vamos a crear un consumidor sencillo que:
- se conecte al cl√∫ster,
- lea mensajes de un topic,
- y muestre su contenido por pantalla.

En este primer ejemplo no usaremos grupos complejos.


In [None]:
brokers = "kafka1:9092,kafka2:9094,kafka3:9096,kafka4:9098"
topic = "demo-topic-distribuido"

consumer_conf = {
    "bootstrap.servers": brokers,
    "group.id": "grupo-demo",
    "auto.offset.reset": "earliest"
}

consumer = Consumer(consumer_conf)
consumer.subscribe([topic])


### Lectura de mensajes

A continuaci√≥n leeremos mensajes del topic.
Observa que:
- el consumidor va leyendo mensajes uno a uno,
- cada mensaje pertenece a una partici√≥n concreta,
- y tiene un offset asociado.


In [None]:
print("Leyendo mensajes...")

for _ in range(10):
    msg = consumer.poll(1.0)

    if msg is None:
        continue

    if msg.error():
        print(f"Error: {msg.error()}")
    else:
        print(
            f"Partici√≥n: {msg.partition()} | "
            f"Offset: {msg.offset()} | "
            f"Clave: {msg.key()} | "
            f"Valor: {msg.value().decode('utf-8')}"
        )

consumer.close()


### Observaci√≥n en Kafdrop

Mientras el consumidor est√° activo o despu√©s de ejecutar la celda, revisa en Kafdrop:

http://localhost:9000/

Observa:
- el n√∫mero de mensajes en cada partici√≥n,
- c√≥mo el offset va avanzando,
- que los mensajes no se eliminan tras ser consumidos.


### El offset y la relectura de mensajes

El valor `auto.offset.reset` controla desde d√≥nde empieza a leer un consumidor cuando
no existe un offset previo guardado:

- **earliest**: desde el primer mensaje disponible.
- **latest**: solo mensajes nuevos a partir de ese momento.

Este comportamiento es clave para:
- pruebas,
- reprocesamiento,
- y depuraci√≥n.


In [None]:
# Ejemplo alternativo: comenzar desde los mensajes m√°s recientes

consumer_conf_latest = {
    "bootstrap.servers": brokers,
    "group.id": "grupo-demo-latest",
    "auto.offset.reset": "latest"
}

consumer_latest = Consumer(consumer_conf_latest)
consumer_latest.subscribe([topic])

msg = consumer_latest.poll(2.0)

if msg is None:
    print("No hay mensajes nuevos")
elif msg.error():
    print(f"Error: {msg.error()}")
else:
    print(f"Mensaje recibido: {msg.value().decode('utf-8')}")

consumer_latest.close()


### Qu√© debe quedar claro tras este apartado

- Kafka no borra los mensajes al consumirlos.
- El consumidor controla su progreso mediante offsets.
- El orden est√° garantizado dentro de cada partici√≥n.
- El comportamiento del consumidor depende de su configuraci√≥n.

En el siguiente apartado daremos un paso m√°s:
veremos c√≥mo **varios consumidores pueden trabajar juntos** mediante grupos de consumo.


## 6. Grupos de consumidores y consumo distribuido

Hasta ahora hemos trabajado con un √∫nico consumidor leyendo mensajes de un topic.
Sin embargo, uno de los pilares de Kafka es la **lectura paralela y escalable** de datos.

Esto se consigue mediante los **grupos de consumidores** (consumer groups).

Un grupo de consumidores permite:
- repartir las particiones de un topic entre varios consumidores,
- escalar horizontalmente el procesamiento,
- garantizar que cada mensaje sea procesado una sola vez por grupo.


### Conceptos clave

Antes de pasar a la pr√°ctica, conviene fijar algunos conceptos:

- **Consumer Group**  
  Conjunto de consumidores que comparten un mismo `group.id`.

- **Asignaci√≥n de particiones**  
  Kafka asigna cada partici√≥n de un topic a un √∫nico consumidor dentro del grupo.

- **Paralelismo real**  
  El paralelismo m√°ximo est√° limitado por el n√∫mero de particiones, no por el n√∫mero de consumidores.

Si hay m√°s consumidores que particiones, algunos consumidores quedar√°n inactivos.


### Escenario de prueba

Para este apartado se asume:
- un topic con varias particiones,
- mensajes ya existentes en el topic,
- y varios consumidores que se lanzan con el mismo `group.id`.

El objetivo es observar c√≥mo Kafka reparte el trabajo.


In [None]:
from confluent_kafka import Consumer
import time

### Consumidor A (miembro del grupo)

Este ser√° el primer consumidor del grupo.


In [None]:
brokers = "kafka1:9092,kafka2:9094,kafka3:9096,kafka4:9098"
topic = "demo-topic-distribuido"

consumer_a_conf = {
    "bootstrap.servers": brokers,
    "group.id": "grupo-distribuido",
    "auto.offset.reset": "earliest"
}

consumer_a = Consumer(consumer_a_conf)
consumer_a.subscribe([topic])

print("Consumidor A activo")


### Consumidor B (mismo grupo)

Este segundo consumidor pertenece al mismo grupo.
Kafka redistribuir√° autom√°ticamente las particiones.


In [None]:
consumer_b_conf = {
    "bootstrap.servers": brokers,
    "group.id": "grupo-distribuido",
    "auto.offset.reset": "earliest"
}

consumer_b = Consumer(consumer_b_conf)
consumer_b.subscribe([topic])

print("Consumidor B activo")


### Lectura simult√°nea desde ambos consumidores

Ejecuta esta celda en cada consumidor (A y B) y observa:

- qu√© particiones procesa cada uno,
- c√≥mo no se duplican los mensajes,
- y c√≥mo el reparto depende del n√∫mero de particiones.


In [None]:
def leer_mensajes(consumer, nombre):
    print(f"Leyendo mensajes para: {nombre}")
    for _ in range(5):
        msg = consumer.poll(1.0)

        if msg is None:
            continue

        if msg.error():
            print(f"{nombre} - Error: {msg.error()}")
        else:
            print(
                f"{nombre} | "
                f"Partici√≥n: {msg.partition()} | "
                f"Offset: {msg.offset()} | "
                f"Valor: {msg.value().decode('utf-8')}"
            )


In [None]:
# Lectura desde el consumidor A
leer_mensajes(consumer_a, "Consumidor A")
consumer_a.close()

# Lectura desde el consumidor B
leer_mensajes(consumer_b, "Consumidor B")
consumer_b.close()

### Rebalanceo de consumidores

Kafka gestiona autom√°ticamente los cambios en el grupo:

- si un consumidor se detiene,
- si se a√±ade uno nuevo,
- o si cambia el n√∫mero de particiones.

Este proceso se llama **rebalanceo**.

Durante el rebalanceo:
- las particiones se reasignan,
- los consumidores pausan moment√°neamente la lectura,
- y luego contin√∫an desde el offset correcto.


### Observaci√≥n en Kafdrop

Accede a:

http://localhost:9000/

Observa:
- el n√∫mero de particiones del topic,
- el l√≠der de cada partici√≥n,
- y c√≥mo el consumo se distribuye entre consumidores.

Aunque Kafdrop no muestra directamente los grupos,
s√≠ permite razonar sobre el reparto observado en el c√≥digo.


### Qu√© debe quedar claro tras este apartado

- Un grupo de consumidores permite escalar el procesamiento.
- Cada partici√≥n solo puede ser consumida por un consumidor del grupo.
- El paralelismo depende del n√∫mero de particiones.
- Kafka gestiona autom√°ticamente fallos y redistribuciones.

Con esto se completa el ciclo b√°sico:
topic ‚Üí particiones ‚Üí productor ‚Üí mensajes ‚Üí consumidor ‚Üí grupo.
