## Diffusion Model

## Load the dataset

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
import numpy as np
import matplotlib.pyplot as plt
import cv2

from google.colab import drive
drive.mount('/content/drive')

import os
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

class CustomImageDataset(Dataset):
    def __init__(self, img_dir, transform=None, target_width=800, target_height=None):
        self.img_dir = img_dir
        self.img_paths = [os.path.join(img_dir, img_name) for img_name in os.listdir(img_dir)]
        self.transform = transform
        self.target_width = target_width
        self.target_height = target_height

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        image = Image.open(img_path).convert("RGB")

        aspect_ratio = image.height / image.width
        new_height = int(self.target_width * aspect_ratio)

        # Apply the resizing transform with the calculated new size
        transform = transforms.Compose([
            transforms.Resize((new_height, self.target_width), interpolation=Image.LANCZOS),
            transforms.ToTensor()
        ])

        image = transform(image)
        if self.target_height:
            padding = (0, 0, 0, self.target_height - new_height)  # left, top, right, bottom
            image = F.pad(image, padding, "constant", 0)

        return image

target_width = 1024
target_height = 1024
img_dir = '/content/drive/MyDrive/Input'

dataset = CustomImageDataset(img_dir, target_width=target_width, target_height=target_height)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)




Mounted at /content/drive


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image
import os
import torchvision.transforms
import torch.quantization as quantization

# Define the Block, Encoder, Decoder, and UNet classes as before

class Block(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU()
        )

    def forward(self, x):
        return self.double_conv(x)

class Encoder(nn.Module):
    def __init__(self, chs=(3,64,128,256,512,1024)):
        super().__init__()
        self.enc_blocks = nn.ModuleList([Block(chs[i], chs[i+1]) for i in range(len(chs)-1)])
        self.pool       = nn.MaxPool2d(2)

    def forward(self, x):
        ftrs = []
        for block in self.enc_blocks:
            x = block(x)
            ftrs.append(x)
            x = self.pool(x)
        return ftrs

class Decoder(nn.Module):
    def __init__(self, chs=(1024, 512, 256, 128, 64)):
        super().__init__()
        self.chs         = chs
        self.upconvs    = nn.ModuleList([nn.ConvTranspose2d(chs[i], chs[i+1], 2, 2) for i in range(len(chs)-1)])
        self.dec_blocks = nn.ModuleList([Block(chs[i], chs[i+1]) for i in range(len(chs)-1)])

    def forward(self, x, encoder_features):
        for i in range(len(self.chs)-1):
            x        = self.upconvs[i](x)
            enc_ftrs = self.crop(encoder_features[i], x)
            x        = torch.cat([x, enc_ftrs], dim=1)
            x        = self.dec_blocks[i](x)
        return x

    def crop(self, enc_ftrs, x):
        _, _, H, W = x.shape
        enc_ftrs   = torchvision.transforms.CenterCrop([H, W])(enc_ftrs)
        return enc_ftrs

class UNet(nn.Module):
    def __init__(self, enc_chs=(3,64,128,256,512,1024), dec_chs=(1024, 512, 256, 128, 64), num_class=3, retain_dim=False, out_sz=(572,572)):
        super().__init__()
        self.encoder     = Encoder(enc_chs)
        self.decoder     = Decoder(dec_chs)
        self.head        = nn.Conv2d(dec_chs[-1], num_class, 1)
        self.retain_dim  = retain_dim
        self.out_sz      = out_sz

    def forward(self, x):
        enc_ftrs = self.encoder(x)
        out      = self.decoder(enc_ftrs[::-1][0], enc_ftrs[::-1][1:])
        out      = self.head(out)
        if self.retain_dim:
            out = F.interpolate(out, self.out_sz)
        return out

class ComplexDiffusionModel(nn.Module):
    def __init__(self, timesteps=10, beta_start=0.0001, beta_end=0.0005):
        super(ComplexDiffusionModel, self).__init__()
        self.timesteps = timesteps

        # Define the variance schedule (beta)
        self.betas = torch.linspace(beta_start, beta_end, timesteps).to(device)

        # Precompute alpha and alpha_cumprod
        self.alphas = (1.0 - self.betas).to(device)
        self.alpha_cumprod = torch.cumprod(self.alphas, dim=0).to(device)

        # Define the UNet model
        self.unet = UNet().to(device)

    def forward(self, x, t):
        return self.unet(x)

    def forward_process(self, x0, t):
        alpha_t = self.alpha_cumprod[t].view(-1, 1, 1, 1)
        std_dev = torch.sqrt(1.0 - alpha_t)
        noise = torch.randn_like(x0).to(device) * 0.5  # Scaled down noise
        x_t = alpha_t.sqrt() * x0 + std_dev * noise
        return x_t, noise

    def reverse_process(self, xt, t):
        predicted_noise = self.forward(xt, t)
        alpha_t = self.alpha_cumprod[t].view(-1, 1, 1, 1)
        alpha_prev = self.alpha_cumprod[t - 1].view(-1, 1, 1, 1)

        xt_predicted = (xt - torch.sqrt(1.0 - alpha_t) * predicted_noise) / torch.sqrt(alpha_t)
        x_prev = torch.sqrt(alpha_prev) * xt_predicted + torch.sqrt(1 - alpha_prev) * predicted_noise

        return x_prev

# Helper function to convert tensor to PIL image
def tensor_to_pil_image(tensor):
    tensor = (tensor - tensor.min()) / (tensor.max() - tensor.min())  # Normalize to [0, 1]
    tensor = tensor.mul(255).byte()
    tensor = tensor.permute(1, 2, 0).cpu().numpy()
    return Image.fromarray(tensor)

# Training function
def train(model, dataloader, epochs):
    model.train()
    for epoch in range(epochs):
        epoch_loss = 0
        for batch_idx, x0 in enumerate(dataloader):
            x0 = x0.to(device)
            optimizer.zero_grad()
            t = torch.randint(0, model.timesteps, (x0.size(0),), device=x0.device)
            xt, noise = model.forward_process(x0, t)
            predicted_noise = model.reverse_process(xt, t)
            loss = loss_fn(noise, predicted_noise)
            loss.backward()
            optimizer.step()

            # Accumulate the loss for this epoch
            epoch_loss += loss.item()

            # Save the original and enhanced images
            if (epoch + 1) % 1 == 0 and batch_idx == 0:
                with torch.no_grad():
                    model.eval()
                    x0_sample = tensor_to_pil_image(x0[0])
                    xt_sample = tensor_to_pil_image(xt[0])

                    original_image_path = os.path.join(enhance_folder, f'epoch_{epoch}_batch_{batch_idx}_original.png')
                    enhanced_image_path = os.path.join(enhance_folder, f'epoch_{epoch}_batch_{batch_idx}_enhanced.png')

                    # Ensure the directory exists
                    os.makedirs(enhance_folder, exist_ok=True)

                    x0_sample.save(original_image_path)
                    xt_sample.save(enhanced_image_path)

                    print(f'Saved original image to {original_image_path}')
                    print(f'Saved enhanced image to {enhanced_image_path}')

        # Print the average loss for this epoch after every 20 epochs
        if (epoch + 1) % 1 == 0:
            avg_epoch_loss = epoch_loss / len(dataloader)
            print(f'Epoch {epoch + 1}, Average Loss: {avg_epoch_loss}')

# Usage
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ComplexDiffusionModel().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
loss_fn = nn.MSELoss()
enhance_folder = '/content/drive/MyDrive/enchance'  # Change this to the path where you want to save the images

# Assume `dataloader` is defined and provides data
train(model, dataloader, epochs=20)


Saved original image to /content/drive/MyDrive/enchance/epoch_4_batch_0_original.png
Saved enhanced image to /content/drive/MyDrive/enchance/epoch_4_batch_0_enhanced.png
Epoch 5, Average Loss: 0.3744173910130154
Saved original image to /content/drive/MyDrive/enchance/epoch_9_batch_0_original.png
Saved enhanced image to /content/drive/MyDrive/enchance/epoch_9_batch_0_enhanced.png
Epoch 10, Average Loss: 0.3709434744986621


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image
import os

class NoiseRemovalModel(nn.Module):
    def __init__(self):
        super(NoiseRemovalModel, self).__init__()

        # Define the neural network layers
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1)
        self.conv5 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)
        self.conv6 = nn.Conv2d(64, 3, kernel_size=3, stride=1, padding=1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = F.relu(self.conv5(x))
        x = self.conv6(x)
        return x

    def denoise(self, noisy_image):
        # Pass the noisy image through the network to get the denoised output
        denoised_image = self.forward(noisy_image)
        return denoised_image

# Helper function to convert tensor to PIL image
def tensor_to_pil_image(tensor):
    tensor = (tensor - tensor.min()) / (tensor.max() - tensor.min())  # Normalize to [0, 1]
    tensor = tensor.mul(255).byte()
    tensor = tensor.permute(1, 2, 0).cpu().numpy()
    return Image.fromarray(tensor)

# Training function
def train(model, dataloader, epochs):
    model.train()
    for epoch in range(epochs):
        for batch_idx, x0 in enumerate(dataloader):
            x0 = x0.to(device)
            optimizer.zero_grad()

            # Use the model to predict the clean image from the noisy input
            predicted_clean_image = model.denoise(x0)

            # The target is the original image (without added noise)
            loss = loss_fn(predicted_clean_image, x0)
            loss.backward()
            optimizer.step()
            print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}')

            # Save the original and denoised images
            with torch.no_grad():
                model.eval()
                x0_sample = tensor_to_pil_image(x0[0])
                denoised_sample = tensor_to_pil_image(predicted_clean_image[0])

                original_image_path = os.path.join(enhance_folder, f'epoch_{epoch}_batch_{batch_idx}_original.png')
                denoised_image_path = os.path.join(enhance_folder, f'epoch_{epoch}_batch_{batch_idx}_denoised.png')

                # Ensure the directory exists
                os.makedirs(enhance_folder, exist_ok=True)

                x0_sample.save(original_image_path)
                denoised_sample.save(denoised_image_path)

                print(f'Saved original image to {original_image_path}')
                print(f'Saved denoised image to {denoised_image_path}')

# Usage
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = NoiseRemovalModel().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)
loss_fn = nn.MSELoss()
enhance_folder = '/content/drive/MyDrive/enchance'  # Change this to the path where you want to save the images

# Assume `dataloader` is defined and provides data
train(model, dataloader, epochs=100)


NameError: name 'dataloader' is not defined

In [None]:
from PIL import Image
import torchvision.transforms as transforms

# Example paths (replace with your actual paths)
image_path = '/content/drive/MyDrive/Input/panda.jpg'
output_path = '/content/drive/MyDrive/Input/Resized_Image.jpg'

# Check if the file has a valid extension and exists
if not image_path.lower().endswith(('.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif')):
    raise ValueError(f"File extension is not supported: {image_path}")

# Load the image
try:
    image = Image.open(image_path)
except Exception as e:
    print(f"Error loading image: {e}")
    raise

# Resize the image while maintaining the aspect ratio
new_width = 800
aspect_ratio = image.height / image.width
new_height = int(new_width * aspect_ratio)

# Apply the resizing transform
transform = transforms.Compose([
    transforms.Resize((new_height, new_width), interpolation=Image.LANCZOS),
    transforms.ToTensor()
])

transformed_image = transform(image)

# Convert back to PIL image if needed and save
resized_image = transforms.ToPILImage()(transformed_image)

try:
    resized_image.save(output_path, quality=95, subsampling=0)  # Save with high quality
    print(f"Resized image saved at: {output_path}")
except Exception as e:
    print(f"Error saving image: {e}")
    raise


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image
import os
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

class CustomImageDataset(Dataset):
    def __init__(self, img_dir, transform=None, target_width=800):
        self.img_dir = img_dir
        self.img_paths = [os.path.join(img_dir, img_name) for img_name in os.listdir(img_dir)]
        self.transform = transform
        self.target_width = target_width

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        image = Image.open(img_path).convert("RGB")

        # Apply the resizing transform with the calculated new size
        transform = transforms.Compose([
            transforms.Resize((self.target_width, self.target_width), interpolation=Image.LANCZOS),
            transforms.ToTensor()
        ])

        image = transform(image)
        return image

target_width = 800
img_dir = '/content/drive/MyDrive/Input'

dataset = CustomImageDataset(img_dir, target_width=target_width)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)

# Define UNet model, training function, and other related code

class Block(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU()
        )

    def forward(self, x):
        return self.double_conv(x)

class Encoder(nn.Module):
    def __init__(self, chs=(3,64,128,256,512,1024)):
        super().__init__()
        self.enc_blocks = nn.ModuleList([Block(chs[i], chs[i+1]) for i in range(len(chs)-1)])
        self.pool       = nn.MaxPool2d(2)

    def forward(self, x):
        ftrs = []
        for block in self.enc_blocks:
            x = block(x)
            ftrs.append(x)
            x = self.pool(x)
        return ftrs

class Decoder(nn.Module):
    def __init__(self, chs=(1024, 512, 256, 128, 64)):
        super().__init__()
        self.chs         = chs
        self.upconvs    = nn.ModuleList([nn.ConvTranspose2d(chs[i], chs[i+1], 2, 2) for i in range(len(chs)-1)])
        self.dec_blocks = nn.ModuleList([Block(chs[i], chs[i+1]) for i in range(len(chs)-1)])

    def forward(self, x, encoder_features):
        for i in range(len(self.chs)-1):
            x        = self.upconvs[i](x)
            enc_ftrs = self.crop(encoder_features[i], x)
            x        = torch.cat([x, enc_ftrs], dim=1)
            x        = self.dec_blocks[i](x)
        return x

    def crop(self, enc_ftrs, x):
        _, _, H, W = x.shape
        enc_ftrs   = torchvision.transforms.CenterCrop([H, W])(enc_ftrs)
        return enc_ftrs

class UNet(nn.Module):
    def __init__(self, enc_chs=(3,64,128,256,512,1024), dec_chs=(1024, 512, 256, 128, 64), num_class=3, retain_dim=False, out_sz=(572,572)):
        super().__init__()
        self.encoder     = Encoder(enc_chs)
        self.decoder     = Decoder(dec_chs)
        self.head        = nn.Conv2d(dec_chs[-1], num_class, 1)
        self.retain_dim  = retain_dim
        self.out_sz      = out_sz

    def forward(self, x):
        enc_ftrs = self.encoder(x)
        out      = self.decoder(enc_ftrs[::-1][0], enc_ftrs[::-1][1:])
        out      = self.head(out)
        if self.retain_dim:
            out = F.interpolate(out, self.out_sz)
        return out

class ComplexDiffusionModel(nn.Module):
    def __init__(self, timesteps=10, beta_start=0.0001, beta_end=0.0005):
        super(ComplexDiffusionModel, self).__init__()
        self.timesteps = timesteps

        # Define the variance schedule (beta)
        self.betas = torch.linspace(beta_start, beta_end, timesteps)

        # Precompute alpha and alpha_cumprod
        self.alphas = 1.0 - self.betas
        self.alpha_cumprod = torch.cumprod(self.alphas, dim=0)

        # Define the UNet model
        self.unet = UNet()

    def forward(self, x, t):
        return self.unet(x)

    def forward_process(self, x0, t):
        alpha_t = self.alpha_cumprod[t].view(-1, 1, 1, 1)
        std_dev = torch.sqrt(1.0 - alpha_t)
        noise = torch.randn_like(x0) * 0.1  # Scaled down noise
        x_t = alpha_t.sqrt() * x0 + std_dev * noise
        return x_t, noise

    def reverse_process(self, xt, t):
        predicted_noise = self.forward(xt, t)
        alpha_t = self.alpha_cumprod[t].view(-1, 1, 1, 1)
        alpha_prev = self.alpha_cumprod[t - 1].view(-1, 1, 1, 1)

        xt_predicted = (xt - torch.sqrt(1.0 - alpha_t) * predicted_noise) / torch.sqrt(alpha_t)
        x_prev = torch.sqrt(alpha_prev) * xt_predicted + torch.sqrt(1 - alpha_prev) * predicted_noise

        return x_prev

# Helper function to convert tensor to PIL image
def tensor_to_pil_image(tensor):
    tensor = (tensor - tensor.min()) / (tensor.max() - tensor.min())  # Normalize to [0, 1]
    tensor = tensor.mul(255).byte()
    tensor = tensor.permute(1, 2, 0).cpu().numpy()
    return Image.fromarray(tensor)

# Training function
def train(model, dataloader, epochs):
    model.train()
    for epoch in range(epochs):
        epoch_loss = 0
        for batch_idx, x0 in enumerate(dataloader):
            x0 = x0.to(device)
            optimizer.zero_grad()
            t = torch.randint(0, model.timesteps, (x0.size(0),), device=x0.device)
            xt, noise = model.forward_process(x0, t)
            predicted_noise = model.reverse_process(xt, t)
            loss = loss_fn(noise, predicted_noise)
            loss.backward()
            optimizer.step()

            # Accumulate the loss for this epoch
            epoch_loss += loss.item()

            # Save the original and enhanced images
            if (epoch + 1) % 20 == 0 and batch_idx == 0:
                with torch.no_grad():
                    model.eval()
                    x0_sample = tensor_to_pil_image(x0[0])
                    xt_sample = tensor_to_pil_image(xt[0])

                    original_image_path = os.path.join(enhance_folder, f'epoch_{epoch}_batch_{batch_idx}_original.png')
                    enhanced_image_path = os.path.join(enhance_folder, f'epoch_{epoch}_batch_{batch_idx}_enhanced.png')

                    # Ensure the directory exists
                    os.makedirs(enhance_folder, exist_ok=True)

                    x0_sample.save(original_image_path)
                    xt_sample.save(enhanced_image_path)

                    print(f'Saved original image to {original_image_path}')
                    print(f'Saved enhanced image to {enhanced_image_path}')

        # Print the average loss for this epoch after every 200 epochs
        if (epoch + 1) % 20 == 0:
            avg_epoch_loss = epoch_loss / len(dataloader)
            print(f'Epoch {epoch + 1}, Average Loss: {avg_epoch_loss}')

# Usage
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ComplexDiffusionModel().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
loss_fn = nn.MSELoss()
enhance_folder = '/content/drive/MyDrive/enchance'

# Assume `dataloader` is defined and provides data
train(model, dataloader, epochs=20)


Saved original image to /content/drive/MyDrive/enchance/epoch_19_batch_0_original.png
Saved enhanced image to /content/drive/MyDrive/enchance/epoch_19_batch_0_enhanced.png
Epoch 20, Average Loss: 0.2977733314037323


In [None]:
import torch
from torch.utils.data import DataLoader
from PIL import Image
import torchvision.transforms as transforms
import os

# Assuming CustomImageDataset, ComplexDiffusionModel, and tensor_to_pil_image are defined

# Path to the directory containing the new images
new_images_dir = '/path/to/new/images'
enhanced_output_dir = '/path/to/save/enhanced/images'

# Create DataLoader for new images
new_dataset = CustomImageDataset(new_images_dir)
new_dataloader = DataLoader(new_dataset, batch_size=1, shuffle=False)

# Load the trained model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ComplexDiffusionModel().to(device)
model.load_state_dict(torch.load('/path/to/saved/model_state_dict.pth'))  # Adjust path
model.eval()

# Ensure the output directory exists
os.makedirs(enhanced_output_dir, exist_ok=True)

# Enhance and save new images
with torch.no_grad():
    for idx, image_tensor in enumerate(new_dataloader):
        image_tensor = image_tensor.to(device)
        t = torch.randint(0, model.timesteps, (image_tensor.size(0),), device=image_tensor.device)
        xt, _ = model.forward_process(image_tensor, t)
        enhanced_image_tensor = model.reverse_process(xt, t)

        # Convert to PIL image and save
        enhanced_image = tensor_to_pil_image(enhanced_image_tensor[0])
        enhanced_image_path = os.path.join(enhanced_output_dir, f'enhanced_{idx}.png')
        enhanced_image.save(enhanced_image_path)

        print(f'Saved enhanced image to {enhanced_image_path}')


NameError: name 'CustomImageDataset' is not defined