In [1357]:
# Imports
import numpy as np
import pandas as pd
from sklearn.preprocessing import OneHotEncoder

# Training libs
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest

In [1358]:
# Feature groups
anomaly_detection_input_features = [
    # Non-feature columns
    "transaction_id",             # Unique identifier for the transaction (dropped in training)
    "date",                       # (String) Transaction date (YYYY-MM-DD HH:MM:SS)

    # Feature columns
    "amount",                     # (Float) Transaction amount
    "transaction_type",           # (String) Credit vs Debit 
    "transaction_duration_secs",  # (Integer) Time in seconds from initiation to settlement
    "is_recurring",               # (Boolean) Recurring (1) vs one-off (0)
    "location",                   # (String) City or ZIP code of transaction location
    "device_id",                  # (String) Unique identifier for the device used
    "merchant_id",                # (String) ID of the merchant
    "day_of_week",                # (Integer) Day of week (0=Mon ... 6=Sun)"
    "hour_of_day",                # (Integer) Hour of day (0-23)
    "is_weekend",                 # (Boolean) True if tx on Saturday or Sunday
    "time_since_last_tx_secs",    # (Float) Seconds since previous transaction
    "rolling_mean_3mo_amount",    # (Float) 3-month trailing mean transaction amount
    "rolling_std_3mo_amount",     # (Float) 3-month trailing std deviation of amounts
    "customer_age",               # (Integer) Age of customer in years
    
    # Other derived features
    "is_holiday",                 # (Boolean) True if tx on a holiday

    # Possibly negatively impacting features (least severe to most)
    # "amount_percentile",          # (Float) Percentile rank in user’s historical amounts
    # "channel",                    # (String) Channel used for transaction. (online, branch, atm, mobile, pos)
]

anomaly_detection_output_features = [
    # Output features
    "is_anomaly", # (Boolean): True if the transaction is an anomaly (this is the target)
    "is_anomaly_result", # (Boolean): Result of the anomaly detection, True if an anomaly
    "anomaly_confidence", # (Float): Model’s confidence (0–1) in that flag
]

In [1359]:
# Load the dataset with date parsing
dataset_file_path = "assets/chatgpt_data/transactions_o3_3.csv"
df = pd.read_csv(dataset_file_path, parse_dates=['date'])

In [1360]:
# Preprocess features (since this data was generated by gpt, we can assume it is clean)
def preprocess(v_df: pd.DataFrame) -> None:
    #* Sort Data
    v_df = v_df.sort_values(["date"])

In [1361]:
# Final Cleanup
preprocess(df)

final_features = [c for c in anomaly_detection_input_features if c in df.columns]
df = df[final_features]

In [1362]:
# One hot encode categorical data
def encode(v_df, cols_to_encode):
    # One-hot encode categorical columns and join into v_df
    for col in cols_to_encode:
        ohe = OneHotEncoder(sparse_output=False, handle_unknown="ignore")
        loc_mat = ohe.fit_transform(v_df[[col]])
        loc_cols = [f"{col}_{c}" for c in ohe.categories_[0]]
        df_loc = pd.DataFrame(loc_mat, columns=loc_cols, index=v_df.index)

        # drop the original column and join the new columns
        v_df = v_df.drop(columns=[col])
        v_df = pd.concat([v_df, df_loc], axis=1)
    
    return v_df

In [1363]:
# Convert categorical features to numerical
# Drop non-features
cols_to_drop = ["transaction_id", "date"]
df = df.drop(columns=cols_to_drop)

# One hot encoding for categorical features
df = encode(df, [c for c in ["merchant_id", "device_id", "channel", "location"]
                  if c in anomaly_detection_input_features])

# Other categorical columns
# Convert any boolean columns to 0/1
bool_cols = df.select_dtypes(include="bool").columns
for c in bool_cols:
    df[c] = df[c].astype(int)

# Convert credit/debit to binary
if "transaction_type" in df.columns:
    df["transaction_type"] = df["transaction_type"].map({"credit": 1, "debit": 0})

In [1364]:
iso_params = {
    "n_estimators": 10000,
    "max_samples": 1.0,
    "max_features": 1.0,
    "bootstrap": True,
    "contamination": 0.011,
    "n_jobs": -1,
    "random_state": 42,
    "verbose": 0,
}

In [1365]:
# Scaling and Training
# Prepare numeric matrix and scale
X = df.astype(float)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Train IsolationForest
iso = IsolationForest(
    n_estimators=iso_params["n_estimators"],
    max_samples=iso_params["max_samples"],
    max_features=iso_params["max_features"],
    bootstrap=iso_params["bootstrap"],
    contamination=iso_params["contamination"],
    n_jobs=iso_params["n_jobs"],
    random_state=iso_params["random_state"],
    verbose=iso_params["verbose"],
)
iso.fit(X_scaled)

# Predict & score
labels     = iso.predict(X_scaled)         # 1 = normal, -1 = anomaly
raw_scores = iso.score_samples(X_scaled)   # higher = more normal

df["is_anomaly_result"] = (labels == -1).astype(int)  # 1 = anomaly, 0 = normal

inv = -raw_scores
df["anomaly_confidence"] = (inv - inv.min()) / (inv.max() - inv.min())


In [1366]:
# Test the model
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

# 1. Load & preprocess
df_eval = pd.read_csv(dataset_file_path, parse_dates=['date'])
preprocess(df_eval)

# 2. Extract true labels and drop non-features
y_true = df_eval["is_anomaly"].astype(int)
cols_to_drop = ["transaction_id", "date", "is_anomaly"]
df_eval_model = df_eval.drop(columns=cols_to_drop)

# 3. One-hot encode & boolean/map columns exactly as training
df_eval_model = encode(df_eval_model, ["merchant_id","location","device_id","channel"])
for c in df_eval_model.select_dtypes(include="bool").columns:
    df_eval_model[c] = df_eval_model[c].astype(int)
df_eval_model["transaction_type"] = df_eval_model["transaction_type"].map({"debit":0,"credit":1})

# 4. Align columns with training X
train_cols = X.columns
for c in train_cols:
    if c not in df_eval_model.columns:
        df_eval_model[c] = 0
df_eval_model = df_eval_model[train_cols]

# 5. Scale, predict & score
X_eval_scaled = scaler.transform(df_eval_model.astype(float))
labels_eval    = iso.predict(X_eval_scaled)          # 1=normal, -1=anomaly
y_pred         = (labels_eval == -1).astype(int)
scores_eval    = -iso.score_samples(X_eval_scaled)   # higher = more anomalous

# 6. Print metrics
print(f"Accuracy:  {accuracy_score(y_true, y_pred):.4f}")
print(f"Precision: {precision_score(y_true, y_pred, zero_division=0):.4f}")
print(f"Recall:    {recall_score(y_true, y_pred, zero_division=0):.4f}")
print(f"F1 Score:  {f1_score(y_true, y_pred, zero_division=0):.4f}")
if len(np.unique(y_true))>1:
    print(f"ROC AUC:   {roc_auc_score(y_true, scores_eval):.4f}")
else:
    print("ROC AUC:   not defined (single class in true labels)")

Accuracy:  0.9814
Precision: 0.5636
Recall:    0.3100
F1 Score:  0.4000
ROC AUC:   0.9628
