In [3]:
from kafka import KafkaConsumer
import json
import pandas as pd

In [4]:

consumer = KafkaConsumer(
    'bank-transactions-ml',
    bootstrap_servers='localhost:9092',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    consumer_timeout_ms=5000  # <-- stop after 5s if no messages
)

data = []

print("📥 Collecting training data from Kafka...")
for message in consumer:
    txn = message.value
    data.append(txn)
    
    if len(data) >= 1000:
        break

consumer.close()

df = pd.DataFrame(data)
print(df.head())

📥 Collecting training data from Kafka...
                          transactionId accountId    type        amount  \
0  72255aab-2756-4ee6-9778-0d645b08a196   ACC4964  CREDIT  33711.676386   
1  147025d8-11a2-4f5a-bbac-3805b0f89343   ACC9706  CREDIT   4257.083251   
2  afe944fe-c8c6-40e9-989f-bbf6cf1ebafb   ACC2635  CREDIT  43616.042779   
3  ea0f4a14-fab6-4210-bfff-675c46e23081   ACC6301  CREDIT  14224.955741   
4  6da4b388-59ec-4d7f-87c5-9e3536e47ee5   ACC3343  CREDIT  21679.047254   

      timestamp     location                      email  
0  1.756754e+09       London  arohirajput3850@gmail.com  
1  1.755947e+09     New York  vivekrajput8244@gmail.com  
2  1.755673e+09       Mumbai  arohirajput3850@gmail.com  
3  1.756121e+09        Paris     vrrajput1720@gmail.com  
4  1.756883e+09  Pune, India  vivekrajput8244@gmail.com  


In [5]:

# Convert timestamp
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
df['hour'] = df['timestamp'].dt.hour
df['day'] = df['timestamp'].dt.dayofweek

# Email domain
df['email_domain'] = df['email'].apply(lambda x: x.split('@')[-1] if isinstance(x, str) and '@' in x else "unknown")

# One-hot encode categorical vars
df = pd.get_dummies(df, columns=['type', 'location', 'email_domain'], drop_first=True)

# Drop IDs (not useful for fraud patterns)
df = df.drop(columns=['transactionId', 'accountId', 'timestamp', 'email'])

print("✅ Final shape:", df.shape)

✅ Final shape: (100, 9)


In [6]:
from sklearn.ensemble import IsolationForest

iso = IsolationForest(
    n_estimators=200,
    contamination=0.05,   # assume 5% frauds
    random_state=42
)

iso.fit(df)

# Predict anomalies (-1 = fraud, 1 = normal)
df['fraud_pred'] = iso.predict(df)
df['fraud_pred'] = df['fraud_pred'].map({1: "Normal", -1: "Fraud"})

print(df[['amount', 'fraud_pred']].head(20))


          amount fraud_pred
0   33711.676386     Normal
1    4257.083251     Normal
2   43616.042779     Normal
3   14224.955741     Normal
4   21679.047254     Normal
5   42965.773808     Normal
6   48738.422878     Normal
7   49654.567238     Normal
8    6092.086254     Normal
9     264.157941     Normal
10  33988.501063     Normal
11  39498.724007     Normal
12  34571.359945     Normal
13  15351.227360     Normal
14  28040.669417     Normal
15  19041.391066     Normal
16  45250.357077     Normal
17   8679.622083      Fraud
18  10769.156427     Normal
19  26328.122260     Normal


In [7]:
import joblib
joblib.dump(iso, "fraud_model.pkl")

['fraud_model.pkl']

In [10]:
df.head(20)

Unnamed: 0,amount,hour,day,type_DEBIT,location_London,location_Mumbai,location_New York,location_Paris,"location_Pune, India",fraud_pred
0,33711.676386,19,0,False,True,False,False,False,False,Normal
1,4257.083251,11,5,False,False,False,True,False,False,Normal
2,43616.042779,6,2,False,False,True,False,False,False,Normal
3,14224.955741,11,0,False,False,False,False,True,False,Normal
4,21679.047254,7,2,False,False,False,False,False,True,Normal
5,42965.773808,9,2,True,False,True,False,False,False,Normal
6,48738.422878,21,5,True,False,False,False,True,False,Normal
7,49654.567238,23,5,True,False,False,False,False,False,Normal
8,6092.086254,14,0,False,False,False,False,False,False,Normal
9,264.157941,8,4,False,False,False,True,False,False,Normal


In [None]:
model = joblib.load("fraud_model.pkl")
txn_vector = preprocess_new_txn(new_txn)  # transform new transaction
prediction = model.predict([txn_vector])