<a href="https://colab.research.google.com/github/AbdulSheffa/IRP/blob/main/FYP_GAN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!git clone https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix.git
%cd pytorch-CycleGAN-and-pix2pix


Cloning into 'pytorch-CycleGAN-and-pix2pix'...
remote: Enumerating objects: 2516, done.[K
remote: Total 2516 (delta 0), reused 0 (delta 0), pack-reused 2516 (from 1)[K
Receiving objects: 100% (2516/2516), 8.20 MiB | 27.35 MiB/s, done.
Resolving deltas: 100% (1575/1575), done.
/content/pytorch-CycleGAN-and-pix2pix


In [3]:
!pip install -r requirements.txt


Collecting dominate>=2.4.0 (from -r requirements.txt (line 3))
  Downloading dominate-2.9.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting visdom>=0.1.8.8 (from -r requirements.txt (line 4))
  Downloading visdom-0.2.4.tar.gz (1.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m28.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading dominate-2.9.1-py2.py3-none-any.whl (29 kB)
Building wheels for collected packages: visdom
  Building wheel for visdom (setup.py) ... [?25l[?25hdone
  Created wheel for visdom: filename=visdom-0.2.4-py3-none-any.whl size=1408196 sha256=5b0ab0d5a44139ee35d1f5304f9f3a21a1192a4ed0456823d42ee91ab4d9a61b
  Stored in directory: /root/.cache/pip/wheels/fa/a4/bb/2be445c295d88a74f9c0a4232f04860ca489a5c7c57eb959d9
Successfully built visdom
Installing collected packages: dominate, visdom
Successfully installed dominate-2.9.1 visdom-0.2.4


In [None]:
import os
from PIL import Image

# Define dataset path
dataset_path = "/content/drive/MyDrive/Khabeer - IRP/Dataset/DID-MDN-datasets/DID-MDN-test"
output_path = "/content/drive/MyDrive/Khabeer - IRP/Dataset/DID-MDN-datasets/Split-DID-MDN"

# Create output directories for rainy and non-rainy
rainy_path = os.path.join(output_path, "rainy")
non_rainy_path = os.path.join(output_path, "non_rainy")

os.makedirs(rainy_path, exist_ok=True)
os.makedirs(non_rainy_path, exist_ok=True)

# Split each image
for filename in os.listdir(dataset_path):
    file_path = os.path.join(dataset_path, filename)
    if filename.endswith(('.png', '.jpg', '.jpeg')):  # Process only image files
        try:
            # Open the image
            img = Image.open(file_path)
            width, height = img.size

            # Split the image into two halves
            rainy_part = img.crop((0, 0, width // 2, height))  # Left half
            non_rainy_part = img.crop((width // 2, 0, width, height))  # Right half

            # Save the split images
            rainy_output = os.path.join(rainy_path, f"rainy_{filename}")
            non_rainy_output = os.path.join(non_rainy_path, f"non_rainy_{filename}")

            rainy_part.save(rainy_output)
            non_rainy_part.save(non_rainy_output)

            print(f"Processed: {filename}")
        except Exception as e:
            print(f"Error processing {filename}: {e}")

print(f"Splitting complete! Rainy images saved in {rainy_path}, Non-rainy images saved in {non_rainy_path}.")


In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from models.networks import ResnetGenerator, NLayerDiscriminator  # Use predefined architectures

class DeRainCycleGAN:
    def __init__(self, input_nc, output_nc, ngf, ndf, device):
        self.device = device

        # Initialize ResNet-based generators (Rainy -> Clean and Clean -> Rainy)
        self.netG_A = ResnetGenerator(input_nc, output_nc, ngf).to(device)  # Rainy -> Clean
        self.netG_B = ResnetGenerator(output_nc, input_nc, ngf).to(device)  # Clean -> Rainy

        # Initialize PatchGAN-based discriminators
        self.netD_A = NLayerDiscriminator(output_nc, ndf).to(device)  # For clean images
        self.netD_B = NLayerDiscriminator(input_nc, ndf).to(device)  # For rainy images

        # Define loss functions
        self.criterionGAN = nn.MSELoss()  # For adversarial loss
        self.criterionCycle = nn.L1Loss()  # For cycle consistency loss

        # Define optimizers
        self.optimizer_G = torch.optim.Adam(
            list(self.netG_A.parameters()) + list(self.netG_B.parameters()), lr=0.0002, betas=(0.5, 0.999)
        )
        self.optimizer_D = torch.optim.Adam(
            list(self.netD_A.parameters()) + list(self.netD_B.parameters()), lr=0.0002, betas=(0.5, 0.999)
        )

    def forward(self, real_A, real_B):
        # Forward cycle (Rainy -> Clean -> Rainy)
        fake_B = self.netG_A(real_A)  # Rainy -> Clean
        rec_A = self.netG_B(fake_B)  # Clean -> Rainy -> Reconstruct Rainy

        # Backward cycle (Clean -> Rainy -> Clean)
        fake_A = self.netG_B(real_B)  # Clean -> Rainy
        rec_B = self.netG_A(fake_A)  # Rainy -> Clean -> Reconstruct Clean

        return fake_B, rec_A, fake_A, rec_B

    def backward_G(self, real_A, real_B, fake_B, rec_A, fake_A, rec_B):
        # GAN loss
        loss_G_A = self.criterionGAN(self.netD_A(fake_B), torch.ones_like(fake_B))  # Rainy -> Clean
        loss_G_B = self.criterionGAN(self.netD_B(fake_A), torch.ones_like(fake_A))  # Clean -> Rainy

        # Cycle consistency loss
        loss_cycle_A = self.criterionCycle(rec_A, real_A) * 10.0  # Rainy -> Clean -> Rainy
        loss_cycle_B = self.criterionCycle(rec_B, real_B) * 10.0  # Clean -> Rainy -> Clean

        # Total generator loss
        loss_G = loss_G_A + loss_G_B + loss_cycle_A + loss_cycle_B
        loss_G.backward()
        return loss_G

    def backward_D(self, netD, real, fake):
        # Real loss
        pred_real = netD(real)
        loss_D_real = self.criterionGAN(pred_real, torch.ones_like(pred_real))

        # Fake loss
        pred_fake = netD(fake.detach())
        loss_D_fake = self.criterionGAN(pred_fake, torch.zeros_like(pred_fake))

        # Total discriminator loss
        loss_D = (loss_D_real + loss_D_fake) * 0.5
        loss_D.backward()
        return loss_D

    def optimize_parameters(self, real_A, real_B):
        # Forward pass
        fake_B, rec_A, fake_A, rec_B = self.forward(real_A, real_B)

        # Update G (Generators)
        self.optimizer_G.zero_grad()
        loss_G = self.backward_G(real_A, real_B, fake_B, rec_A, fake_A, rec_B)
        self.optimizer_G.step()

        # Update D_A (Discriminator for Clean Images)
        self.optimizer_D.zero_grad()
        loss_D_A = self.backward_D(self.netD_A, real_B, fake_B)
        self.optimizer_D.step()

        # Update D_B (Discriminator for Rainy Images)
        self.optimizer_D.zero_grad()
        loss_D_B = self.backward_D(self.netD_B, real_A, fake_A)
        self.optimizer_D.step()

        return loss_G, loss_D_A, loss_D_B


In [5]:
import os
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from torchvision.utils import save_image
from PIL import Image
from models.networks import ResnetGenerator, NLayerDiscriminator  # Use predefined architectures

class RainDataset(Dataset):
    def __init__(self, input_dir, target_dir, transform=None, sample_size=None):
        self.input_dir = input_dir
        self.target_dir = target_dir
        self.transform = transform
        self.input_images = sorted(os.listdir(input_dir))
        self.target_images = sorted(os.listdir(target_dir))

        # Randomly sample a subset if sample_size is provided
        if sample_size is not None:
            indices = random.sample(range(len(self.input_images)), sample_size)
            self.input_images = [self.input_images[i] for i in indices]
            self.target_images = [self.target_images[i] for i in indices]

    def __len__(self):
        return len(self.input_images)

    def __getitem__(self, idx):
        input_img_path = os.path.join(self.input_dir, self.input_images[idx])
        target_img_path = os.path.join(self.target_dir, self.target_images[idx])

        input_image = Image.open(input_img_path).convert("RGB")
        target_image = Image.open(target_img_path).convert("RGB")

        if self.transform:
            input_image = self.transform(input_image)
            target_image = self.transform(target_image)

        return input_image, target_image, self.input_images[idx]

class DeRainCycleGAN:
    def __init__(self, input_nc, output_nc, ngf, ndf, device):
        self.device = device

        # Initialize ResNet-based generators (Rainy -> Clean and Clean -> Rainy)
        self.netG_A = ResnetGenerator(input_nc, output_nc, ngf).to(device)  # Rainy -> Clean
        self.netG_B = ResnetGenerator(output_nc, input_nc, ngf).to(device)  # Clean -> Rainy

        # Initialize PatchGAN-based discriminators
        self.netD_A = NLayerDiscriminator(output_nc, ndf).to(device)  # For clean images
        self.netD_B = NLayerDiscriminator(input_nc, ndf).to(device)  # For rainy images

        # Define loss functions
        self.criterionGAN = nn.MSELoss()  # For adversarial loss
        self.criterionCycle = nn.L1Loss()  # For cycle consistency loss

        # Define optimizers
        self.optimizer_G = torch.optim.Adam(
            list(self.netG_A.parameters()) + list(self.netG_B.parameters()), lr=0.0002, betas=(0.5, 0.999)
        )
        self.optimizer_D = torch.optim.Adam(
            list(self.netD_A.parameters()) + list(self.netD_B.parameters()), lr=0.0002, betas=(0.5, 0.999)
        )

    def forward(self, real_A, real_B):
        # Forward cycle (Rainy -> Clean -> Rainy)
        fake_B = self.netG_A(real_A)  # Rainy -> Clean
        rec_A = self.netG_B(fake_B)  # Clean -> Rainy -> Reconstruct Rainy

        # Backward cycle (Clean -> Rainy -> Clean)
        fake_A = self.netG_B(real_B)  # Clean -> Rainy
        rec_B = self.netG_A(fake_A)  # Rainy -> Clean -> Reconstruct Clean

        return fake_B, rec_A, fake_A, rec_B

    def backward_G(self, real_A, real_B, fake_B, rec_A, fake_A, rec_B):
        # GAN loss
        loss_G_A = self.criterionGAN(self.netD_A(fake_B), torch.ones_like(self.netD_A(fake_B)))  # Rainy -> Clean
        loss_G_B = self.criterionGAN(self.netD_B(fake_A), torch.ones_like(self.netD_B(fake_A)))  # Clean -> Rainy

        # Cycle consistency loss
        loss_cycle_A = self.criterionCycle(rec_A, real_A) * 10.0  # Rainy -> Clean -> Rainy
        loss_cycle_B = self.criterionCycle(rec_B, real_B) * 10.0  # Clean -> Rainy -> Clean

        # Total generator loss
        loss_G = loss_G_A + loss_G_B + loss_cycle_A + loss_cycle_B
        loss_G.backward()
        return loss_G

    def backward_D(self, netD, real, fake):
        # Real loss
        pred_real = netD(real)
        loss_D_real = self.criterionGAN(pred_real, torch.ones_like(pred_real))

        # Fake loss
        pred_fake = netD(fake.detach())
        loss_D_fake = self.criterionGAN(pred_fake, torch.zeros_like(pred_fake))

        # Total discriminator loss
        loss_D = (loss_D_real + loss_D_fake) * 0.5
        loss_D.backward()
        return loss_D

    def optimize_parameters(self, real_A, real_B):
        # Forward pass
        fake_B, rec_A, fake_A, rec_B = self.forward(real_A, real_B)

        # Update G (Generators)
        self.optimizer_G.zero_grad()
        loss_G = self.backward_G(real_A, real_B, fake_B, rec_A, fake_A, rec_B)
        self.optimizer_G.step()

        # Update D_A (Discriminator for Clean Images)
        self.optimizer_D.zero_grad()
        loss_D_A = self.backward_D(self.netD_A, real_B, fake_B)
        self.optimizer_D.step()

        # Update D_B (Discriminator for Rainy Images)
        self.optimizer_D.zero_grad()
        loss_D_B = self.backward_D(self.netD_B, real_A, fake_A)
        self.optimizer_D.step()

        return loss_G, loss_D_A, loss_D_B


In [7]:

# Initialize dataset paths and DataLoader
input_dir = "/content/drive/MyDrive/Khabeer - IRP/Dataset/Rain100L/input"
target_dir = "/content/drive/MyDrive/Khabeer - IRP/Dataset/Rain100L/target"
output_dir = "/content/drive/MyDrive/Khabeer - IRP/Dataset/Rain100L/results_3/"

os.makedirs(output_dir, exist_ok=True)

transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Limit dataset to 10 samples for trial purposes
train_loader = DataLoader(RainDataset(input_dir, target_dir, transform, sample_size=100), batch_size=8, shuffle=True)

# Initialize and train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
model = DeRainCycleGAN(input_nc=3, output_nc=3, ngf=64, ndf=64, device=device)

# Add learning rate schedulers
scheduler_G = torch.optim.lr_scheduler.StepLR(model.optimizer_G, step_size=20, gamma=0.5)
scheduler_D = torch.optim.lr_scheduler.StepLR(model.optimizer_D, step_size=20, gamma=0.5)

# Training Loop
for epoch in range(60):  # Number of epochs
    model.netG_A.train()
    model.netG_B.train()
    epoch_loss_G, epoch_loss_D_A, epoch_loss_D_B = 0, 0, 0

    for i, (real_A, real_B, img_name) in enumerate(train_loader):
        real_A, real_B = real_A.to(device), real_B.to(device)

        # Forward pass and optimization
        loss_G, loss_D_A, loss_D_B = model.optimize_parameters(real_A, real_B)
        epoch_loss_G += loss_G.item()
        epoch_loss_D_A += loss_D_A.item()
        epoch_loss_D_B += loss_D_B.item()

        # Prevent runtime disconnection
        print(f"Epoch {epoch}/{60}, Batch {i}/{len(train_loader)}, Image: {img_name[0]}, Loss_G: {loss_G:.4f}, Loss_D_A: {loss_D_A:.4f}, Loss_D_B: {loss_D_B:.4f}")

    # Step the scheduler
    scheduler_G.step()
    scheduler_D.step()

    # Save generated images every 25 epochs and at the final epoch
    if epoch % 30 == 0 or epoch == 59:  # Save images for epochs 0, 30, and the final epoch
        model.netG_A.eval()  # Use the Rainy -> Clean generator
        with torch.no_grad():
            for i, (real_A, _, img_names) in enumerate(train_loader):  # Iterate over all batches
                real_A = real_A.to(model.device)
                fake_B = model.netG_A(real_A)  # Rainy -> Clean

                # Save each image in the batch
                for j in range(real_A.size(0)):  # Loop through images in the current batch
                    img_name = img_names[j]  # Name of the image
                    save_path = os.path.join(output_dir, f"epoch_{epoch}_de_rained_{img_name}")
                    save_image(fake_B[j], save_path)  # Save individual images






cuda
Epoch 0/60, Batch 0/13, Image: 96.png, Loss_G: 13.0750, Loss_D_A: 0.8836, Loss_D_B: 0.7776
Epoch 0/60, Batch 1/13, Image: 72.png, Loss_G: 9.0179, Loss_D_A: 1.9938, Loss_D_B: 1.1945
Epoch 0/60, Batch 2/13, Image: 17.png, Loss_G: 8.0362, Loss_D_A: 0.8726, Loss_D_B: 0.8749
Epoch 0/60, Batch 3/13, Image: 42.png, Loss_G: 8.7849, Loss_D_A: 2.1532, Loss_D_B: 1.0016
Epoch 0/60, Batch 4/13, Image: 28.png, Loss_G: 9.2781, Loss_D_A: 0.4794, Loss_D_B: 1.0131
Epoch 0/60, Batch 5/13, Image: 30.png, Loss_G: 8.3922, Loss_D_A: 0.4736, Loss_D_B: 0.6537
Epoch 0/60, Batch 6/13, Image: 53.png, Loss_G: 7.1460, Loss_D_A: 0.3698, Loss_D_B: 0.4094
Epoch 0/60, Batch 7/13, Image: 82.png, Loss_G: 6.9584, Loss_D_A: 0.3112, Loss_D_B: 0.3227
Epoch 0/60, Batch 8/13, Image: 93.png, Loss_G: 5.5604, Loss_D_A: 0.2816, Loss_D_B: 0.2725
Epoch 0/60, Batch 9/13, Image: 92.png, Loss_G: 5.2531, Loss_D_A: 0.2592, Loss_D_B: 0.2660
Epoch 0/60, Batch 10/13, Image: 94.png, Loss_G: 4.7053, Loss_D_A: 0.2564, Loss_D_B: 0.2468
Epo

In [None]:
# Initialize dataset paths and DataLoader
input_dir = "/content/drive/MyDrive/Khabeer - IRP/Dataset/DID-MDN-datasets/Split-DID-MDN/rainy"
target_dir = "/content/drive/MyDrive/Khabeer - IRP/Dataset/DID-MDN-datasets/Split-DID-MDN/non_rainy"
output_dir = "/content/drive/MyDrive/Khabeer - IRP/Dataset/DID-MDN-datasets/Split-DID-MDN/results_6/"

os.makedirs(output_dir, exist_ok=True)

# Define transformation with normalization
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalizing between [-1, 1]
])

# Load dataset
train_loader = DataLoader(RainDataset(input_dir, target_dir, transform, sample_size=150), batch_size=8, shuffle=True)

# Initialize and train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
model = DeRainCycleGAN(input_nc=3, output_nc=3, ngf=64, ndf=64, device=device)

# Add learning rate schedulers
scheduler_G = torch.optim.lr_scheduler.StepLR(model.optimizer_G, step_size=20, gamma=0.5)
scheduler_D = torch.optim.lr_scheduler.StepLR(model.optimizer_D, step_size=20, gamma=0.5)

# Training Loop
for epoch in range(60):  # Number of epochs
    model.netG_A.train()
    model.netG_B.train()
    epoch_loss_G, epoch_loss_D_A, epoch_loss_D_B = 0, 0, 0

    for i, (real_A, real_B, img_name) in enumerate(train_loader):
        real_A, real_B = real_A.to(device), real_B.to(device)

        # Forward pass and optimization
        loss_G, loss_D_A, loss_D_B = model.optimize_parameters(real_A, real_B)
        epoch_loss_G += loss_G.item()
        epoch_loss_D_A += loss_D_A.item()
        epoch_loss_D_B += loss_D_B.item()

        # Prevent runtime disconnection
        print(f"Epoch {epoch}/{60}, Batch {i}/{len(train_loader)}, Image: {img_name[0]}, Loss_G: {loss_G:.4f}, Loss_D_A: {loss_D_A:.4f}, Loss_D_B: {loss_D_B:.4f}")

    # Step the scheduler
    scheduler_G.step()
    scheduler_D.step()

    # Save generated images every 25 epochs and at the final epoch
    if epoch == 59:  # Save images for epochs 0, 30, and the final epoch
        model.netG_A.eval()  # Use the Rainy -> Clean generator
        with torch.no_grad():
            for i, (real_A, _, img_names) in enumerate(train_loader):  # Iterate over all batches
                real_A = real_A.to(model.device)
                fake_B = model.netG_A(real_A)  # Rainy -> Clean

                # Denormalize before saving
                fake_B = fake_B * 0.5 + 0.5  # Scale from [-1, 1] back to [0, 1]

                # Save each image in the batch
                for j in range(real_A.size(0)):  # Loop through images in the current batch
                    img_name = img_names[j]  # Name of the image
                    save_path = os.path.join(output_dir, f"epoch_{epoch}_de_rained_{img_name}")
                    save_image(fake_B[j], save_path)  # Save individual images

print("Training completed.")


cuda
Epoch 0/60, Batch 0/19, Image: rainy_1150.jpg, Loss_G: 12.3574, Loss_D_A: 0.6425, Loss_D_B: 0.5356
Epoch 0/60, Batch 1/19, Image: rainy_574.jpg, Loss_G: 11.2411, Loss_D_A: 2.9521, Loss_D_B: 2.8910
Epoch 0/60, Batch 2/19, Image: rainy_208.jpg, Loss_G: 9.3204, Loss_D_A: 1.0918, Loss_D_B: 1.3594
Epoch 0/60, Batch 3/19, Image: rainy_74.jpg, Loss_G: 9.8790, Loss_D_A: 0.3953, Loss_D_B: 0.6878
Epoch 0/60, Batch 4/19, Image: rainy_644.jpg, Loss_G: 8.9314, Loss_D_A: 0.6915, Loss_D_B: 0.6177
Epoch 0/60, Batch 5/19, Image: rainy_349.jpg, Loss_G: 8.7904, Loss_D_A: 1.0965, Loss_D_B: 0.3643
Epoch 0/60, Batch 6/19, Image: rainy_181.jpg, Loss_G: 6.6188, Loss_D_A: 0.3779, Loss_D_B: 0.3356
Epoch 0/60, Batch 7/19, Image: rainy_567.jpg, Loss_G: 6.9580, Loss_D_A: 0.3196, Loss_D_B: 0.2952
Epoch 0/60, Batch 8/19, Image: rainy_54.jpg, Loss_G: 6.3864, Loss_D_A: 0.2951, Loss_D_B: 0.3144
Epoch 0/60, Batch 9/19, Image: rainy_1171.jpg, Loss_G: 5.9655, Loss_D_A: 0.2937, Loss_D_B: 0.3192
Epoch 0/60, Batch 10/19

In [13]:
import os
import torch
import torch.nn as nn
from torchvision.utils import save_image
from torchvision import transforms, models
from PIL import Image

class AutoencoderWithSkipConnections(nn.Module):
    def __init__(self, input_nc):
        super(AutoencoderWithSkipConnections, self).__init__()
        # Encoder
        self.encoder1 = nn.Sequential(
            nn.Conv2d(input_nc, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True)
        )
        self.encoder2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True)
        )
        self.encoder3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True)
        )
        # Bottleneck
        self.bottleneck = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True)
        )
        # Decoder
        self.decoder1 = nn.Sequential(
            nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True)
        )
        self.decoder2 = nn.Sequential(
            nn.ConvTranspose2d(256 + 256, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True)
        )
        self.decoder3 = nn.Sequential(
            nn.ConvTranspose2d(128 + 128, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True)
        )
        self.decoder4 = nn.Sequential(
            nn.ConvTranspose2d(64 + 64, input_nc, kernel_size=4, stride=2, padding=1),
            nn.Tanh()
        )

    def forward(self, x):
        e1 = self.encoder1(x)
        e2 = self.encoder2(e1)
        e3 = self.encoder3(e2)
        b = self.bottleneck(e3)
        d1 = self.decoder1(b)
        d2 = self.decoder2(torch.cat([d1, e3], dim=1))
        d3 = self.decoder3(torch.cat([d2, e2], dim=1))
        d4 = self.decoder4(torch.cat([d3, e1], dim=1))
        return d4

class PerceptualLoss(nn.Module):
    def __init__(self):
        super(PerceptualLoss, self).__init__()
        vgg19 = models.vgg19(pretrained=True).features[:16].eval()
        for param in vgg19.parameters():
            param.requires_grad = False
        self.vgg19 = vgg19
        self.criterion = nn.MSELoss()

    def forward(self, x, y):
        x_features = self.vgg19(x)
        y_features = self.vgg19(y)
        return self.criterion(x_features, y_features)

# Initialize Autoencoder
input_channels = 3
autoencoder = AutoencoderWithSkipConnections(input_channels).cuda()

# Define Loss and Optimizer
mse_loss = nn.MSELoss()
perceptual_loss = PerceptualLoss().cuda()
optimizer = torch.optim.Adam(autoencoder.parameters(), lr=0.0002)

# Load GAN-generated images
gan_output_path = "/content/drive/MyDrive/Khabeer - IRP/Dataset/DID-MDN-datasets/Split-DID-MDN/results_6"
rainy_images = [f for f in os.listdir(gan_output_path) if f.endswith(('.png', '.jpg', '.jpeg'))]

def load_images(image_list, root_path):
    images = []
    filenames = []
    for img_name in image_list:
        img_path = os.path.join(root_path, img_name)
        img = Image.open(img_path).convert("RGB")
        img = transforms.ToTensor()(img)
        images.append(img)
        filenames.append(img_name)
    return torch.stack(images), filenames

train_data, train_filenames = load_images(rainy_images, gan_output_path)
train_loader = torch.utils.data.DataLoader(train_data, batch_size=16, shuffle=True)

# Train the autoencoder
epochs = 60
final_epoch_reconstructions = {}

for epoch in range(epochs):
    for i, images in enumerate(train_loader):
        images = images.cuda()
        reconstructed = autoencoder(images)
        mse_loss_value = mse_loss(reconstructed, images)
        perceptual_loss_value = perceptual_loss(reconstructed, images)
        total_loss = mse_loss_value + 0.01 * perceptual_loss_value

        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()

        if epoch == epochs - 1:
            batch_filenames = train_filenames[i * 16:(i + 1) * 16]
            for j, filename in enumerate(batch_filenames):
                final_epoch_reconstructions[filename] = reconstructed[j].unsqueeze(0)

    print(f"Epoch [{epoch+1}/{epochs}], MSE Loss: {mse_loss_value.item()}, Perceptual Loss: {perceptual_loss_value.item()}")

output_dir = "/content/drive/MyDrive/Khabeer - IRP/Dataset/DID-MDN-datasets/reconstructed_autoencoder_8"
os.makedirs(output_dir, exist_ok=True)

for filename, reconstructed_image in final_epoch_reconstructions.items():
    save_path = os.path.join(output_dir, f"{filename.split('.')[0]}_refined.png")
    save_image(reconstructed_image, save_path)
    # print(f"Saved: {save_path}")

print("Autoencoder training completed.")


Epoch [1/60], MSE Loss: 0.20659932494163513, Perceptual Loss: 4.693155765533447
Epoch [2/60], MSE Loss: 0.13648764789104462, Perceptual Loss: 2.8803763389587402
Epoch [3/60], MSE Loss: 0.055457036942243576, Perceptual Loss: 3.4117679595947266
Epoch [4/60], MSE Loss: 0.03448586165904999, Perceptual Loss: 2.597916841506958
Epoch [5/60], MSE Loss: 0.019618110731244087, Perceptual Loss: 2.0173041820526123
Epoch [6/60], MSE Loss: 0.018136020749807358, Perceptual Loss: 1.9500073194503784
Epoch [7/60], MSE Loss: 0.017502862960100174, Perceptual Loss: 2.065246343612671
Epoch [8/60], MSE Loss: 0.012627768330276012, Perceptual Loss: 1.7568596601486206
Epoch [9/60], MSE Loss: 0.015493995510041714, Perceptual Loss: 1.8104727268218994
Epoch [10/60], MSE Loss: 0.009665400721132755, Perceptual Loss: 1.7112977504730225
Epoch [11/60], MSE Loss: 0.01640314981341362, Perceptual Loss: 1.7680519819259644
Epoch [12/60], MSE Loss: 0.009533348493278027, Perceptual Loss: 1.5512104034423828
Epoch [13/60], MSE L

In [None]:
# Define Reconstruction Loss Function
class ReconstructionLoss(nn.Module):
    def __init__(self, autoencoder, criterion=nn.MSELoss()):
        super(ReconstructionLoss, self).__init__()
        self.autoencoder = autoencoder
        self.criterion = criterion

    def forward(self, input_image, target_image):
        """
        Compute the reconstruction loss between the autoencoder output and target image.

        Args:
        input_image (torch.Tensor): The image generated by the GAN.
        target_image (torch.Tensor): The original clean image.

        Returns:
        loss (torch.Tensor): The reconstruction loss value.
        """
        refined_image = self.autoencoder(input_image)
        loss = self.criterion(refined_image, target_image)
        return loss, refined_image

# Example Usage
# Assuming `autoencoder`, `gan_output`, and `clean_target` are available
reconstruction_loss_fn = ReconstructionLoss(autoencoder=autoencoder)
loss_reconstruction, refined_image = reconstruction_loss_fn(gan_output, clean_target)

print(f"Reconstruction Loss: {loss_reconstruction.item()}")


In [None]:
from torchvision.models import vgg19
class PerceptualLoss(nn.Module):
    def __init__(self):
        super(PerceptualLoss, self).__init__()
        self.vgg = vgg19(pretrained=True).features[:16].eval().cuda()
        for param in self.vgg.parameters():
            param.requires_grad = False

    def forward(self, x, y):
        return nn.MSELoss()(self.vgg(x), self.vgg(y))


In [None]:
import torch

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# If a GPU is available, you can also check its details
if torch.cuda.is_available():
    print(f"GPU Name: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory Allocated: {torch.cuda.memory_allocated(0) / 1024**3:.2f} GB")
    print(f"GPU Memory Cached: {torch.cuda.memory_reserved(0) / 1024**3:.2f} GB")


Using device: cuda
GPU Name: Tesla T4
GPU Memory Allocated: 0.00 GB
GPU Memory Cached: 0.00 GB
