In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from google.colab.patches import cv2_imshow
from torchvision.utils import save_image
import numpy as np
import cv2  
import random

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
class Flatten(nn.Module):
    def forward(self, inputs):
        return inputs.view(inputs.size(0), -1)


class UnFlatten(nn.Module):
    def forward(self, inputs, size=512):
        return inputs.view(inputs.size(0), 128, 4, 4)


class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()

        self.model = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=5, stride=2, padding=2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(128, 64, kernel_size=5, stride=2, padding=2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 32, kernel_size=5, stride=2, padding=2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(32, 16, kernel_size=5, stride=2, padding=2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(16, 4, kernel_size=5, stride=2, padding=2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Sigmoid(),
        )

    def forward(self, img):
        validity = self.model(img)
        return (validity)

In [4]:
class ResBlock(nn.Module):
    def __init__(self, n_ch) -> None:
        super().__init__()

        self.resblock_model = nn.Sequential(
            nn.Conv2d(n_ch, n_ch, kernel_size=3, bias=False, padding=1),
            nn.BatchNorm2d(n_ch),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Conv2d(n_ch, n_ch, kernel_size=3, bias=False, padding=1),
            nn.BatchNorm2d(n_ch)
        )

    def forward(self, inputs):
        return self.resblock_model(inputs) + inputs

In [5]:
class Downscale(nn.Module):
    def __init__(self, in_ch, out_ch, kernel_size=3, padding=1):
        super().__init__()
        self.in_ch = in_ch
        self.out_ch = out_ch
        self.kernel_size = kernel_size
        self.conv = nn.Conv2d(self.in_ch, self.out_ch, kernel_size=self.kernel_size, stride=2, padding=padding)
        self.batch_norm = nn.BatchNorm2d(self.out_ch)
        self.relu = nn.LeakyReLU(0.1)
        self.drop = nn.Dropout2d()

    def forward(self, x):
        x = self.conv(x)
        x = self.batch_norm(x)
        x = self.relu(x)
        x = self.drop(x)
        return x

In [6]:
class Upscale(nn.Module):
    def __init__(self, in_ch, out_ch, kernel_size=5, padding=2):
        super().__init__()
        self.conv = nn.ConvTranspose2d(in_ch, out_ch, kernel_size, stride=2, padding=1)
        self.batch_norm = nn.BatchNorm2d(out_ch)
        self.relu = nn.LeakyReLU(0.1)
        self.drop = nn.Dropout2d()

    def forward(self, x):
        x = self.conv(x)
        x = self.batch_norm(x)
        x = self.relu(x)
        x = self.drop(x)
        return x

In [7]:
class AutoEncoder(nn.Module):

    def __init__(self, image_channels=3, h_dim=2048, z_dim=128):
        super(AutoEncoder, self).__init__()
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.encoder = nn.Sequential(
            Downscale(image_channels, 64),
            Downscale(64, 128),
            # Downscale(128, 128),
            # ResBlock(128),
            Downscale(128, 256),
            # ResBlock(256),
            Downscale(256, 256),
            Downscale(256, 512),
            # ResBlock(512),
            Downscale(512, 512),
            Flatten(),
        )
        # ([32, 2304])

        self.inter_layer = nn.Sequential(
            nn.Linear(h_dim, z_dim),
            nn.Linear(z_dim, z_dim),
            nn.Linear(z_dim, h_dim),
        )

        self.decoder = nn.Sequential(
            UnFlatten(),
            # Upscale(128, 128, kernel_size=4),
            Upscale(128, 256, kernel_size=4),
            # ResBlock(256),
            # ResBlock(128),
            # ResBlock(128),
            Upscale(256, 256, kernel_size=4),
            Upscale(256, 128, kernel_size=4),
            # ResBlock(128),
            Upscale(128, 64, kernel_size=4),
            ResBlock(64),
            Upscale(64, 32, kernel_size=4),
            Upscale(32, 32, kernel_size=4),
            nn.Conv2d(32, image_channels, kernel_size=1, stride=2),
            nn.Sigmoid(),
        )

        self.decoder_b = nn.Sequential(
            UnFlatten(),
            # Upscale(128, 128, kernel_size=4),
            Upscale(128, 256, kernel_size=4),
            # ResBlock(256),
            # ResBlock(128),
            # ResBlock(128),
            Upscale(256, 256, kernel_size=4),
            Upscale(256, 128, kernel_size=4),
            # ResBlock(128),
            Upscale(128, 64, kernel_size=4),
            ResBlock(64),
            Upscale(64, 32, kernel_size=4),
            Upscale(32, 32, kernel_size=4),
            nn.Conv2d(32, image_channels, kernel_size=1, stride=2),
            nn.Sigmoid(),
        )

    def forward(self, x, version='a'):
        z = self.encoder(x)
        z = self.inter_layer(z)
        if version == 'a':
            z = self.decoder(z)
        else:
            z = self.decoder_b(z)
        return z

In [8]:
import os
from math import exp
import torch.nn.functional as F
from torch.autograd import Variable

In [9]:

def gaussian(window_size, sigma):
    gauss = torch.Tensor([exp(-(x - window_size // 2) ** 2 / float(2 * sigma ** 2)) for x in range(window_size)])
    return gauss / gauss.sum()


def create_window(window_size, channel):
    _1D_window = gaussian(window_size, 1.5).unsqueeze(1)
    _2D_window = _1D_window.mm(_1D_window.t()).float().unsqueeze(0).unsqueeze(0)
    window = Variable(_2D_window.expand(channel, 1, window_size, window_size).contiguous())
    return window


def _ssim(img1, img2, window, window_size, channel, size_average=True):
    mu1 = F.conv2d(img1, window, padding=window_size // 2, groups=channel)
    mu2 = F.conv2d(img2, window, padding=window_size // 2, groups=channel)

    mu1_sq = mu1.pow(2)
    mu2_sq = mu2.pow(2)
    mu1_mu2 = mu1 * mu2

    sigma1_sq = F.conv2d(img1 * img1, window, padding=window_size // 2, groups=channel) - mu1_sq
    sigma2_sq = F.conv2d(img2 * img2, window, padding=window_size // 2, groups=channel) - mu2_sq
    sigma12 = F.conv2d(img1 * img2, window, padding=window_size // 2, groups=channel) - mu1_mu2

    C1 = 0.01 ** 2
    C2 = 0.03 ** 2

    ssim_map = ((2 * mu1_mu2 + C1) * (2 * sigma12 + C2)) / ((mu1_sq + mu2_sq + C1) * (sigma1_sq + sigma2_sq + C2))

    if size_average:
        return ssim_map.mean()
    else:
        return ssim_map.mean(1).mean(1).mean(1)

# luminance , contrast , structure
class SSIM(torch.nn.Module):
    def __init__(self, window_size=11, size_average=True):
        super(SSIM, self).__init__()
        self.window_size = window_size
        self.size_average = size_average
        self.channel = 1
        self.window = create_window(window_size, self.channel)

    def forward(self, img1, img2):
        (_, channel, _, _) = img1.size()

        if channel == self.channel and self.window.data.type() == img1.data.type():
            window = self.window
        else:
            window = create_window(self.window_size, channel)

            if img1.is_cuda:
                window = window.cuda(img1.get_device())
            window = window.type_as(img1)

            self.window = window
            self.channel = channel

        return _ssim(img1, img2, window, self.window_size, channel, self.size_average)


def ssim(img1, img2, window_size=11, size_average=True):
    (_, channel, _, _) = img1.size()
    window = create_window(window_size, channel)

    if img1.is_cuda:
        window = window.cuda(img1.get_device())
    window = window.type_as(img1)

    return _ssim(img1, img2, window, window_size, channel, size_average)

In [10]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print('loading data...')
dataset_a = datasets.ImageFolder(root="/content/drive/MyDrive/Minor_Project/Dataset_2/A" ,transform=transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),])
)

dataset_b = datasets.ImageFolder(root="/content/drive/MyDrive/Minor_Project/Dataset_2/B", transform=transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),])
)

dataloader_a = torch.utils.data.DataLoader(dataset_a, batch_size=len(dataset_a), shuffle=True)
dataloader_b = torch.utils.data.DataLoader(dataset_b, batch_size=len(dataset_b), shuffle=True)


train_dataset_array_a = next(iter(dataloader_a))[0].numpy()
train_dataset_array_b = next(iter(dataloader_b))[0].numpy()

np.save('a.npy', train_dataset_array_a)
np.save('b.npy', train_dataset_array_b)

save = True

loading data...


In [11]:
class Iterator:
    def __init__(self, dataset, batch_size=32):
        self.datset = dataset
        self.max = len(dataset)
        self.batch_size = batch_size
        self.idx = 0

    def __iter__(self):
        self.idx = 0
        return self

    def __next__(self):
        if self.idx + + self.batch_size >= self.max - 1:
            np.random.shuffle(self.datset)
            self.idx = 0
        self.idx += self.batch_size
        return self.datset[self.idx:self.idx + self.batch_size]

In [12]:
# 1. Create Model directory 
from pathlib import Path # for writing file paths
MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents = True, exist_ok = True) # if the directory already exists it won't throw a error after exist_ok = True
MODEL_PATH_2 = Path("models/final")
MODEL_PATH_2.mkdir(parents = True, exist_ok = True)

# 2. Create Model Save Path
model_name = "model.pth" # common convention is to save models using either .pt or .pth file extension
final = "final.pth"
MODEL_SAVE_PATH = MODEL_PATH / model_name # I guess / is overloaded to return a posix path when used with Path() object
MODEL_SAVE_PATH_2 = MODEL_PATH_2 / final
MODEL_SAVE_PATH

PosixPath('models/model.pth')

In [13]:
itera = iter(Iterator(train_dataset_array_a, 16))
iterb = iter(Iterator(train_dataset_array_b, 16))

model = AutoEncoder(image_channels=3).to(device)
model.load_state_dict(torch.load('/content/drive/MyDrive/Minor_Project/saved_models/final.pth'))

discriminator = Discriminator().to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
optimizer_b = torch.optim.Adam(model.parameters(), lr=1e-3)

mse = nn.L1Loss()
ssim_loss = SSIM()


def dis_loss(prob_real_is_real, prob_fake_is_real):
    EPS = 1e-12
    return torch.mean(-(torch.log(prob_real_is_real + EPS) + torch.log(1 - prob_fake_is_real + EPS)))


def gen_loss(original, recon_structed, validity):
    ssim_l = -ssim_loss(recon_structed, original)
    if validity.all():
        gen_loss_GAN = torch.mean(-torch.log(validity + 1e-12))
        # gen_loss_L1 = torch.mean(torch.abs(original - recon_structed))
        return 5 * ssim_l + gen_loss_GAN
    else:
        return ssim_l


def train_step(images, version='a'):
    _decoder_image = model(images, version=version)

    #if args.discriminator:
    with torch.no_grad():
       validity = discriminator(_decoder_image)

    _loss = gen_loss(_decoder_image, images, validity)
    #else:
        #_loss = gen_loss(_decoder_image, images)
    
    optimizer.zero_grad()
    _loss.backward(retain_graph=True)
    optimizer.step()
    # if args.discriminator:
    validity = discriminator(_decoder_image.detach())
    real_dis = discriminator(images)

    d_loss = dis_loss(real_dis, validity)
    optimizer_b.zero_grad()
    d_loss.backward(retain_graph=True)
    optimizer_b.step()
    return _loss


In [14]:
torch.cuda.empty_cache()

In [15]:
print('training for {} steps'.format(10000))
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

for epoch in range(10000):
    # for idx, (images, _) in enumerate(dataloader):
    a = next(itera)
    b = next(iterb)
    images_a = torch.tensor(a, device=device).float()
    images_a = images_a.to(device)

    images_b = torch.tensor(b, device=device).float()
    images_b = images_b.to(device)

    loss_a = train_step(images_a, version='a')
    loss_b = train_step(images_b, version='b')
    # loss_c = train_step(images_b, version='a')
    # loss_d = train_step(images_a, version='b') 

    to_print = "Epoch[{}/{}] Loss A:{}, Loss B:{} , Loss C:{} , Loss D:{}".format(epoch+1, 10000, loss_a.data, loss_b.data,loss_c.data,loss_d.data)
    if epoch % 100 == 0:
        print(to_print)
        model_state_dict = model.state_dict()
        model_name = "model_"+str(epoch)
        torch.save(obj = model.state_dict(),f = MODEL_SAVE_PATH)
if save:
    model_state_dict = model.state_dict()
    torch.save(obj = model.state_dict(),f = MODEL_SAVE_PATH_2)
else:
    model.load_state_dict(torch.load(torch.load(f = MODEL_SAVE_PATH_2)))

training for 10000 steps


NameError: ignored

In [17]:
from google.colab import files
# files.download("/content/drive/MyDrive/Minor_Project/saved_models/final.pth")

In [18]:
import os
from PIL import Image

In [19]:
def transfer(model, x, version):
    x = torch.from_numpy(x).unsqueeze(0)
    x = x.to('cuda')
    model.eval()
    if version == 'a':
        out = model(x, version='a')
        return torch.cat([x, out])
    elif version == 'b':
        out = model(x, version='b')
        return torch.cat([x, out])


def write_images(model, image_a, image_b, dir_name):
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)

    compare_x = transfer(model, image_a, 'a')
    save_image(compare_x.data.cpu(), '{}/sample_image_a.png'.format(dir_name))

    compare_x = transfer(model, image_b, 'b')
    save_image(compare_x.data.cpu(), '{}/sample_image_b.png'.format(dir_name))

    compare_x = transfer(model, image_b, 'a')
    save_image(compare_x.data.cpu(), '{}/sample_image_b_to_a.png'.format(dir_name))

    compare_x = transfer(model, image_a, 'b')
    save_image(compare_x.data.cpu(), '{}/sample_image_a_to_b.png'.format(dir_name))

In [23]:
train_dataset_array_a = np.load("/content/a.npy")
train_dataset_array_b = np.load("/content/b.npy")

x_a = train_dataset_array_a[random.randint(1, 10)]
x_b = train_dataset_array_b[random.randint(1, 10)]

x_a = train_dataset_array_a[random.randint(1, 10)]
x_b = train_dataset_array_b[random.randint(1, 10)]

write_images(model, x_a, x_b, "/content/final_images")