In [1]:
from kafka import KafkaConsumer, KafkaProducer
import pandas as pd
import json
import joblib

# Configuração do Kafka
bootstrap_servers = ['localhost:29092']
topicName = 'source.public.transacoes'  # Tópico de entrada
topic_elasticsearch = 'transacao_elastic'  # Tópico Kafka que envia para o Elasticsearch

# Inicializar o consumidor Kafka
consumer = KafkaConsumer(topicName, auto_offset_reset='earliest', 
                         bootstrap_servers=bootstrap_servers)

# Inicializar o produtor Kafka para enviar dados ao tópico Elasticsearch
producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                          value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# Carregar modelo treinado
modelo = joblib.load(r"C:\Users\Matheus\Desktop\Estudo\Kafka\Transação_Credito\Previsao\models\fraud_detection.pkl")

print("🚀 Aguardando transações...")

for msg in consumer:
    try:
        # Ler mensagem do Kafka
        data = json.loads(msg.value.decode('utf-8'))  # Decodificar bytes para string e carregar como JSON
        print(f"📥 Transação recebida: {data}")

        # Transformar em DataFrame para passar ao modelo
        df_transacao = pd.DataFrame([data])

        # Selecionar apenas as colunas que o modelo precisa
        features = ['distance_from_home', 'distance_from_last_transaction', 
                    'ratio_to_median_purchase_price', 'repeat_retailer', 
                    'used_chip', 'used_pin_number', 'online_order']

        # Verificar se as colunas estão presentes no DataFrame
        if all(col in df_transacao.columns for col in features):
            # Fazer a previsão
            predicao = modelo.predict(df_transacao[features])[0]

            # Adicionar o resultado da previsão ao dicionário de dados
            data['resultado_previsao'] = 'fraude' if predicao == 1 else 'aprovada'

            # Enviar os dados com a previsão para o tópico do Elasticsearch
            producer.send(topic_elasticsearch, value=data)
            print(f"📤 Transação enviada para o Elasticsearch: {data}")
        else:
            print("❌ Colunas ausentes no DataFrame. Verifique a mensagem do Kafka.")
    except Exception as e:
        print(f"❌ Erro ao processar mensagem: {e}")


🚀 Aguardando transações...
📥 Transação recebida: {'distance_from_home': 57.87785658389723, 'distance_from_last_transaction': 0.3111400080477545, 'ratio_to_median_purchase_price': 1.9459399775518595, 'repeat_retailer': 1.0, 'used_chip': 1.0, 'used_pin_number': 0.0, 'online_order': 0.0}
📤 Transação enviada para o Elasticsearch: {'distance_from_home': 57.87785658389723, 'distance_from_last_transaction': 0.3111400080477545, 'ratio_to_median_purchase_price': 1.9459399775518595, 'repeat_retailer': 1.0, 'used_chip': 1.0, 'used_pin_number': 0.0, 'online_order': 0.0, 'resultado_previsao': 'aprovada'}
📥 Transação recebida: {'distance_from_home': 10.829942699255543, 'distance_from_last_transaction': 0.1755915022816658, 'ratio_to_median_purchase_price': 1.294218810619857, 'repeat_retailer': 1.0, 'used_chip': 0.0, 'used_pin_number': 0.0, 'online_order': 0.0}
📤 Transação enviada para o Elasticsearch: {'distance_from_home': 10.829942699255543, 'distance_from_last_transaction': 0.1755915022816658, 'ra

KeyboardInterrupt: 