In [1]:
from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F

import os
from pathlib import Path

from torch.utils.data import Dataset, DataLoader
from glob import glob

import cv2
import numpy as np


In [2]:
input_file = Path('input_image')
img_A = Path('Data/img_A') # high qualit images
img_B = Path('Data/img_B') # low qualit images

In [3]:
class NoiseAdditionModel(nn.Module):
    def __init__(self, noise_type="gaussian", mean=0.0, std=0.1):
        """
        Model to add noise to images.
        Args:
            noise_type (str): Type of noise ('gaussian', 'salt_pepper').
            mean (float): Mean for Gaussian noise.
            std (float): Standard deviation for Gaussian noise.
        """
        super(NoiseAdditionModel, self).__init__()
        self.noise_type = noise_type
        self.mean = mean
        self.std = std

    def forward(self, x):
        """
        Adds noise to input images.
        Args:
            x (torch.Tensor): Input image tensor of shape (B, C, H, W).
        Returns:
            torch.Tensor: Noisy images of shape (B, C, H, W).
        """
        if self.noise_type == "gaussian":
            noise = torch.randn_like(x) * self.std + self.mean
            return torch.clamp(x + noise, 0, 1)  # Ensure pixel values are in [0, 1]

        elif self.noise_type == "salt_pepper":
            # Generate salt and pepper noise
            prob = 0.05  # Noise probability
            salt = torch.rand_like(x) < (prob / 2)
            pepper = torch.rand_like(x) < (prob / 2)
            noisy_x = x.clone()
            noisy_x[salt] = 1.0
            noisy_x[pepper] = 0.0
            return noisy_x

        else:
            raise ValueError(f"Unsupported noise type: {self.noise_type}")





In [4]:


# Initialize the noise addition model
noise_model = NoiseAdditionModel(noise_type="gaussian", mean=0.0, std=0.1)

# Load an image and preprocess it
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor()
])

for _, _, filenames in os.walk(input_file):
    for img in filenames:

        image = Image.open(f'input_image/{img}').convert("RGB")
        image_tensor = transform(image).unsqueeze(0)  # Add batch dimension

        # Add noise to the image
        noise_model.eval()
        with torch.no_grad():
            noisy_image = noise_model(image_tensor)

        # Convert the noisy image to a displayable format
        to_pil = transforms.ToPILImage()
        original_image = to_pil(image_tensor.squeeze(0))
        noisy_image_display = to_pil(noisy_image.squeeze(0))

        # save the qulait image in the img_A and save low qualit in img_B
        original_image.save(f'img_A/{img}')
        noisy_image_display.save(f'img_B/{img}')

In [5]:
# Create the model
class DEGAN_Generator(nn.Module):
    def __init__(self, input_channels=3, output_channels=3, biggest_layer=512):
        super(DEGAN_Generator, self).__init__()

        # Downsampling layers
        self.down1 = self.conv_block(input_channels, 64)
        self.down2 = self.conv_block(64, 128)
        self.down3 = self.conv_block(128, 256)
        self.down4 = self.conv_block(256, biggest_layer // 2)
        self.down5 = self.conv_block(biggest_layer // 2, biggest_layer)

        # Upsampling layers
        self.up4 = self.upsample_block(biggest_layer, biggest_layer // 2)
        self.up3 = self.upsample_block(biggest_layer // 2, 256)
        self.up2 = self.upsample_block(256, 128)
        self.up1 = self.upsample_block(128, 64)

        # Final output layer
        self.final_conv = nn.Conv2d(64, output_channels, kernel_size=1, stride=1, padding=0)

    def conv_block(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def upsample_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        # Downsampling
        d1 = self.down1(x)
        d2 = self.down2(F.max_pool2d(d1, 2))
        d3 = self.down3(F.max_pool2d(d2, 2))
        d4 = self.down4(F.max_pool2d(d3, 2))
        d5 = self.down5(F.max_pool2d(d4, 2))

        # Upsampling
        u4 = self.up4(d5) + d4
        u3 = self.up3(u4) + d3
        u2 = self.up2(u3) + d2
        u1 = self.up1(u2) + d1

        # Final output
        return torch.sigmoid(self.final_conv(u1))

In [6]:
class DEGAN_Discriminator(nn.Module):
    def __init__(self, input_channels=3):
        super(DEGAN_Discriminator, self).__init__()

        self.model = nn.Sequential(
            self.disc_block(input_channels * 2, 64, normalization=False),
            self.disc_block(64, 128),
            self.disc_block(128, 256),
            self.disc_block(256, 512),
            nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1)
        )

    def disc_block(self, in_channels, out_channels, kernel_size=4, stride=2, padding=1, normalization=True):
        layers = [nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)]
        if normalization:
            layers.append(nn.BatchNorm2d(out_channels))
        layers.append(nn.LeakyReLU(0.2, inplace=True))
        return nn.Sequential(*layers)

    def forward(self, img_input, img_target):
        # Concatenate degraded and enhanced images along the channel dimension
        combined_imgs = torch.cat((img_input, img_target), dim=1)
        return self.model(combined_imgs)

In [7]:
def generator_loss(disc_output, gen_output, target, lambda_l1=100):
    adversarial_loss = F.binary_cross_entropy_with_logits(disc_output, torch.ones_like(disc_output))
    l1_loss = F.l1_loss(gen_output, target)
    return adversarial_loss + lambda_l1 * l1_loss

def discriminator_loss(real_output, fake_output):
    real_loss = F.binary_cross_entropy_with_logits(real_output, torch.ones_like(real_output))
    fake_loss = F.binary_cross_entropy_with_logits(fake_output, torch.zeros_like(fake_output))
    return (real_loss + fake_loss) / 2

In [8]:
import torch.optim as optim

# Initialize models
generator = DEGAN_Generator()
discriminator = DEGAN_Discriminator()

# Move models to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
generator = generator.to(device)
discriminator = discriminator.to(device)

# Optimizers
g_optimizer = optim.Adam(generator.parameters(), lr=1e-4, betas=(0.5, 0.999))
d_optimizer = optim.Adam(discriminator.parameters(), lr=1e-4, betas=(0.5, 0.999))

In [9]:
# Create the data loader
# ------------------------
# Paired Image Dataset
# ------------------------
class PairedImageDataset(Dataset):
    def __init__(self, high_quality_dir, low_quality_dir, transform=None):
        """
        Dataset for loading paired high-quality and low-quality images.

        Args:
            high_quality_dir (str): Path to the directory of high-quality images.
            low_quality_dir (str): Path to the directory of low-quality images.
            transform (callable, optional): A function/transform to apply to both images.
        """
        self.high_quality_files = sorted(glob(os.path.join(high_quality_dir, '*')))
        self.low_quality_files = sorted(glob(os.path.join(low_quality_dir, '*')))
        self.transform = transform

        if len(self.high_quality_files) != len(self.low_quality_files):
            raise ValueError("Number of high-quality and low-quality images must be the same.")

        # Ensure files match by filenames (if necessary)
        self.high_quality_files = [
            f for f in self.high_quality_files
            if os.path.basename(f) in {os.path.basename(lp) for lp in self.low_quality_files}
        ]
        self.low_quality_files = [
            f for f in self.low_quality_files
            if os.path.basename(f) in {os.path.basename(hp) for hp in self.high_quality_files}
        ]

    def __len__(self):
        return len(self.high_quality_files)

    def __getitem__(self, idx):
        high_quality_path = self.high_quality_files[idx]
        low_quality_path = self.low_quality_files[idx]

        high_quality_image = Image.open(high_quality_path).convert("RGB")
        low_quality_image = Image.open(low_quality_path).convert("RGB")

        if self.transform:
            high_quality_image = self.transform(high_quality_image)
            low_quality_image = self.transform(low_quality_image)

        return low_quality_image, high_quality_image

In [1]:
high_quality_dir = 'Data/img_A'
low_quality_dir = 'Data/img_B'
# Load dataset
dataset = PairedImageDataset(high_quality_dir, low_quality_dir, transform=transform)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)

NameError: name 'PairedImageDataset' is not defined

In [None]:
# Training loop
epochs = 1
for epoch in range(epochs):
    for degraded, target in dataloader:  # Replace `train_dataloader` with your data loader
        degraded, target = degraded.to(device), target.to(device)

        # Train discriminator
        d_optimizer.zero_grad()
        real_output = discriminator(degraded, target)
        fake_output = discriminator(degraded, generator(degraded).detach())
        d_loss = discriminator_loss(real_output, fake_output)
        d_loss.backward()
        d_optimizer.step()

        # Train generator
        g_optimizer.zero_grad()
        fake_images = generator(degraded)
        fake_output = discriminator(degraded, fake_images)
        g_loss = generator_loss(fake_output, fake_images, target)
        g_loss.backward()
        g_optimizer.step()

    print(f"Epoch [{epoch+1}/{epochs}], Generator Loss: {g_loss.item()}, Discriminator Loss: {d_loss.item()}")

Epoch [1/1], Generator Loss: 12.780847549438477, Discriminator Loss: 0.35651612281799316


In [None]:
# Save the Generator model
torch.save(generator.state_dict(), "noise_generator.pth")