**Start the dataset by adding synthetic noise to the image dataset available**

In [None]:
import os
import cv2
import numpy as np
from google.colab import drive

# Mount Google Drive
#drive.mount('/content/drive')

# Paths
input_folder = '/content/drive/MyDrive/Dataset_noisy/Kodak24'
output_folder = '/content/drive/MyDrive/Dataset_noisy/noisy_kodak24'

# Ensure output folder exists
os.makedirs(output_folder, exist_ok=True)

# Function to add Gaussian noise
def add_gaussian_noise(image, std_dev):
    noise = np.random.normal(0, std_dev, image.shape).astype(np.float32)
    noisy_image = image.astype(np.float32) + noise
    noisy_image = np.clip(noisy_image, 0, 255).astype(np.uint8)
    return noisy_image

# Noise levels
noise_levels = [10, 20, 30, 40]

# Process images
for noise_level in noise_levels:
    noise_output_folder = os.path.join(output_folder, f'noise_{noise_level}')
    os.makedirs(noise_output_folder, exist_ok=True)

    for filename in os.listdir(input_folder):
        if filename.lower().endswith(('jpg', 'png','bmp')):
            file_path = os.path.join(input_folder, filename)

            # Read image
            image = cv2.imread(file_path)
            if image is None:
                print(f"Skipping file {filename} (not an image)")
                continue

            # Add Gaussian noise
            noisy_image = add_gaussian_noise(image, noise_level)

            # Save noisy image
            noisy_image_path = os.path.join(noise_output_folder, filename)
            cv2.imwrite(noisy_image_path, noisy_image)
            print(f"Saved noisy image: {noisy_image_path}")

print("All noisy images have been created and saved.")


# ADD Some libararies to the model

In [None]:
pip install torchsummary


In [None]:
pip install pillow


# code to extract parameters.

In [None]:
import torch
import torch.nn as nn
import torchsummary

# Define CBAM attention module
class CBAM(nn.Module):
    def __init__(self, channels, reduction=16):
        super(CBAM, self).__init__()
        self.channel_attention = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(channels, channels // reduction, kernel_size=1, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(channels // reduction, channels, kernel_size=1, stride=1, padding=0),
            nn.Sigmoid()
        )
        self.spatial_attention = nn.Sequential(
            nn.Conv2d(2, 1, kernel_size=7, stride=1, padding=3),
            nn.Sigmoid()
        )

    def forward(self, x):
        # Channel Attention
        channel_att = self.channel_attention(x)
        x = x * channel_att

        # Spatial Attention
        avg_pool = torch.mean(x, dim=1, keepdim=True)
        max_pool, _ = torch.max(x, dim=1, keepdim=True)
        spatial_input = torch.cat([avg_pool, max_pool], dim=1)
        spatial_att = self.spatial_attention(spatial_input)
        x = x * spatial_att

        return x

# Define Attentive Dilated Convolution Block
class AttentiveDilatedConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, dilation_rates=(1, 2, 3)):
        super(AttentiveDilatedConvBlock, self).__init__()
        self.dilated_convs = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, dilation=dilation_rates[0], padding=dilation_rates[0]),
            nn.ReLU(),
            nn.BatchNorm2d(out_channels),

            nn.Conv2d(out_channels, out_channels, kernel_size=3, dilation=dilation_rates[1], padding=dilation_rates[1]),
            nn.ReLU(),
            nn.BatchNorm2d(out_channels),

            nn.Conv2d(out_channels, out_channels, kernel_size=3, dilation=dilation_rates[2], padding=dilation_rates[2]),
            nn.ReLU(),
            nn.BatchNorm2d(out_channels)
        )
        self.cbam = CBAM(out_channels)

    def forward(self, x):
        x = self.dilated_convs(x)
        x = self.cbam(x)
        return x

# Define the full model
class DenoisingModel(nn.Module):
    def __init__(self, num_blocks=15, in_channels=3, out_channels=3, block_channels=64):
        super(DenoisingModel, self).__init__()
        self.initial_conv = nn.Conv2d(in_channels, block_channels, kernel_size=3, stride=1, padding=1)
        self.blocks = nn.Sequential(
            *[AttentiveDilatedConvBlock(block_channels, block_channels) for _ in range(num_blocks)]
        )
        self.final_conv = nn.Conv2d(block_channels, out_channels, kernel_size=3, stride=1, padding=1)

    def forward(self, x):
        x = self.initial_conv(x)
        x = self.blocks(x)
        x = self.final_conv(x)
        return x

# Instantiate and summarize the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = DenoisingModel().to(device)

# Print model summary
torchsummary.summary(model, input_size=(3, 128, 128))  # Assuming input image size is 128x128


# code to train the model

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import os

# ========== ADC BLOCK ==========
class ADCBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ADCBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.skip_connection = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        residual = self.skip_connection(x)
        x = self.conv1(x)
        x = self.relu(x)
        x = self.conv2(x)
        return x + residual

# ========== MODEL ==========
class DenoisingModel(nn.Module):
    def __init__(self, num_blocks):
        super(DenoisingModel, self).__init__()
        blocks = [ADCBlock(3, 64)]
        blocks += [ADCBlock(64, 64) for _ in range(num_blocks - 1)]
        self.blocks = nn.Sequential(*blocks)
        self.final_conv = nn.Conv2d(64, 3, kernel_size=1)

    def forward(self, x):
        x = self.blocks(x)
        return self.final_conv(x)

# ========== DATASET ==========
class ImageDataset(Dataset):
    def __init__(self, noisy_dir, clean_dir, transform=None):
        self.noisy_images = sorted([os.path.join(noisy_dir, f) for f in os.listdir(noisy_dir) if f.endswith(('.png', '.jpg', '.jpeg'))])
        self.clean_images = sorted([os.path.join(clean_dir, f) for f in os.listdir(clean_dir) if f.endswith(('.png', '.jpg', '.jpeg'))])
        self.transform = transform if transform else transforms.ToTensor()

    def __len__(self):
        return min(len(self.noisy_images), len(self.clean_images))

    def __getitem__(self, idx):
        noisy_image = Image.open(self.noisy_images[idx]).convert('RGB')
        clean_image = Image.open(self.clean_images[idx]).convert('RGB')
        return self.transform(noisy_image), self.transform(clean_image)

# ========== TRAIN FUNCTION ==========
def train_model(model, train_loader, optimizer, criterion, num_epochs, checkpoint_path, device):
    os.makedirs(checkpoint_path, exist_ok=True)
    print("‚úÖ Starting training...")

    for epoch in range(num_epochs):
        print(f" Starting epoch {epoch+1}")
        model.train()
        running_loss = 0.0

        for batch_idx, (noisy_images, clean_images) in enumerate(train_loader):
            noisy_images = noisy_images.to(device)
            clean_images = clean_images.to(device)

            optimizer.zero_grad()
            outputs = model(noisy_images)
            loss = criterion(outputs, clean_images)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            if batch_idx % 5 == 0:
                print(f"   Batch {batch_idx}, Loss: {loss.item():.4f}")

        avg_loss = running_loss / len(train_loader)
        print(f" Epoch [{epoch+1}/{num_epochs}], Avg Loss: {avg_loss:.4f}")

        # Save checkpoint every 10 epochs
        if (epoch + 1) % 10 == 0:
            checkpoint = {
                'epoch': epoch + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': avg_loss,
            }
            filename = os.path.join(checkpoint_path, f'checkpoint_epoch_{epoch+1}.pth')
            torch.save(checkpoint, filename)
            print(f"üíæ Checkpoint saved at {filename}")

# ========== MAIN SETUP ==========
# Paths
noisy_dir = "/content/drive/MyDrive/Dataset_noisy/noisy_bsd68/noise_10"
clean_dir = "/content/drive/MyDrive/Datasets/BSD68"
checkpoint_path = "/content/drive/MyDrive/Dataset_noisy/validation/training1"

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f" Using device: {device}")

# Transform
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

# Dataset and Dataloader
dataset = ImageDataset(noisy_dir, clean_dir, transform=transform)
print(f" Number of training images: {len(dataset)}")
train_loader = DataLoader(dataset, batch_size=16, shuffle=True)

# Model, optimizer, and loss
model = DenoisingModel(num_blocks=15).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

# Train
train_model(model, train_loader, optimizer, criterion, num_epochs=1000, checkpoint_path=checkpoint_path, device=device)


# Train the model from the previouus saved checkpoint and pass the model forward

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import os

# ========== ADC BLOCK ==========
class ADCBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ADCBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.skip_connection = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        residual = self.skip_connection(x)
        x = self.conv1(x)
        x = self.relu(x)
        x = self.conv2(x)
        return x + residual

# ========== MODEL ==========
class DenoisingModel(nn.Module):
    def __init__(self, num_blocks):
        super(DenoisingModel, self).__init__()
        blocks = [ADCBlock(3, 64)]
        blocks += [ADCBlock(64, 64) for _ in range(num_blocks - 1)]
        self.blocks = nn.Sequential(*blocks)
        self.final_conv = nn.Conv2d(64, 3, kernel_size=1)

    def forward(self, x):
        x = self.blocks(x)
        return self.final_conv(x)

# ========== DATASET ==========
class ImageDataset(Dataset):
    def __init__(self, noisy_dir, clean_dir, transform=None):
        self.noisy_images = sorted([os.path.join(noisy_dir, f) for f in os.listdir(noisy_dir) if f.endswith(('.png', '.jpg', '.jpeg'))])
        self.clean_images = sorted([os.path.join(clean_dir, f) for f in os.listdir(clean_dir) if f.endswith(('.png', '.jpg', '.jpeg'))])
        self.transform = transform if transform else transforms.ToTensor()

    def __len__(self):
        return min(len(self.noisy_images), len(self.clean_images))

    def __getitem__(self, idx):
        noisy_image = Image.open(self.noisy_images[idx]).convert('RGB')
        clean_image = Image.open(self.clean_images[idx]).convert('RGB')
        return self.transform(noisy_image), self.transform(clean_image)

# ========== TRAIN FUNCTION ==========
def train_model(model, train_loader, optimizer, criterion, num_epochs, checkpoint_path, device, resume_checkpoint=None):
    if resume_checkpoint and os.path.exists(resume_checkpoint):
        print(f"üîÅ Loading checkpoint from {resume_checkpoint}")
        checkpoint = torch.load(resume_checkpoint, map_location=device)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        print("‚úÖ Weights and optimizer state loaded. Starting training fresh from epoch 1.")

    os.makedirs(checkpoint_path, exist_ok=True)

    for epoch in range(1, num_epochs + 1):
        print(f"üöÄ Epoch {epoch}/{num_epochs}")
        model.train()
        running_loss = 0.0

        for batch_idx, (noisy_images, clean_images) in enumerate(train_loader):
            noisy_images = noisy_images.to(device)
            clean_images = clean_images.to(device)

            optimizer.zero_grad()
            outputs = model(noisy_images)
            loss = criterion(outputs, clean_images)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            if batch_idx % 5 == 0:
                print(f"   Batch {batch_idx}, Loss: {loss.item():.4f}")

        avg_loss = running_loss / len(train_loader)
        print(f"üìâ Avg Loss: {avg_loss:.4f}")

        # Save checkpoint
        if epoch % 10 == 0:
            save_path = os.path.join(checkpoint_path, f"checkpoint_epoch_{epoch}.pth")
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': avg_loss
            }, save_path)
            print(f"üíæ Checkpoint saved at: {save_path}")

# ========== MAIN SETUP ==========
if __name__ == "__main__":
    # Paths
    noisy_dir = "/content/drive/MyDrive/Dataset_noisy/noisy_bsd68/noise_20"
    clean_dir = "/content/drive/MyDrive/Datasets/BSD68"
    checkpoint_path = "/content/drive/MyDrive/Dataset_noisy/validation/training2"
    resume_checkpoint = os.path.join(checkpoint_path, "/content/drive/MyDrive/Dataset_noisy/validation/training1/checkpoint_epoch_1000.pth")  # last checkpoint

    # Device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Data
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])
    dataset = ImageDataset(noisy_dir, clean_dir, transform=transform)
    train_loader = DataLoader(dataset, batch_size=16, shuffle=True)
    print(f"Total training images: {len(dataset)}")

    # Model
    model = DenoisingModel(num_blocks=15).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    # Train for a new 1000 epochs (starting from epoch 1, resuming weights)
    train_model(
        model=model,
        train_loader=train_loader,
        optimizer=optimizer,
        criterion=criterion,
        num_epochs= 1000,
        checkpoint_path=checkpoint_path,
        device=device,
        resume_checkpoint=resume_checkpoint
    )


Same code for training all levels of noise.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import os

# ========== ADC BLOCK ==========
class ADCBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ADCBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.skip_connection = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        residual = self.skip_connection(x)
        x = self.conv1(x)
        x = self.relu(x)
        x = self.conv2(x)
        return x + residual

# ========== MODEL ==========
class DenoisingModel(nn.Module):
    def __init__(self, num_blocks):
        super(DenoisingModel, self).__init__()
        blocks = [ADCBlock(3, 64)]
        blocks += [ADCBlock(64, 64) for _ in range(num_blocks - 1)]
        self.blocks = nn.Sequential(*blocks)
        self.final_conv = nn.Conv2d(64, 3, kernel_size=1)

    def forward(self, x):
        x = self.blocks(x)
        return self.final_conv(x)

# ========== DATASET ==========
class ImageDataset(Dataset):
    def __init__(self, noisy_dir, clean_dir, transform=None):
        self.noisy_images = sorted([os.path.join(noisy_dir, f) for f in os.listdir(noisy_dir) if f.endswith(('.png', '.jpg', '.jpeg'))])
        self.clean_images = sorted([os.path.join(clean_dir, f) for f in os.listdir(clean_dir) if f.endswith(('.png', '.jpg', '.jpeg'))])
        self.transform = transform if transform else transforms.ToTensor()

    def __len__(self):
        return min(len(self.noisy_images), len(self.clean_images))

    def __getitem__(self, idx):
        noisy_image = Image.open(self.noisy_images[idx]).convert('RGB')
        clean_image = Image.open(self.clean_images[idx]).convert('RGB')
        return self.transform(noisy_image), self.transform(clean_image)

# ========== TRAIN FUNCTION ==========
def train_model(model, train_loader, optimizer, criterion, num_epochs, checkpoint_path, device, resume_checkpoint=None):
    if resume_checkpoint and os.path.exists(resume_checkpoint):
        print(f"üîÅ Loading checkpoint from {resume_checkpoint}")
        checkpoint = torch.load(resume_checkpoint, map_location=device)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        print("‚úÖ Weights and optimizer state loaded. Starting training fresh from epoch 1.")

    os.makedirs(checkpoint_path, exist_ok=True)

    for epoch in range(1, num_epochs + 1):
        print(f"üöÄ Epoch {epoch}/{num_epochs}")
        model.train()
        running_loss = 0.0

        for batch_idx, (noisy_images, clean_images) in enumerate(train_loader):
            noisy_images = noisy_images.to(device)
            clean_images = clean_images.to(device)

            optimizer.zero_grad()
            outputs = model(noisy_images)
            loss = criterion(outputs, clean_images)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            if batch_idx % 5 == 0:
                print(f"   Batch {batch_idx}, Loss: {loss.item():.4f}")

        avg_loss = running_loss / len(train_loader)
        print(f"üìâ Avg Loss: {avg_loss:.4f}")

        # Save checkpoint
        if epoch % 10 == 0:
            save_path = os.path.join(checkpoint_path, f"checkpoint_epoch_{epoch}.pth")
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': avg_loss
            }, save_path)
            print(f"üíæ Checkpoint saved at: {save_path}")

# ========== MAIN SETUP ==========
if __name__ == "__main__":
    # Paths
    noisy_dir = "/content/drive/MyDrive/Dataset_noisy/noisy_bsd68/noise_30"
    clean_dir = "/content/drive/MyDrive/Datasets/BSD68"
    checkpoint_path = "/content/drive/MyDrive/Dataset_noisy/validation/training3"
    resume_checkpoint = os.path.join(checkpoint_path, "/content/drive/MyDrive/Dataset_noisy/validation/training2/checkpoint_epoch_1000.pth")  # last checkpoint

    # Device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Data
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])
    dataset = ImageDataset(noisy_dir, clean_dir, transform=transform)
    train_loader = DataLoader(dataset, batch_size=16, shuffle=True)
    print(f"Total training images: {len(dataset)}")

    # Model
    model = DenoisingModel(num_blocks=15).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    # Train for a new 1000 epochs (starting from epoch 1, resuming weights)
    train_model(
        model=model,
        train_loader=train_loader,
        optimizer=optimizer,
        criterion=criterion,
        num_epochs=1000,
        checkpoint_path=checkpoint_path,
        device=device,
        resume_checkpoint=resume_checkpoint
    )


# Testing the model

In [None]:
import torch
import os
from torch.utils.data import DataLoader
from torchvision import transforms
from skimage.metrics import structural_similarity as ssim
from skimage.metrics import peak_signal_noise_ratio as psnr
from PIL import Image
import numpy as np

# Load your trained model
def load_model(checkpoint_path, model, device):
    checkpoint = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.to(device)
    model.eval()
    print(f"‚úÖ Model loaded from {checkpoint_path}")
    return model

# Dataset for Testing
class TestImageDataset(torch.utils.data.Dataset):
    def __init__(self, noisy_dir, clean_dir, transform=None):
        self.noisy_images = sorted([os.path.join(noisy_dir, f) for f in os.listdir(noisy_dir) if f.endswith(('.png', '.jpg', '.jpeg','.bmp'))])
        self.clean_images = sorted([os.path.join(clean_dir, f) for f in os.listdir(clean_dir) if f.endswith(('.png', '.jpg', '.jpeg','.bmp'))])
        self.transform = transform if transform else transforms.ToTensor()

    def __len__(self):
        return len(self.noisy_images)

    def __getitem__(self, idx):
        noisy = Image.open(self.noisy_images[idx]).convert('RGB')
        clean = Image.open(self.clean_images[idx]).convert('RGB')
        return self.transform(noisy), self.transform(clean)

# Evaluation Function
def evaluate_model(model, test_loader, device):
    psnr_total, ssim_total = 0, 0
    count = 0

    with torch.no_grad():
        for noisy_imgs, clean_imgs in test_loader:
            noisy_imgs = noisy_imgs.to(device)
            clean_imgs = clean_imgs.to(device)

            outputs = model(noisy_imgs)

            for i in range(outputs.size(0)):
                pred = outputs[i].cpu().numpy().transpose(1, 2, 0)
                target = clean_imgs[i].cpu().numpy().transpose(1, 2, 0)

                pred = np.clip(pred, 0, 1)
                target = np.clip(target, 0, 1)

                psnr_val = psnr(target, pred, data_range=1.0)
                ssim_val = ssim(target, pred, data_range=1.0, channel_axis=2, win_size=7)



                psnr_total += psnr_val
                ssim_total += ssim_val
                count += 1

    avg_psnr = psnr_total / count
    avg_ssim = ssim_total / count
    print(f"üìà Average PSNR: {avg_psnr:.4f}, Average SSIM: {avg_ssim:.4f}")
    return avg_psnr, avg_ssim

# Example Usage
if __name__ == "__main__":
    noisy_test_dir = "/content/drive/MyDrive/Dataset_noisy/noisy_classic/noise_20"
    clean_test_dir = "/content/drive/MyDrive/Dataset_noisy/classic5"
    checkpoint_path = "/content/drive/MyDrive/Dataset_noisy/validation/training cbsd4/checkpoint_epoch_1000.pth"

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Define transforms
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])

    # Dataset and DataLoader
    test_dataset = TestImageDataset(noisy_test_dir, clean_test_dir, transform=transform)
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

    # Load model
    model = DenoisingModel(num_blocks=15)
    model = load_model(checkpoint_path, model, device)

    # Evaluate
    evaluate_model(model, test_loader, device)


# Visualization of the model output

In [None]:
import torch
import os
from torch.utils.data import DataLoader
from torchvision import transforms
from skimage.metrics import structural_similarity as ssim
from skimage.metrics import peak_signal_noise_ratio as psnr
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt

# Define your model architecture here (replace this with your actual model)

class ADCBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ADCBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.skip_connection = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        residual = self.skip_connection(x)
        x = self.conv1(x)
        x = self.relu(x)
        x = self.conv2(x)
        return x + residual

# ========== MODEL ==========
class DenoisingModel(nn.Module):
    def __init__(self, num_blocks):
        super(DenoisingModel, self).__init__()
        blocks = [ADCBlock(3, 64)]
        blocks += [ADCBlock(64, 64) for _ in range(num_blocks - 1)]
        self.blocks = nn.Sequential(*blocks)
        self.final_conv = nn.Conv2d(64, 3, kernel_size=1)

    def forward(self, x):
        x = self.blocks(x)
        return self.final_conv(x)

# Load trained model
def load_model(checkpoint_path, model, device):
    checkpoint = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.to(device)
    model.eval()
    print(f"‚úÖ Model loaded from {checkpoint_path}")
    return model

# Dataset for Testing
class TestImageDataset(torch.utils.data.Dataset):
    def __init__(self, noisy_dir, clean_dir, transform=None):
        self.noisy_images = sorted([os.path.join(noisy_dir, f) for f in os.listdir(noisy_dir) if f.endswith(('.png', '.jpg', '.jpeg','.bmp'))])
        self.clean_images = sorted([os.path.join(clean_dir, f) for f in os.listdir(clean_dir) if f.endswith(('.png', '.jpg', '.jpeg','.bmp'))])
        self.transform = transform if transform else transforms.ToTensor()

    def __len__(self):
        return len(self.noisy_images)

    def __getitem__(self, idx):
        noisy = Image.open(self.noisy_images[idx]).convert('RGB')
        clean = Image.open(self.clean_images[idx]).convert('RGB')
        return self.transform(noisy), self.transform(clean)

# Evaluation Function with image display and saving
def evaluate_model(model, test_loader, device, show_images=True, save_output=False, output_dir="outputs"):
    if save_output and not os.path.exists(output_dir):
        os.makedirs(output_dir)

    psnr_total, ssim_total = 0, 0
    count = 0

    with torch.no_grad():
        for idx, (noisy_imgs, clean_imgs) in enumerate(test_loader):
            noisy_imgs = noisy_imgs.to(device)
            clean_imgs = clean_imgs.to(device)

            outputs = model(noisy_imgs)

            for i in range(outputs.size(0)):
                pred = outputs[i].cpu().numpy().transpose(1, 2, 0)
                target = clean_imgs[i].cpu().numpy().transpose(1, 2, 0)
                noisy = noisy_imgs[i].cpu().numpy().transpose(1, 2, 0)

                pred = np.clip(pred, 0, 1)
                target = np.clip(target, 0, 1)
                noisy = np.clip(noisy, 0, 1)

                psnr_val = psnr(target, pred, data_range=1.0)
                ssim_val = ssim(target, pred, data_range=1.0, channel_axis=2, win_size=7)

                psnr_total += psnr_val
                ssim_total += ssim_val
                count += 1

                print(f"Image {idx + 1}: PSNR = {psnr_val:.4f}, SSIM = {ssim_val:.4f}")

                if show_images or save_output:
                    fig, axs = plt.subplots(1, 3, figsize=(15, 5))
                    axs[0].imshow(noisy)
                    axs[0].set_title("Noisy Input")
                    axs[0].axis("off")
                    axs[1].imshow(pred)
                    axs[1].set_title("Denoised Output")
                    axs[1].axis("off")
                    axs[2].imshow(target)
                    axs[2].set_title("Ground Truth")
                    axs[2].axis("off")

                    plt.suptitle(f"PSNR: {psnr_val:.2f}, SSIM: {ssim_val:.4f}")

                    if save_output:
                        plt.savefig(os.path.join(output_dir, f"output_{idx+1}.png"))
                    if show_images:
                        plt.show()
                    plt.close()

    avg_psnr = psnr_total / count
    avg_ssim = ssim_total / count
    print(f"\nüìà Average PSNR: {avg_psnr:.4f}, Average SSIM: {avg_ssim:.4f}")
    return avg_psnr, avg_ssim


# Main Entry Point
if __name__ == "__main__":
    noisy_test_dir = "/content/drive/MyDrive/Dataset_noisy/noisy_kodak24/noise_30"
    clean_test_dir = "/content/drive/MyDrive/Dataset_noisy/Kodak24"
    checkpoint_path = "/content/drive/MyDrive/Dataset_noisy/validation/training cbsd4/checkpoint_epoch_50.pth"

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])

    test_dataset = TestImageDataset(noisy_test_dir, clean_test_dir, transform=transform)
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

    model = DenoisingModel(num_blocks=15)
    model = load_model(checkpoint_path, model, device)

    # Run evaluation with display
    evaluate_model(model, test_loader, device, show_images=True, save_output=False)
