In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from torchvision.datasets import ImageFolder
from torch.utils.data import random_split
from torchvision.transforms import ToTensor
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

In [2]:
# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [3]:
class CustomDataset(Dataset):
    def __init__(self, root_low, root_high, transform=None):
        self.root_low = root_low
        self.root_high = root_high
        self.low_images = os.listdir(root_low)
        self.high_images = os.listdir(root_high)
        self.transform = transform

    def __len__(self):
        return len(self.low_images)

    def __getitem__(self, idx):
        low_img_name = os.path.join(self.root_low, self.low_images[idx])
        high_img_name = os.path.join(self.root_high, self.low_images[idx])  # Assuming same file names in low and high dirs
        low_img = Image.open(low_img_name).convert('RGB')
        high_img = Image.open(high_img_name).convert('RGB')

        if self.transform:
            low_img = self.transform(low_img)
            high_img = self.transform(high_img)

        return low_img, high_img

# Define transformation for preprocessing
transform = ToTensor()

# Dataset paths
train_low_dir = "C:/Users/scdes/OneDrive/Desktop/archive/lol_dataset/Train/low/"
train_high_dir = "C:/Users/scdes/OneDrive/Desktop/archive/lol_dataset/Train/high/"
test_low_dir = "C:/Users/scdes/OneDrive/Desktop/archive/lol_dataset/Test/low/"
test_high_dir = "C:/Users/scdes/OneDrive/Desktop/archive/lol_dataset/Test/high/"

# Load datasets
train_dataset = CustomDataset(train_low_dir, train_high_dir, transform=transform)
test_dataset = CustomDataset(test_low_dir, test_high_dir, transform=transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [4]:
# Define the diffusion model
class DiffusionModel(nn.Module):
    def __init__(self):
        super(DiffusionModel, self).__init__()

        # Encoder
        self.encoder_conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.encoder_conv2 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.encoder_pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Decoder
        self.decoder_upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.decoder_conv1 = nn.Conv2d(128, 64, kernel_size=3, padding=1)
        self.decoder_conv2 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.decoder_conv3 = nn.Conv2d(64, 3, kernel_size=3, padding=1)

    def forward(self, x):
        # Encoder
        x1 = nn.functional.relu(self.encoder_conv1(x))
        x2 = nn.functional.relu(self.encoder_conv2(x1))
        x_pool = self.encoder_pool(x2)

        # Decoder
        x_upsample = self.decoder_upsample(x_pool)
        x_concat = torch.cat([x2, x_upsample], dim=1)
        x3 = nn.functional.relu(self.decoder_conv1(x_concat))
        x4 = nn.functional.relu(self.decoder_conv2(x3))
        x_out = nn.functional.relu(self.decoder_conv3(x4))
        
        return x_out

# Define PSNR function
def psnr(img1, img2):
    mse = torch.mean((img1 - img2) ** 2)
    if mse == 0:
        return float('inf')
    return 20 * torch.log10(1.0 / torch.sqrt(mse))

In [None]:
# Initialize the diffusion model and move it to GPU
model = DiffusionModel().to(device)

# Define optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10
train_psnr_history = []
test_psnr_history = []
for epoch in range(num_epochs):
    model.train()
    train_psnr_sum = 0.0
    train_num_samples = 0
    for inputs, targets in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs} - Training'):
        inputs, targets = inputs.to(device), targets.to(device)  # Move data to GPU
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = torch.mean((outputs - targets)**2)  # Mean Squared Error loss
        loss.backward()
        optimizer.step()

        # Calculate PSNR on training data
        train_psnr_sum += torch.sum(psnr(outputs, targets))
        train_num_samples += inputs.size(0)
    avg_train_psnr = train_psnr_sum / train_num_samples
    train_psnr_history.append(avg_train_psnr.item())

    # Evaluate PSNR on test dataset
    model.eval()
    test_psnr_sum = 0.0
    test_num_samples = 0
    for inputs, targets in tqdm(test_loader, desc=f'Epoch {epoch + 1}/{num_epochs} - Testing'):
        inputs, targets = inputs.to(device), targets.to(device)  # Move data to GPU
        with torch.no_grad():
            outputs = model(inputs)
            for i in range(inputs.size(0)):
                test_psnr_sum += psnr(outputs[i], targets[i])
                test_num_samples += 1
    avg_test_psnr = test_psnr_sum / test_num_samples
    test_psnr_history.append(avg_test_psnr.item())
    print(f'Average PSNR on test dataset: {avg_test_psnr:.2f} dB')


In [5]:
from torch.cuda.amp import GradScaler

# Initialize the scaler for mixed precision training
scaler = GradScaler()

# Initialize the diffusion model and move it to GPU
model = DiffusionModel().to(device)

# Define optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10
train_psnr_history = []
test_psnr_history = []

# Replace the training loop with this code
for epoch in range(num_epochs):
    model.train()
    train_psnr_sum = 0.0
    train_num_samples = 0
    for inputs, targets in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs} - Training'):
        inputs, targets = inputs.to(device), targets.to(device)  # Move data to GPU
        optimizer.zero_grad()
        with torch.cuda.amp.autocast():
            outputs = model(inputs)
            loss = torch.mean((outputs - targets)**2)  # Mean Squared Error loss
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # Calculate PSNR on training data
        train_psnr_sum += torch.sum(psnr(outputs, targets))
        train_num_samples += inputs.size(0)
    avg_train_psnr = train_psnr_sum / train_num_samples
    train_psnr_history.append(avg_train_psnr.item())

    # Evaluate PSNR on test dataset
    model.eval()
    test_psnr_sum = 0.0
    test_num_samples = 0
    for inputs, targets in tqdm(test_loader, desc=f'Epoch {epoch + 1}/{num_epochs} - Testing'):
        inputs, targets = inputs.to(device), targets.to(device)  # Move data to GPU
        with torch.no_grad():
            outputs = model(inputs)
            for i in range(inputs.size(0)):
                test_psnr_sum += psnr(outputs[i], targets[i])
                test_num_samples += 1
    avg_test_psnr = test_psnr_sum / test_num_samples
    test_psnr_history.append(avg_test_psnr.item())
    print(f'Average PSNR on test dataset: {avg_test_psnr:.2f} dB')


Epoch 1/10 - Training: 100%|███████████████████████████████████████████████████████████| 16/16 [12:08<00:00, 45.53s/it]
Epoch 1/10 - Testing: 100%|██████████████████████████████████████████████████████████████| 1/1 [00:12<00:00, 12.34s/it]


Average PSNR on test dataset: 15.00 dB


Epoch 2/10 - Training:  25%|███████████████                                             | 4/16 [05:08<15:25, 77.09s/it]


OutOfMemoryError: CUDA out of memory. Tried to allocate 938.00 MiB. GPU 0 has a total capacty of 6.00 GiB of which 0 bytes is free. Of the allocated memory 11.10 GiB is allocated by PyTorch, and 1.01 GiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

In [None]:
# Save the trained model
torch.save(model.state_dict(), "diffusion_model.pth")

# Plot training and testing PSNR history
plt.plot(train_psnr_history, label='Training PSNR')
plt.plot(test_psnr_history, label='Testing PSNR')
plt.xlabel('Epoch')
plt.ylabel('PSNR (dB)')
plt.title('Training and Testing PSNR')
plt.legend()
plt.grid(True)
plt.savefig('psnr_history.png')
plt.show()



In [None]:
# Print low, predicted, and high-resolution images for all samples in the test set
model.eval()
for inputs, targets in test_loader:
    inputs, targets = inputs.to(device), targets.to(device)  # Move data to GPU
    with torch.no_grad():
        outputs = model(inputs)
        for i in range(inputs.size(0)):
            low_img = transforms.ToPILImage()(inputs[i].cpu())
            pred_img = transforms.ToPILImage()(outputs[i].cpu())
            high_img = transforms.ToPILImage()(targets[i].cpu())
            low_img.show(title='Low-Resolution Image')
            pred_img.show(title='Predicted Image')
            high_img.show(title='High-Resolution Image')