In [26]:
pip install kafka-python

Note: you may need to restart the kernel to use updated packages.


In [27]:
from kafka import KafkaConsumer
import sqlite3
import json

# Conexão ao SQLite
conn = sqlite3.connect('movies.db')
cursor = conn.cursor()

# Criação da tabela com colunas correspondentes aos campos do JSON
cursor.execute('''
CREATE TABLE IF NOT EXISTS movies (
    id INTEGER PRIMARY KEY,
    adult BOOLEAN,
    backdrop_path TEXT,
    belongs_to_collection TEXT,
    budget INTEGER,
    homepage TEXT,
    imdb_id TEXT,
    original_language TEXT,
    original_title TEXT,
    overview TEXT,
    popularity REAL,
    poster_path TEXT,
    video BOOLEAN,
    vote_average REAL,
    vote_count INTEGER,
    name TEXT, 
    url TEXT  
);
''')
conn.commit()

print("Tabela 'movies' criada com sucesso!")
cursor.close()
conn.close()


Tabela 'movies' criada com sucesso!


In [28]:
# Conexão ao SQLite para deletar todos os registros (pode ser ajustado)
conn = sqlite3.connect('movies.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM movies')
conn.commit()

print("Todos os dados da tabela 'movies' foram deletados!")
cursor.close()
conn.close()

Todos os dados da tabela 'movies' foram deletados!


### Inicialização do "ouvinte" da fila que, a cada leitura, faz uma inserção no banco de dados.

In [29]:
from kafka import KafkaConsumer
import sqlite3
import json

# Configurações do Kafka Consumer
consumer = KafkaConsumer('atividade_kafka', bootstrap_servers='localhost:9092')

# Conexão ao SQLite
conn = sqlite3.connect('movies.db')
cursor = conn.cursor()

# Função para verificar se um valor é numérico
def is_numeric(value):
    try:
        float(value)
        return True
    except (ValueError, TypeError):
        return False

# Função para tratar os dados de cada coluna
def process_data(data):
    # Verifica e trata cada coluna
    data['id'] = data.get('id', None)
    data['adult'] = bool(data.get('adult', False))
    data['backdrop_path'] = data.get('backdrop_path', None)
    data['belongs_to_collection'] = data.get('belongs_to_collection', None)
    data['budget'] = data.get('budget', None)
    data['homepage'] = data.get('homepage', None)
    data['imdb_id'] = data.get('imdb_id', None)
    data['original_language'] = data.get('original_language', None)
    data['original_title'] = data.get('original_title', None)
    data['overview'] = data.get('overview', None)
    data['popularity'] = data.get('popularity', None)
    data['poster_path'] = data.get('poster_path', None)
    data['video'] = bool(data.get('video', False))
    data['vote_average'] = data.get('vote_average', None)
    data['vote_count'] = data.get('vote_count', None)
    data['name'] = data.get('name', 'Desconhecido')
    data['url'] = data.get('url', None)

    # Adicione verificações e tratamentos adicionais para outras colunas, se necessário

    return data

# Função callback para processar mensagens do Kafka
def kafka_callback(message):
    try:
        if message.value:
            data = json.loads(message.value)

            if isinstance(data, dict):
                # Processa os dados antes da inserção
                data = process_data(data)

                cursor.execute('''
                INSERT INTO movies (
                    id, adult, backdrop_path, belongs_to_collection, budget, homepage,
                    imdb_id, original_language, original_title, overview, popularity,
                    poster_path, video, vote_average, vote_count, name, url
                )
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    data['id'], data['adult'], data['backdrop_path'], 
                    data['belongs_to_collection'], data['budget'], data['homepage'],
                    data['imdb_id'], data['original_language'], data['original_title'], 
                    data['overview'], data['popularity'], data['poster_path'],
                    data['video'], data['vote_average'], data['vote_count'], 
                    data['name'], data['url']
                ))
                conn.commit()

                print(f"Dados do filme {data['name']} foram inseridos no SQLite.")
            else:
                print("Erro: Mensagem não está em formato JSON válido. Ignorando...")

        print("Mensagem do Kafka ouvida: ", message.value)

    except json.JSONDecodeError as e:
        print(f"Erro ao decodificar a mensagem JSON: {e}")
    except Exception as e:
        print(f"Erro ao inserir no SQLite: {e}")

# Loop para consumir mensagens do Kafka
for message in consumer:
    kafka_callback(message)

# Fechando a conexão do SQLite
conn.close()


Dados do filme Muzzle foram inseridos no SQLite.
Mensagem do Kafka ouvida:  b'{"adult": false, "backdrop_path": "/eSsMzJpzAwCa69tm6Wco2il44aJ.jpg", "belongs_to_collection": null, "budget": 0, "homepage": "https://muzzlemovie.com", "id": 939335, "imdb_id": "tt17663876", "original_language": "en", "original_title": "Muzzle", "overview": "LAPD K-9 officer Jake Rosser has just witnessed the shocking murder of his dedicated partner by a mysterious assailant. As he investigates the shooter\\u2019s identity, he uncovers a vast conspiracy that has a chokehold on the city in this thrilling journey through the tangled streets of Los Angeles and the corrupt bureaucracy of the LAPD.", "popularity": 1451.551, "poster_path": "/qXChf7MFL36BgoLkiB3BzXiwW82.jpg", "video": false, "vote_average": 6.292, "vote_count": 48, "name": "Muzzle", "url": "https://api.themoviedb.org/3/movie/939335?api_key=faa82585bb153b3362a84dc30fe51c35"}'
Dados do filme Desperation Road foram inseridos no SQLite.
Mensagem do Kaf

KeyboardInterrupt: 