# Tutorial: Kafka com Python usando confluent-kafka

Este notebook mostra como configurar um Producer e um Consumer para Apache Kafka usando a biblioteca `confluent-kafka`.

## Pré-requisitos
1. **Kafka e ZooKeeper**: Configure o Kafka usando Docker Compose.
2. **Bibliotecas Python**: Certifique-se de instalar os pacotes necessários:
   ```bash
   pip install confluent-kafka python-dotenv
   ```

## 1. Configuração do Docker Compose para Kafka e ZooKeeper

Crie um arquivo `docker-compose.yml` com o seguinte conteúdo para configurar Kafka e ZooKeeper:

```yaml
services:
  zookeeper:
    image: bitnami/zookeeper:3.8.0
    container_name: zookeeper
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    ports:
      - "2181:2181"
    networks:
      - kafka_network

  kafka-1:
    image: bitnami/kafka:3.5.2
    container_name: kafka-1
    environment:
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,INTERNAL://:29092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${KAFKA_ADVERTISED_LISTENER_IP}:9092,INTERNAL://kafka-1:29092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=2
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    networks:
      - kafka_network

  kafka-2:
    image: bitnami/kafka:3.5.2
    container_name: kafka-2
    environment:
      - KAFKA_CFG_BROKER_ID=2
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9093,INTERNAL://:29093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${KAFKA_ADVERTISED_LISTENER_IP}:9093,INTERNAL://kafka-2:29093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=2
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1
    depends_on:
      - zookeeper
    ports:
      - "9093:9093"
      - "29093:29093"
    networks:
      - kafka_network

  kafka-3:
    image: bitnami/kafka:3.5.2
    container_name: kafka-3
    environment:
      - KAFKA_CFG_BROKER_ID=3
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9094,INTERNAL://:29094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${KAFKA_ADVERTISED_LISTENER_IP}:9094,INTERNAL://kafka-3:29094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=2
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1
    depends_on:
      - zookeeper
    ports:
      - "9094:9094"
      - "29094:29094"
    networks:
      - kafka_network

networks:
  kafka_network:
    name: kafka_network
    driver: bridge
```

Execute o seguinte comando para iniciar os serviços:

```bash
docker-compose up -d
```

Certifique-se de que os contêineres Kafka e ZooKeeper estão em execução.

## 2. Instalando a Biblioteca `confluent-kafka`

Antes de configurar o **Producer**, é necessário instalar a biblioteca Python `confluent-kafka`. Esta biblioteca é uma interface cliente baseada em **librdkafka**, que permite interagir de forma eficiente com o Apache Kafka.

#### Passo 1: Instalar a Biblioteca
Execute o comando abaixo no terminal para instalar a biblioteca:

```bash
pip install confluent-kafka


## 3. Configurando o Producer (Produtor)

O Producer envia mensagens para o Kafka. Abaixo está um código exemplo para configurá-lo usando python:

In [None]:
from confluent_kafka import Producer


def delivery_report(err, msg):
    """Callback para confirmar o envio da mensagem."""
    if err:
        print(f"Erro ao enviar mensagem: {err}")
    else:
        print(f"Mensagem enviada para o tópico {msg.topic()} [partição {msg.partition()}] offset {msg.offset()}")


def produce_message():
    config = {
        'bootstrap.servers': 'localhost:9092',  # Endereço do Kafka
    }

    producer = Producer(config)

    topic = 'test-topic'
    message = "Hello, Kafka from Python!"

    try:
        producer.produce(topic, value=message, callback=delivery_report)
        producer.flush()  # Garante o envio antes de sair
    except Exception as e:
        print(f"Erro ao produzir mensagem: {e}")


produce_message()

## 4. Configurando o Consumer (Consumidor)

O Consumer lê as mensagens do Kafka. Abaixo está o código para configurá-lo:

In [None]:
from confluent_kafka import Consumer, KafkaException


def consume_messages():
    config = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'python-consumer-group',  # Grupo de consumidores
        'auto.offset.reset': 'earliest',  # Começa do início do tópico
    }

    consumer = Consumer(config)
    topic = 'test-topic'

    try:
        consumer.subscribe([topic])

        print(f"Consumindo mensagens do tópico: {topic}")
        while True:
            msg = consumer.poll(timeout=1.0)  # Espera por mensagens

            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaException._PARTITION_EOF:
                    print("Fim da partição.")
                else:
                    print(f"Erro: {msg.error()}")
                continue

            print(f"Recebido: chave = {msg.key()}, valor = {msg.value()}")
    except KeyboardInterrupt:
        print("Encerrando o consumidor.")
    finally:
        consumer.close()


consume_messages()

## 5. Testando o Producer e Consumer

1. **Inicie o Producer:**
   Execute o código da célula do Producer para enviar mensagens ao Kafka.
2. **Inicie o Consumer:**
   Execute o código da célula do Consumer para consumir as mensagens enviadas pelo Producer.

Certifique-se de que ambos estejam funcionando corretamente. Você verá as mensagens do Producer no terminal do Consumer.

## 6. Conclusão

Neste tutorial, aprendemos como configurar um ambiente Kafka com Docker Compose e interagir com ele usando Python. Criamos um Producer para enviar mensagens e um Consumer para recebê-las.

### Próximos passos:
- Explore particionamento de tópicos e grupos de consumidores.
- Use ferramentas como Prometheus e Grafana para monitorar o Kafka.

### FIM