# Consumer - ETL | Kafka

Este notebook tem por objetivo conectar no kafka, consumir a mensagem do ano selecionado e enviar os dados brutos e tratados para o Postgres.

In [1]:
# !pip install confluent-kafka psycopg2-binary

In [2]:
from confluent_kafka import Consumer, KafkaError, TopicPartition
import psycopg2
import pandas as pd
from sqlalchemy import create_engine

In [3]:
ano="2019"

In [4]:
def f_cria_tabela(tabela):
    # Configurações de conexão com o PostgreSQL
    tb = tabela
    db_conf = {
        'host': 'teste-postgres-compose',
        'database': 'enem',
        'user': 'postgres',
        'password': 'Postgres2019!'
    }
    conn = psycopg2.connect(**db_conf)
    cur = conn.cursor()
    # cria a tabela, se nao existir (f1_schema.laps_{ano})
    query_create = f"""CREATE TABLE IF NOT EXISTS {tb}
    (
        col_1 text COLLATE pg_catalog."default"
    )
    TABLESPACE pg_default;
    ALTER TABLE IF EXISTS {tb}
        OWNER to postgres;
    TRUNCATE TABLE {tb};
    """
    cur.execute(query_create)
    conn.commit()
    cur.close()

def f_trunca_tabela(tabela):
    # Configurações de conexão com o PostgreSQL
    tb = tabela
    db_conf = {
        'host': 'teste-postgres-compose',
        'database': 'enem',
        'user': 'postgres',
        'password': 'Postgres2019!'
    }
    conn = psycopg2.connect(**db_conf)
    cur = conn.cursor()
    # cria a tabela, se nao existir (f1_schema.laps_{ano})
    query_trunc = f"TRUNCATE TABLE {tb};"
    cur.execute(query_trunc)
    conn.commit()
    cur.close()
    

In [None]:
# kafka configuration
consumer_config = {
    'bootstrap.servers': 'kafka2:9093',  # Endereço do(s) broker(s) Kafka
    'group.id': 'consumo-teste',        # Identificador do grupo de consumidores
    'auto.offset.reset': 'earliest',         # Lê todas as mensagens disponíveis no tópico
    'client.id': 'consumidor_do_kafka'            # nome do client conectado
}
consumer = Consumer(consumer_config)
topic = f'notas-{ano}'  # Substitua pelo nome do seu tópico Kafka
partition = 0
offset = 0  # colocando 0, vmaos consumir sempre desde o inicio

# Atribua a partição e o offset ao consumidor
consumer.assign([TopicPartition(topic, partition, offset)])
# consumer.subscribe([topic])

# Configurações de conexão com o PostgreSQL
db_config = {
    'host': 'teste-postgres-compose',
    'database': 'enem',
    'user': 'postgres',
    'password': 'Postgres2019!'
}
connection = psycopg2.connect(**db_config)
cursor = connection.cursor()
    
# cria a tabela, se nao existir
f_cria_tabela(f"redacao.notas_redacao_{ano}")
# limpa a tabela de destino
# f_trunca_tabela(f"enem.notas_redacao_{ano}")

contador = 0
while (contador <= 10):
    msg = consumer.poll(1)
    if msg is None:
        contador = contador + 1
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print('Fim da Partição')
        else:
            print('Erro no Consumidor: {}'.format(msg.error()))
    else:
        print('Recebido: {}'.format(msg.value()))
        # Pega o valor da mensagem
        data = msg.value()
    
        # Inserir dados brutos no Postgres
        insert_query = f"INSERT INTO redacao.notas_redacao_{ano} (col_1) VALUES (%s);"
        valor = data.decode('utf-8')
        cursor.execute(insert_query, (valor, ))
        connection.commit()
        
        # Inserir dados tratados no Postgres
        registro=[]
        registro.append(valor.replace('\n','').split(';'))
        df = pd.DataFrame(registro, columns=['NO_MUNICIPIO_RESIDENCIA', 'NU_IDADE', 'TP_SEXO', 'NU_NOTA_REDACAO'])
        engine = create_engine('postgresql://postgres:Postgres2019!@teste-postgres-compose:5432/enem') 
        df.to_sql(name=f'notas_redacao_tratada_{ano}', con=engine, schema='redacao', if_exists='append',index=False)

        contador = 0
        print(contador)

consumer.close()

Recebido: b'S\xc3\xa3o Bernardo do Campo;19;M;560.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;32;F;0.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;20;F;920.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;19;M;820.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;20;F;0.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;18;M;640.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;41;F;0.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;17;F;560.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;33;F;600.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;17;F;660.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;27;F;0.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;24;M;0.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;17;F;500.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;27;M;580.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;17;F;320.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;18;F;640.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;31;F;0.0\n'
0
Recebido: b'S\xc3\xa3o Bernardo do Campo;23