In [1]:
import os
import glob
import cv2
import numpy as np
from sklearn.model_selection import train_test_split

from google.colab import drive
import os

# Step 1: Mount Google Drive
drive.mount('/content/drive')

# Step 2: Navigate to the current directory containing the notebook
# Assuming the notebook is in '/content/drive/My Drive/Colab Notebooks'
notebook_directory = '/content/drive/My Drive/Colab Notebooks'  # Adjust this path as needed

# Change to the notebook directory
os.chdir(notebook_directory)

# Verify the current directory
print("Current directory:", os.getcwd())

# Step 3: Define the path to the 'test' directory and navigate to it
test_directory = 'test'
os.chdir(test_directory)

# Verify the current directory
print("Current directory:", os.getcwd())

# Step 4: Define the paths to the high and low quality directories
high_quality_dir = 'high'
low_quality_dir = 'low'

# Verify the directories exist
if os.path.exists(high_quality_dir) and os.path.exists(low_quality_dir):
    print(f"'high' directory is located at: {os.path.abspath(high_quality_dir)}")
    print(f"'low' directory is located at: {os.path.abspath(low_quality_dir)}")
else:
    print("One or both of the directories do not exist. Please check the paths.")



Mounted at /content/drive
Current directory: /content/drive/My Drive/Colab Notebooks
Current directory: /content/drive/My Drive/Colab Notebooks/test
'high' directory is located at: /content/drive/My Drive/Colab Notebooks/test/high
'low' directory is located at: /content/drive/My Drive/Colab Notebooks/test/low


In [2]:
# Load image file paths
high_quality_images = sorted(glob.glob(os.path.join(high_quality_dir, '*.png')))
low_quality_images = sorted(glob.glob(os.path.join(low_quality_dir, '*.png')))

# Ensure both directories contain the same number of images
assert len(high_quality_images) == len(low_quality_images), "The number of images in both directories must be the same."

# Print the number of images found
print(f"Found {len(high_quality_images)} high-quality images.")
print(f"Found {len(low_quality_images)} low-quality images.")

# Function to load and pair images
def load_image_pairs(high_quality_images, low_quality_images):
    pairs = []
    for hq_img_path, lq_img_path in zip(high_quality_images, low_quality_images):
        hq_img = cv2.imread(hq_img_path, cv2.IMREAD_COLOR)
        lq_img = cv2.imread(lq_img_path, cv2.IMREAD_COLOR)
        if hq_img is None or lq_img is None:
            print(f"Error reading images: {hq_img_path}, {lq_img_path}")
        else:
            pairs.append((hq_img, lq_img))
    return pairs

# Load image pairs
image_pairs = load_image_pairs(high_quality_images, low_quality_images)

# Check if image pairs are loaded correctly
if len(image_pairs) == 0:
    print("No image pairs were loaded. Check your file paths and image formats.")
else:
    print(f"Successfully loaded {len(image_pairs)} image pairs.")


Found 485 high-quality images.
Found 485 low-quality images.
Successfully loaded 485 image pairs.


In [3]:
# Function to normalize images
def normalize_image(image):
    return image.astype(np.float32) / 255.0

# Normalize all image pairs
normalized_pairs = [(normalize_image(hq), normalize_image(lq)) for hq, lq in image_pairs]

# Check normalization
print(f"First normalized pair shapes: {normalized_pairs[0][0].shape}, {normalized_pairs[0][1].shape}")

# Extract high-quality and low-quality images from pairs
high_quality_images, low_quality_images = zip(*normalized_pairs)

# Split the dataset
hq_train, hq_test, lq_train, lq_test = train_test_split(
    high_quality_images, low_quality_images, test_size=0.2, random_state=42)

hq_train, hq_val, lq_train, lq_val = train_test_split(
    hq_train, lq_train, test_size=0.25, random_state=42)  # 0.25 * 0.8 = 0.2

# Convert lists to numpy arrays for easier processing later
hq_train, hq_val, hq_test = np.array(hq_train), np.array(hq_val), np.array(hq_test)
lq_train, lq_val, lq_test = np.array(lq_train), np.array(lq_val), np.array(lq_test)

print("Training set size:", len(hq_train))
print("Validation set size:", len(hq_val))
print("Test set size:", len(hq_test))


First normalized pair shapes: (400, 600, 3), (400, 600, 3)
Training set size: 291
Validation set size: 97
Test set size: 97


In [4]:
import torch
from torch.utils.data import Dataset, DataLoader

# Custom dataset class
class ImagePairDataset(Dataset):
    def __init__(self, hq_images, lq_images):
        self.hq_images = hq_images
        self.lq_images = lq_images

    def __len__(self):
        return len(self.hq_images)

    def __getitem__(self, idx):
        hq_img = self.hq_images[idx]
        lq_img = self.lq_images[idx]
        hq_img = torch.from_numpy(hq_img.transpose((2, 0, 1)))
        lq_img = torch.from_numpy(lq_img.transpose((2, 0, 1)))
        return hq_img, lq_img

# Create data loaders
batch_size = 16
train_dataset = ImagePairDataset(hq_train, lq_train)
val_dataset = ImagePairDataset(hq_val, lq_val)
test_dataset = ImagePairDataset(hq_test, lq_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [5]:
import torch
import torch.nn as nn
import torch.optim as optim

# Define your denoising model (example architecture)
class DenoisingCNN(nn.Module):
    def __init__(self):
        super(DenoisingCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 3, kernel_size=3, padding=1)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.conv3(x)
        return x


In [6]:
def initialize_model():
    model = DenoisingCNN()
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    return model, criterion, optimizer


In [7]:
from torch.utils.data import Dataset, DataLoader

class ImagePairDataset(Dataset):
    def __init__(self, hq_images, lq_images):
        self.hq_images = hq_images
        self.lq_images = lq_images

    def __len__(self):
        return len(self.hq_images)

    def __getitem__(self, idx):
        hq_img = self.hq_images[idx]
        lq_img = self.lq_images[idx]
        hq_img = torch.from_numpy(hq_img.transpose((2, 0, 1))).float()
        lq_img = torch.from_numpy(lq_img.transpose((2, 0, 1))).float()
        return hq_img, lq_img

def create_dataloaders(hq_train, lq_train, hq_val, lq_val, batch_size=16):
    train_dataset = ImagePairDataset(hq_train, lq_train)
    val_dataset = ImagePairDataset(hq_val, lq_val)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, val_loader


In [12]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=3, device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        for inputs, targets in train_loader:
            print('It is working')
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * inputs.size(0)

        train_loss /= len(train_loader.dataset)

        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for inputs_val, targets_val in val_loader:
                inputs_val, targets_val = inputs_val.to(device), targets_val.to(device)
                outputs_val = model(inputs_val)
                loss = criterion(outputs_val, targets_val)
                val_loss += loss.item() * inputs_val.size(0)

        val_loss /= len(val_loader.dataset)

        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
    return model


In [18]:
# Example usage
# Assuming hq_train, lq_train, hq_val, and lq_val are defined numpy arrays of your images
batch_size = 16
train_loader, val_loader = create_dataloaders(hq_train, lq_train, hq_val, lq_val, batch_size=batch_size)

# Initialize model, criterion, and optimizer
model, criterion, optimizer = initialize_model()

# Train the model
trained_model = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=2)

# Save the trained model
torch.save(trained_model.state_dict(), '/content/denoising_model.pth')


It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
Epoch [1/2], Train Loss: 0.0039, Val Loss: 0.0026
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
It is working
Epoch [2/2], Train Loss: 0.0035, Val Loss: 0.0026


# New Section

In [19]:
# Load the saved model
model = DenoisingCNN()
model.load_state_dict(torch.load('denoising_model.pth'))
model.eval()

# Evaluate on test set
test_loss = 0.0
with torch.no_grad():
    for inputs_test, targets_test in test_loader:
        outputs_test = model(inputs_test.float())
        loss = criterion(outputs_test, targets_test.float())
        test_loss += loss.item() * inputs_test.size(0)

test_loss /= len(test_loader.dataset)
print(f'Test Loss: {test_loss:.4f}')


Test Loss: 0.0035


In [28]:
from skimage.metrics import peak_signal_noise_ratio, structural_similarity

def calculate_metrics(denoised, original):
    psnr_value = peak_signal_noise_ratio(original, denoised)
    # Adjust win_size to be smaller than the image size
    ssim_value, _ = structural_similarity(original, denoised, win_size=5, full=True, multichannel=True)
    return psnr_value, ssim_value

psnr_values = []
ssim_values = []

with torch.no_grad():
    for idx, (lq_tensor, original_img) in enumerate(test_loader):
        denoised_tensor = model(lq_tensor.float())
        denoised_img = denoised_tensor.squeeze().cpu().numpy()
        original_img = original_img.squeeze().cpu().numpy()
        #psnr_value, ssim_value = calculate_metrics(denoised_img, original_img)
        #psnr_values.append(psnr_value)
        #ssim_values.append(ssim_value)
        #print(f"Image {idx + 1}/{len(test_loader)} - PSNR: {psnr_value:.2f} dB, SSIM: {ssim_value:.4f}")

# Calculate average PSNR and SSIM over all test images
#avg_psnr = np.mean(psnr_values)
#avg_ssim = np.mean(ssim_values)
#print(f"\nAverage PSNR on test set: {avg_psnr:.2f} dB")
#print(f"Average SSIM on test set: {avg_ssim:.4f}")

In [31]:
import skimage.transform

In [32]:
# Resize the images to be larger than the win_size
denoised_img = skimage.transform.resize(denoised_img, (500, 700))
original_img = skimage.transform.resize(original_img, (500, 700))

In [34]:
# Adjust the win_size to be smaller than the size of the images
psnr_value, ssim_value = calculate_metrics(denoised_img, original_img)

  ssim_value, _ = structural_similarity(original, denoised, win_size=5, full=True, multichannel=True)


In [35]:
psnr_value

26.158963424072354