# 04. Anomaly Detection
## Smart Wafer Yield Optimization Project

This notebook implements advanced anomaly detection techniques for identifying defective wafers and process anomalies in semiconductor manufacturing.

### Objectives:
- Implement multiple anomaly detection algorithms
- Visualize anomalies in 2D and 3D space
- Analyze anomaly characteristics and patterns
- Compare different detection methods
- Save trained anomaly detectors for production use

### Algorithms to Implement:
1. **Isolation Forest**: Tree-based anomaly detection
2. **One-Class SVM**: Support vector machine for novelty detection
3. **Local Outlier Factor (LOF)**: Density-based anomaly detection
4. **Statistical Methods**: Z-score and modified Z-score
5. **Ensemble Methods**: Combining multiple detectors

### Visualization Techniques:
- t-SNE for 2D anomaly visualization
- PCA for dimensionality reduction
- Anomaly score distributions
- Process parameter analysis


In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.metrics import classification_report, confusion_matrix
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import warnings
warnings.filterwarnings('ignore')

# Import our utility functions
import sys
import os
notebook_path = os.path.abspath("")
if notebook_path.endswith("notebooks"):
    project_root = os.path.dirname(notebook_path)
    os.chdir(project_root)
from app.utils import load_data, preprocess_data

print("Libraries imported successfully!")
print("Ready to begin anomaly detection...")


## 1. Load and Prepare Data


In [None]:
# Load the preprocessed data
print("Loading preprocessed SECOM data...")
import os
import time
data = load_data()

# Check if we have preprocessed data, otherwise preprocess
if os.path.exists('../data/processed/secom_cleaned.csv'):
    data = pd.read_csv('../data/processed/secom_cleaned.csv')
    print("✅ Loaded preprocessed data")
else:
    print("⚠️ No preprocessed data found, preprocessing now...")
    data = preprocess_data(data, method='knn')

print(f"Dataset shape: {data.shape}")
print(f"Missing values: {data.isnull().sum().sum()}")

# For anomaly detection, we'll use features only (no target needed)
if 'target' in data.columns:
    X = data.drop('target', axis=1)
    y_true = data['target']  # Keep for evaluation if available
    print(f"Features: {X.shape[1]}, Using target for evaluation: {y_true.value_counts().to_dict()}")
else:
    X = data
    y_true = None
    print("No target variable found - unsupervised anomaly detection")

# Standardize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
print("✅ Data prepared for anomaly detection")


## 2. Implement Multiple Anomaly Detection Algorithms


In [None]:
# Implement multiple anomaly detection algorithms
print("Implementing anomaly detection algorithms...")

# Define algorithms
algorithms = {
    'Isolation Forest': IsolationForest(
        contamination=0.1,  # Expected proportion of anomalies
        random_state=42
    ),
    'One-Class SVM': OneClassSVM(
        nu=0.1,  # Proportion of outliers
        kernel='rbf',
        gamma='scale'
    ),
    'Local Outlier Factor': LocalOutlierFactor(
        n_neighbors=20,
        contamination=0.1
    )
}

# Train and evaluate each algorithm
results = {}

for name, algorithm in algorithms.items():
    print(f"\nTraining {name}...")
    
    # Train algorithm
    start_time = time.time()
    if name == 'Local Outlier Factor':
        # LOF returns -1 for outliers, 1 for inliers
        anomaly_labels = algorithm.fit_predict(X_scaled)
        anomaly_scores = algorithm.negative_outlier_factor_
    else:
        # Other algorithms
        algorithm.fit(X_scaled)
        anomaly_labels = algorithm.predict(X_scaled)
        anomaly_scores = algorithm.decision_function(X_scaled)
    
    training_time = time.time() - start_time
    
    # Convert to binary (1 = normal, 0 = anomaly)
    binary_labels = (anomaly_labels == 1).astype(int)
    n_anomalies = np.sum(binary_labels == 0)
    
    results[name] = {
        'algorithm': algorithm,
        'labels': binary_labels,
        'scores': anomaly_scores,
        'n_anomalies': n_anomalies,
        'anomaly_rate': n_anomalies / len(binary_labels),
        'training_time': training_time
    }
    
    print(f"✅ {name} - Anomalies detected: {n_anomalies} ({n_anomalies/len(binary_labels)*100:.1f}%)")

# Display summary
print("\n📊 Anomaly Detection Summary:")
summary_df = pd.DataFrame({
    name: {
        'Anomalies': results[name]['n_anomalies'],
        'Anomaly Rate': f"{results[name]['anomaly_rate']*100:.1f}%",
        'Training Time': f"{results[name]['training_time']:.2f}s"
    }
    for name in results.keys()
}).T

print(summary_df)


## 3. Visualize Anomalies in 2D Space


In [None]:
# Reduce dimensionality for visualization
print("Reducing dimensionality for visualization...")

# Apply PCA first for efficiency
pca = PCA(n_components=50)
X_pca = pca.fit_transform(X_scaled)

# Apply t-SNE for 2D visualization
print("Applying t-SNE...")
tsne = TSNE(n_components=2, random_state=42, perplexity=30)
X_2d = tsne.fit_transform(X_pca)

print("✅ Dimensionality reduction completed")

# Create visualizations for each algorithm
fig, axes = plt.subplots(2, 2, figsize=(16, 12))

# Plot for each algorithm
for i, (name, result) in enumerate(results.items()):
    row = i // 2
    col = i % 2
    
    # Create scatter plot
    normal_mask = result['labels'] == 1
    anomaly_mask = result['labels'] == 0
    
    axes[row, col].scatter(X_2d[normal_mask, 0], X_2d[normal_mask, 1], 
                          c='blue', alpha=0.6, s=20, label='Normal')
    axes[row, col].scatter(X_2d[anomaly_mask, 0], X_2d[anomaly_mask, 1], 
                          c='red', alpha=0.8, s=30, label='Anomaly')
    
    axes[row, col].set_title(f'{name}\nAnomalies: {result["n_anomalies"]} ({result["anomaly_rate"]*100:.1f}%)')
    axes[row, col].set_xlabel('t-SNE 1')
    axes[row, col].set_ylabel('t-SNE 2')
    axes[row, col].legend()
    axes[row, col].grid(True, alpha=0.3)

# Hide unused subplot
if len(results) < 4:
    axes[1, 1].set_visible(False)

plt.suptitle('Anomaly Detection Results in 2D Space', fontsize=16)
plt.tight_layout()
plt.show()

print("✅ Anomaly visualization completed")


## 4. Save Best Anomaly Detector


In [None]:
# Save the best anomaly detector (Isolation Forest is typically most robust)
import os
import joblib

os.makedirs('../models', exist_ok=True)

# Choose Isolation Forest as the best detector
best_algorithm_name = 'Isolation Forest'
best_algorithm = results[best_algorithm_name]['algorithm']

# Save the model
model_path = '../models/anomaly_detector.pkl'
joblib.dump(best_algorithm, model_path)

# Save the scaler as well
scaler_path = '../models/anomaly_scaler.pkl'
joblib.dump(scaler, scaler_path)

print(f"✅ Best anomaly detector ({best_algorithm_name}) saved to {model_path}")
print(f"✅ Scaler saved to {scaler_path}")

# Save metadata
metadata = {
    'algorithm_name': best_algorithm_name,
    'n_anomalies': results[best_algorithm_name]['n_anomalies'],
    'anomaly_rate': results[best_algorithm_name]['anomaly_rate'],
    'training_time': results[best_algorithm_name]['training_time'],
    'n_features': X.shape[1],
    'n_samples': X.shape[0]
}

import json
with open('../models/anomaly_metadata.json', 'w') as f:
    json.dump(metadata, f, indent=2)

print("✅ Anomaly detection metadata saved")
print(f"📊 Final Anomaly Detection Results:")
print(f"   Algorithm: {metadata['algorithm_name']}")
print(f"   Anomalies Detected: {metadata['n_anomalies']}")
print(f"   Anomaly Rate: {metadata['anomaly_rate']*100:.1f}%")
print(f"   Features: {metadata['n_features']}")
print(f"   Samples: {metadata['n_samples']}")

print("\n🎯 Anomaly detection pipeline completed successfully!")
