In [1]:
import os
import shutil
import random
from glob import glob

In [2]:
# Define the base directory where noisy image folders are located
base_dir = 'Data/Images/'


# Iterate through each subfolder in the root directory
for folder_name in os.listdir(base_dir):
    folder_path = os.path.join(base_dir, folder_name)
    
    # Check if it's a directory
    if os.path.isdir(folder_path):
        # Count the number of files in the directory
        file_count = len([f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))])
        print(f"{folder_name}: {file_count} images")

In [3]:
# Input directory containing noise-type severity folders
input_dir = 'Data/Images/'
output_dir = 'dataset'

In [4]:
# Define the noise types and severity levels
noise_types = [
    "Brightness", "Contrast", "Defocus-blur", "Elastic", "Fog",
    "Frost", "Gaussian-noise", "Impulse-noise", "JPEG-compression", "Motion-Blur",
    "Pixelate", "Rain", "Saturation", "Shot-noise", "Snow", "Spatter",
    "Speckle-noise", "Zoom-Blur"
]

In [79]:
# # the split ratio for train, validation, and test
train_ratio = 0.7
val_ratio = 0.15
test_ratio = 0.15

In [5]:
import os
from glob import glob

# Define the path to your dataset directory
dataset_dir = 'dataset/' 

# Define the splits (train, val, test)
splits = ['train', 'val', 'test']

# Function to count the number of images in each noise type folder
def count_images_in_each_folder():
    # Iterate over each split (train, val, test)
    for split in splits:
        print(f"--- {split.upper()} ---")
        
        # Get the list of noise type folders inside each split directory
        noise_type_folders = os.listdir(os.path.join(dataset_dir, split))
        
        # Iterate over each noise type folder
        for noise_type in noise_type_folders:
            # Get the path to the folder
            folder_path = os.path.join(dataset_dir, split, noise_type)
            
            # Count the number of images in the folder (recursively if there are subfolders)
            images = glob(os.path.join(folder_path, "**", "*.jpg"), recursive=True)  # Assuming images are .jpg files
            
            # Print the number of images for the current noise type folder
            print(f"{noise_type}: {len(images)} images")
        print("\n")

# Run the function to count and display the images in each folder
count_images_in_each_folder()

In [6]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
import time
from PIL import Image
import random

In [7]:
print(torch.cuda.is_available())

In [8]:
# Define transformations
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to 224x224
    transforms.RandomHorizontalFlip(),  # Data augmentation: random horizontal flip
    transforms.ToTensor(),  # Convert image to PyTorch tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize with ImageNet values
])

test_val_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [9]:
# Custom Dataset class
class NoiseClassificationDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Args:
            root_dir (string): Directory with all the images, split into subfolders by class.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.root_dir = root_dir
        self.transform = transform
        self.images = []
        self.labels = []
        self.noise_types = sorted(os.listdir(root_dir))  # Subfolders represent classes
        
        # Load images and labels
        for noise_type_idx, noise_type in enumerate(self.noise_types):
            noise_folder = os.path.join(root_dir, noise_type)
            for img_name in os.listdir(noise_folder):
                img_path = os.path.join(noise_folder, img_name)
                self.images.append(img_path)
                self.labels.append(noise_type_idx)  # Label is index of noise type
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img_path = self.images[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert('RGB')  # Ensure it's an RGB image
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

In [10]:
# Define dataset directories
train_dir = 'dataset/train/'
val_dir = 'dataset/val/'
test_dir = 'dataset/test/'

In [11]:
# Create datasets
train_dataset = NoiseClassificationDataset(root_dir=train_dir, transform=train_transforms)
val_dataset = NoiseClassificationDataset(root_dir=val_dir, transform=test_val_transforms)
test_dataset = NoiseClassificationDataset(root_dir=test_dir, transform=test_val_transforms)

In [13]:
# Define batch size
batch_size = 30

In [14]:
# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

In [15]:
print(train_loader)

In [16]:
# Print dataset sizes
print(f"Train dataset size: {len(train_dataset)} images")
print(f"Validation dataset size: {len(val_dataset)} images")
print(f"Test dataset size: {len(test_dataset)} images")

In [17]:
# Example: iterate through the DataLoader
for images, labels in train_loader:
    print(f"Batch size: {images.size(0)}, Image shape: {images.size()}, Labels shape: {labels.size()}")
    break

In [18]:
# Set the device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [19]:
from torchvision.models import ResNet50_Weights

In [20]:
import torch
print(torch.cuda.is_available())
print(torch.cuda.current_device())

In [None]:
# Load a pre-trained ResNet model and modify the final layer for noise classification
num_classes = 18  # Number of noise types
resnet_model = models.resnet50(weights=ResNet50_Weights.DEFAULT)

# Replace the final fully connected layer to match the number of classes (noise types)
resnet_model.fc = nn.Linear(resnet_model.fc.in_features, num_classes)

# Move the model to the GPU (if available)
resnet_model = resnet_model.to(device)
resnet_model

In [None]:
# Define the loss function (CrossEntropyLoss for classification tasks)
criterion = nn.CrossEntropyLoss()

# Define the optimizer (Adam or SGD)
optimizer = optim.Adam(resnet_model.parameters(), lr=0.001)

# Number of epochs
num_epochs = 40

## Resnet50 Training

In [None]:
# Function to train the model
def train_model_resnet(model, train_loader, val_loader, criterion, optimizer, num_epochs, patience=10):
    best_val_loss = float('inf')  # Initialize best validation loss as infinity
    early_stop_counter = 0  # Count epochs with no improvement

    for epoch in range(num_epochs):
        start_time = time.time()

        # --- TRAINING PHASE ---
        model.train()  # Set the model to training mode
        train_loss = 0.0
        correct_train = 0
        total_train = 0
        
        for images, labels in train_loader:
            # Move images and labels to the GPU
            images, labels = images.to(device), labels.to(device)

            # Forward pass: Get model predictions
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward pass: Compute gradients and update weights
            optimizer.zero_grad()  # Clear previous gradients
            loss.backward()
            optimizer.step()

            # Accumulate the training loss
            train_loss += loss.item() * images.size(0)

            # Track accuracy
            _, predicted = outputs.max(1)
            correct_train += predicted.eq(labels).sum().item()
            total_train += labels.size(0)

        avg_train_loss = train_loss / total_train
        train_accuracy = 100.0 * correct_train / total_train

        # --- VALIDATION PHASE ---
        model.eval()  # Set the model to evaluation mode
        val_loss = 0.0
        correct_val = 0
        total_val = 0

        # We don't need gradients in the validation phase
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)

                # Forward pass: Get model predictions
                outputs = model(images)
                loss = criterion(outputs, labels)

                # Accumulate the validation loss
                val_loss += loss.item() * images.size(0)

                # Track validation accuracy
                _, predicted = outputs.max(1)
                correct_val += predicted.eq(labels).sum().item()
                total_val += labels.size(0)

        avg_val_loss = val_loss / total_val
        val_accuracy = 100.0 * correct_val / total_val

        # --- EARLY STOPPING CHECK ---
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            early_stop_counter = 0  # Reset counter if improvement
            torch.save(model.state_dict(), 'Resnet50_New_TrainingTime.pt')  # Save the best model
            print(f'Validation loss improved. Saving model at epoch {epoch+1}')
        else:
            early_stop_counter += 1  # Increment counter if no improvement

        if early_stop_counter >= patience:
            print(f'Early stopping at epoch {epoch+1}. Best validation loss: {best_val_loss:.4f}')
            break

        # Print the losses and accuracies for this epoch
        print(f'Epoch [{epoch+1}/{num_epochs}]')
        print(f'Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%')
        print(f'Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%')
        print(f'Time taken: {time.time() - start_time:.2f} seconds\n')

    print("Training Complete!")
    # Load the best model before returning
    # model.load_state_dict(torch.load('Resnet50.pt'))

In [108]:
# Start training the model
train_model_resnet(resnet_model, train_loader, val_loader, criterion, optimizer, num_epochs)

In [None]:
def evaluate_model_on_test(model, test_loader, criterion):
    # Load the best model saved during training
    model.load_state_dict(torch.load('Resnet50_New_TrainingTime.pt'))
    model.to(device)  # Ensure the model is on the same device
    model.eval()  # Set the model to evaluation mode
    
    test_loss = 0.0
    correct_test = 0
    total_test = 0

    # We don't need gradients for evaluation
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            # Accumulate the test loss
            test_loss += loss.item() * images.size(0)

            # Track accuracy
            _, predicted = outputs.max(1)
            correct_test += predicted.eq(labels).sum().item()
            total_test += labels.size(0)

    avg_test_loss = test_loss / total_test
    test_accuracy = 100.0 * correct_test / total_test
    
    print(f'Test Loss: {avg_test_loss:.4f}')
    print(f'Test Accuracy: {test_accuracy:.2f}%')

    return avg_test_loss, test_accuracy

In [110]:
evaluate_model_on_test(resnet_model, test_loader, criterion)

In [112]:
resnet_model.load_state_dict(torch.load('Resnet50_New.pt'))
resnet_model.eval()