In [1]:
import os
import cv2
import numpy as np
import pandas as pd
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest, AdaBoostClassifier
from sklearn.neighbors import LocalOutlierFactor, NearestNeighbors
from sklearn.covariance import EllipticEnvelope
from sklearn.cluster import DBSCAN, KMeans
from sklearn.mixture import GaussianMixture
from sklearn.tree import DecisionTreeClassifier

from sklearn.metrics import (precision_score, recall_score, f1_score, 
                             confusion_matrix, roc_auc_score, roc_curve)

try:
    from tensorflow.keras import layers, models, losses, optimizers
    from tensorflow.keras.callbacks import EarlyStopping
    import tensorflow as tf
    KERAS_AVAILABLE = True
except:
    print("TensorFlow/Keras not available. Deep learning methods will be skipped.")
    KERAS_AVAILABLE = False

2025-11-09 10:15:11.299286: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1762683311.530505      13 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1762683311.599908      13 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

In [2]:
# SECTION 1: AITEX DATASET LOADER

class AITEXDatasetLoader:
    """Load AITEX fabric dataset with specific directory structure"""
    
    def __init__(self, dataset_root_path):
        """
        Initialize loader for AITEX dataset
        """
        self.dataset_root = Path(dataset_root_path)
        self.defect_dir = self.dataset_root / "Defect_images"
        self.no_defect_dir = self.dataset_root / "NODefect_images"
        self.mask_dir = self.dataset_root / "Mask_images"
        
        self._validate_structure()
    
    def _validate_structure(self):
        """Validate that required directories exist"""
        required_dirs = [self.defect_dir, self.no_defect_dir]
        
        for dir_path in required_dirs:
            if not dir_path.exists():
                raise ValueError(f"Required directory not found: {dir_path}")
        
        print(f"✓ Dataset structure validated")
        print(f"  Defect images: {self.defect_dir}")
        print(f"  NODefect images: {self.no_defect_dir}")
        if self.mask_dir.exists():
            print(f"  Mask images: {self.mask_dir}")
    
    def load_defect_images(self):
        """Load all defect images"""
        print("\nLoading defect images...")
        images = []
        paths = []
        
        image_files = sorted(list(self.defect_dir.glob("*.png")))
        
        for idx, image_file in enumerate(image_files):
            if (idx + 1) % 50 == 0 or idx == 0:
                print(f"  Loading: {idx + 1}/{len(image_files)}")
            
            try:
                img = cv2.imread(str(image_file), cv2.IMREAD_GRAYSCALE)
                if img is not None:
                    img = cv2.resize(img, (256, 256))
                    images.append(img)
                    paths.append(str(image_file))
            except Exception as e:
                print(f"  Error loading {image_file}: {e}")
        
        print(f"Loaded {len(images)} defect images")
        return images, paths
    
    def load_no_defect_images(self):
        """Load all non-defect images from subcategories"""
        print("\nLoading non-defect images...")
        images = []
        paths = []
        
        # Get all subdirectories (fabric types)
        subdirs = sorted([d for d in self.no_defect_dir.iterdir() if d.is_dir()])
        print(f"Found {len(subdirs)} fabric types")
        
        total_images = sum(len(list(d.glob("*.png"))) for d in subdirs)
        loaded = 0
        
        for subdir in subdirs:
            fabric_type = subdir.name
            image_files = sorted(list(subdir.glob("*.png")))
            
            for image_file in image_files:
                try:
                    img = cv2.imread(str(image_file), cv2.IMREAD_GRAYSCALE)
                    if img is not None:
                        img = cv2.resize(img, (256, 256))
                        images.append(img)
                        paths.append(str(image_file))
                        loaded += 1
                        
                        if loaded % 100 == 0:
                            print(f"  Loaded: {loaded}/{total_images}")
                except Exception as e:
                    print(f"  Error loading {image_file}: {e}")
        
        print(f"Loaded {len(images)} non-defect images")
        return images, paths
    
    def load_masks(self):
        """Load mask images for defect localization"""
        print("\nLoading mask images...")
        masks = {}
        
        if not self.mask_dir.exists():
            print("Mask directory not found. Skipping.")
            return masks
        
        mask_files = sorted(list(self.mask_dir.glob("*.png")))
        
        for mask_file in mask_files:
            try:
                mask = cv2.imread(str(mask_file), cv2.IMREAD_GRAYSCALE)
                if mask is not None:
                    mask = cv2.resize(mask, (256, 256))
                    masks[mask_file.stem] = mask
            except Exception as e:
                print(f"Error loading {mask_file}: {e}")
        
        print(f"Loaded {len(masks)} mask images")
        return masks
    
    def get_dataset_summary(self):
        """Print dataset summary"""
        defect_count = len(list(self.defect_dir.glob("*.png")))
        
        no_defect_count = 0
        for subdir in self.no_defect_dir.iterdir():
            if subdir.is_dir():
                no_defect_count += len(list(subdir.glob("*.png")))
        
        mask_count = len(list(self.mask_dir.glob("*.png"))) if self.mask_dir.exists() else 0
        
        print("\n" + "="*70)
        print("AITEX DATASET SUMMARY")
        print("="*70)
        print(f"Defect images:     {defect_count}")
        print(f"Non-defect images: {no_defect_count}")
        print(f"Mask images:       {mask_count}")
        print(f"Total images:      {defect_count + no_defect_count}")
        print("="*70)
        
        return {
            'defect': defect_count,
            'no_defect': no_defect_count,
            'masks': mask_count,
            'total': defect_count + no_defect_count
        }

In [3]:
# SECTION 2: FEATURE EXTRACTION (from previous implementation)

class ImageFeatureExtractor:
    """Extract multiple types of features from fabric images"""
    
    def __init__(self, image_size=(256, 256)):
        self.image_size = image_size
        self.scaler = StandardScaler()
    
    def edge_detection(self, img):
        """Extract edge features using Canny edge detection"""
        edges = cv2.Canny(img, 50, 150)
        edge_features = [
            np.sum(edges) / edges.size,
            np.std(edges),
            np.count_nonzero(edges) / edges.size,
        ]
        return np.array(edge_features), edges
    
    def corner_detection_harris(self, img):
        """Extract corner features using Harris corner detection"""
        corners = cv2.cornerHarris(img, 2, 3, 0.04)
        corners = cv2.dilate(corners, None)
        
        corner_features = [
            np.sum(corners) / corners.size,
            np.max(corners),
            np.count_nonzero(corners > 0.01 * corners.max()) / corners.size,
            np.std(corners),
        ]
        return np.array(corner_features), corners
    
    def blob_detection(self, img):
        """Extract blob features using SimpleBlobDetector"""
        params = cv2.SimpleBlobDetector_Params()
        params.filterByArea = True
        params.minArea = 10
        params.maxArea = 5000
        params.filterByCircularity = True
        params.minCircularity = 0.1
        detector = cv2.SimpleBlobDetector_create(params)
        
        keypoints = detector.detect(img)
        
        blob_features = [
            len(keypoints),
            np.mean([kp.size for kp in keypoints]) if keypoints else 0,
            np.std([kp.size for kp in keypoints]) if len(keypoints) > 1 else 0,
        ]
        return np.array(blob_features), keypoints
    
    def ridge_detection(self, img):
        """Extract ridge features using Sobel derivatives"""
        sobelx = cv2.Sobel(img, cv2.CV_64F, 1, 0, ksize=3)
        sobely = cv2.Sobel(img, cv2.CV_64F, 0, 1, ksize=3)
        
        ridge_strength = np.sqrt(sobelx**2 + sobely**2)
        
        ridge_features = [
            np.mean(ridge_strength),
            np.std(ridge_strength),
            np.percentile(ridge_strength, 95),
        ]
        return np.array(ridge_features), ridge_strength
    
    def sift_features(self, img):
        """Extract SIFT keypoints and descriptors"""
        sift = cv2.SIFT_create()
        keypoints, descriptors = sift.detectAndCompute(img, None)
        
        if descriptors is None or len(descriptors) == 0:
            sift_features = np.zeros(128)  # Fixed size: only mean features
        else:
            # Use only mean of descriptors for fixed-size output
            sift_features = np.mean(descriptors, axis=0)
        
        return sift_features, keypoints, descriptors
    
    def surf_features(self, img):
        """Extract SURF keypoints and descriptors"""
        try:
            surf = cv2.xfeatures2d.SURF_create(400)
            keypoints, descriptors = surf.detectAndCompute(img, None)
            
            if descriptors is None or len(descriptors) == 0:
                surf_features = np.zeros(64)  # Fixed size
            else:
                surf_features = np.mean(descriptors, axis=0)
        except:
            surf_features = np.zeros(64)
            keypoints = []
            descriptors = None
        
        return surf_features, keypoints, descriptors
    
    def orb_features(self, img):
        """Extract ORB keypoints and descriptors"""
        orb = cv2.ORB_create(nfeatures=500)
        keypoints, descriptors = orb.detectAndCompute(img, None)
        
        if descriptors is None or len(descriptors) == 0:
            orb_features = np.zeros(32)  # Fixed size
        else:
            descriptors_float = descriptors.astype(np.float32)
            orb_features = np.mean(descriptors_float, axis=0)
        
        return orb_features, keypoints, descriptors
    
    def texture_features(self, img):
        """Extract texture features using LBP-like statistics"""
        log = cv2.Laplacian(img, cv2.CV_64F)
        
        texture_features = [
            np.mean(log),
            np.std(log),
            np.percentile(log, 25),
            np.percentile(log, 75),
        ]
        return np.array(texture_features)
    
    def extract_all_features(self, img):
        """Extract all features from an image"""
        edge_feat, _ = self.edge_detection(img)
        corner_feat, _ = self.corner_detection_harris(img)
        blob_feat, _ = self.blob_detection(img)
        ridge_feat, _ = self.ridge_detection(img)
        texture_feat = self.texture_features(img)
        
        sift_feat, _, _ = self.sift_features(img)
        surf_feat, _, _ = self.surf_features(img)
        orb_feat, _, _ = self.orb_features(img)
        
        all_features = np.concatenate([
            edge_feat, corner_feat, blob_feat, ridge_feat, texture_feat,
            sift_feat, surf_feat, orb_feat
        ])
        
        return all_features.reshape(1, -1)

In [4]:
class AutoencoderAnomalyDetector:
    """
    Autoencoder for Anomaly Detection
    
    Theory:
    - Autoencoders learn a compressed representation of normal data
    - Reconstruction error = ||input - reconstruction||²
    - Anomalies have higher reconstruction error
    - Works with unsupervised learning (no labels needed)
    
    Architecture:
    - Input layer → Encoder layers → Bottleneck → Decoder layers → Output layer
    - Compression ratio determines bottleneck size
    - Deeper networks capture more complex patterns
    
    Advantages:
    - Learns complex, non-linear patterns
    - Unsupervised (no labels needed)
    - Handles high-dimensional data well
    - Can detect novel anomalies
    
    Disadvantages:
    - Needs large training data (>1000 samples)
    - Slow training and inference
    - Risk of overfitting
    - Hyperparameter tuning complex
    - Requires GPU for speed
    
    Parameters:
    - input_dim: Number of input features
    - encoding_dim: Bottleneck dimension
    - epochs: Training epochs (50-200)
    - batch_size: Batch size (32-128)
    - contamination: Expected anomaly fraction
    """
    
    def __init__(self, input_dim, encoding_dim=32, contamination=0.1):
        self.input_dim = input_dim
        self.encoding_dim = encoding_dim
        self.contamination = contamination
        self.model = None
        self.scaler = StandardScaler()
        self.threshold = None
    
    def build_model(self):
        if not KERAS_AVAILABLE:
            raise ImportError("TensorFlow/Keras not installed")
        
        # Encoder
        input_layer = layers.Input(shape=(self.input_dim,))
        
        encoded = layers.Dense(128, activation='relu')(input_layer)
        encoded = layers.Dropout(0.2)(encoded)
        encoded = layers.Dense(64, activation='relu')(encoded)
        encoded = layers.Dropout(0.2)(encoded)
        encoded = layers.Dense(self.encoding_dim, activation='relu')(encoded)
        
        # Decoder
        decoded = layers.Dense(64, activation='relu')(encoded)
        decoded = layers.Dropout(0.2)(decoded)
        decoded = layers.Dense(128, activation='relu')(decoded)
        decoded = layers.Dropout(0.2)(decoded)
        decoded = layers.Dense(self.input_dim, activation='sigmoid')(decoded)
        
        # Autoencoder model
        self.model = models.Model(input_layer, decoded)
        
        # Compile
        self.model.compile(
            optimizer=optimizers.Adam(learning_rate=0.001),
            loss=losses.MeanSquaredError()
        )
        
        return self
    
    def fit(self, X_train, epochs=50, batch_size=32, validation_split=0.1, verbose=1):
        if self.model is None:
            self.build_model()
        
        # Normalize data
        X_train_scaled = self.scaler.fit_transform(X_train)
        
        # Early stopping callback
        early_stop = EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True
        )
        
        # Train
        history = self.model.fit(
            X_train_scaled, X_train_scaled,
            epochs=epochs,
            batch_size=batch_size,
            validation_split=validation_split,
            callbacks=[early_stop],
            verbose=verbose
        )
        
        # Set threshold based on training data
        train_predictions = self.model.predict(X_train_scaled, verbose=0)
        train_mse = np.mean((X_train_scaled - train_predictions) ** 2, axis=1)
        self.threshold = np.percentile(train_mse, (1 - self.contamination) * 100)
        
        return history
    
    def predict(self, X_test):
        """
        Returns:
            predictions: -1 (anomaly) or 1 (normal)
            reconstruction_errors: MSE for each sample
        """
        if self.model is None:
            raise ValueError("Model not fitted yet")
        
        # Normalize
        X_test_scaled = self.scaler.transform(X_test)
        
        # Get reconstructions
        reconstructions = self.model.predict(X_test_scaled, verbose=0)
        
        # Compute reconstruction error (MSE)
        reconstruction_errors = np.mean(
            (X_test_scaled - reconstructions) ** 2,
            axis=1
        )
        
        # Classify as anomaly if error > threshold
        predictions = np.where(
            reconstruction_errors > self.threshold,
            -1,  # anomaly
            1    # normal
        )
        
        return predictions, reconstruction_errors
    
    def get_reconstruction_visualization(self, X_test, num_samples=5):
        """
        Returns:
            originals: Original data
            reconstructions: Reconstructed data
        """
        X_test_scaled = self.scaler.transform(X_test[:num_samples])
        reconstructions = self.model.predict(X_test_scaled, verbose=0)
        
        return X_test_scaled, reconstructions

In [5]:
# SECTION 3: ANOMALY DETECTION METHODS

class AnomalyDetectionMethods:
    """Comprehensive anomaly detection using multiple algorithms"""
    
    def __init__(self):
        self.models = {}
        self.results = {}
        self.scaler = StandardScaler()
    
    def knn_anomaly_detection(self, X_train, X_test, n_neighbors=5, percentile=95):
        """Detect anomalies using K-Nearest Neighbors"""
        model = NearestNeighbors(n_neighbors=n_neighbors)
        model.fit(X_train)
        distances, _ = model.kneighbors(X_test)
        anomaly_scores = distances[:, -1]
        threshold = np.percentile(distances[:, -1], percentile)
        predictions = (anomaly_scores > threshold).astype(int)
        predictions = np.where(predictions == 1, -1, 1)
        return model, predictions, anomaly_scores
    
    def pca_anomaly_detection(self, X_train, X_test, variance_explained=0.95, percentile=95):
        """Detect anomalies using PCA reconstruction error"""
        pca = PCA(n_components=variance_explained)
        X_train_pca = pca.fit_transform(X_train)
        X_test_pca = pca.transform(X_test)
        
        X_train_reconstructed = pca.inverse_transform(X_train_pca)
        X_test_reconstructed = pca.inverse_transform(X_test_pca)
        
        train_errors = np.mean((X_train - X_train_reconstructed) ** 2, axis=1)
        test_errors = np.mean((X_test - X_test_reconstructed) ** 2, axis=1)
        
        threshold = np.percentile(train_errors, percentile)
        predictions = (test_errors > threshold).astype(int)
        predictions = np.where(predictions == 1, -1, 1)
        
        return pca, predictions, test_errors
    
    def isolation_forest_detection(self, X_train, X_test, contamination=0.1):
        """Detect anomalies using Isolation Forest"""
        model = IsolationForest(contamination=contamination, random_state=42)
        model.fit(X_train)
        predictions = model.predict(X_test)
        scores = model.score_samples(X_test)
        return model, predictions, scores
    
    def local_outlier_factor_detection(self, X_train, X_test, n_neighbors=20, contamination=0.1, novelty=True):
        """Detect anomalies using Local Outlier Factor"""
        model = LocalOutlierFactor(n_neighbors=n_neighbors, contamination=contamination, novelty=novelty)
        X_combined = np.vstack([X_train, X_test])
        model.fit(X_combined)
        predictions = model.predict(X_test)
        scores = model.negative_outlier_factor_
        return model, predictions, scores[-len(X_test):]
    
    def one_class_svm_detection(self, X_train, X_test, nu=0.1):
        """Detect anomalies using One-Class SVM"""
        model = OneClassSVM(kernel='rbf', nu=nu)
        model.fit(X_train)
        predictions = model.predict(X_test)
        scores = model.decision_function(X_test)
        return model, predictions, scores
    
    def elliptic_envelope_detection(self, X_train, X_test, contamination=0.1):
        """Detect anomalies using Elliptic Envelope"""
        model = EllipticEnvelope(contamination=contamination, random_state=42)
        model.fit(X_train)
        predictions = model.predict(X_test)
        scores = model.decision_function(X_test)
        return model, predictions, scores
    
    def dbscan_anomaly_detection(self, X_train, X_test, eps=0.5, min_samples=5):
        """Detect anomalies using DBSCAN"""
        X_combined = np.vstack([X_train, X_test])
        model = DBSCAN(eps=eps, min_samples=min_samples)
        labels = model.fit_predict(X_combined)
        predictions = labels[-len(X_test):]
        predictions = np.where(predictions == -1, -1, 1)
        return model, predictions, labels[-len(X_test):]
    
    def gaussian_mixture_anomaly_detection(self, X_train, X_test, n_components=10, percentile=95):
        """Detect anomalies using Gaussian Mixture Model"""
        model = GaussianMixture(n_components=n_components, random_state=42)
        model.fit(X_train)
        train_scores = -model.score_samples(X_train)
        test_scores = -model.score_samples(X_test)
        threshold = np.percentile(train_scores, percentile)
        predictions = (test_scores > threshold).astype(int)
        predictions = np.where(predictions == 1, -1, 1)
        return model, predictions, test_scores
    
    def adaboost_detection(self, X_train, y_train, X_test,
                           n_estimators=100, learning_rate=0.5):
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        # Create and train model
        base_estimator = DecisionTreeClassifier(max_depth=1, random_state=42)
        model = AdaBoostClassifier(
            estimator=base_estimator,
            n_estimators=n_estimators,
            learning_rate=learning_rate,
            random_state=42
        )
        model.fit(X_train_scaled, y_train)
        
        # Predict
        probabilities = model.predict_proba(X_test_scaled)[:, 1]
        predictions = np.where(probabilities > 0.5, -1, 1)
        
        return model, predictions, probabilities
    
    def autoencoder_detection(self, X_train, X_test, encoding_dim=32,
                              epochs=50, batch_size=32,
                              contamination=0.1):
        input_dim = X_train.shape[1]

        # Create detector
        detector = AutoencoderAnomalyDetector(
            input_dim=input_dim,
            encoding_dim=encoding_dim,
            contamination=contamination
        )

        # Train
        detector.fit(X_train, epochs=epochs, batch_size=batch_size, verbose=0)

        # Predict
        predictions, reconstruction_errors = detector.predict(X_test)
        return detector, predictions, reconstruction_errors
    
    def kmeans_anomaly_detection(self, X_train, X_test, n_clusters=5, contamination=0.1):
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        model = KMeans(n_clusters=n_clusters, init='k-means++', random_state=42)
        model.fit(X_train_scaled)

        train_labels = model.predict(X_train_scaled)
        train_distances = np.sqrt(np.sum((X_train_scaled - model.cluster_centers_[train_labels])**2, axis=1))
        
        threshold = np.percentile(train_distances, (1 - contamination) * 100)

        test_labels = model.predict(X_test_scaled)
        test_distances = np.sqrt(np.sum((X_test_scaled - model.cluster_centers_[test_labels])**2, axis=1))

        predictions = np.where(test_distances > threshold, -1, 1)
        return model, predictions, test_distances

In [6]:
# SECTION 4: MAIN PIPELINE FOR AITEX DATASET

class AITEXAnomalyDetectionPipeline:
    """Complete pipeline for AITEX fabric anomaly detection"""
    
    def __init__(self, dataset_path):
        self.dataset_path = dataset_path
        self.loader = AITEXDatasetLoader(dataset_path)
        self.extractor = ImageFeatureExtractor()
        self.anomaly_detector = AnomalyDetectionMethods()
        self.results_df = None
    
    def load_and_prepare_dataset(self, train_ratio=0.7, limit_no_defect=None):
        """Load AITEX dataset and prepare for training"""
        print("Loading AITEX dataset...")
        
        # Get dataset summary
        summary = self.loader.get_dataset_summary()
        
        # Load defect images
        defect_images, defect_paths = self.loader.load_defect_images()
        defect_labels = np.ones(len(defect_images))  # 1 for defect
        
        # Load non-defect images
        no_defect_images, no_defect_paths = self.loader.load_no_defect_images()
        no_defect_labels = np.zeros(len(no_defect_images))  # 0 for normal
        
        # Limit non-defect if needed
        if limit_no_defect and len(no_defect_images) > limit_no_defect:
            indices = np.random.choice(len(no_defect_images), limit_no_defect, replace=False)
            no_defect_images = [no_defect_images[i] for i in indices]
            no_defect_paths = [no_defect_paths[i] for i in indices]
            no_defect_labels = no_defect_labels[indices]
        
        print(f"\nDataset loaded:")
        print(f"  Defect images: {len(defect_images)}")
        print(f"  Non-defect images: {len(no_defect_images)}")
        
        # Extract features
        print("\nExtracting features...")
        all_images = defect_images + no_defect_images
        all_labels = np.concatenate([defect_labels, no_defect_labels])
        all_paths = defect_paths + no_defect_paths
        
        features_list = []
        for idx, img in enumerate(all_images):
            if (idx + 1) % 100 == 0 or idx == 0:
                print(f"  Extracting: {idx + 1}/{len(all_images)}")
            
            try:
                features = self.extractor.extract_all_features(img)
                features_list.append(features)
            except Exception as e:
                print(f"  Error extracting features from image {idx}: {e}")
                features_list.append(np.zeros((1, 241)))
        
        X = np.vstack(features_list)
        y = all_labels
        
        # Split into train and test
        # Training: mostly normal images
        normal_indices = np.where(y == 0)[0]
        defect_indices = np.where(y == 1)[0]
        
        n_train_normal = int(len(normal_indices) * train_ratio)
        train_indices = np.concatenate([
            normal_indices[:n_train_normal],
            np.random.choice(defect_indices, size=min(len(defect_indices)//3, 5), replace=False)
        ])
        
        test_indices = np.concatenate([
            normal_indices[n_train_normal:],
            defect_indices[len(defect_indices)//3:]
        ])
        
        X_train = X[train_indices]
        X_test = X[test_indices]
        y_train = y[train_indices]
        y_test = y[test_indices]
        
        # Normalize
        self.anomaly_detector.scaler.fit(X_train)
        X_train = self.anomaly_detector.scaler.transform(X_train)
        X_test = self.anomaly_detector.scaler.transform(X_test)
        
        print(f"\nDataset prepared:")
        print(f"  Training: {len(X_train)} samples")
        print(f"  Testing: {len(X_test)} samples")
        print(f"  Features: {X.shape[1]}")
        
        return X_train, X_test, y_train, y_test, all_paths
    
    def run_all_methods(self, X_train, X_test, y_train, y_test):
        """Run all anomaly detection methods"""
        print("\n" + "="*70)
        print("Running Anomaly Detection Methods")
        print("="*70)
        
        results = {
            'Method': [],
            'Precision': [],
            'Recall': [],
            'F1-Score': [],
            'ROC-AUC': []
        }
        
        methods = [
            ('K-Nearest Neighbors', self._run_knn),
            ('PCA', self._run_pca),
            ('Isolation Forest', self._run_isolation_forest),
            ('Local Outlier Factor', self._run_lof),
            ('One-Class SVM', self._run_ocsvm),
            ('Elliptic Envelope', self._run_elliptic),
            ('DBSCAN', self._run_dbscan),
            ('Gaussian Mixture Model', self._run_gmm),
            ('K-Means', self._run_kmeans),
            ('Autoencoder', self._run_autoencoder)
        ]
        
        for method_name, method_func in methods:
            print(f"\n{method_name}...")
            try:
                metrics = method_func(X_train, X_test, y_test)
                self._print_results(method_name, metrics)
                results = self._append_results(results, method_name, metrics)
            except Exception as e:
                print(f"  Error: {e}")
        
        methods = [('AdaBoost', self._run_adaboost)]
        for method_name, method_func in methods:
            print(f"\n{method_name}...")
            try:
                metrics = method_func(X_train, y_train, X_test, y_test)
                self._print_results(method_name, metrics)
                results = self._append_results(results, method_name, metrics)
            except Exception as e:
                print(f"  Error: {e}")
        
        self.results_df = pd.DataFrame(results)
        return self.results_df
    
    def _run_knn(self, X_train, X_test, y_test):
        """Run KNN method"""
        model, predictions, scores = self.anomaly_detector.knn_anomaly_detection(X_train, X_test)
        return self._compute_metrics(y_test, predictions, scores)
    
    def _run_pca(self, X_train, X_test, y_test):
        """Run PCA method"""
        model, predictions, scores = self.anomaly_detector.pca_anomaly_detection(X_train, X_test)
        return self._compute_metrics(y_test, predictions, scores)
    
    def _run_isolation_forest(self, X_train, X_test, y_test):
        """Run Isolation Forest method"""
        model, predictions, scores = self.anomaly_detector.isolation_forest_detection(X_train, X_test)
        return self._compute_metrics(y_test, predictions, scores)
    
    def _run_lof(self, X_train, X_test, y_test):
        """Run LOF method"""
        model, predictions, scores = self.anomaly_detector.local_outlier_factor_detection(X_train, X_test)
        return self._compute_metrics(y_test, predictions, scores)
    
    def _run_ocsvm(self, X_train, X_test, y_test):
        """Run One-Class SVM method"""
        model, predictions, scores = self.anomaly_detector.one_class_svm_detection(X_train, X_test)
        return self._compute_metrics(y_test, predictions, scores)
    
    def _run_elliptic(self, X_train, X_test, y_test):
        """Run Elliptic Envelope method"""
        model, predictions, scores = self.anomaly_detector.elliptic_envelope_detection(X_train, X_test)
        return self._compute_metrics(y_test, predictions, scores)
    
    def _run_dbscan(self, X_train, X_test, y_test):
        """Run DBSCAN method"""
        model, predictions, scores = self.anomaly_detector.dbscan_anomaly_detection(X_train, X_test)
        return self._compute_metrics(y_test, predictions, scores)
    
    def _run_gmm(self, X_train, X_test, y_test):
        """Run Gaussian Mixture Model method"""
        model, predictions, scores = self.anomaly_detector.gaussian_mixture_anomaly_detection(X_train, X_test)
        return self._compute_metrics(y_test, predictions, scores)
    
    def _run_kmeans(self, X_train, X_test, y_test):
        model, predictions, scores = self.anomaly_detector.kmeans_anomaly_detection(X_train, X_test)
        return self._compute_metrics(y_test, predictions, scores)
    
    def _run_autoencoder(self, X_train, X_test, y_test):
        model, predictions, scores = self.anomaly_detector.autoencoder_detection(X_train, X_test)
        return self._compute_metrics(y_test, predictions, scores)
    
    def _run_adaboost(self, X_train, y_train, X_test, y_test):
        model, predictions, scores = self.anomaly_detector.adaboost_detection(X_train, y_train, X_test)
        return self._compute_metrics(y_test, predictions, scores)

    def _compute_metrics(self, y_true, predictions, scores):
        """Compute evaluation metrics"""
        y_pred_binary = np.where(predictions == -1, 1, 0)
        
        precision = precision_score(y_true, y_pred_binary, zero_division=0)
        recall = recall_score(y_true, y_pred_binary, zero_division=0)
        f1 = f1_score(y_true, y_pred_binary, zero_division=0)
        
        try:
            roc_auc = roc_auc_score(y_true, scores)
        except:
            roc_auc = 0.0
        
        return {
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'roc_auc': roc_auc
        }
    
    def _print_results(self, method_name, metrics):
        """Print results for a method"""
        print(f"  Precision: {metrics['precision']:.4f}")
        print(f"  Recall: {metrics['recall']:.4f}")
        print(f"  F1-Score: {metrics['f1']:.4f}")
        print(f"  ROC-AUC: {metrics['roc_auc']:.4f}")
    
    def _append_results(self, results, method_name, metrics):
        """Append results to dictionary"""
        results['Method'].append(method_name)
        results['Precision'].append(metrics['precision'])
        results['Recall'].append(metrics['recall'])
        results['F1-Score'].append(metrics['f1'])
        results['ROC-AUC'].append(metrics['roc_auc'])
        return results
    
    def save_results(self, output_file='aitex_anomaly_results.csv'):
        """Save results to CSV"""
        if self.results_df is not None:
            self.results_df.to_csv(output_file, index=False)
            print(f"\nResults saved to {output_file}")
            print("\nResults Summary:")
            print(self.results_df.to_string(index=False))

In [7]:
if __name__ == "__main__":
    # Configuration
    DATASET_ROOT = "/kaggle/input/aitex-fabric-image-database"
    LIMIT_NO_DEFECT = None  # Set to a number to limit non-defect images
    
    try:
        pipeline = AITEXAnomalyDetectionPipeline(DATASET_ROOT)
        
        # Load and prepare dataset
        X_train, X_test, y_train, y_test, paths = pipeline.load_and_prepare_dataset(
            train_ratio=0.7,
            limit_no_defect=LIMIT_NO_DEFECT
        )
        
        # Run all anomaly detection methods
        results = pipeline.run_all_methods(X_train, X_test, y_train, y_test)
        
        # Save results
        pipeline.save_results("aitex_anomaly_results.csv")
        
        # Print summary
        print("ANALYSIS COMPLETE")
        best_idx = results['F1-Score'].idxmax()
        print(f"Best method: {results.loc[best_idx, 'Method']}")
        print(f"Best F1-Score: {results.loc[best_idx, 'F1-Score']:.4f}")
        print(f"Best ROC-AUC: {results.loc[best_idx, 'ROC-AUC']:.4f}")
        
    except Exception as e:
        print(f"Error: {e}")
        import traceback
        traceback.print_exc()

✓ Dataset structure validated
  Defect images: /kaggle/input/aitex-fabric-image-database/Defect_images
  NODefect images: /kaggle/input/aitex-fabric-image-database/NODefect_images
  Mask images: /kaggle/input/aitex-fabric-image-database/Mask_images
Loading AITEX dataset...

AITEX DATASET SUMMARY
Defect images:     106
Non-defect images: 141
Mask images:       107
Total images:      247

Loading defect images...
  Loading: 1/106
  Loading: 50/106
  Loading: 100/106
Loaded 106 defect images

Loading non-defect images...
Found 7 fabric types
  Loaded: 100/141
Loaded 141 non-defect images

Dataset loaded:
  Defect images: 106
  Non-defect images: 141

Extracting features...
  Extracting: 1/247
  Extracting: 100/247
  Extracting: 200/247

Dataset prepared:
  Training: 103 samples
  Testing: 114 samples
  Features: 241

Running Anomaly Detection Methods

K-Nearest Neighbors...
  Precision: 0.5000
  Recall: 0.0423
  F1-Score: 0.0779
  ROC-AUC: 0.4720

PCA...
  Precision: 0.5816
  Recall: 0.80

2025-11-09 10:16:00.751567: E external/local_xla/xla/stream_executor/cuda/cuda_driver.cc:152] failed call to cuInit: INTERNAL: CUDA error: Failed call to cuInit: UNKNOWN ERROR (303)


  Precision: 0.3051
  Recall: 0.2535
  F1-Score: 0.2769
  ROC-AUC: 0.1323

AdaBoost...
  Precision: 0.9000
  Recall: 0.1268
  F1-Score: 0.2222
  ROC-AUC: 0.5244

Results saved to aitex_anomaly_results.csv

Results Summary:
                Method  Precision   Recall  F1-Score  ROC-AUC
   K-Nearest Neighbors   0.500000 0.042254  0.077922 0.471995
                   PCA   0.581633 0.802817  0.674556 0.378972
      Isolation Forest   0.346154 0.253521  0.292683 0.825090
  Local Outlier Factor   0.777778 0.197183  0.314607 0.380609
         One-Class SVM   0.533333 0.676056  0.596273 0.682607
     Elliptic Envelope   0.609091 0.943662  0.740331 0.586309
                DBSCAN   0.622807 1.000000  0.767568 0.500000
Gaussian Mixture Model   0.609091 0.943662  0.740331 0.389781
               K-Means   0.629630 0.478873  0.544000 0.432362
           Autoencoder   0.305085 0.253521  0.276923 0.132329
              AdaBoost   0.900000 0.126761  0.222222 0.524402
ANALYSIS COMPLETE
Best method: DB