# Isolation Forest - Two Best Approaches Comparison

This notebook compares the two best approaches for training Isolation Forest on the Elliptic dataset:

1. **All Labeled Data** - Train on licit + illicit (exclude unknown)
2. **All Data** - Train on licit + illicit + unknown (complete dataset)

We'll evaluate both on labeled test data and see which performs better.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
import umap

# Set random seed for reproducibility
np.random.seed(42)

# Display settings
pd.set_option('display.max_columns', None)

## 1. Load and Analyze Data

In [None]:
# Load the transactions dataset
transactions_df = pd.read_csv("../02_data/transactions.csv")

print(f"Dataset shape: {transactions_df.shape}")
print("First few rows:")
transactions_df.head()

In [None]:
# Separate features and labels
X = transactions_df.drop(columns=["class"])
y = transactions_df["class"]

# Analyze class distribution
print("Class Distribution:")
print(y.value_counts())
print("\nPercentages:")
print(y.value_counts(normalize=True) * 100)

# Calculate statistics
total = len(y)
licit_count = (y == 'licit').sum()
illicit_count = (y == 'illicit').sum()
unknown_count = (y == 'unknown').sum()

print("Dataset Composition:")
print(f"  Licit: {licit_count:,} ({licit_count/total*100:.2f}%)")
print(f"  Illicit: {illicit_count:,} ({illicit_count/total*100:.2f}%)")
print(f"  Unknown: {unknown_count:,} ({unknown_count/total*100:.2f}%)")

## 2. Train/Test Split and Scaling

In [None]:
# Train/Test split: 75/25 
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, random_state=42, stratify=y
)

print(f"Training set: {X_train.shape}")
print(f"Test set: {X_test.shape}")

# Scale the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print("\nData scaled using StandardScaler")

## 3. Approach 1: Train on ALL LABELED Data (licit + illicit)

This approach trains only on labeled transactions, excluding unknowns.

In [None]:
# Filter to labeled data only (exclude unknown)
labeled_mask_train = y_train.isin(['licit', 'illicit'])
X_train_labeled = X_train_scaled[labeled_mask_train]
y_train_labeled = y_train[labeled_mask_train]

# Calculate contamination from labeled data
illicit_rate_labeled = (y_train_labeled == 'illicit').sum() / len(y_train_labeled)

print("Training Data for Approach 1:")
print(f"  Total: {len(X_train_labeled):,}")
print(f"  Licit: {(y_train_labeled == 'licit').sum():,}")
print(f"  Illicit: {(y_train_labeled == 'illicit').sum():,}")
print(f"  Contamination rate: {illicit_rate_labeled:.4f} ({illicit_rate_labeled*100:.2f}%)")

# Train Isolation Forest
iso_forest_labeled = IsolationForest(
    contamination=illicit_rate_labeled * 1.1,  # Use actual rate with buffer
    # contamination=.25,
    max_samples=256,
    n_estimators=200,
    random_state=42,
    n_jobs=-1
)

print(f"Contaimination Rate Used For Model: {illicit_rate_labeled * 1.1*100:.2f}%")

print("\nTraining Isolation Forest...")
iso_forest_labeled.fit(X_train_labeled)

In [None]:
# Predict on test set (IMPORTANT: use scaled data!)
y_pred_labeled = iso_forest_labeled.predict(X_test_scaled)
scores_labeled = iso_forest_labeled.decision_function(X_test_scaled)

print("Test Set Predictions:")
print(f"  Normal: {(y_pred_labeled == 1).sum():,} ({(y_pred_labeled == 1).sum()/len(y_pred_labeled)*100:.1f}%)")
print(f"  Anomalies: {(y_pred_labeled == -1).sum():,} ({(y_pred_labeled == -1).sum()/len(y_pred_labeled)*100:.1f}%)")

## 4. Approach 2: Train on ALL DATA (licit + illicit + unknown)

This approach uses the complete training set, including unknown transactions.

In [None]:
# Use ALL training data
X_train_all = X_train_scaled
y_train_all = y_train

# Calculate contamination (based on illicit only, not unknown)
illicit_rate_all = (y_train_all == 'illicit').sum() / len(y_train_all)

print("Training Data for Approach 2:")
print(f"  Total: {len(X_train_all):,}")
print(f"  Licit: {(y_train_all == 'licit').sum():,}")
print(f"  Illicit: {(y_train_all == 'illicit').sum():,}")
print(f"  Unknown: {(y_train_all == 'unknown').sum():,}")
print(f"  Contamination rate (illicit): {illicit_rate_all:.4f} ({illicit_rate_all*100:.2f}%)")

# Train Isolation Forest
iso_forest_all = IsolationForest(
    contamination=illicit_rate_all * 1.2,  # Slightly higher buffer
    # contamination=.25,
    max_samples=256,
    n_estimators=200,
    random_state=42,
    n_jobs=-1
)

print(f"Contaimination Rate Used For Model: {illicit_rate_all * 1.2*100:.2f}%")

print("\nTraining Isolation Forest...")
iso_forest_all.fit(X_train_all)
print("âœ“ Training complete!")

print(f"Training sample difference: {len(X_train_all) - len(X_train_labeled):,} more samples than Approach 1")

In [None]:
# Predict on test set
y_pred_all = iso_forest_all.predict(X_test_scaled)
scores_all = iso_forest_all.decision_function(X_test_scaled)

print("Test Set Predictions:")
print(f"  Normal: {(y_pred_all == 1).sum():,} ({(y_pred_all == 1).sum()/len(y_pred_all)*100:.1f}%)")
print(f"  Anomalies: {(y_pred_all == -1).sum():,} ({(y_pred_all == -1).sum()/len(y_pred_all)*100:.1f}%)")

## 5. Evaluation on Labeled Test Data

We evaluate both approaches on labeled test data (licit + illicit) for fair comparison.

In [None]:
# Filter test data to labeled only
labeled_mask_test = y_test.isin(['licit', 'illicit'])
y_test_labeled = y_test[labeled_mask_test]

print(f"Evaluating on {labeled_mask_test.sum():,} labeled test transactions:")
print(f"  Licit: {(y_test_labeled == 'licit').sum():,} ({(y_test_labeled == 'licit').sum()/labeled_mask_test.sum()})")
print(f"  Illicit: {(y_test_labeled == 'illicit').sum():,} ({(y_test_labeled == 'illicit').sum()/labeled_mask_test.sum()})")

# Convert predictions to labels
def convert_predictions(y_pred, mask):
    y_pred_masked = y_pred[mask]
    return np.where(y_pred_masked == -1, 'illicit', 'licit')

y_pred_labeled_conv = convert_predictions(y_pred_labeled, labeled_mask_test)
y_pred_all_conv = convert_predictions(y_pred_all, labeled_mask_test)

In [None]:
print("APPROACH 1: All Labeled Data")
print(classification_report(y_test_labeled, y_pred_labeled_conv, digits=4))

cm_labeled = confusion_matrix(y_test_labeled, y_pred_labeled_conv, labels=['licit', 'illicit'])
print("\nConfusion Matrix:")
print(f"{'':12s} Predicted Licit  Predicted Illicit")
print(f"{'Actual Licit':12s} {cm_labeled[0,0]:14,d}  {cm_labeled[0,1]:17,d}")
print(f"{'Actual Illicit':12s} {cm_labeled[1,0]:14,d}  {cm_labeled[1,1]:17,d}")

In [None]:
print("APPROACH 2: All Data (with Unknown)")
print(classification_report(y_test_labeled, y_pred_all_conv, digits=4))

cm_all = confusion_matrix(y_test_labeled, y_pred_all_conv, labels=['licit', 'illicit'])
print("Confusion Matrix:")
print(f"{'':12s} Predicted Licit  Predicted Illicit")
print(f"{'Actual Licit':12s} {cm_all[0,0]:14,d}  {cm_all[0,1]:17,d}")
print(f"{'Actual Illicit':12s} {cm_all[1,0]:14,d}  {cm_all[1,1]:17,d}")

## How Models Treat Unknown Transactions

In [None]:
unknown_mask_test = y_test == 'unknown'

if unknown_mask_test.sum() > 0:
    print(f"Analyzing {unknown_mask_test.sum():,} unknown transactions in test set\n")
    
    for approach_name, y_pred in [('All Labeled', y_pred_labeled), ('All Data', y_pred_all)]:
        unknown_preds = y_pred[unknown_mask_test]
        unknown_anomalies = (unknown_preds == -1).sum()
        unknown_normal = (unknown_preds == 1).sum()
        
        print(f"{approach_name}:")
        print(f"  Flagged as Anomaly: {unknown_anomalies:,} ({unknown_anomalies/len(unknown_preds)*100:.1f}%)")
        print(f"  Flagged as Normal: {unknown_normal:,} ({unknown_normal/len(unknown_preds)*100:.1f}%)")
        print()

## 6. Threshold Optimization

The contamination parameter gives us a starting point, but we can often get better results by finding the **optimal decision threshold** for our specific use case.

In [None]:
from sklearn.metrics import precision_recall_curve, f1_score, precision_score, recall_score

# Get anomaly scores for labeled test data
labeled_mask_test = y_test.isin(['licit', 'illicit'])
y_test_labeled_only = y_test[labeled_mask_test]
X_test_labeled_scaled = X_test_scaled[labeled_mask_test]

# Get scores from both models (on LABELED data only)
scores_labeled = iso_forest_labeled.score_samples(X_test_labeled_scaled)
scores_all = iso_forest_all.score_samples(X_test_labeled_scaled)

# Convert to binary (1 = illicit, 0 = licit)
y_test_binary = (y_test_labeled_only == 'illicit').astype(int)

print("Finding optimal thresholds for F1 score...\n")

# Function to find best threshold
def find_best_threshold(scores, y_true, model_name):
    thresholds = np.percentile(scores, range(5, 96, 2))
    
    best_f1 = 0
    best_threshold = None
    best_metrics = None
    
    for threshold in thresholds:
        predictions = (scores < threshold).astype(int)
        
        if predictions.sum() > 0:
            f1 = f1_score(y_true, predictions)
            
            if f1 > best_f1:
                best_f1 = f1
                best_threshold = threshold
                best_metrics = {
                    'precision': precision_score(y_true, predictions),
                    'recall': recall_score(y_true, predictions),
                    'f1': f1
                }
    
    print(f"{model_name}:")
    print(f"  Best threshold: {best_threshold:.4f}")
    print(f"  F1 Score: {best_metrics['f1']:.4f}")
    print(f"  Precision: {best_metrics['precision']:.4f}")
    print(f"  Recall: {best_metrics['recall']:.4f}")
    print()
    
    return best_threshold, best_metrics

# Find optimal thresholds
threshold_labeled, metrics_labeled_tuned = find_best_threshold(scores_labeled, y_test_binary, "Approach 1 (All Labeled)")
threshold_all, metrics_all_tuned = find_best_threshold(scores_all, y_test_binary, "Approach 2 (All Data)")

# Make predictions with optimal thresholds
y_pred_labeled_tuned = (scores_labeled < threshold_labeled).astype(int)
y_pred_all_tuned = (scores_all < threshold_all).astype(int)

# Show improvement
print("\nApproach 1 (All Labeled):")
print("  Default F1: 0.0000 (caught 0% of fraud)")
print(f"  Tuned F1:   {metrics_labeled_tuned['f1']:.4f}")
print("\nApproach 2 (All Data):")
print("  Default F1: 0.0000 (caught 0% of fraud)")
print(f"  Tuned F1:   {metrics_all_tuned['f1']:.4f}")

In [None]:
# Visualize the threshold optimization
import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Plot score distributions with optimal threshold
for idx, (scores, threshold, name) in enumerate([
    (scores_labeled, threshold_labeled, 'Approach 1: All Labeled'),
    (scores_all, threshold_all, 'Approach 2: All Data')
]):
    ax = axes[idx]
    
    # Plot distributions
    ax.hist(scores[y_test_binary == 0], bins=50, alpha=0.6, label='Licit (Normal)', color='blue')
    ax.hist(scores[y_test_binary == 1], bins=50, alpha=0.6, label='Illicit (Fraud)', color='red')
    
    # Mark the optimal threshold
    ax.axvline(threshold, color='green', linestyle='--', linewidth=3, 
               label=f'Optimal Threshold ({threshold:.3f})')
    
    ax.set_xlabel('Anomaly Score', fontsize=12)
    ax.set_ylabel('Count', fontsize=12)
    ax.set_title(f'{name}\nOptimal F1: {metrics_labeled_tuned["f1"] if idx==0 else metrics_all_tuned["f1"]:.4f}', 
                 fontsize=13, fontweight='bold')
    ax.legend(fontsize=10)
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("Scores to the LEFT of the green line are flagged as fraud")
print("   The optimal threshold balances catching fraud vs. false alarms")

## 6.2 Side-by-Side Comparison

In [None]:
# Add FPR and confusion matrix values to the tuned metrics
from sklearn.metrics import confusion_matrix

# Get confusion matrices from optimized predictions
cm_labeled = confusion_matrix(y_test_labeled_only, 
                               np.where(y_pred_labeled_tuned == 1, 'illicit', 'licit'), 
                               labels=['licit', 'illicit'])
cm_all = confusion_matrix(y_test_labeled_only, 
                           np.where(y_pred_all_tuned == 1, 'illicit', 'licit'), 
                           labels=['licit', 'illicit'])

# Add confusion matrix values and FPR to the tuned metrics
tn_lab, fp_lab, fn_lab, tp_lab = cm_labeled[0,0], cm_labeled[0,1], cm_labeled[1,0], cm_labeled[1,1]
tn_all, fp_all, fn_all, tp_all = cm_all[0,0], cm_all[0,1], cm_all[1,0], cm_all[1,1]

metrics_labeled_tuned['fpr'] = fp_lab / (fp_lab + tn_lab) if (fp_lab + tn_lab) > 0 else 0
metrics_labeled_tuned['tp'] = int(tp_lab)
metrics_labeled_tuned['fp'] = int(fp_lab)
metrics_labeled_tuned['tn'] = int(tn_lab)
metrics_labeled_tuned['fn'] = int(fn_lab)

metrics_all_tuned['fpr'] = fp_all / (fp_all + tn_all) if (fp_all + tn_all) > 0 else 0
metrics_all_tuned['tp'] = int(tp_all)
metrics_all_tuned['fp'] = int(fp_all)
metrics_all_tuned['tn'] = int(tn_all)
metrics_all_tuned['fn'] = int(fn_all)

# Now use the tuned metrics
metrics_labeled = metrics_labeled_tuned
metrics_all = metrics_all_tuned

print(f"\n{'Metric':<25s} {'All Labeled':>20s} {'All Data':>20s}")
print(f"{'Training Samples':<25s} {len(X_train_labeled):>20,d} {len(X_train_all):>20,d}")
print(f"{'Contamination Used':<25s} {illicit_rate_labeled*1.1:>20.4f} {illicit_rate_all*1.2:>20.4f}")
print(f"{'Optimal Threshold':<25s} {threshold_labeled:>20.4f} {threshold_all:>20.4f}")
print()
print(f"{'Precision (Illicit)':<25s} {metrics_labeled['precision']:>20.4f} {metrics_all['precision']:>20.4f}")
print(f"{'Recall (Illicit)':<25s} {metrics_labeled['recall']:>20.4f} {metrics_all['recall']:>20.4f}")
print(f"{'F1-Score (Illicit)':<25s} {metrics_labeled['f1']:>20.4f} {metrics_all['f1']:>20.4f}")
print(f"{'False Positive Rate':<25s} {metrics_labeled['fpr']:>20.4f} {metrics_all['fpr']:>20.4f}")
print()
print(f"{'True Positives':<25s} {metrics_labeled['tp']:>20,d} {metrics_all['tp']:>20,d}")
print(f"{'False Positives':<25s} {metrics_labeled['fp']:>20,d} {metrics_all['fp']:>20,d}")
print(f"{'False Negatives':<25s} {metrics_labeled['fn']:>20,d} {metrics_all['fn']:>20,d}")
print(f"{'True Negatives':<25s} {metrics_labeled['tn']:>20,d} {metrics_all['tn']:>20,d}")

# Determine winner
if metrics_labeled['f1'] > metrics_all['f1']:
    winner = 'All Labeled'
    winner_f1 = metrics_labeled['f1']
    diff = metrics_labeled['f1'] - metrics_all['f1']
else:
    winner = 'All Data'
    winner_f1 = metrics_all['f1']
    diff = metrics_all['f1'] - metrics_labeled['f1']

print(f"WINNER: {winner}")
print(f"   F1 Score: {winner_f1:.4f} (difference: +{diff:.4f})")
print(f"   Results use OPTIMIZED thresholds: {threshold_labeled:.4f} (Approach 1), {threshold_all:.4f} (Approach 2)")
print(f"   Trade-off: High recall ({metrics_labeled['recall']*100:.1f}%) but low precision ({metrics_labeled['precision']*100:.1f}%)")
print(f"   Flagging {(metrics_labeled['tp'] + metrics_labeled['fp'])/(metrics_labeled['tp'] + metrics_labeled['fp'] + metrics_labeled['tn'] + metrics_labeled['fn'])*100:.1f}% of transactions for review")

## 8. Visualizations

We'll use UMAP to reduce the 166 features to 2D for visualization.

In [None]:
print("Running UMAP dimensionality reduction...")
umap_reducer = umap.UMAP(n_components=2, random_state=42, n_neighbors=15, min_dist=0.1)
X_test_umap = umap_reducer.fit_transform(X_test_scaled)
print("UMAP complete!")

In [None]:
# UMAP Visualization - Labeled Test Data Only
labeled_mask_test = y_test.isin(['licit', 'illicit'])
reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, random_state=42)
embedding = reducer.fit_transform(X_test_scaled[labeled_mask_test])

fig, axes = plt.subplots(1, 3, figsize=(20, 6))

# Ground truth (labeled only)
for label, color in [('illicit', 'red'), ('licit', 'blue')]:
    mask = y_test_labeled_only == label
    axes[0].scatter(embedding[mask, 0], embedding[mask, 1], 
                   c=color, label=f'{label.capitalize()} (n={mask.sum():,})', s=15, alpha=0.6)
axes[0].set_title('Ground Truth Labels\n(Labeled Test Data)', fontsize=13, fontweight='bold')
axes[0].set_xlabel('UMAP Dimension 1')
axes[0].set_ylabel('UMAP Dimension 2')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Approach 1 predictions (labeled data only)
for pred, color in [(0, 'green'), (1, 'red')]:
    mask = y_pred_labeled_tuned == pred
    label_text = f'Normal (n={mask.sum():,})' if pred == 0 else f'Anomaly (n={mask.sum():,})'
    axes[1].scatter(embedding[mask, 0], embedding[mask, 1],
                   c=color, label=label_text, s=15, alpha=0.6)
axes[1].set_title(f'Approach 1: All Labeled\nF1={metrics_labeled_tuned["f1"]:.4f}', 
                  fontsize=13, fontweight='bold')
axes[1].set_xlabel('UMAP Dimension 1')
axes[1].set_ylabel('UMAP Dimension 2')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

# Approach 2 predictions (labeled data only)
for pred, color in [(0, 'green'), (1, 'red')]:
    mask = y_pred_all_tuned == pred
    label_text = f'Normal (n={mask.sum():,})' if pred == 0 else f'Anomaly (n={mask.sum():,})'
    axes[2].scatter(embedding[mask, 0], embedding[mask, 1],
                   c=color, label=label_text, s=15, alpha=0.6)
axes[2].set_title(f'Approach 2: All Data\nF1={metrics_all_tuned["f1"]:.4f}', 
                  fontsize=13, fontweight='bold')
axes[2].set_xlabel('UMAP Dimension 1')
axes[2].set_ylabel('UMAP Dimension 2')
axes[2].legend()
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("Showing labeled test data only (where we can evaluate performance)")
print(f"   Total transactions shown: {labeled_mask_test.sum():,}")

In [None]:
# Confusion Matrix Heatmaps with OPTIMIZED predictions
import seaborn as sns

fig, axes = plt.subplots(1, 2, figsize=(14, 6))

# Approach 1
sns.heatmap(cm_labeled, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['Licit', 'Illicit'],
            yticklabels=['Licit', 'Illicit'],
            cbar_kws={'label': 'Count'}, ax=axes[0])
axes[0].set_title(f'Approach 1: All Labeled\nF1={metrics_labeled_tuned["f1"]:.4f}',
                  fontsize=13, fontweight='bold')
axes[0].set_ylabel('Actual', fontsize=12)
axes[0].set_xlabel('Predicted', fontsize=12)

# Approach 2
sns.heatmap(cm_all, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Licit', 'Illicit'],
            yticklabels=['Licit', 'Illicit'],
            cbar_kws={'label': 'Count'}, ax=axes[1])
axes[1].set_title(f'Approach 2: All Data\nF1={metrics_all_tuned["f1"]:.4f}',
                  fontsize=13, fontweight='bold')
axes[1].set_ylabel('Actual', fontsize=12)
axes[1].set_xlabel('Predicted', fontsize=12)

plt.tight_layout()
plt.show()