# Import Libraries

In [3]:
%pip install kafka-python

Collecting kafka-python
  Using cached kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2
Note: you may need to restart the kernel to use updated packages.


In [1]:
from kafka import KafkaConsumer
import sqlite3
import json
import logging

# Criação de uma tabela no SQLite

In [2]:
conn = sqlite3.connect('data.sql')
cursor = conn.cursor()

cursor.execute('''
CREATE TABLE IF NOT EXISTS league (
    id INTEGER PRIMARY KEY,
    accountId INTEGER,
    puuid TEXT,
    name TEXT,
    profileIconId INTEGER,
    revisionDate DATE,
    summonerLevel INTEGER);
''')
conn.commit()

print("Tabela 'league' criada com sucesso!")
cursor.close()
conn.close()

Tabela 'league' criada com sucesso!


In [10]:
conn = sqlite3.connect('data.sql')
cursor = conn.cursor()
cursor.execute('DELETE FROM league')
conn.commit()

print("Todos os dados da tabela 'league' foram deletados!")
cursor.close()
conn.close()

Todos os dados da tabela 'league' foram deletados!


# Leitura e inserção de dados no SQLite

In [11]:
consumer = KafkaConsumer('atividade_kafka', bootstrap_servers='localhost:9092')
conn = sqlite3.connect('data.sql')
cursor = conn.cursor()

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.4.0
INFO:kafka.conn:Set configuration api_version=(2, 4, 0) to skip auto check_version requests on startup
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('atividade_kafka',)


In [12]:
logging.basicConfig(level=logging.INFO)

def kafka_callback(body, cursor, conn):
    try:
        if body.value:
            data = json.loads(body.value)
            
            id = data.get('id', None)
            accountId = data.get('accountId', None)
            puuid = data.get('puuid', None)
            name = data.get('name', None)
            profileIconId = data.get('profileIconId', None)
            revisionDate = data.get('revisionDate', None)
            summonerLevel = data.get('summonerLevel', None)
            
            cursor.execute('''
            INSERT INTO league (
                id, accountId, puuid, name, profileIconId, revisionDate, summonerLevel
                )
            VALUES (?, ?, ?, ?, ?, ?, ?)
            ''', 
            (id, accountId, puuid, name, profileIconId, revisionDate, summonerLevel))

            conn.commit()
            
            logging.info("Informações do perfil foram inseridas no SQLite.")
        else:
            logging.warning("Erro: Mensagem não está em formato JSON válido. Ignorando...")

        logging.info("Mensagem do Kafka ouvida: %s", body.value)

    except json.JSONDecodeError as e:
        logging.error("Erro ao decodificar a mensagem JSON: %s", e)
    except Exception as e:
        logging.error("Erro ao inserir no SQLite: %s", e)

if __name__ == "__main__":
    conn = sqlite3.connect('data.sql')
    cursor = conn.cursor()

    consumer = KafkaConsumer('atividade_kafka', bootstrap_servers='localhost:9092')

    try:
        for body in consumer:
            kafka_callback(body, cursor, conn)
    except KeyboardInterrupt:
        logging.info("Parando o consumer...")
    finally:
        conn.close()
        consumer.close()

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.4.0
INFO:kafka.conn:Set configuration api_version=(2, 4, 0) to skip auto check_version requests on startup
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('atividade_kafka',)
INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='atividade_kafka', partition=0)]
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>