In [None]:
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
import collections


user_data = collections.defaultdict(lambda: {"transactions": [], "model": None})

def add_transaction(user_id, amount):
    """
    Add a transaction for a user and check for anomalies.
    """
    user = user_data[user_id]
    user["transactions"].append(amount)

    if len(user["transactions"]) <= 20:
        print(f"Learning: Recorded {len(user['transactions'])} transactions for user {user_id}.")
        if len(user["transactions"]) == 20:
            train_user_model(user_id)
        return None


    is_anomalous = predict_transaction(user_id, amount)
    return is_anomalous

def train_user_model(user_id):
    """
    Train the model for a user using the first 20 transactions.
    """
    user = user_data[user_id]
    transactions = np.array(user["transactions"]).reshape(-1, 1)

    model = IsolationForest(n_estimators=50, contamination=0.1, random_state=42)
    model.fit(transactions)

    user["model"] = model
    print(f"Model trained for user {user_id}.")

def predict_transaction(user_id, amount):
    """
    Predict whether a transaction is anomalous or not for a given user.
    """
    user = user_data[user_id]
    model = user["model"]
    if model is None:
        print(f"No model available for user {user_id}. Cannot predict anomalies.")
        return None

    prediction = model.predict([[amount]])[0]
    if prediction == -1:
        print(f"Anomalous transaction detected for user {user_id}: {amount}")
    else:
        print(f"Transaction is normal for user {user_id}: {amount}")
    return prediction == -1

6
if __name__ == "__main__":
    print("Real-Time Anomaly Detection System")
    while True:
        try:
            user_id = input("Enter user ID (or type 'exit' to quit): ")
            if user_id.lower() == "exit":
                break
            amount = float(input(f"Enter transaction amount for user {user_id}: "))
            result = add_transaction(user_id, amount)
            if result is not None:
                print(f"Transaction anomaly status: {'Anomalous' if result else 'Normal'}")
        except ValueError:
            print("Invalid input. Please try again.")


Real-Time Anomaly Detection System
