In [1]:
import json
import csv
import os
from datetime import datetime
from confluent_kafka import Consumer, KafkaError, TopicPartition

In [2]:
# kafka configuration
consumer_config = {
    'bootstrap.servers': 'kafka2:9093,kafka1:9092',  # Endereço do(s) broker(s) Kafka
    'group.id': 'consumo-impacta',        # Identificador do grupo de consumidores
    'auto.offset.reset': 'earliest',         # Lê todas as mensagens disponíveis no tópico
    'client.id': 'consumidor_do_kafka'            # nome do client conectado
}
consumer = Consumer(consumer_config)
topic = f'impacta'  # Substitua pelo nome do seu tópico Kafka
partition = 0
offset = 0  # colocando 0, vmaos consumir sempre desde o inicio

# Atribua a partição e o offset ao consumidor
consumer.assign([TopicPartition(topic, partition, offset)])
# consumer.subscribe([topic])

In [3]:
# Função para salvar mensagens em CSV
def save_to_csv(messages, filename):
    messages = [json.loads(message.decode('utf-8')) for message in messages]
    keys = messages[0].keys()
    with open(filename, 'w', newline='') as output_file:
        dict_writer = csv.DictWriter(output_file, fieldnames=keys)
        dict_writer.writeheader()
        dict_writer.writerows(messages)

In [4]:
# Consumir mensagens e salvar em CSV a cada minuto
messages = []
start_time = datetime.now()

In [5]:
contador = 0
while (contador <= 10):
    print(contador)
    msg = consumer.poll(1)
    if msg is None:
        contador = contador + 1
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print('Fim da Partição')
        else:
            print('Erro no Consumidor: {}'.format(msg.error()))
    else:
        print('Recebido: {}'.format(msg.value()))
        # Pega o valor da mensagem
        data = msg.value()

        messages.append(data)
        current_time = datetime.now()
        if (current_time - start_time).seconds >= 60:
            filename = f"messages_{start_time.strftime('%Y%m%d_%H%M')}.csv"
            save_to_csv(messages, filename)
            print(f"Saved {len(messages)} messages to {filename}")
            messages = []
            start_time = current_time
        contador = 0
consumer.close()

0
Recebido: b'{"user": "user2", "action": "post", "message": "This is a comment by user1", "timestamp": "2025-11-19T15:27:27.702562"}'
0
Recebido: b'{"user": "user3", "action": "like", "message": "This is a post by user2", "timestamp": "2025-11-19T15:27:28.703875"}'
0
Recebido: b'{"user": "user1", "action": "like", "message": "This is a post by user1", "timestamp": "2025-11-19T15:27:29.704978"}'
0
Recebido: b'{"user": "user2", "action": "comment", "message": "This is a post by user2", "timestamp": "2025-11-19T15:27:30.707086"}'
0
Recebido: b'{"user": "user3", "action": "like", "message": "This is a like by user1", "timestamp": "2025-11-19T15:27:31.707740"}'
0
Recebido: b'{"user": "user1", "action": "comment", "message": "This is a post by user2", "timestamp": "2025-11-19T15:27:32.709051"}'
0
Recebido: b'{"user": "user3", "action": "post", "message": "This is a like by user3", "timestamp": "2025-11-19T15:27:33.711900"}'
0
Recebido: b'{"user": "user3", "action": "post", "message": "This 