### Libraries 📚⬇

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import numpy as np
import pandas as pd
import os, math, sys
import glob, itertools
import argparse, random

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torchvision.models import vgg19
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from torchvision.utils import save_image, make_grid

import plotly
import plotly.express as px
import plotly.graph_objects as go
import matplotlib.pyplot as plt

from PIL import Image
from tqdm import tqdm_notebook as tqdm

random.seed(42)
import warnings
warnings.filterwarnings("ignore")

In [3]:
# load pretrained models
load_pretrained_models = False
# number of epochs of training
n_epochs = 20
# size of the batches
batch_size = 16
# adam: learning rate
lr = 0.00008
# adam: decay of first order momentum of gradient
b1 = 0.5
# adam: decay of second order momentum of gradient
b2 = 0.999
# epoch from which to start lr decay
decay_epoch = 100
# number of cpu threads to use during batch generation
n_cpu = 8
# high res. image height
hr_height = 512
# high res. image width
hr_width = 512
# number of image channels
channels = 3

os.makedirs("images", exist_ok=True)
os.makedirs("saved_models", exist_ok=True)

cuda = torch.cuda.is_available()
hr_shape = (hr_height, hr_width)

In [4]:
import os
os.chdir('/content/drive/MyDrive/project')

In [5]:
os.listdir('/content/drive/MyDrive/project/7-Detection data')

['Mobile_crane',
 '.ipynb_checkpoints',
 'Tower_crane',
 'low_Mobile_crane',
 'low_Tower_crane']

### Settings ⚙️

### Define Dataset Class

In [6]:
import glob
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader

# # Normalization parameters
# mean = np.array([0.485, 0.456, 0.406])
# std = np.array([0.229, 0.224, 0.225])

class ImageDataset(Dataset):
    def __init__(self, low_paths, high_paths):
        # Define transforms for images
        self.transform_lr = transforms.Compose([
            transforms.Resize((64, 64)),
            transforms.ToTensor()

        ])

        self.transform_hr = transforms.Compose([
            transforms.Resize((512, 512)),
            transforms.ToTensor()
        ])

        self.low_paths = low_paths
        self.high_paths = high_paths

    def __getitem__(self, index):
        img_low = Image.open(self.low_paths[index % len(self.low_paths)])
        img_high = Image.open(self.high_paths[index % len(self.high_paths)])

        img_lr = self.transform_lr(img_low)
        img_hr = self.transform_hr(img_high)

        return {"lr": img_lr, "hr": img_hr}

    def __len__(self):
        return max(len(self.low_paths), len(self.high_paths))

In [7]:
len(os.listdir('/content/drive/MyDrive/project/7-Detection data/Mobile_crane'))

2009

In [8]:
len(os.listdir('/content/drive/MyDrive/project/7-Detection data/Tower_crane'))

2022

In [9]:
import glob

dataset_path = '7-Detection data'

low_paths1 = glob.glob(dataset_path + "/low_Mobile_crane/*.*")
low_paths2 = glob.glob(dataset_path + "/low_Tower_crane/*.*")
high_paths1 = glob.glob(dataset_path + "/Mobile_crane/*.*")
high_paths2 = glob.glob(dataset_path + "/Tower_crane/*.*")


low_paths = sorted(low_paths1 + low_paths2)
high_paths = sorted(high_paths1 + high_paths2)

print(len(low_paths))
print(len(high_paths))

4031
4031


In [None]:
low_paths=low_paths[:400]
high_paths=high_paths[:400]

In [None]:
train_low_paths, test_low_paths = train_test_split(low_paths, test_size=0.2, random_state=42)
train_high_paths, test_high_paths = train_test_split(high_paths, test_size=0.2, random_state=42)

batch_size = 4
n_cpu = 1

In [None]:
print(len(train_low_paths))
print(len(train_high_paths))

320
320


In [None]:
print(len(test_low_paths))
print(len(test_high_paths))

80
80


### Get Train/Test Dataloaders

In [None]:
train_dataset = ImageDataset(train_low_paths, train_high_paths)
test_dataset = ImageDataset(test_low_paths, test_high_paths)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=int(batch_size * 0.75), shuffle=False)

### Define Model Classes

In [None]:
# class FeatureExtractor(nn.Module):
#     def __init__(self):
#         super(FeatureExtractor, self).__init__()
#         vgg19_model = vgg19(pretrained=True)
#         self.feature_extractor = nn.Sequential(*list(vgg19_model.features.children())[:18])

#     def forward(self, img):
#         return self.feature_extractor(img)


# class ResidualBlock(nn.Module):
#     def __init__(self, in_features):
#         super(ResidualBlock, self).__init__()
#         self.conv_block = nn.Sequential(
#             nn.Conv2d(in_features, in_features, kernel_size=3, stride=1, padding=1),
#             nn.BatchNorm2d(in_features, 0.8),
#             nn.PReLU(),
#             nn.Conv2d(in_features, in_features, kernel_size=3, stride=1, padding=1),
#             nn.BatchNorm2d(in_features, 0.8),
#         )

#     def forward(self, x):
#         return x + self.conv_block(x)


# class GeneratorVGG(nn.Module):
#     def __init__(self, in_channels=3, out_channels=3, n_residual_blocks=16):
#         super(GeneratorVGG, self).__init__()

#         # First layer
#         self.conv1 = nn.Sequential(nn.Conv2d(in_channels, 64, kernel_size=9, stride=1, padding=4), nn.PReLU())

#         # Residual blocks
#         res_blocks = []
#         for _ in range(n_residual_blocks):
#             res_blocks.append(ResidualBlock(64))
#         self.res_blocks = nn.Sequential(*res_blocks)

#         # Second conv layer post residual blocks
#         self.conv2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(64, 0.8))

#         # Upsampling layers
#         upsampling = []
#         for out_features in range(2):
#             upsampling += [
#                 # nn.Upsample(scale_factor=2),
#                 nn.Conv2d(64, 256, 3, 1, 1),
#                 nn.BatchNorm2d(256),
#                 nn.PixelShuffle(upscale_factor=2),
#                 nn.PReLU(),
#             ]
#         self.upsampling = nn.Sequential(*upsampling)

#         # Final output layer
#         self.conv3 = nn.Sequential(nn.Conv2d(64, out_channels, kernel_size=9, stride=1, padding=4), nn.Sigmoid())

#     def forward(self, x):
#         out1 = self.conv1(x)
#         out = self.res_blocks(out1)
#         out2 = self.conv2(out)
#         out = torch.add(out1, out2)
#         out = self.upsampling(out)
#         out = self.conv3(out)
#         # Clamp the output to [0, 1] range
#         out = torch.clamp(out, min=0.0, max=1.0)
#         return out


# class Discriminator(nn.Module):
#     def __init__(self, input_shape):
#         super(Discriminator, self).__init__()

#         self.input_shape = input_shape
#         in_channels, in_height, in_width = self.input_shape
#         patch_h, patch_w = int(in_height / 2 ** 4), int(in_width / 2 ** 4)
#         self.output_shape = (1, patch_h, patch_w)

#         def discriminator_block(in_filters, out_filters, first_block=False):
#             layers = []
#             layers.append(nn.Conv2d(in_filters, out_filters, kernel_size=3, stride=1, padding=1))
#             if not first_block:
#                 layers.append(nn.BatchNorm2d(out_filters))
#             layers.append(nn.LeakyReLU(0.2, inplace=True))
#             layers.append(nn.Conv2d(out_filters, out_filters, kernel_size=3, stride=2, padding=1))
#             layers.append(nn.BatchNorm2d(out_filters))
#             layers.append(nn.LeakyReLU(0.2, inplace=True))
#             return layers

#         layers = []
#         in_filters = in_channels
#         for i, out_filters in enumerate([64, 128, 256, 512]):
#             layers.extend(discriminator_block(in_filters, out_filters, first_block=(i == 0)))
#             in_filters = out_filters

#         layers.append(nn.Conv2d(out_filters, 1, kernel_size=3, stride=1, padding=1))

#         self.model = nn.Sequential(*layers)

#     def forward(self, img):
#         return self.model(img)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import vgg19
class FeatureExtractor(nn.Module):
    def __init__(self):
        super(FeatureExtractor, self).__init__()
        vgg19_model = vgg19(pretrained=True)
        self.feature_extractor = nn.Sequential(*list(vgg19_model.features.children())[:18])

    def forward(self, img):
        return self.feature_extractor(img)

class ResidualBlock(nn.Module):
    def __init__(self, in_features):
        super(ResidualBlock, self).__init__()
        self.conv_block = nn.Sequential(
            nn.Conv2d(in_features, in_features, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(in_features),
            nn.PReLU(),
            nn.Conv2d(in_features, in_features, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(in_features),
        )

    def forward(self, x):
        return x + self.conv_block(x)


class GeneratorVGG(nn.Module):
    def __init__(self, in_channels=3, out_channels=3, n_residual_blocks=16):
        super(GeneratorVGG, self).__init__()

        # First layer
        self.conv1 = nn.Sequential(nn.Conv2d(in_channels, 64, kernel_size=9, stride=1, padding=4), nn.PReLU())

        # Residual blocks
        res_blocks = []
        for _ in range(n_residual_blocks):
            res_blocks.append(ResidualBlock(64))
        self.res_blocks = nn.Sequential(*res_blocks)

        # Second conv layer post residual blocks
        self.conv2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(64))

        # Upsampling layers
        upsampling = []
        for out_features in range(3):
            upsampling += [
                nn.Conv2d(64, 256, 3, 1, 1),
                nn.BatchNorm2d(256),
                nn.PixelShuffle(upscale_factor=2),
                nn.PReLU(),
            ]
        self.upsampling = nn.Sequential(*upsampling)

        # Final output layer
        self.conv3 = nn.Sequential(nn.Conv2d(64, out_channels, kernel_size=9, stride=1, padding=4), nn.Sigmoid())

    def forward(self, x):
        out1 = self.conv1(x)
        out = self.res_blocks(out1)
        out2 = self.conv2(out)
        out = torch.add(out1, out2)
        out = self.upsampling(out)
        out = self.conv3(out)
        # Clamp the output to [0, 1] range
        out = torch.clamp(out, min=0.0, max=1.0)
        return out

class Discriminator(nn.Module):
    def __init__(self, input_shape):
        super(Discriminator, self).__init__()

        self.input_shape = input_shape
        in_channels, in_height, in_width = self.input_shape
        patch_h, patch_w = int(in_height / 2 ** 4), int(in_width / 2 ** 4)
        self.output_shape = (1, patch_h, patch_w)

        def discriminator_block(in_filters, out_filters, first_block=False):
            layers = []
            layers.append(nn.Conv2d(in_filters, out_filters, kernel_size=3, stride=1, padding=1))
            if not first_block:
                layers.append(nn.BatchNorm2d(out_filters))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            layers.append(nn.Conv2d(out_filters, out_filters, kernel_size=3, stride=2, padding=1))
            layers.append(nn.BatchNorm2d(out_filters))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return layers

        layers = []
        in_filters = in_channels
        for i, out_filters in enumerate([64, 128, 256, 512]):
            layers.extend(discriminator_block(in_filters, out_filters, first_block=(i == 0)))
            in_filters = out_filters

        layers.append(nn.Conv2d(out_filters, 1, kernel_size=3, stride=1, padding=1))

        self.model = nn.Sequential(*layers)

    def forward(self, img):
        return self.model(img)

In [None]:
# Initialize generator and discriminator
generator = GeneratorVGG()
discriminator = Discriminator(input_shape=(channels, *hr_shape))
feature_extractor = FeatureExtractor()

# Set feature extractor to inference mode
feature_extractor.eval()

# Losses
criterion_GAN = torch.nn.MSELoss()
criterion_content = torch.nn.L1Loss()

if cuda:
    generator = generator.cuda()
    discriminator = discriminator.cuda()
    feature_extractor = feature_extractor.cuda()
    criterion_GAN = criterion_GAN.cuda()
    criterion_content = criterion_content.cuda()

# Optimizers
optimizer_G = torch.optim.Adam(generator.parameters(), lr=lr, betas=(b1, b2))
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(b1, b2))

Tensor = torch.cuda.FloatTensor if cuda else torch.Tensor

Downloading: "https://download.pytorch.org/models/vgg19-dcbb9e9d.pth" to /root/.cache/torch/hub/checkpoints/vgg19-dcbb9e9d.pth
100%|██████████| 548M/548M [00:05<00:00, 95.9MB/s]


### Train Super Resolution GAN (SRGAN)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchvision.utils import make_grid, save_image
from tqdm import tqdm
import numpy as np
from skimage.metrics import peak_signal_noise_ratio as PSNR
from skimage.metrics import structural_similarity as SSIM

# Assuming you have defined your models, optimizers, loss functions, and data loaders somewhere

train_gen_losses, train_disc_losses, train_psnr_values = [], [], []
test_gen_losses, test_disc_losses, test_psnr_values = [], [], []

# Assuming test_counter initialization elsewhere in your code

for epoch in range(n_epochs):

    ### Training
    epoch_gen_loss, epoch_disc_loss = 0, 0
    epoch_psnr = 0
    tqdm_bar = tqdm(train_dataloader, desc=f'Training Epoch {epoch} ', total=len(train_dataloader))

    for batch_idx, imgs in enumerate(tqdm_bar):
        generator.train()
        discriminator.train()

        # Configure model input
        imgs_lr = Variable(imgs["lr"].type(Tensor))
        imgs_hr = Variable(imgs["hr"].type(Tensor))

        # Adversarial ground truths
        valid = Variable(Tensor(np.ones((imgs_lr.size(0), *discriminator.output_shape))), requires_grad=False)
        fake = Variable(Tensor(np.zeros((imgs_lr.size(0), *discriminator.output_shape))), requires_grad=False)

        ### Train Generator
        optimizer_G.zero_grad()
        # Generate a high resolution image from low resolution input
        gen_hr = generator(imgs_lr)

        # Adversarial loss
        loss_GAN = criterion_GAN(discriminator(gen_hr), valid)
        # Content loss
        gen_features = feature_extractor(gen_hr)
        real_features = feature_extractor(imgs_hr)
        loss_content = criterion_content(gen_features, real_features.detach())
        # Total loss
        loss_G = loss_content + 1e-3 * loss_GAN
        loss_G.backward()
        optimizer_G.step()

        ### Train Discriminator
        optimizer_D.zero_grad()
        # Loss of real and fake images
        loss_real = criterion_GAN(discriminator(imgs_hr), valid)
        loss_fake = criterion_GAN(discriminator(gen_hr.detach()), fake)
        # Total loss
        loss_D = (loss_real + loss_fake) / 2
        loss_D.backward()
        optimizer_D.step()

        # Accumulate losses
        epoch_gen_loss += loss_G.item()
        epoch_disc_loss += loss_D.item()

        # Calculate PSNR
        imgs_hr_np = imgs_hr.cpu().detach().numpy()
        gen_hr_np = gen_hr.cpu().detach().numpy()
        psnr_value = PSNR(imgs_hr_np, gen_hr_np, data_range=1.0)
        epoch_psnr += psnr_value

        tqdm_bar.set_postfix(gen_loss=epoch_gen_loss/(batch_idx+1), disc_loss=epoch_disc_loss/(batch_idx+1), PSNR=f"{epoch_psnr/(batch_idx+1):.4f}")

    # Append epoch metrics to the lists
    train_gen_losses.append(epoch_gen_loss / len(train_dataloader))
    train_disc_losses.append(epoch_disc_loss / len(train_dataloader))
    train_psnr_values.append(epoch_psnr / len(train_dataloader))

    ### Testing (Validation)
    epoch_gen_loss, epoch_disc_loss = 0, 0
    epoch_psnr = 0
    tqdm_bar = tqdm(test_dataloader, desc=f'Testing Epoch {epoch} ', total=len(test_dataloader))

    for batch_idx, imgs in enumerate(tqdm_bar):
        generator.eval()
        discriminator.eval()

        # Configure model input
        imgs_lr = Variable(imgs["lr"].type(Tensor))
        imgs_hr = Variable(imgs["hr"].type(Tensor))

        # Adversarial ground truths
        valid = Variable(Tensor(np.ones((imgs_lr.size(0), *discriminator.output_shape))), requires_grad=False)
        fake = Variable(Tensor(np.zeros((imgs_lr.size(0), *discriminator.output_shape))), requires_grad=False)

        ### Eval Generator
        # Generate a high resolution image from low resolution input
        gen_hr = generator(imgs_lr)

        # Adversarial loss
        loss_GAN = criterion_GAN(discriminator(gen_hr), valid)
        # Content loss
        gen_features = feature_extractor(gen_hr)
        real_features = feature_extractor(imgs_hr)
        loss_content = criterion_content(gen_features, real_features.detach())
        # Total loss
        loss_G = loss_content + 1e-3 * loss_GAN

        ### Eval Discriminator
        # Loss of real and fake images
        loss_real = criterion_GAN(discriminator(imgs_hr), valid)
        loss_fake = criterion_GAN(discriminator(gen_hr.detach()), fake)
        # Total loss
        loss_D = (loss_real + loss_fake) / 2

        # Accumulate losses
        epoch_gen_loss += loss_G.item()
        epoch_disc_loss += loss_D.item()

        # Calculate PSNR
        imgs_hr_np = imgs_hr.cpu().detach().numpy()
        gen_hr_np = gen_hr.cpu().detach().numpy()
        psnr_value = PSNR(imgs_hr_np, gen_hr_np, data_range=1.0)
        epoch_psnr += psnr_value

        tqdm_bar.set_postfix(gen_loss=epoch_gen_loss/(batch_idx+1), disc_loss=epoch_disc_loss/(batch_idx+1), PSNR=f"{epoch_psnr/(batch_idx+1):.4f}")

    # Append epoch metrics to the lists
    test_gen_losses.append(epoch_gen_loss / len(test_dataloader))
    test_disc_losses.append(epoch_disc_loss / len(test_dataloader))
    test_psnr_values.append(epoch_psnr / len(test_dataloader))

    # Save model checkpoints based on validation loss
    if np.argmin(test_gen_losses) == len(test_gen_losses) - 1:
        torch.save(generator.state_dict(), "saved_models/generator.pth")
        torch.save(discriminator.state_dict(), "saved_models/discriminator.pth")


Training Epoch 0 :  29%|██▉       | 23/80 [49:05<1:59:58, 126.29s/it, PSNR=10.8797, disc_loss=0.212, gen_loss=0.387]

In [None]:
import torch
from torch.autograd import Variable
from torchvision.utils import save_image
import matplotlib.pyplot as plt


generator = GeneratorVGG()
generator.load_state_dict(torch.load('saved_models/generator.pth'))
generator.eval()



fig, axs = plt.subplots(4, 2, figsize=(10, 20))

for idx in range(4):
    imgs = next(iter(test_dataloader))

    imgs_lr = Variable(imgs["lr"].type(torch.FloatTensor))

    with torch.no_grad():
        gen_hr = generator(imgs_lr)

    original_hr = imgs["hr"][0].permute(1, 2, 0).numpy()
    generated_hr = gen_hr[0].cpu().permute(1, 2, 0).numpy()

    # Plot original HR image
    axs[idx, 0].imshow(original_hr)
    axs[idx, 0].set_title('Original HR')
    axs[idx, 0].axis('off')

    # Plot generated HR image
    axs[idx, 1].imshow(generated_hr)
    axs[idx, 1].set_title('Generated HR')
    axs[idx, 1].axis('off')

plt.tight_layout()
plt.show()