In [None]:
import joblib
from kafka import KafkaConsumer
import json
from alert_service import send_slack_alert
import csv
import os
import pandas as pd

try:
    model = joblib.load("rf_fraud_model.pkl")
except FileNotFoundError:
    print("Error: rf_fraud_model.pkl not found. Please check the file path.")
    exit(1)

consumer = KafkaConsumer(
    'transactions',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='latest',
    group_id='fraud-detectors',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

print("Listening for transactions...")

header_written = False
log_file = 'fraud_logs_2.csv'

for msg in consumer:
    try:
        txn = msg.value

        required_fields = ["user_id", "amount", "location", "timestamp"]
        if not all(field in txn for field in required_fields):
            print(f"Missing fields in transaction: {txn}")
            continue

        loc_code = {'NY': 0, 'LDN': 1, 'SG': 2, 'DEL': 3, 'DXB': 4, 'HKG': 5, 'AMS': 6, 'CPT': 7}.get(txn['location'], -1)
        if loc_code == -1:
            print(f"Invalid location code for transaction: {txn}")
            continue

        features = pd.DataFrame([{
            "amount": txn["amount"],
            "loc_code": loc_code
        }])

        prediction = model.predict(features)[0]

        if prediction == 1:
            send_slack_alert(txn)
            print(f"Fraudulent txn detected: {txn}")
        else:
            print(f"Legit txn: {txn}")

        with open(log_file, 'a', newline='') as file:
            writer = csv.writer(file)
            if not header_written:
                writer.writerow(["user_id", "amount", "location", "timestamp", "prediction"])
                header_written = True
            writer.writerow([
                txn["user_id"],
                txn["amount"],
                txn["location"],
                txn["timestamp"],
                int(prediction)
            ])
            print(f"Logged transaction to {log_file}")

    except Exception as e:
        print(f"Exception: {e}, Transaction: {txn}")
        continue

print(f"Saving transaction logs to: {os.path.abspath(log_file)}")

Listening for transactions...
Legit txn: {'transaction_id': 969323, 'user_id': 9894, 'amount': 5853.04, 'timestamp': '2025-08-19T16:43:45.581784', 'location': 'LDN', 'status': 'success'}
Logged transaction to fraud_logs_2.csv
Legit txn: {'transaction_id': 821719, 'user_id': 7209, 'amount': 6491.05, 'timestamp': '2025-08-19T16:43:46.083221', 'location': 'LDN', 'status': 'success'}
Logged transaction to fraud_logs_2.csv
Legit txn: {'transaction_id': 565879, 'user_id': 6857, 'amount': 8638.32, 'timestamp': '2025-08-19T16:43:46.584446', 'location': 'DEL', 'status': 'success'}
Logged transaction to fraud_logs_2.csv
Legit txn: {'transaction_id': 554641, 'user_id': 4723, 'amount': 3451.05, 'timestamp': '2025-08-19T16:43:47.086421', 'location': 'LDN', 'status': 'success'}
Logged transaction to fraud_logs_2.csv
Legit txn: {'transaction_id': 511428, 'user_id': 2227, 'amount': 1059.43, 'timestamp': '2025-08-19T16:43:47.588471', 'location': 'SG', 'status': 'success'}
Logged transaction to fraud_lo

KeyboardInterrupt: 