In [None]:
from confluent_kafka import Consumer, KafkaException, Producer
import json
import joblib
import time

# Kafka consumer ve producer config
conf = {
    'bootstrap.servers': 'localhost:9092',  # Kafka'nın çalıştığı adres
    'group.id': 'consumer_group',  # Consumer group ID
    'auto.offset.reset': 'earliest'  # İlk veriden başlamak için
}

# Consumer oluştur
consumer = Consumer(conf)
print("Consumer oluşturuldu.")

# Producer için config oluştur
producer = Producer({'bootstrap.servers': 'localhost:9092'})  # Producer config
print("Producer oluşturuldu.")

# Modeli yükle
rf_model = joblib.load('random_forest_model.pkl')
print("Model yüklendi.")

# Kafka topic'ten veri al
consumer.subscribe(['input_topic'])
print("Topic abonesi olundu.")

# Veriyi işle ve tahmin yap
def process_data_and_predict(data):
    data = json.loads(data)
    features = [data['V1'], data['V6'], data['V8'], data['V13'], data['V15'], data['V19'], data['V20'], data['V21'],
                data['V23'], data['V24'], data['V25'], data['V26'], data['V27'], data['V28'], data['year'], data['month'],
                data['day'], data['hour'], data['weekday'], data['log_amount']]
    print(f"Öznitelikler: {features}")
    prediction = rf_model.predict([features])
    print(f"Model tahmini yapıldı: {prediction[0]}")
    return prediction[0]  # 0: Normal, 1: Anomali

# Anlamlı mesajı yazdırma ve Kafka'ya gönderme
def print_prediction(prediction, msg):
    if prediction == 1:
        print(f"Anomali Tespit Edildi: {msg.value()}")
        producer.produce('anomaly_topic', key=str(msg.key()), value=msg.value())
    else:
        print(f"Normal Veri: {msg.value()}")
        producer.produce('normal_topic', key=str(msg.key()), value=msg.value())

# Veriyi al ve işleme
def process_messages():
    while True:
        # Kafka'dan veri alırken timeout süresini artırıyoruz
        msg = consumer.poll(timeout=1.0)  # 1 saniye bekleyerek veri al

        if msg is None:  # Veri yoksa devam et
            print("Topic'te mesaj yok veya timeout süresi doldu.")
            continue
        if msg.error():
            print(f"Hata oluştu: {msg.error()}")
            raise KafkaException(msg.error())

        # Veriyi işleyip tahmin yapma
        prediction = process_data_and_predict(msg.value())

        # Tahmin sonrası sonuçları yazdırma ve Kafka'ya gönderme
        print_prediction(prediction, msg)

# Consumer'ı başlatma
try:
    print("Mesaj işleme başlıyor...")
    process_messages()

except KeyboardInterrupt:
    print("İşlem durduruldu.")
finally:
    consumer.close()
    print("Consumer kapatıldı.")


Consumer oluşturuldu.
Producer oluşturuldu.
Model yüklendi.
Topic abonesi olundu.
Mesaj işleme başlıyor...
Öznitelikler: [0.7403182357420065, 0.9070454626339304, 0.6353753923764834, 0.9824513544309549, 0.9514798334086078, 0.9258564757137899, 0.7869180946953968, 0.32969714973725583, 0.08688671413745153, 0.46874680698582905, 0.11137714299501644, 0.28866912295196445, 0.04478550286516203, 0.7690030479226413, 0, 1, 1, 1, 1, 0.9485304200617772]
Model tahmini yapıldı: 0
Normal Veri: b'{"V1": 0.7403182357420065, "V6": 0.9070454626339304, "V8": 0.6353753923764834, "V13": 0.9824513544309549, "V15": 0.9514798334086078, "V19": 0.9258564757137899, "V20": 0.7869180946953968, "V21": 0.32969714973725583, "V23": 0.08688671413745153, "V24": 0.46874680698582905, "V25": 0.11137714299501644, "V26": 0.28866912295196445, "V27": 0.04478550286516203, "V28": 0.7690030479226413, "year": 0, "month": 1, "day": 1, "hour": 1, "weekday": 1, "log_amount": 0.9485304200617772, "Class": 0}'
Öznitelikler: [0.2395177380953