<a href="https://www.kaggle.com/code/shashankroy568/cnnml6?scriptVersionId=262855121" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [1]:
# CNN Implementation on MVTec Dataset - FIXED VERSION
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import torchvision.transforms as transforms
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight
import time
import kagglehub
from pathlib import Path
from PIL import Image
import os

# Configuration
IMG_HEIGHT = 224
IMG_WIDTH = 224
BATCH_SIZE = 16
EPOCHS = 50
LEARNING_RATE = 0.001

def download_mvtec_dataset():
    """Download MVTec dataset"""
    print("📥 Downloading MVTec dataset...")
    try:
        dataset_path = kagglehub.dataset_download("shashankroy568/mvtec-anomaly-detection")
        print(f"✅ Dataset downloaded to: {dataset_path}")
        
        root_path = Path(dataset_path)
        
        # Find MVTec categories
        mvtec_categories = [
            'bottle', 'cable', 'capsule', 'carpet', 'grid',
            'hazelnut', 'leather', 'metal_nut', 'pill', 'screw',
            'tile', 'toothbrush', 'transistor', 'wood', 'zipper'
        ]
        
        found_categories = {}
        for root, dirs, files in os.walk(root_path):
            for dir_name in dirs:
                if dir_name in mvtec_categories:
                    category_path = Path(root) / dir_name
                    found_categories[dir_name] = category_path
        
        return root_path, found_categories
    except Exception as e:
        print(f"❌ Dataset download error: {e}")
        return None, {}

class MVTecCNNDataset(Dataset):
    """MVTec dataset for CNN training"""
    
    def __init__(self, root_path, category, split='train', transform=None):
        self.root_path = Path(root_path)
        self.category = category
        self.split = split
        self.transform = transform or transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.CenterCrop((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                              std=[0.229, 0.224, 0.225])
        ])
        
        self.samples = self._load_samples()
        print(f"📊 CNN {category}: Loaded {len(self.samples)} {split} samples")
    
    def _load_samples(self):
        """Load samples with train/test split for CNN"""
        samples = []
        
        # Find category path
        possible_paths = [
            self.root_path / self.category,
            self.root_path / "mvtec_anomaly_detection" / self.category,
        ]
        
        category_path = None
        for path in possible_paths:
            if path.exists():
                category_path = path
                break
        
        if not category_path:
            print(f"❌ Category path not found for {self.category}")
            return samples
        
        # Collect all normal samples
        train_good_path = category_path / 'train' / 'good'
        test_good_path = category_path / 'test' / 'good'
        
        all_normal_samples = []
        if train_good_path.exists():
            for ext in ['*.png', '*.jpg', '*.jpeg', '*.bmp']:
                for img_path in train_good_path.rglob(ext):
                    all_normal_samples.append((str(img_path), 0))
        
        if test_good_path.exists():
            for ext in ['*.png', '*.jpg', '*.jpeg', '*.bmp']:
                for img_path in test_good_path.rglob(ext):
                    all_normal_samples.append((str(img_path), 0))
        
        # Collect all anomaly samples
        all_anomaly_samples = []
        test_path = category_path / 'test'
        if test_path.exists():
            for defect_dir in test_path.iterdir():
                if defect_dir.is_dir() and defect_dir.name != 'good':
                    for ext in ['*.png', '*.jpg', '*.jpeg', '*.bmp']:
                        for img_path in defect_dir.rglob(ext):
                            all_anomaly_samples.append((str(img_path), 1))
        
        # Split data for CNN training (70% train, 30% test)
        np.random.seed(42)
        np.random.shuffle(all_normal_samples)
        np.random.shuffle(all_anomaly_samples)
        
        normal_split = int(0.7 * len(all_normal_samples))
        anomaly_split = int(0.7 * len(all_anomaly_samples))
        
        if self.split == 'train':
            samples = all_normal_samples[:normal_split] + all_anomaly_samples[:anomaly_split]
        else:  # test
            samples = all_normal_samples[normal_split:] + all_anomaly_samples[anomaly_split:]
        
        # Count samples
        normal_count = len([s for s in samples if s[1] == 0])
        anomaly_count = len([s for s in samples if s[1] == 1])
        
        print(f"   ✅ {self.category} CNN {self.split}: {normal_count} normal, {anomaly_count} anomaly")
        
        return samples
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        
        try:
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
            return image, label
        except Exception as e:
            print(f"⚠️ Error loading {img_path}: {e}")
            dummy_image = torch.zeros((3, 224, 224))
            return dummy_image, label

class CNN_Model(nn.Module):
    """CNN architecture for binary classification"""
    
    def __init__(self, num_classes=2):
        super(CNN_Model, self).__init__()
        
        # Feature extraction layers
        self.features = nn.Sequential(
            # Block 1
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.25),
            
            # Block 2
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.25),
            
            # Block 3
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.3),
            
            # Block 4
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.3),
        )
        
        # Classifier
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((7, 7)),
            nn.Flatten(),
            nn.Linear(256 * 7 * 7, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(0.3),
            nn.Linear(128, num_classes)
        )
    
    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

def calculate_metrics(y_true, y_pred_probs, threshold=0.5):
    """Calculate comprehensive metrics - FIXED VERSION"""
    # Convert to numpy arrays if needed
    if isinstance(y_true, list):
        y_true = np.array(y_true)
    if isinstance(y_pred_probs, list):
        y_pred_probs = np.array(y_pred_probs)
    
    # Ensure proper data types
    y_true = y_true.astype(int)
    y_pred_probs = y_pred_probs.astype(float)
    
    # Create binary predictions
    y_pred = (y_pred_probs >= threshold).astype(int)
    
    # Calculate metrics
    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    auc = roc_auc_score(y_true, y_pred_probs) if len(set(y_true)) > 1 else 0.5
    
    # Confusion matrix
    try:
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    except ValueError:
        # Handle case where confusion matrix can't be calculated
        tp = fp = tn = fn = 0
    
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
    accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0
    
    return {
        'auc': auc,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'specificity': specificity,
        'accuracy': accuracy,
        'true_positives': int(tp),
        'true_negatives': int(tn),
        'false_positives': int(fp),
        'false_negatives': int(fn)
    }

def train_cnn_single_category(root_path, category, results_dict):
    """Train CNN on a single category"""
    
    print(f"\n" + "="*50)
    print(f"🧠 TRAINING CNN: {category.upper()}")
    print(f"="*50)
    
    start_time = time.time()
    
    try:
        # Create datasets
        train_dataset = MVTecCNNDataset(root_path, category, 'train')
        test_dataset = MVTecCNNDataset(root_path, category, 'test')
        
        if len(train_dataset) == 0 or len(test_dataset) == 0:
            print(f"⚠️ Insufficient data for {category}")
            results_dict[category] = {'status': 'failed', 'reason': 'insufficient_data'}
            return
        
        # Check class distribution
        train_labels = [train_dataset.samples[i][1] for i in range(len(train_dataset))]
        test_labels = [test_dataset.samples[i][1] for i in range(len(test_dataset))]
        
        if len(set(train_labels)) < 2 or len(set(test_labels)) < 2:
            print(f"⚠️ {category}: Single class detected")
            results_dict[category] = {'status': 'failed', 'reason': 'single_class'}
            return
        
        # Handle class imbalance with weighted sampling
        class_weights = compute_class_weight('balanced', classes=np.unique(train_labels), y=train_labels)
        sample_weights = [class_weights[label] for label in train_labels]
        sampler = WeightedRandomSampler(sample_weights, len(sample_weights))
        
        # Create data loaders
        train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=sampler, num_workers=2)
        test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
        
        print(f"✅ {category}: Dataset ready - {len(train_dataset)} train, {len(test_dataset)} test")
        
        # Initialize model
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        model = CNN_Model(num_classes=2).to(device)
        
        # Loss function and optimizer with class weights
        class_weights_tensor = torch.FloatTensor(class_weights).to(device)
        criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)
        optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5)
        
        print(f"🔄 Training CNN for {category}...")
        
        # Training loop
        best_auc = 0
        patience_counter = 0
        patience = 10
        best_predictions = []
        best_targets = []
        
        for epoch in range(EPOCHS):
            # Training
            model.train()
            train_loss = 0
            train_correct = 0
            train_total = 0
            
            for batch_idx, (data, target) in enumerate(train_loader):
                data, target = data.to(device), target.to(device)
                
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
                
                train_loss += loss.item()
                _, predicted = torch.max(output.data, 1)
                train_total += target.size(0)
                train_correct += (predicted == target).sum().item()
            
            train_acc = 100. * train_correct / train_total
            
            # Validation
            model.eval()
            val_predictions = []
            val_targets = []
            
            with torch.no_grad():
                for data, target in test_loader:
                    data, target = data.to(device), target.to(device)
                    output = model(data)
                    prob = torch.softmax(output, dim=1)[:, 1]  # Probability of anomaly class
                    
                    val_predictions.extend(prob.cpu().numpy())
                    val_targets.extend(target.cpu().numpy())
            
            # Calculate AUC
            val_auc = roc_auc_score(val_targets, val_predictions) if len(set(val_targets)) > 1 else 0.5
            scheduler.step(1 - val_auc)  # Use AUC for learning rate scheduling
            
            # Early stopping based on AUC
            if val_auc > best_auc:
                best_auc = val_auc
                patience_counter = 0
                # Save best model predictions
                best_predictions = val_predictions.copy()
                best_targets = val_targets.copy()
            else:
                patience_counter += 1
            
            if epoch % 10 == 0:
                print(f"   Epoch {epoch}: Train Acc = {train_acc:.2f}%, Val AUC = {val_auc:.4f}")
            
            if patience_counter >= patience:
                print(f"   Training completed at epoch {epoch}")
                break
        
        # Calculate final metrics using best model - FIXED VERSION
        if len(best_predictions) > 0 and len(best_targets) > 0:
            metrics = calculate_metrics(best_targets, best_predictions)
        else:
            # Fallback metrics if no best predictions saved
            metrics = {
                'auc': 0.5,
                'precision': 0.0,
                'recall': 0.0,
                'f1_score': 0.0,
                'specificity': 0.0,
                'accuracy': 0.0,
                'true_positives': 0,
                'true_negatives': 0,
                'false_positives': 0,
                'false_negatives': 0
            }
        
        training_time = time.time() - start_time
        
        # Store results
        results_dict[category] = {
            'status': 'success',
            'auc_score': metrics['auc'],
            'precision': metrics['precision'],
            'recall': metrics['recall'],
            'f1_score': metrics['f1_score'],
            'specificity': metrics['specificity'],
            'accuracy': metrics['accuracy'],
            'true_positives': metrics['true_positives'],
            'true_negatives': metrics['true_negatives'],
            'false_positives': metrics['false_positives'],
            'false_negatives': metrics['false_negatives'],
            'train_samples': len(train_dataset),
            'test_samples': len(test_dataset),
            'training_time': training_time,
            'model_type': 'cnn',
            'epochs_trained': epoch + 1
        }
        
        print(f"✅ {category}: AUC Score = {metrics['auc']:.4f}")
        print(f"📊 {category}: Precision = {metrics['precision']:.4f}, Recall = {metrics['recall']:.4f}, F1 = {metrics['f1_score']:.4f}")
        print(f"⏱️  {category}: Training time = {training_time:.1f}s")
        
    except Exception as e:
        print(f"❌ {category}: Training failed - {e}")
        import traceback
        traceback.print_exc()
        results_dict[category] = {'status': 'failed', 'reason': str(e)}

def run_cnn_pipeline():
    """Run CNN training pipeline"""
    
    print("🧠 Starting CNN Training Pipeline")
    print("=" * 50)
    
    # Download dataset
    root_path, categories = download_mvtec_dataset()
    
    if not root_path or not categories:
        print("❌ Cannot proceed without dataset")
        return
    
    # Target categories
    target_categories = ['bottle', 'metal_nut', 'capsule', 'cable', 'screw', 'pill', 'transistor', 'hazelnut']
    available_targets = [cat for cat in target_categories if cat in categories]
    
    if not available_targets:
        print("❌ No target categories found")
        return
    
    print(f"\n🎯 Will train CNN on {len(available_targets)} categories: {available_targets}")
    
    # Train each category
    cnn_results = {}
    total_start_time = time.time()
    
    for i, category in enumerate(available_targets, 1):
        print(f"\n🔄 Progress: {i}/{len(available_targets)} categories")
        train_cnn_single_category(root_path, category, cnn_results)
    
    total_time = time.time() - total_start_time
    
    # Results summary
    print(f"\n" + "="*50)
    print("📋 CNN TRAINING RESULTS")
    print("="*50)
    
    successful_trainings = [cat for cat, res in cnn_results.items() if res.get('status') == 'success']
    failed_trainings = [cat for cat, res in cnn_results.items() if res.get('status') == 'failed']
    
    print(f"✅ Successful: {len(successful_trainings)}/{len(available_targets)} categories")
    print(f"❌ Failed: {len(failed_trainings)}/{len(available_targets)} categories")
    print(f"⏱️  Total time: {total_time:.1f}s")
    
    if successful_trainings:
        print(f"\n📊 DETAILED CNN RESULTS:")
        print("-" * 120)
        print(f"{'Category':<12} {'AUC':<8} {'Precision':<9} {'Recall':<8} {'F1':<8} {'Acc':<8} {'Train':<6} {'Test':<5} {'Time':<6}")
        print("-" * 120)
        
        for category in successful_trainings:
            res = cnn_results[category]
            print(f"{category:<12} {res['auc_score']:<8.4f} {res['precision']:<9.4f} {res['recall']:<8.4f} "
                  f"{res['f1_score']:<8.4f} {res['accuracy']:<8.4f} {res['train_samples']:<6} {res['test_samples']:<5} "
                  f"{res['training_time']:<6.1f}s")
        
        # Calculate averages
        avg_auc = np.mean([cnn_results[cat]['auc_score'] for cat in successful_trainings])
        avg_precision = np.mean([cnn_results[cat]['precision'] for cat in successful_trainings])
        avg_recall = np.mean([cnn_results[cat]['recall'] for cat in successful_trainings])
        avg_f1 = np.mean([cnn_results[cat]['f1_score'] for cat in successful_trainings])
        avg_accuracy = np.mean([cnn_results[cat]['accuracy'] for cat in successful_trainings])
        
        print("-" * 120)
        print(f"{'AVERAGE':<12} {avg_auc:<8.4f} {avg_precision:<9.4f} {avg_recall:<8.4f} "
              f"{avg_f1:<8.4f} {avg_accuracy:<8.4f}")
        print("-" * 120)
    
    # Export results
    if successful_trainings:
        cnn_df = pd.DataFrame([
            {
                'category': category,
                'model': 'CNN',
                'auc_score': cnn_results[category]['auc_score'],
                'precision': cnn_results[category]['precision'],
                'recall': cnn_results[category]['recall'],
                'f1_score': cnn_results[category]['f1_score'],
                'accuracy': cnn_results[category]['accuracy'],
                'train_samples': cnn_results[category]['train_samples'],
                'test_samples': cnn_results[category]['test_samples'],
                'training_time': cnn_results[category]['training_time'],
                'model_type': cnn_results[category]['model_type'],
                'epochs_trained': cnn_results[category]['epochs_trained']
            }
            for category in successful_trainings
        ])
        
        cnn_df.to_csv('mvtec_cnn_results.csv', index=False)
        print(f"\n💾 CNN results exported to: mvtec_cnn_results.csv")
    
    print(f"\n✅ CNN training pipeline completed!")
    
    return cnn_results

# Run the CNN pipeline
if __name__ == "__main__":
    cnn_results = run_cnn_pipeline()


KeyboardInterrupt: 