In [1]:
import logging
logging.basicConfig(level=logging.ERROR)

## Introducción: productores y consumidores usando tópicos

In [2]:
from kafka import KafkaConsumer
SERVER = "kafka_b:9094"
consumer = KafkaConsumer(
    "website_events", 
    bootstrap_servers=SERVER,
)

In [3]:
consumer.poll(timeout_ms=1000)

{}

In [4]:
consumer.subscription()

{'website_events'}

Congelamos el proceso hasta recibir un mensaje y luego volvemos el control:

In [5]:
for event in consumer:
    print(event)
    # Generalmente continuamos procesando eventos, en un loop infinito.
    # Esperar sólo un mensaje lo podemos hacer con .poll()
    break

ConsumerRecord(topic='website_events', partition=1, offset=58, timestamp=1719274347490, timestamp_type=0, key=None, value=b"{'type': 'click', 'location': '/index.html'}", headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=44, serialized_header_size=-1)


Veamos que ya se configuran particiones:

In [6]:
event.partition

1

El consumidor reporta el offset que observa al broker:

In [7]:
event.offset

58

Recibamos unos mensajes más ahora:

In [None]:
for i, event in enumerate(consumer):
    if i >= 4:
        break

print("Terminado!")

Veamos el nuevo offset:

In [13]:
event.offset

64

Podemos ver el estado de las particiones:

In [11]:
from kafka import TopicPartition
from kafka import KafkaProducer
producer = KafkaProdu
partitions = []
for partition in consumer.partitions_for_topic("website_events"):
    partitions.append(TopicPartition("website_events", partition))
    
end_offsets = consumer.end_offsets(partitions)
end_offsets

{TopicPartition(topic='website_events', partition=0): 65,
 TopicPartition(topic='website_events', partition=1): 66}

Esto indica el offset del siguiente mensaje de la partición a recibir. Los offsets son compartidos a nivel tópico.

## Particionamiento

Para implementar distribución de carga 

In [16]:
consumer0 = KafkaConsumer(bootstrap_servers=SERVER)
consumer1 = KafkaConsumer(bootstrap_servers=SERVER)

consumer0.assign([TopicPartition(topic="metrics", partition=0)])
consumer1.assign([TopicPartition(topic="metrics", partition=1)])

Aseguramos el registro en el broker:

In [17]:
consumer0.poll(timeout_ms=1000)
consumer1.poll(timeout_ms=1000)

{}

Luego de hacer send de un mensaje en cada uno recibimos:

In [20]:
consumer0.poll(timeout_ms=1000)

{TopicPartition(topic='metrics', partition=0): [ConsumerRecord(topic='metrics', partition=0, offset=1, timestamp=1719276267314, timestamp_type=0, key=b'0', value=b'hola', headers=[], checksum=None, serialized_key_size=1, serialized_value_size=4, serialized_header_size=-1)]}

In [21]:
consumer1.poll(timeout_ms=1000)

{TopicPartition(topic='metrics', partition=1): [ConsumerRecord(topic='metrics', partition=1, offset=1, timestamp=1719276270398, timestamp_type=0, key=b'1', value=b'mundo', headers=[], checksum=None, serialized_key_size=1, serialized_value_size=5, serialized_header_size=-1)]}

## Grupos de consumidores

Los grupos son una forma de rebalancear automáticamente las particiones entre consumidores que entran y salen del mismo. 

Permiten de manera más transparente mezclar los patrones de cola simple y de pubsub.

Registremos dos consumidores en el mismo grupo, uno en este notebook y otro en el notebook GrupoConsumidor1. 

(Separamos en dos notebooks que hacen polling continuamente para evitar que Kafka rebalancee constantemente el grupo).

In [None]:
group_consumer0 = KafkaConsumer(
    "metrics", 
    group_id="spice1",
    bootstrap_servers=SERVER,
    client_id="client1",
)

for msg in group_consumer0:
    print(msg)

ConsumerRecord(topic='metrics', partition=0, offset=7, timestamp=1719277990174, timestamp_type=0, key=b'0', value=b'hola', headers=[], checksum=None, serialized_key_size=1, serialized_value_size=4, serialized_header_size=-1)
ConsumerRecord(topic='metrics', partition=0, offset=8, timestamp=1719278028241, timestamp_type=0, key=b'0', value=b'hola', headers=[], checksum=None, serialized_key_size=1, serialized_value_size=4, serialized_header_size=-1)
ConsumerRecord(topic='metrics', partition=0, offset=9, timestamp=1719278058128, timestamp_type=0, key=b'0', value=b'hola', headers=[], checksum=None, serialized_key_size=1, serialized_value_size=4, serialized_header_size=-1)
ConsumerRecord(topic='metrics', partition=0, offset=10, timestamp=1719278059425, timestamp_type=0, key=b'0', value=b'hola', headers=[], checksum=None, serialized_key_size=1, serialized_value_size=4, serialized_header_size=-1)
ConsumerRecord(topic='metrics', partition=0, offset=11, timestamp=1719278060021, timestamp_type=0, 

Si tiramos el otro notebook eventualmente veremos que este toma las de la otra partición también :-)