<a href="https://colab.research.google.com/github/biaaoliveira/Data_Girl/blob/main/DE_Processamento_de_dados_Pratica.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Parte 1:

Execute o codigo abaixo:

```shell
python kafka_producer.py
```

In [None]:
# kafka_producer.py

from confluent_kafka import Producer
import sys

# Função de callback para notificar sobre o status da entrega da mensagem
def delivery_callback(err, msg):
    if err:
        print(f"Falha ao entregar mensagem: {err}")
    else:
        print(f"Mensagem entregue em {msg.topic()} [{msg.partition()}] offset {msg.offset()}")

# Configurações do produtor Kafka
conf = {
    'bootstrap.servers': 'localhost:9092'  # Endereço do broker Kafka local
}

# Cria uma instância do Producer
producer = Producer(conf)

# Envia mensagens lidas do terminal para o Kafka
print("Digite mensagens para enviar ao Kafka. Pressione Ctrl+C para sair.")
try:
    for line in sys.stdin:
        # Remove o caractere de nova linha e envia a mensagem
        producer.produce(
            'meu_topico',  # Nome do tópico
            value=line.rstrip(),  # Mensagem
            callback=delivery_callback  # Função de callback para entrega
        )
        # Processa eventos de entrega de mensagens
        producer.poll(0)
except KeyboardInterrupt:
    print("\nEncerrando produtor...")

# Aguarda a entrega de todas as mensagens pendentes antes de sair
producer.flush()
print("Produtor finalizado.")

Em um novo terminal, execute:
```shell
python kafka_consumer.py
```

In [None]:
from confluent_kafka import Consumer

# Configurações do consumidor Kafka
conf = {
    'bootstrap.servers': 'localhost:9092',  # Endereço do broker Kafka local
    'group.id': 'meu_grupo_consumidor',     # ID do grupo de consumidores
    'auto.offset.reset': 'earliest'         # Começa a ler do início do tópico
}

# Cria uma instância do Consumer
consumer = Consumer(conf)

# Inscreve o consumidor no tópico desejado
consumer.subscribe(['meu_topico'])

print("Consumidor iniciado. Aguardando mensagens... (Ctrl+C para sair)")
try:
    while True:
        # Aguarda por novas mensagens (timeout de 1 segundo)
        msg = consumer.poll(1.0)
        if msg is None:
            continue  # Nenhuma mensagem recebida
        if msg.error():
            print(f"Erro: {msg.error()}")
        else:
            # Exibe a mensagem recebida no terminal
            print(f"Recebido: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
    print("\nEncerrando consumidor...")

# Fecha a conexão do consumidor com o Kafka
consumer.close()
print("Consumidor finalizado.")


#### Parte 2:

Execute o codigo abaixo:

```shell
python kafka_producer_mult.py
```

In [None]:
from confluent_kafka import Producer
import sys

# Lista de tópicos disponíveis
TOPICOS_DISPONIVEIS = ['topico_a', 'topico_b', 'topico_c']

def delivery_callback(err, msg):
    if err:
        print(f"Falha ao entregar mensagem: {err}")
    else:
        print(f"Mensagem entregue em {msg.topic()} [{msg.partition()}] offset {msg.offset()}")

conf = {
    'bootstrap.servers': 'localhost:9092'
}

producer = Producer(conf)

print("Tópicos disponíveis para envio:")
for idx, topico in enumerate(TOPICOS_DISPONIVEIS):
    print(f"{idx}: {topico}")

print("\nDigite o número do tópico desejado e a mensagem, separados por dois pontos (ex: 1:Olá Kafka!)")
print("Pressione Ctrl+C para sair.")

try:
    for line in sys.stdin:
        # Exemplo de entrada: 1:Mensagem para o tópico B
        if ':' not in line:
            print("Formato inválido. Use <numero_topico>:<mensagem>")
            continue

        idx_str, mensagem = line.split(':', 1)
        idx_str = idx_str.strip()
        mensagem = mensagem.strip()

        # Validação do índice do tópico
        if not idx_str.isdigit() or int(idx_str) not in range(len(TOPICOS_DISPONIVEIS)):
            print("Índice de tópico inválido.")
            continue

        topico_escolhido = TOPICOS_DISPONIVEIS[int(idx_str)]

        producer.produce(
            topico_escolhido,
            value=mensagem,
            callback=delivery_callback
        )
        producer.poll(0)
except KeyboardInterrupt:
    print("\nEncerrando produtor...")

producer.flush()
print("Produtor finalizado.")


In [None]:
from confluent_kafka import Consumer

# Lista de tópicos disponíveis
TOPICOS_DISPONIVEIS = ['topico_a', 'topico_b', 'topico_c']

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'meu_grupo_consumidor',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)

print("Tópicos disponíveis para assinatura:")
for idx, topico in enumerate(TOPICOS_DISPONIVEIS):
    print(f"{idx}: {topico}")

# Usuário escolhe os tópicos que deseja assinar (pode escolher múltiplos, separados por vírgula)
indices = input("\nDigite os números dos tópicos que deseja assinar (ex: 0,2): ")
indices = [i.strip() for i in indices.split(',') if i.strip().isdigit() and int(i.strip()) in range(len(TOPICOS_DISPONIVEIS))]
topicos_escolhidos = [TOPICOS_DISPONIVEIS[int(i)] for i in indices]

if not topicos_escolhidos:
    print("Nenhum tópico válido selecionado. Encerrando consumidor.")
    consumer.close()
    exit(0)

consumer.subscribe(topicos_escolhidos)
print(f"\nConsumidor iniciado. Assinando os tópicos: {', '.join(topicos_escolhidos)} (Ctrl+C para sair)")

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Erro: {msg.error()}")
        else:
            print(f"[{msg.topic()}] Recebido: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
    print("\nEncerrando consumidor...")

consumer.close()
print("Consumidor finalizado.")
