# Multiplos brokers

![kafka-architecture](https://s3-sa-east-1.amazonaws.com/lcpi/032ec305-71c0-4b07-8f87-f56849c021c4.png)

Em primeiro lugar, inicie o servidor Zookeeper.

---
```bash
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
```
---

Para inicializar um cluster Kafka com $id=1,2,3,...,N$ brokers, é preciso executar o seguinte procedimento para cada servidor $id$. 


1. Crie uma cópia do arquivo de configuração padrão `server.properties`, que se encontra em `$KAFKA_HOME/config/`
    ```bash
        cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-<id>.properties
    ```
2. Altere as seguintes configurações do arquivo criado: 
   ```bash
       nano $KAFKA_HOME/config/server-<id>.properties
    ```

    ```nano
        broker.id=<id>
        listeners=PLAINTEXT://localhost:909<2+id> 
        log.dirs=/tmp/kafka-logs-<id>
    ```

3. Inicie o servidor
```bash
    $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server-<id>.properties
```

Substitua `<id>` pelo ID do broker correspondente (0, 1, 2 etc). É importante que cada broker seja iniciado com seu próprio arquivo de configuração.

# Configurando os tópicos

Vamos criar alguns tópicos com diferentes características para em seguida explorar suas propriedades.

---
`topic_create`
```python
from kafka.admin import KafkaAdminClient, NewTopic

# Configura os brokers
bootstrap_servers = ['localhost:9092', 'localhost:9093', 'localhost:9094']

# Cria o objeto KafkaAdminClient
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

# Cria a lista de tópicos
topic_list = [NewTopic(name='topic_1', num_partitions=4, replication_factor=1),
              NewTopic(name='topic_2', num_partitions=3, replication_factor=2),
              NewTopic(name='topic_3', num_partitions=2, replication_factor=3)]


# Cria os tópicos
for topic in topic_list:
    try:
        admin_client.create_topics(new_topics=[topic], validate_only=False)
        print(f'Topic {topic.name} created successfully.')
        print('\n')

    except Exception as ex:
        print(f'Erro: {ex}')
        print('\n')


# Encerra o objeto KafkaAdminClient
admin_client.close()
```
---

# Investigando o servidor

## Topic describe

Para entendermos a dinâmica do Kafka, iremos frequentemente olhar a descrição dos tópicos. No terminal, usamos o código

---
```bash
$KAFKA_HOME/bin//kafka-topics.sh --describe  --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic topic_1
```
---

onde o último argumento é opcional, ou seja, podemos removê-lo para ver as descrição de todos os tópicos existentes.

![topic_describe](https://s3-sa-east-1.amazonaws.com/lcpi/813511b3-4d7e-4798-9428-8f205e75a5d5.png)

O comando `kafka-topics --describe  --bootstrap-server <bootstrap_servers> --topic <topic_name>`  é usado para obter informações sobre as partições e replicação de um tópico específico em um cluster Kafka. O resultado deste comando fornece informações detalhadas sobre o tópico, como o número de partições, fator de replicação, líder da partição, replicas em sincronia (in-sync replicas, ISR), bem como algumas informações adicionais, como configurações do tópico e permissões.

- **Topic**: O nome do tópico.

- **PartitionCount**: O número de partições do tópico.

- **ReplicationFactor**: O número de réplicas para cada partição. Importante para garantir que os dados não sejam perdidos se um ou mais brokers falharem.

- **Configs**: As configurações do tópico, que podem incluir informações sobre compactação de dados, retenção de dados, tamanho máximo de mensagem, entre outros.

- **Partition**: O número da partição.

- **Leader**: O ID do broker líder para a partição, responsável por coordenar o processo de leitura e gravação na partição.

- **Replicas**: O ID dos brokers que armazenam as réplicas da partição, que são cópias dos dados armazenados em vários brokers no cluster Kafka.

- **Isr**: O ID dos brokers que armazenam as réplicas em sincronia (in-sync replicas, ISR) da partição, ou seja, são réplicas que estão atualizadas com o líder da partição e podem assumir a liderança caso o líder falhe.

Para facilitar e flexibilizar nossa interação com o servidor, podemos usar o script `08_topic_describe.py`, que reproduz parte dos resultados mostrados em linha de comando. Note que esse script pode ser expandido conforme sua necessidade.

---
`topic_describe`
```python
from kafka.admin import KafkaAdminClient

# Cria uma instância do KafkaAdminClient
admin_client = KafkaAdminClient(
    bootstrap_servers=['localhost:9092', 'localhost:9093', 'localhost:9094'],
)

# Define o nome do tópico a ser descrito
topic_names = admin_client.list_topics()
topic_names = ['topic_1','topic_2','topic_3']

# Descreve o tópico
topic_description = admin_client.describe_topics(topic_names)


print(f'{"Tópico":<9} {"Partição"} {"Líder"} {"Réplicas":<15} {"In-Sync Replicas (isr)":<10}  {"offline_replicas"} ')

for topic in topic_description:
    for partition in topic["partitions"]:
    
        print(f'{topic["topic"]:<12}  {partition["partition"]:<5}  {partition["leader"]:<5}  {partition["replicas"] } {"  ":10} {partition["isr"] } {"  ":20} {partition["offline_replicas"] } ' )
          
```
---

## Group describe

Com frequência nos perguntamos quaL o estado de escrita e leitura em uma determinada partição. Conforme discutido anteriormente, essa é uma informação inerente ao consumidor e não ao servidor. O servidor `zookeeper` só armazena os metadados para referência!

No terminal, podemos usar o comando

---
```bash
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --describe --group 1

```
---

que resulta em

![group_offset](https://s3-sa-east-1.amazonaws.com/lcpi/ad2b997b-6ef2-453f-92b5-f4b5dc1b6252.png)

O comando "kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --describe" retorna informações detalhadas sobre o grupo de consumidores "my-group" registrado no cluster Kafka com o broker na porta 9092. 

O resultado consiste em uma tabela com as seguintes colunas:

- `GROUP`: nome do grupo de consumidores;

- `TOPIC`: nome do tópico;

- `PARTITION`: número da partição do tópico;

- `CURRENT-OFFSET`: último offset consumido pelo grupo de consumidores;

- `LOG-END-OFFSET`: próximo offset disponível no tópico e partição;

- `LAG`: número de mensagens que ainda não foram consumidas pelo grupo de consumidores na partição;

- `CONSUMER-ID`: ID do consumidor dentro do grupo;

- `HOST`: endereço IP do host que executa o consumidor.

O valor em LAG é especialmente útil para entender o quão "atrasado" está o grupo de consumidores em relação às mensagens mais recentes no tópico e pode ser utilizado para detectar possíveis gargalos ou problemas de desempenho no consumo de mensagens.

Como no caso anterior, usaremos um script em python para facilitar nossa interação. O script `08_group_offset.py` pode ser ajustado para melhor atender suas necessidades e sua versão inicial é dada abaixo.

---
```python
from kafka import KafkaConsumer, TopicPartition


groups = ['1']

for group_id_ in groups:

    # Configurações do consumidor
    consumer = KafkaConsumer(
        bootstrap_servers=['localhost:9092', 'localhost:9093', 'localhost:9094'],
        auto_offset_reset='earliest',
        group_id=group_id_,
        )

    topics = ['topic_1', 'topic_2','topic_3']

    # Define o tópico e a partição desejada
    for topic in topics:
        partitions = consumer.partitions_for_topic(topic)
        
        try:
            for partition in partitions:

            
            
                # Definir a partição que deseja consultar
                tp = TopicPartition(topic, partition)

                # Atribuir a partição ao consumidor
                consumer.assign([tp])
                
                # Obter a posição atual do offset
                current_offset = consumer.position(tp)
            
                # Obter o offset mais recente da partição
                end_offset = consumer.end_offsets([tp])[tp]

                # Calcular a diferença entre a posição atual e o offset mais recente
                unconsumed_messages = end_offset - current_offset
                
                print(f'group_id: {group_id_} topic: {topic} partition: {partition:3} current_offset: {current_offset:5} end_offset: {end_offset:5} unconsumed_messages: {unconsumed_messages:5}')
        except Exception as ex:
            print(f'Erro: {ex}')
    print('\n')
```
---

**Pergunta:** Com múltiplos brokers, tópicos e partições, como decidir para onde enviar uma mensagem ou mesmo saber para onde ela foi enviada?

# Future

O método `send()` do `KafkaProducer` em Python retorna um objeto `Future` que representa o resultado da chamada assíncrona para enviar uma mensagem ao Kafka broker. Este objeto `Future` possui vários métodos que podem ser usados para interagir com o resultado da chamada, incluindo:

- `get()`: Espera até que a mensagem seja enviada com sucesso (ou falhe) e retorna o resultado.

- `add_callback(callback)`: Adiciona um callback a ser executado quando a mensagem for enviada com sucesso.

- `add_errback(callback)`: Adiciona um callback a ser executado se ocorrer um erro ao enviar a mensagem.

Aqui está um exemplo de como usar o método `send()` com o objeto `Future` retornado pelo método:

---
`producer`
```python
from kafka import KafkaProducer   # Importa a classe KafkaProducer da biblioteca kafka-python
import datetime as dt   # Importa a biblioteca datetime para trabalhar com datas e horas
import time   # Importa a biblioteca time para adicionar atrasos ao envio das mensagens
import random

# Configura os brokers
bootstrap_servers = ['localhost:9092', 'localhost:9093', 'localhost:9094']

# Cria o produtor
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    value_serializer=lambda x: x.encode('utf-8')
)


# Funções de callback
def on_send_success(record_metadata):
    print(f'Mensagem enviada com sucesso para o tópico {record_metadata.topic} na partição {record_metadata.partition} com offset {record_metadata.offset}')

def on_send_error(ex):
    print(f'Erro ao enviar mensagem: {ex}')

# Lista de tópicos e partições
topic_partitions = [
    {'topic': 'topic_1', 'partitions': 4},
    {'topic': 'topic_2', 'partitions': 3},
    {'topic': 'topic_3', 'partitions': 2}
]

# Envia mensagens aleatoriamente para tópicos e partições
while True:

    # Escolhe o tópico
    topic_partition = random.choice(topic_partitions)
    topic = topic_partition['topic']

    # Cria a mensagem
    time_stamp = dt.datetime.strftime(dt.datetime.now(), format='%Y-%m-%d %H:%M:%S.%f') 
    value_ = f'{time_stamp} Essa mensagem é enviada por KafkaProducer.'

    # Envia a mensagem para o Kafka
    future = producer.send(topic, value=value_)

    # Espera a confirmação da entrega
    try:
        record_metadata = future.get(timeout=10)
        on_send_success(record_metadata)
    except Exception as ex:
        on_send_error(ex)
    
    time.sleep(1)

# Encerra o produtor
producer.flush()
producer.close()
```
---

Observe que os callbacks são executados em outra thread, portanto, se você precisar de sincronia, é necessário aguardar a execução do callback ou usar algum mecanismo de sincronização, como uma fila ou uma variável de condição.


A classe `RecordMetadata` do KafkaProducer retorna informações sobre a gravação de uma mensagem em um tópico, incluindo o tópico, a partição e o offset atribuídos à mensagem.


- `topic`: o nome do tópico em que a mensagem foi gravada.

- `partition`: o número da partição do tópico em que a mensagem foi gravada.

- `offset`: o número do offset da mensagem na partição do tópico.

- `serializedKeySize`: o tamanho da chave serializada da mensagem.

- `serializedValueSize`: o tamanho do valor serializado da mensagem.

- `timestamp`: o timestamp da mensagem, se o produtor definiu um timestamp.

- `checksum`: o checksum da mensagem, se o tópico estiver configurado para usar verificação de integridade.

- `serializedHeaderSize`: o tamanho total dos headers serializados da mensagem.

Esses atributos podem ser úteis para monitorar o desempenho e a integridade do sistema e para depurar problemas de produção e consumo de mensagens.

No exemplo anterior, deixamos o Kafka decidir em qual partição gravar a mensagem, mas poderíamos decidir nos mesmo. Além disso, podemos definir uma chave única para cada mensagem, a fim de identificá-la quando necessário.

Para isso, adicionamos as linhas a seguir no nosso produtor.

---
`producer`
```python
# Cria o produtor
producer = KafkaProducer(
    ...,
    key_serializer=lambda x: x.encode('utf-8'),
)


# Envia mensagens aleatoriamente para tópicos e partições
while True:
    ...
    partition_ = random.randint(0, topic_partition['partitions']-1)

    # Cria a chave e valor da mensagem
    key_ = f'{topic}-partition-{partition_}'
    value_ = ...

    # Envia a mensagem para o Kafka
    future = producer.send(..., key=key_, partition=partition_)
```
---


Podemos ainda verificar que o atributo `timestamp` se refere ao envio da mensagem e não à geração da mensagem.

---
`producer`
```python
# Funções de callback
def on_send_success(record_metadata):
    delivery_time = dt.datetime.fromtimestamp(record_metadata.timestamp / 1e3)
    print(f'Mensagem enviada com sucesso em {delivery_time} para o tópico {record_metadata.topic} na partição {record_metadata.partition} com offset {record_metadata.offset}')
    print('\n')


value_ = ...
print(value_)
```
---


**Pergunta:** Temos três brokers, três tópicos e até quatro partições. Sabendo que cada mensagem é enviada para uma partição específica (de forma aleatória ou não), como passar ao consumidor o local exato para leitura?

# Consumer

Para entendermos o funcionamento do Kafka explorando seus tópicos, precisamos primeiramente ter um consumidor associado a um grupo. 

Em primeiro lugar, repare que o consumidor pode ler dos três brokers, mas indicamos apenas um tópico e uma partição!

---
`consumer`
```python
# Importa a classe KafkaConsumer da biblioteca kafka-python
from kafka import KafkaConsumer, TopicPartition
import time

# Cria uma instância de um consumidor Kafka e configura o endereço do servidor de bootstrap e o nome do tópico a ser consumido
consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092', 'localhost:9093', 'localhost:9094'],
    auto_offset_reset='earliest',
    group_id='1'
)

# Atribuir a partição e o offset desejados
tp = TopicPartition('topic_1', 0)  # partição 0 de 'my_topic'
consumer.assign([tp])
consumer.seek(tp, 0)  # move para o offset X

current_offset = consumer.position(tp)
end_offset = consumer.end_offsets([tp])[tp]

print(f'current_offset: {current_offset} - end_offset: {end_offset}')

while True:
    
    current_offset = consumer.position(tp)
    end_offset = consumer.end_offsets([tp])[tp]

    print(f'current_offset: {current_offset} - end_offset: {end_offset}')

    # Lê as mensagens do Kafka em lotes, com um limite de X mensagens por lote.
    messages = consumer.poll(max_records=5, timeout_ms=3000)
    
    
    # Itera pelos lotes de mensagens lidos.
    for tp, msgs in messages.items():
        print(f'------ Batch limit ------')

        # Itera pelas mensagens de cada lote.
        for msg in msgs:
        
            print(f"Offset: {msg.offset}, Chave: {msg.key}, Valor: {msg.value.decode('utf-8')}")

            # commita o offset da última mensagem consumida para todas as partições atribuídas
            consumer.commit()
    
    time.sleep(1)
```
---

# Exercícios

Alta disponibilidade
---

1. Crie os tópicos `topic_1`, `topic_2` e `topic_3` e assegure-se que estejam vazios.
2. Inicie o produtor.
3. Inicie o consumidor acima.
4. Execute o arquivo `08_topic_offset.py` .

**Pergunta:** Quais partições foram lidas? Você esperava esse resultado?

5. Execute o arquivo `08_topic_describe.py`.
6. Derrube um broker com `Ctrl+C` na respectiva janela.

**Pergunta:** Produtor e consumidor continuam funcionando? 

7. Execute o arquivo `08_topic_describe.py` novamente.

**Pergunta:** O que significam os valores nas colunas `In-Sync Replica` e `Offline Replicas`?


8. Reinicie ese broker e repita o processo para os outros.
9. Execute o arquivo `08_topic_describe.py` novamente.


**Pergunta:**  Explique por que os valores de `In-Sync Replica` e `Offline Replicas` mudaram novamente?

# Múltiplos consumidores

1. Certifique-se que todos os brokers estejam ligados e sincronizados.

**Pergunta:** Como verificar a última condição?

2. Desligue o consumidor.
3. Verifique os offsets de todos os tópicos e partições com o script `08_topic_offset.py`.
4. Ajuste o consumidor com as configurações abaixo e ligue-o novamente.
   - `group_id`=1
   - `topic`=topic_1
   - `partition`=0
5.  Ajuste o consumidor com as configurações abaixo e ligue-o em um novo terminal.
   - `group_id`=1
   - `topic`=topic_3
   - `partition`=1

**Pergunta:** Quantas partições temos ao todo? Em quantas estamos escrevendo dados? Quantas estão sendo consumidas?

6.  Ajuste o consumidor com as configurações abaixo e ligue-o em um novo terminal.
   - `group_id`=2
   - `topic`=topic_1
   - `partition`=0
  
7. Ajuste o consumidor com as configurações abaixo e ligue-o em um novo terminal.
   - `group_id`=2
   - `topic`=topic_3
   - `partition`=1

**Pergunta:** Verifique se os consumidores paralelos (aqueles que leem da mesma partição) interferem nas leituras uns dos outros.

**Pergunta:** Como descobrir o offset atual de cada consumidor?



# Encerrando os servidores

$KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server.properties

$KAFKA_HOME/bin/zookeeper-server-stop.sh $KAFKA_HOME/config/zookeeper.properties