In [None]:
import pickle
import numpy as np
import os


DATASET_PATH = '/kaggle/input/cifar-10-new/'

print("Files in dataset folder:")
print(os.listdir(DATASET_PATH))
print()


def load_batch(batch_path):
    """
    Load a single CIFAR-10 batch file
    
    Args:
        batch_path: Full path to the batch file
        
    Returns:
        data: numpy array of shape (10000, 3072) - flattened images
        labels: list of length 10000 - class labels (0-9)
    """
    with open(batch_path, 'rb') as f:
        batch = pickle.load(f, encoding='bytes')
    
    data = batch[b'data']      # Shape: (10000, 3072)
    labels = batch[b'labels']  # Shape: (10000,)
    
    return data, labels



x_train_batches = []
y_train_batches = []

print("Loading training batches...")
for i in range(1, 6):  # i = 1, 2, 3, 4, 5
    batch_filename = f'data_batch_{i}'
    batch_path = os.path.join(DATASET_PATH, batch_filename)
    
    print(f"  Loading {batch_filename}...", end=' ')
    data, labels = load_batch(batch_path)
    
    x_train_batches.append(data)
    y_train_batches.append(labels)
    
    print(f"‚úì Loaded {len(data)} images")

print(f"\n Successfully loaded all 5 training batches!")
print(f"   Total batches: {len(x_train_batches)}")
print(f"   Each batch shape: {x_train_batches[0].shape}")
print(f"   Each batch has {len(y_train_batches[0])} labels")


In [None]:
print("\nConcatenating all training batches...")

x_train_full = np.concatenate(x_train_batches, axis=0)  # (50000, 3072)

y_train_full = np.concatenate(y_train_batches, axis=0)  # (50000,)

print(f" Concatenation complete!")
print(f"   Combined image data shape: {x_train_full.shape}  ‚Üê (50000 images, 3072 pixel values)")
print(f"   Combined labels shape: {y_train_full.shape}  ‚Üê (50000 class labels)")
print()


print(" Verification:")
print(f"   Total images: {len(x_train_full)}")
print(f"   Pixel values range: [{x_train_full.min()}, {x_train_full.max()}]  ‚Üê Still in [0, 255]")
print(f"   Unique class labels: {np.unique(y_train_full)}  ‚Üê Should be [0-9]")
print(f"   Label distribution:")
for class_id in range(10):
    count = np.sum(y_train_full == class_id)
    print(f"     Class {class_id}: {count} images")


In [None]:
from sklearn.model_selection import train_test_split


print("Step 1: Reshaping images...")

x_train_full = x_train_full.reshape(-1, 3, 32, 32)  # (50000, 3, 32, 32)

x_train_full = x_train_full.transpose(0, 2, 3, 1)   # (50000, 32, 32, 3)

print(f" Reshaped to: {x_train_full.shape}")
print(f"   Format: (num_images, height, width, channels)")
print()

print("Step 2: Normalizing pixel values...")

x_train_full = x_train_full.astype('float32') / 255.0

print(f" Normalized!")
print(f"   Pixel value range: [{x_train_full.min():.3f}, {x_train_full.max():.3f}]")
print(f"   Data type: {x_train_full.dtype}")
print()


print("Step 3: Splitting into train and validation sets...")

x_train, x_val, y_train, y_val = train_test_split(
    x_train_full,
    y_train_full,
    test_size=0.2,          # 20% for validation
    random_state=42,        # For reproducibility
    stratify=y_train_full   # Maintain class balance
)

print(f" Split complete!")
print(f"   Training set: {x_train.shape} ({len(x_train)} images)")
print(f"   Validation set: {x_val.shape} ({len(x_val)} images)")
print()
print(f"   Training labels: {y_train.shape}")
print(f"   Validation labels: {y_val.shape}")
print()

print(" Class Distribution Verification:")
print()
print("Training set:")
for class_id in range(10):
    count = np.sum(y_train == class_id)
    print(f"  Class {class_id}: {count} images")

print()
print("Validation set:")
for class_id in range(10):
    count = np.sum(y_val == class_id)
    print(f"  Class {class_id}: {count} images")

print()
print("=" * 60)
print(" PREPROCESSING COMPLETE!")
print("=" * 60)
print(f"Ready for U-Net model:")
print(f"  x_train: {x_train.shape} - float32 in [0,1] range")
print(f"  x_val: {x_val.shape} - float32 in [0,1] range")
print(f"  Format: (N, H, W, C) = (samples, 32, 32, 3)")
print(f"  Next step: Create PyTorch Dataset with noise addition")


In [None]:
import torch
from torch.utils.data import Dataset


class CIFAR10DenoisingDataset(Dataset):
    """
    Custom PyTorch Dataset for denoising autoencoder training.
    
    This Dataset:
    - Stores clean images and labels
    - Adds Gaussian noise on-the-fly when samples are accessed
    - Converts image format from (H, W, C) to (C, H, W) for PyTorch Conv2d
    - Returns (noisy_image, clean_image, label) tuples
    """
    
    def __init__(self, images, labels, noise_factor=0.3):
        """
        Args:
            images: numpy array of shape (N, 32, 32, 3) with values in [0, 1]
            labels: numpy array of shape (N,) with class indices 0-9
            noise_factor: standard deviation of Gaussian noise
        """
        # Convert numpy arrays to PyTorch tensors
        self.images = torch.from_numpy(images).float()  # (N, 32, 32, 3)
        self.labels = torch.from_numpy(labels).long()    # (N,)
        self.noise_factor = noise_factor
    
    def __len__(self):
        """Return the total number of images in the dataset"""
        return len(self.images)
    
    def __getitem__(self, idx):
        """
        Get a single sample with noise added.
        Called by DataLoader for each sample in a batch.
        
        Args:
            idx: index of the image to retrieve
            
        Returns:
            noisy_img: image with Gaussian noise added, shape (3, 32, 32)
            clean_img: original clean image, shape (3, 32, 32)
            label: class label (0-9)
        """
        clean_img = self.images[idx]  # (32, 32, 3)
        label = self.labels[idx]
        
        noise = torch.randn_like(clean_img) * self.noise_factor
        noisy_img = clean_img + noise
        
        noisy_img = torch.clamp(noisy_img, 0.0, 1.0)
        
        noisy_img = noisy_img.permute(2, 0, 1)  # (32, 32, 3) -> (3, 32, 32)
        clean_img = clean_img.permute(2, 0, 1)  # (32, 32, 3) -> (3, 32, 32)
        
        return noisy_img, clean_img, label

print("Creating PyTorch Datasets...")

# Training dataset
train_dataset = CIFAR10DenoisingDataset(
    images=x_train,      # (40000, 32, 32, 3)
    labels=y_train,      # (40000,)
    noise_factor=0.3     # Adjust noise level as needed
)

# Validation dataset
val_dataset = CIFAR10DenoisingDataset(
    images=x_val,        # (10000, 32, 32, 3)
    labels=y_val,        # (10000,)
    noise_factor=0.3
)

print(f" Datasets created!")
print(f"   Training dataset: {len(train_dataset)} samples")
print(f"   Validation dataset: {len(val_dataset)} samples")
print()

print(" Verifying Dataset output format:")
noisy_sample, clean_sample, label_sample = train_dataset[0]
print(f"   Single sample shapes:")
print(f"   - Noisy image: {noisy_sample.shape}  ‚Üê (C, H, W) format")
print(f"   - Clean image: {clean_sample.shape}")
print(f"   - Label: {label_sample}  ‚Üê Class {label_sample}")
print(f"   - Noisy values range: [{noisy_sample.min():.3f}, {noisy_sample.max():.3f}]")
print(f"   - Clean values range: [{clean_sample.min():.3f}, {clean_sample.max():.3f}]")


In [None]:
from torch.utils.data import DataLoader

print("\nCreating PyTorch DataLoaders...")

train_loader = DataLoader(
    train_dataset,
    batch_size=128,       # Number of images per batch
    shuffle=True,         # Shuffle training data each epoch
    num_workers=2,        # Parallel data loading (adjust based on your system)
    pin_memory=True       # Faster data transfer to GPU
)

val_loader = DataLoader(
    val_dataset,
    batch_size=128,       # Same batch size
    shuffle=False,        # Don't shuffle validation data
    num_workers=2,
    pin_memory=True
)

print(f"DataLoaders created!")
print(f"   Training batches per epoch: {len(train_loader)}")
print(f"   Validation batches per epoch: {len(val_loader)}")
print()


print(" Verifying DataLoader batch format:")

noisy_batch, clean_batch, label_batch = next(iter(train_loader))

print(f"   Batch shapes:")
print(f"   - Noisy batch: {noisy_batch.shape}  ‚Üê (batch, C, H, W)")
print(f"   - Clean batch: {clean_batch.shape}")
print(f"   - Label batch: {label_batch.shape}  ‚Üê (batch,)")
print(f"   - Noisy batch values: [{noisy_batch.min():.3f}, {noisy_batch.max():.3f}]")
print(f"   - Clean batch values: [{clean_batch.min():.3f}, {clean_batch.max():.3f}]")
print()

print("=" * 60)
print(" DATASET AND DATALOADER SETUP COMPLETE!")
print("=" * 60)
print(f"Ready for U-Net training:")
print(f"  - Train batches: {len(train_loader)} √ó {128} images")
print(f"  - Val batches: {len(val_loader)} √ó {128} images")
print(f"  - Format: (batch, 3, 32, 32) ‚Üê Perfect for your U-Net model")
print(f"  - Noise added on-the-fly during training")


In [None]:
import torch
import torch.nn as nn

class UNetDenoisingAutoencoder(nn.Module):
    def __init__(self):
        super(UNetDenoisingAutoencoder, self).__init__()

        initial_filters = 32  

        def conv_block(in_channels, out_channels):
            """
            Standard convolution block with two Conv2d layers + ReLU
            Maintains spatial dimensions (padding=1 with kernel_size=3)
            """
            return nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
                nn.BatchNorm2d(out_channels),  # Added BatchNorm for stability
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True)
            )

        
        # Encoder Block 1
        self.enc1 = conv_block(3, initial_filters)     # Input: (3, 32, 32) ‚Üí Output: (32, 32, 32)
        self.pool1 = nn.MaxPool2d(2)                   # Output: (32, 16, 16)

        # Encoder Block 2
        self.enc2 = conv_block(initial_filters, initial_filters * 2)  # Output: (64, 16, 16)
        self.pool2 = nn.MaxPool2d(2)                   # Output: (64, 8, 8)

        
        self.bottleneck = conv_block(initial_filters * 2, initial_filters * 4)  # Output: (128, 8, 8)

        
        # Decoder Block 2
        self.upconv2 = nn.ConvTranspose2d(initial_filters * 4, initial_filters * 2, 
                                          kernel_size=2, stride=2)  # Output: (64, 16, 16)
        # After concatenation with enc2: (64 + 64 = 128, 16, 16)
        self.dec2 = conv_block(initial_filters * 2 + initial_filters * 2, initial_filters * 2)  # Output: (64, 16, 16)

        # Decoder Block 1
        self.upconv1 = nn.ConvTranspose2d(initial_filters * 2, initial_filters, 
                                          kernel_size=2, stride=2)  # Output: (32, 32, 32)
        # After concatenation with enc1: (32 + 32 = 64, 32, 32)
        self.dec1 = conv_block(initial_filters + initial_filters, initial_filters)  # Output: (32, 32, 32)

    
        self.final_conv = nn.Conv2d(initial_filters, 3, kernel_size=1)  # Output: (3, 32, 32)
        self.sigmoid = nn.Sigmoid()  # Output values in [0, 1]

    def forward(self, x):
        """
        Forward pass with skip connections
        
        Input: x of shape (batch, 3, 32, 32)
        Output: denoised image of shape (batch, 3, 32, 32)
        """
    
        e1 = self.enc1(x)      # (batch, 32, 32, 32)
        p1 = self.pool1(e1)    # (batch, 32, 16, 16)

        e2 = self.enc2(p1)     # (batch, 64, 16, 16)
        p2 = self.pool2(e2)    # (batch, 64, 8, 8)

        bottleneck = self.bottleneck(p2)  # (batch, 128, 8, 8)
        
        # Upsample and concatenate with enc2
        d2 = self.upconv2(bottleneck)      # (batch, 64, 16, 16)
        d2 = torch.cat((d2, e2), dim=1)    # (batch, 128, 16, 16) - Skip connection
        d2 = self.dec2(d2)                 # (batch, 64, 16, 16)

        # Upsample and concatenate with enc1
        d1 = self.upconv1(d2)              # (batch, 32, 32, 32)
        d1 = torch.cat((d1, e1), dim=1)    # (batch, 64, 32, 32) - Skip connection
        d1 = self.dec1(d1)                 # (batch, 32, 32, 32)

        output = self.final_conv(d1)       # (batch, 3, 32, 32)
        output = self.sigmoid(output)      # Values in [0, 1]

        return output

if __name__ == "__main__":
    model = UNetDenoisingAutoencoder()
    
    dummy_input = torch.randn(4, 3, 32, 32)
    
    # Forward pass
    output = model(dummy_input)
    
    print("=" * 60)
    print("U-Net Architecture for CIFAR-10 (32√ó32)")
    print("=" * 60)
    print(f"Input shape:  {dummy_input.shape}")
    print(f"Output shape: {output.shape}")
    print(f"\nTotal parameters: {sum(p.numel() for p in model.parameters()):,}")
    print(f"Trainable parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")
    print("\n Model is ready for CIFAR-10 denoising!")


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models import vgg16


class PerceptualLoss(nn.Module):
    """
    Perceptual Loss using VGG16 pre-trained features.
    
    Compares high-level features extracted from a pre-trained VGG16 network
    instead of raw pixel values. This better matches human perception of
    image similarity.
    """
    
    def __init__(self, device='cuda'):
        super(PerceptualLoss, self).__init__()
        
        # Load pre-trained VGG16 model
        vgg = vgg16(pretrained=True).features.to(device)
        
        self.feature_extractor = nn.Sequential(*list(vgg.children())[:23]).eval()
        
        for param in self.feature_extractor.parameters():
            param.requires_grad = False
        
        self.mse_loss = nn.MSELoss()
        
        self.register_buffer('mean', torch.tensor([0.485, 0.456, 0.406]).view(1, 3, 1, 1))
        self.register_buffer('std', torch.tensor([0.229, 0.224, 0.225]).view(1, 3, 1, 1))
    
    def normalize(self, x):
        """Normalize images from [0,1] to ImageNet normalization"""
        mean = self.mean.to(x.device)
        std = self.std.to(x.device)
        return (x - mean) / std
    
    def forward(self, output, target):
        """
        Calculate perceptual loss between output and target images.
        
        Args:
            output: Denoised images (batch, 3, 32, 32) in [0, 1]
            target: Clean images (batch, 3, 32, 32) in [0, 1]
            
        Returns:
            Perceptual loss value
        """
        output_normalized = self.normalize(output)
        target_normalized = self.normalize(target)
        
        output_features = self.feature_extractor(output_normalized)
        target_features = self.feature_extractor(target_normalized)
        
        perceptual_loss = self.mse_loss(output_features, target_features)
        
        return perceptual_loss


# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Initialize model
model = UNetDenoisingAutoencoder().to(device)

# Define Loss Function (Perceptual Loss only)
criterion = PerceptualLoss(device=device)

print(" Loss function defined: Perceptual Loss")
print(f"   - Based on VGG16 pre-trained features")
print(f"   - Focuses on visual quality and high-level features")

optimizer = optim.Adam(
    model.parameters(),
    lr=0.001,              # Learning rate
    betas=(0.9, 0.999),    # Beta parameters for Adam
    eps=1e-8,              # Epsilon for numerical stability
    weight_decay=1e-5      # L2 regularization
)

print(" Optimizer defined: Adam")
print(f"   - Learning rate: 0.001")
print(f"   - Weight decay: 1e-5")
print(f"   - Model parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.5,
    patience=5,
    verbose=True
)

print("Learning rate scheduler defined: ReduceLROnPlateau")

print("\n" + "="*60)
print("LOSS FUNCTION AND OPTIMIZER SETUP COMPLETE!")
print("="*60)
print("Ready for training loop.")


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import time
import matplotlib.pyplot as plt


print("\n" + "="*70)
print("VERIFYING DATALOADERS")
print("="*70)

try:
    print(f" train_loader found: {len(train_loader)} batches")
    print(f" val_loader found: {len(val_loader)} batches")
except NameError:
    raise NameError(
        "train_loader and/or val_loader not found!\n"
        "Please run the DataLoader creation code first."
    )


print("\n" + "="*70)
print("STARTING U-NET DENOISING TRAINING ON CIFAR-10 WITH MULTI-GPU DataParallel")
print("="*70)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"\nüìç Using device: {device}")

model = UNetDenoisingAutoencoder().to(device)

if torch.cuda.device_count() > 1:
    print(f"‚öôÔ∏è {torch.cuda.device_count()} GPUs detected. Using DataParallel.")
    model = nn.DataParallel(model)

print(f" Model loaded with {sum(p.numel() for p in model.parameters() if p.requires_grad):,} parameters")

criterion = PerceptualLoss(device=device)
print(" Loss function: Perceptual Loss (VGG16-based)")

optimizer = optim.Adam(
    model.parameters(),
    lr=0.001,
    betas=(0.9, 0.999),
    eps=1e-8,
    weight_decay=1e-5
)
print(" Optimizer: Adam (lr=0.001)")

scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.5,
    patience=5,
    verbose=True
)
print(" Scheduler: ReduceLROnPlateau (reduces LR if validation loss plateaus)")

num_epochs = 50  
print(f"\n Training configuration:")
print(f"   - Number of epochs: {num_epochs}")
print(f"   - Training batches: {len(train_loader)}")
print(f"   - Validation batches: {len(val_loader)}")
print(f"   - Batch size: {train_loader.batch_size}")

train_losses = []
val_losses = []
best_val_loss = float('inf')

print("\n" + "="*70)
print("TRAINING STARTED")
print("="*70 + "\n")

start_time = time.time()


for epoch in range(num_epochs):
    epoch_start_time = time.time()
    
    
    model.train()  # Set model to training mode
    train_loss = 0.0
    
    for batch_idx, (noisy_batch, clean_batch, labels) in enumerate(train_loader):
        noisy_batch = noisy_batch.to(device)
        clean_batch = clean_batch.to(device)
        
        outputs = model(noisy_batch)
        
        loss = criterion(outputs, clean_batch)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        
        if (batch_idx + 1) % 50 == 0:
            print(f"   Epoch [{epoch+1}/{num_epochs}] "
                  f"Batch [{batch_idx+1}/{len(train_loader)}] "
                  f"Loss: {loss.item():.6f}")
    
    avg_train_loss = train_loss / len(train_loader)
    train_losses.append(avg_train_loss)
    
    model.eval()  
    val_loss = 0.0
    
    with torch.no_grad():  
        for noisy_batch, clean_batch, labels in val_loader:
            noisy_batch = noisy_batch.to(device)
            clean_batch = clean_batch.to(device)
            
            outputs = model(noisy_batch)
            
            loss = criterion(outputs, clean_batch)
            
            val_loss += loss.item()
    
    avg_val_loss = val_loss / len(val_loader)
    val_losses.append(avg_val_loss)
    
    scheduler.step(avg_val_loss)
    
    epoch_time = time.time() - epoch_start_time
    
    print(f"\n{'='*70}")
    print(f" EPOCH {epoch+1}/{num_epochs} SUMMARY")
    print(f"{'='*70}")
    print(f"   Training Perceptual Loss:   {avg_train_loss:.6f}")
    print(f"   Validation Perceptual Loss: {avg_val_loss:.6f}")
    print(f"   Epoch Time: {epoch_time:.2f}s")
    print(f"   Current Learning Rate: {optimizer.param_groups[0]['lr']:.6f}")
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'train_loss': avg_train_loss,
            'val_loss': avg_val_loss,
        }, 'best_unet_denoising_model.pth')
        print(f"   ‚úÖ Best model saved! (Val Loss: {best_val_loss:.6f})")
    
    print(f"{'='*70}\n")


total_time = time.time() - start_time
print("\n" + "="*70)
print(" TRAINING COMPLETE!")
print("="*70)
print(f"   Total Training Time: {total_time/60:.2f} minutes")
print(f"   Best Validation Loss: {best_val_loss:.6f}")
print(f"   Final Training Loss: {train_losses[-1]:.6f}")
print(f"   Final Validation Loss: {val_losses[-1]:.6f}")

torch.save({
    'epoch': num_epochs,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'train_loss': train_losses[-1],
    'val_loss': val_losses[-1],
    'train_loss_history': train_losses,
    'val_loss_history': val_losses,
}, '/kaggle/working/final_unet_denoising_model.pth')

print("\n Models saved:")
print("   - best_unet_denoising_model.pth (lowest validation loss)")
print("   - final_unet_denoising_model.pth (last epoch)")


plt.figure(figsize=(10, 6))
plt.plot(range(1, num_epochs + 1), train_losses, label='Training Loss', marker='o', markersize=4)
plt.plot(range(1, num_epochs + 1), val_losses, label='Validation Loss', marker='s', markersize=4)
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Perceptual Loss', fontsize=12)
plt.title('Training and Validation Loss Over Epochs', fontsize=14, fontweight='bold')
plt.legend(fontsize=10)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('/kaggle/working/training_loss_plot.png', dpi=300)
print("\n Training loss plot saved: training_loss_plot.png")
plt.show()

print("\n" + "="*70)
print(" ALL DONE! Ready for testing phase.")
print("="*70)


In [None]:
x_test_batches = []
y_test_batches = []

print("Loading testing batch...")

batch_filename = f'test_batch'
batch_path = os.path.join(DATASET_PATH, batch_filename)
    
print(f"  Loading {batch_filename}...", end=' ')
data, labels = load_batch(batch_path)
    
x_test_batches.append(data)
y_test_batches.append(labels)
    
print(f"‚úì Loaded {len(data)} images")

print(f"\n Successfully loaded all 5 training batches!")
print(f"   Total batches: {len(x_test_batches)}")
print(f"   Each batch shape: {x_test_batches[0].shape}")
print(f"   Each batch has {len(y_test_batches[0])} labels")

In [None]:
print("Step 1: Reshaping images...")

x_test_batches = np.array(x_test_batches)
y_test_batches = np.array(y_test_batches)

x_test_batches = x_test_batches.reshape(-1, 3, 32, 32)  # (50000, 3, 32, 32)

x_test_batches = x_test_batches.transpose(0, 2, 3, 1)   # (50000, 32, 32, 3)

print(f"Reshaped to: {x_test_batches.shape}")
print(f"   Format: (num_images, height, width, channels)")
print()


print("Step 2: Normalizing pixel values...")

x_test_batches = x_test_batches.astype('float32') / 255.0


y_test_batches = np.array(y_test_batches).flatten()

print(f" Normalized!")
print(f"   Pixel value range: [{x_test_batches.min():.3f}, {x_test_batches.max():.3f}]")
print(f"   Data type: {x_test_batches.dtype}")
print()

In [None]:
import torch
from torch.utils.data import Dataset

class CIFAR10TestDataset(Dataset):
    """
    PyTorch Dataset for denoising autoencoder testing.
    Returns clean images (as tensors in C, H, W) and their class labels.
    """
    def __init__(self, images, labels):
        """
        images: numpy array shape (N, 32, 32, 3), values in [0, 1]
        labels: numpy array shape (N,)
        """
        self.images = torch.from_numpy(images).float()    # [N, 32, 32, 3]
        self.labels = torch.from_numpy(labels).long()      # [N]
    def __len__(self):
        return len(self.images)
    def __getitem__(self, idx):
        clean_img = self.images[idx]                      # (32, 32, 3)
        label = self.labels[idx]
        # Convert to (C, H, W) for PyTorch models
        clean_img = clean_img.permute(2, 0, 1)            # (3, 32, 32)
        return clean_img, label

test_dataset = CIFAR10TestDataset(
    images=x_test_batches,        # (10000, 32, 32, 3)
    labels=y_test_batches,        # (10000,)
)



In [None]:
test_loader = DataLoader(
    test_dataset,
    batch_size=128,       
    shuffle=False,       
    num_workers=2,
    pin_memory=True
)

print(f" DataLoaders created!")
print(f"   Testing batches per epoch: {len(test_loader)}")
print()


In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
import os

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
NOISE_STD = 0.1  # Gaussian noise std used in training
NUM_CLASSES = 10 # CIFAR-10 classes

model_checkpoint_path = os.path.join('/kaggle/working', 'best_unet_denoising_model.pth')

model = UNetDenoisingAutoencoder().to(device)
if torch.cuda.device_count() > 1:
    print(f"Multi-GPU detected: {torch.cuda.device_count()} GPUs -- using DataParallel.")
    model = nn.DataParallel(model)
else:
    print(f"Single GPU or CPU detected -- using standard inference.")

checkpoint = torch.load(model_checkpoint_path, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()  # Set to evaluation mode

perceptual_criterion = PerceptualLoss(device=device)

print("Starting UNet denoising testing...")
class_loss_counters = defaultdict(list)

with torch.no_grad():
    for batch_idx, (images, labels) in enumerate(test_loader):
        images = images.to(device)      # [B, 3, 32, 32] in [0, 1]
        labels = labels.to(device)

        noisy_images = images + (NOISE_STD * torch.randn_like(images))
        noisy_images = torch.clamp(noisy_images, 0.0, 1.0)

        denoised_images = model(noisy_images)

        for i in range(denoised_images.size(0)):
            pred = denoised_images[i].unsqueeze(0)   # shape [1, 3, 32, 32]
            target = images[i].unsqueeze(0)
            class_idx = labels[i].item()
            loss_val = perceptual_criterion(pred, target).item()
            class_loss_counters[class_idx].append(loss_val)

        if (batch_idx + 1) % 20 == 0:
            print(f"Tested batch {batch_idx + 1}/{len(test_loader)}")

print("Aggregating and plotting per-class test loss...")

mean_loss_per_class = []
for class_idx in range(NUM_CLASSES):
    losses = class_loss_counters[class_idx]
    if losses:
        mean_loss = np.mean(losses)
    else:
        mean_loss = float('nan')
    mean_loss_per_class.append(mean_loss)

plt.figure(figsize=(9, 6))
plt.bar(np.arange(NUM_CLASSES), mean_loss_per_class, color='royalblue', edgecolor='k', alpha=0.8)
plt.xticks(np.arange(NUM_CLASSES), [str(i) for i in range(NUM_CLASSES)], fontsize=12)
plt.xlabel('Class label', fontsize=14)
plt.ylabel('Average Perceptual Loss', fontsize=14)
plt.title('UNet Denoising Autoencoder\nTest Perceptual Loss by Class', fontsize=16, fontweight='bold')
plt.grid(axis='y', linestyle=':', alpha=0.4)
plt.tight_layout()
plt.savefig('/kaggle/working/test_perceptual_loss_per_class.png', dpi=300)
print("\n Per-class perceptual loss plot saved: /kaggle/working/test_perceptual_loss_per_class.png")
plt.show()

for class_idx, mean_loss in enumerate(mean_loss_per_class):
    print(f"Class {class_idx}: Mean perceptual loss = {mean_loss:.5f}")

print("\n Testing complete and results saved.")



In [None]:
plt.figure(figsize=(10, 6))
plt.plot(
    np.arange(NUM_CLASSES), 
    mean_loss_per_class, 
    marker='o', 
    markersize=5, 
    color='royalblue', 
    linewidth=2, 
    label='Test Perceptual Loss'
)
plt.xlabel('Class Label', fontsize=12)
plt.ylabel('Average Perceptual Loss', fontsize=12)
plt.title('Test Perceptual Loss by Class (UNet Denoising Autoencoder)', fontsize=16, fontweight='bold')
plt.xticks(np.arange(NUM_CLASSES), [str(i) for i in range(NUM_CLASSES)], fontsize=12)
plt.grid(True, alpha=0.3)
plt.legend(fontsize=11)
plt.tight_layout()
plt.savefig('/kaggle/working/test_perceptual_loss_per_class_lineplot.png', dpi=300)
print("\n Per-class loss line plot saved: /kaggle/working/test_perceptual_loss_per_class_lineplot.png")
plt.show()
