In [None]:
from confluent_kafka import Consumer, KafkaException, Producer
import json
import joblib
import time

# Kafka consumer ve producer config
conf = {
    'bootstrap.servers': 'localhost:9092',  # Kafka'nın çalıştığı adres
    'group.id': 'consumer_group',  # Consumer group ID
    'auto.offset.reset': 'earliest'  # İlk veriden başlamak için
}

# Consumer oluştur
consumer = Consumer(conf)
print("Consumer oluşturuldu.")

# Producer için config oluştur
producer = Producer({'bootstrap.servers': 'localhost:9092'})  # Producer config
print("Producer oluşturuldu.")

# Modeli yükle
rf_model = joblib.load('random_forest_model.pkl')
print("Model yüklendi.")

# Kafka topic veri alma
consumer.subscribe(['input_topic'])
print("Topic abonesi olundu.")

# Veriyi işle ve tahmin yap
def process_data_and_predict(data):
    data = json.loads(data)
    features = [data['V1'], data['V6'], data['V8'], data['V13'], data['V15'], data['V19'], data['V20'], data['V21'],
                data['V23'], data['V24'], data['V25'], data['V26'], data['V27'], data['V28'], data['log_amount']]
    print(f"Öznitelikler: {features}")
    prediction = rf_model.predict([features])
    print(f"Model tahmini yapıldı: {prediction[0]}")
    return prediction[0]  # 0: Normal, 1: Anomali

# Anlamlı mesajı yazdırma ve Kafka'ya gönderme
def print_prediction(prediction, msg):
    if prediction == 1:
        print(f"Anomali Tespit Edildi: {msg.value()}")
        producer.produce('anomaly_topic', key=str(msg.key()), value=msg.value())
    else:
        print(f"Normal Veri: {msg.value()}")
        producer.produce('normal_topic', key=str(msg.key()), value=msg.value())

# Veriyi al ve işleme
def process_messages():
    while True:
        # Kafka'dan veri alırken timeout süresini artırıyoruz
        msg = consumer.poll(timeout=1.0)  # 1 saniye bekleyerek veri al

        if msg is None:  # Veri yoksa devam et
            print("Topic'te mesaj yok veya timeout süresi doldu.")
            continue
        if msg.error():
            print(f"Hata oluştu: {msg.error()}")
            raise KafkaException(msg.error())

        # Veriyi işleyip tahmin yapma
        prediction = process_data_and_predict(msg.value())

        # Tahmin sonrası sonuçları yazdırma ve Kafka'ya gönderme
        print_prediction(prediction, msg)

# Consumer'ı başlatma
try:
    print("Mesaj işleme başlıyor...")
    process_messages()

except KeyboardInterrupt:
    print("İşlem durduruldu.")
finally:
    consumer.close()
    print("Consumer kapatıldı.")


Consumer oluşturuldu.
Producer oluşturuldu.
Model yüklendi.
Topic abonesi olundu.
Mesaj işleme başlıyor...
Öznitelikler: [0.2535626844636766, 0.4450429693135607, 0.9258490968744894, 0.4594622885301013, 0.6856512160164645, 0.3940534335723689, 0.2816620327201673, 0.697507559914457, 0.5205509738969608, 0.45269915664058225, 0.262433163233025, 0.5604427201757897, 0.8120329325320805, 0.3125382781433541, 0.040659322954422095]
Model tahmini yapıldı: 1
Anomali Tespit Edildi: b'{"V1": 0.2535626844636766, "V6": 0.4450429693135607, "V8": 0.9258490968744894, "V13": 0.4594622885301013, "V15": 0.6856512160164645, "V19": 0.3940534335723689, "V20": 0.2816620327201673, "V21": 0.697507559914457, "V23": 0.5205509738969608, "V24": 0.45269915664058225, "V25": 0.262433163233025, "V26": 0.5604427201757897, "V27": 0.8120329325320805, "V28": 0.3125382781433541, "log_amount": 0.040659322954422095, "Class": 0}'
Öznitelikler: [0.37082747599874055, 0.7037790973351542, 0.9476308241748139, 0.13302741801506612, 0.9529