In [1]:
import os
import time
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
from torch.nn import init
from torch.optim import lr_scheduler
import functools
import numpy as np
import cv2 as cv
from PIL import Image

In [2]:
def make_dataset(directory):
    
    images = []
    assert os.path.isdir(directory), '%s is not a valid directory' % directory

    for root, _, fnames in sorted(os.walk(directory)):
        for fname in fnames:
            path = os.path.join(root, fname)
            images.append(path)
    return images

In [3]:
def __make_power_2(img, base, method=Image.BICUBIC):
    
    ow, oh = img.size
    h = int(round(oh / base) * base)
    w = int(round(ow / base) * base)
    
    if h == oh and w == ow:
        return img

    __print_size_warning(ow, oh, w, h)
    
    return img.resize((w, h), method)

In [4]:
def __print_size_warning(ow, oh, w, h):
    
    if not hasattr(__print_size_warning, 'has_printed'):
        
        print("The image size needs to be a multiple of 4. "
              "The loaded image size was (%d, %d), so it was adjusted to "
              "(%d, %d). This adjustment will be done to all images "
              "whose sizes are not multiples of 4" % (ow, oh, w, h))
        
        __print_size_warning.has_printed = True

In [5]:
def get_transform(method):
    
    transform_list = []
    transform_list.append(transforms.Lambda(lambda img: __make_power_2(img, base=4, method=method)))

    transform_list += [transforms.ToTensor()]
    transform_list += [transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
            
    return transforms.Compose(transform_list)

In [6]:
def init_net(net, gpu_ids, init_type='normal', init_gain=0.02):

#     if len(gpu_ids) > 0:
    if gpu_ids[0] != -1:
        
        assert(torch.cuda.is_available())
        net.to(gpu_ids[0])
        net = torch.nn.DataParallel(net, gpu_ids)  # multi-GPUs
    
    def init_func(m): 
        
        classname = m.__class__.__name__
        
        if hasattr(m, 'weight') and (classname.find('Conv') != -1 or classname.find('Linear') != -1):
            
            init.normal_(m.weight.data, 0.0, init_gain)
            
            if hasattr(m, 'bias') and m.bias is not None:
                
                init.constant_(m.bias.data, 0.0)
                
        elif classname.find('BatchNorm2d') != -1: 
            init.normal_(m.weight.data, 1.0, init_gain)
            init.constant_(m.bias.data, 0.0)

    net.apply(init_func)
    
    return net

In [7]:
def get_scheduler(optimizer, epoch_count, n_epochs, n_epochs_decay):

    def lambda_rule(epoch):
        lr_l = 1.0 - max(0, epoch + epoch_count - n_epochs) / float(n_epochs_decay + 1)
        return lr_l
    scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda_rule)
    
    return scheduler

In [8]:
def set_requires_grad(nets, requires_grad=False):
    
        if not isinstance(nets, list):
            nets = [nets]
            
        for net in nets:
            
            if net is not None:
                
                for param in net.parameters():
                    
                    param.requires_grad = requires_grad

In [9]:
class ResnetBlock(nn.Module):

    def __init__(self, dim, padding_type, norm_layer, use_dropout, use_bias):

        super(ResnetBlock, self).__init__()
        self.conv_block = self.build_conv_block(dim, padding_type, norm_layer, use_dropout, use_bias)

    def build_conv_block(self, dim, padding_type, norm_layer, use_dropout, use_bias):
       
        conv_block = []
        p = 0
        
        conv_block += [nn.ReflectionPad2d(1)]

        conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias), norm_layer(dim), nn.ReLU(True)]
        
        if use_dropout:
            conv_block += [nn.Dropout(0.5)]

        p = 0
       
        conv_block += [nn.ReflectionPad2d(1)]

        conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias), norm_layer(dim)]

        return nn.Sequential(*conv_block)
    
    def forward(self, x):

        out = x + self.conv_block(x)  # add skip connections
        
        return out

In [10]:
class ResnetGenerator(nn.Module):

    def __init__(self, input_nc, output_nc, ngf=64, norm_layer=nn.BatchNorm2d, use_dropout=True, n_blocks=9, padding_type='reflect'):

        assert(n_blocks >= 0)
        super(ResnetGenerator, self).__init__()
        
        use_bias=False

        model = [nn.ReflectionPad2d(3),
                 nn.Conv2d(input_nc, ngf, kernel_size=7, padding=0, bias=use_bias),
                 norm_layer(ngf),
                 nn.ReLU(True)]

        n_downsampling = 2
        for i in range(n_downsampling):  # add downsampling layers
            mult = 2 ** i
            model += [nn.Conv2d(ngf * mult, ngf * mult * 2, kernel_size=3, stride=2, padding=1, bias=use_bias),
                      norm_layer(ngf * mult * 2),
                      nn.ReLU(True)]

        mult = 2 ** n_downsampling
        for i in range(n_blocks):       # add ResNet blocks

            model += [ResnetBlock(ngf * mult, padding_type=padding_type, norm_layer=norm_layer, use_dropout=use_dropout, use_bias=use_bias)]

        for i in range(n_downsampling):  # add upsampling layers
            mult = 2 ** (n_downsampling - i)
            model += [nn.ConvTranspose2d(ngf * mult, int(ngf * mult / 2),
                                         kernel_size=3, stride=2,
                                         padding=1, output_padding=1,
                                         bias=use_bias),
                      norm_layer(int(ngf * mult / 2)),
                      nn.ReLU(True)]
        model += [nn.ReflectionPad2d(3)]
        model += [nn.Conv2d(ngf, output_nc, kernel_size=7, padding=0)]
        model += [nn.Tanh()]

        self.model = nn.Sequential(*model)

    def forward(self, input):

        return self.model(input)

In [11]:
class NLayerDiscriminator(nn.Module):

    def __init__(self, input_nc, ndf=64, n_layers=3, norm_layer=nn.BatchNorm2d):

        super(NLayerDiscriminator, self).__init__()
        
        use_bias=False

        kw = 4
        padw = 1
        sequence = [nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), nn.LeakyReLU(0.2, True)]
        nf_mult = 1
        nf_mult_prev = 1
        for n in range(1, n_layers):  # gradually increase the number of filters
            nf_mult_prev = nf_mult
            nf_mult = min(2 ** n, 8)
            sequence += [
                nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=2, padding=padw, bias=use_bias),
                norm_layer(ndf * nf_mult),
                nn.LeakyReLU(0.2, True)
            ]

        nf_mult_prev = nf_mult
        nf_mult = min(2 ** n_layers, 8)
        sequence += [
            nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=1, padding=padw, bias=use_bias),
            norm_layer(ndf * nf_mult),
            nn.LeakyReLU(0.2, True)
        ]

        sequence += [nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw)]  # output 1 channel prediction map
        self.model = nn.Sequential(*sequence)

    def forward(self, input):
      
        return self.model(input)

In [12]:
class GANLoss(nn.Module):
   
    def __init__(self, gan_mode='lsgan', target_real_label=1.0, target_fake_label=0.0):
 
        super(GANLoss, self).__init__()
    
        self.register_buffer('real_label', torch.tensor(target_real_label))
        self.register_buffer('fake_label', torch.tensor(target_fake_label))
        
        self.gan_mode = gan_mode

        self.loss = nn.MSELoss()

    def get_target_tensor(self, prediction, target_is_real):

        if target_is_real:
            target_tensor = self.real_label
        else:
            target_tensor = self.fake_label
        return target_tensor.expand_as(prediction)

    def __call__(self, prediction, target_is_real):
       
        target_tensor = self.get_target_tensor(prediction, target_is_real)
        loss = self.loss(prediction, target_tensor)
        
        return loss

In [13]:
class AlignedDataset(torch.utils.data.Dataset):

    def __init__(self, dataroot, phase):
       
        self.dataroot = dataroot
        self.phase = phase
        self.dir_AB = os.path.join(dataroot, phase)  # get the image directory
        self.AB_paths = sorted(make_dataset(self.dir_AB))  # get image paths
                               
    def __getitem__(self, index):
        
        # read a image given a random integer index
        AB_path = self.AB_paths[index]
        AB = Image.open(AB_path).convert('RGB')
                               
        # split AB image into A and B AFTER HAVING BEEN PROCESSED FROM THE SCRIPT "combine_A_and_B.py"
        w, h = AB.size
        w2 = int(w / 2)
        A = AB.crop((0, 0, w2, h))
        B = AB.crop((w2, 0, w, h))

        # apply the same transform to both A and B
        A_transform = get_transform(Image.BICUBIC)
        B_transform = get_transform(Image.BICUBIC)

        A = A_transform(A)
        B = B_transform(B)

        return {'A': A, 'B': B, 'A_paths': AB_path, 'B_paths': AB_path}

    def __len__(self):
       
        return len(self.AB_paths)

In [14]:
class CustomDatasetDataLoader():

    def __init__(self, dataroot, phase):
        
        self.dataroot = dataroot
        self.phase = phase
        self.dataset = AlignedDataset(dataroot, phase)
        
        self.dataloader = torch.utils.data.DataLoader(
            self.dataset,
            batch_size=1,
            shuffle=False,
            num_workers=0)

    def load_data(self):
        return self

    def __len__(self):
        
        return len(self.dataset)

    def __iter__(self):
        
        for i, data in enumerate(self.dataloader):
            yield data

In [15]:
class Pix2PixModel:

    def __init__(self, gpu_ids):
        
        self.gpu_ids = gpu_ids
        self.device = torch.device('cuda:{}'.format(gpu_ids[0])) if gpu_ids[0]!=-1 else torch.device('cpu')

        self.optimizers = []
        
        self.netG = ResnetGenerator(input_nc=3, output_nc=3, ngf=64, norm_layer=nn.BatchNorm2d, use_dropout=True, n_blocks=9)
        init_net(self.netG, gpu_ids, 'normal', 0.02)
        
        self.netD = NLayerDiscriminator(input_nc=6, ndf=64, n_layers=3, norm_layer=nn.BatchNorm2d)
        init_net(self.netD, gpu_ids, 'normal', 0.02)

        self.criterionGAN = GANLoss(gan_mode='lsgan').to(self.device)
        self.criterionL1 = torch.nn.L1Loss()

        self.optimizer_G = torch.optim.Adam(self.netG.parameters(), lr=0.0002, betas=(0.5, 0.999))
        self.optimizer_D = torch.optim.Adam(self.netD.parameters(), lr=0.0002, betas=(0.5, 0.999))
        self.optimizers.append(self.optimizer_G)
        self.optimizers.append(self.optimizer_D)

    def set_input(self, input):
        
        self.input = input
        
        self.real_A = input['A'].to(self.device)
#         self.real_A = torch.unsqueeze(self.real_A, dim=0)

        self.real_B = input['B'].to(self.device)
#         self.real_B = torch.unsqueeze(self.real_B, dim=0)

        self.image_paths = input['A_paths']

    def forward(self):
  
        self.fake_B = self.netG(self.real_A)

    def backward_D(self):
        
        fake_AB = torch.cat((self.real_A, self.fake_B), 1) 
        pred_fake = self.netD(fake_AB.detach())
        self.loss_D_fake = self.criterionGAN(pred_fake, False)
        
        real_AB = torch.cat((self.real_A, self.real_B), 1)
        pred_real = self.netD(real_AB)
        
        self.loss_D_real = self.criterionGAN(pred_real, True)
        
        self.loss_D = (self.loss_D_fake + self.loss_D_real) * 0.5
        self.loss_D.backward()
        
        return (self.loss_D_real, self.loss_D_fake)

    def backward_G(self):

        fake_AB = torch.cat((self.real_A, self.fake_B), 1)
        pred_fake = self.netD(fake_AB)
        self.loss_G_GAN = self.criterionGAN(pred_fake, True)
        
        self.loss_G_L1 = self.criterionL1(self.fake_B, self.real_B) * 100.0
        
        self.loss_G = self.loss_G_GAN + self.loss_G_L1
        self.loss_G.backward()
        
        return (self.loss_G_GAN, self.loss_G_L1)

    def optimize_parameters(self):
        
        self.forward()              
        
        set_requires_grad(nets=self.netD, requires_grad=True)       
        self.optimizer_D.zero_grad()     
        rf_losses = self.backward_D()              
        self.optimizer_D.step()    
        
        set_requires_grad(nets=self.netD, requires_grad=False)  
        self.optimizer_G.zero_grad()        
        gl_losses = self.backward_G()                 
        self.optimizer_G.step() 
        
        return (rf_losses, gl_losses, self.optimizer_G, self.optimizer_D)

In [16]:
# Defining dataset object
# dataset = AlignedDataset('C:/Users/muhammad.ispahani/Desktop/Train_Dir', 'train')

# # Defining the Dataloader
# data_loader = torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=False, num_workers=0)

In [17]:
data_loader = CustomDatasetDataLoader('D:/Data Science Projects/CNIC OCR/GANs/pytorch-CycleGAN-and-pix2pix-master/datasets/cnictopix', 'train')
dataset = data_loader.load_data()

In [18]:
# Initialization of Model

gpu_ids = [-1]
pix2pix = Pix2PixModel(gpu_ids)

In [19]:
epoch_count = 1
n_epochs = 1
n_epochs_decay = 1

schedulers = [get_scheduler(optimizer, epoch_count, n_epochs, n_epochs_decay) for optimizer in pix2pix.optimizers]

In [21]:
total_iters = 0

for epoch in range(epoch_count, n_epochs + n_epochs_decay + 1):  
    
    epoch_start_time = time.time()
 
    epoch_iter = 0
    
    print("Epoch Number on Training: ", epoch, '\n')

    old_lr = pix2pix.optimizers[0].param_groups[0]['lr']
    
    for scheduler in schedulers:
        scheduler.step()
        
    lr = pix2pix.optimizers[0].param_groups[0]['lr']
    
    for i, data in enumerate(dataset):  

        total_iters += 1
        epoch_iter += 1
        
        pix2pix.set_input(data) 
        
        (rf_losses, gl_losses, opt_G, opt_D) = pix2pix.optimize_parameters() 
        
#         if total_iters % 10 == 0: 
        
        losses = [float(gl_losses[0].item()), float(gl_losses[1].item()), float(rf_losses[0].item()), float(rf_losses[1].item())]

        print('Losses for Epoch number ', epoch, ' for iteration ', i+1 , ' are G_GAN:', losses[0], ', G_L1:', losses[1],\
              ', D_real:', losses[2], ', D_fake:', losses[3], '\n')
            
    # Saving Model at each epoch
    
    torch.save({'epoch': epoch,
                'model_G': pix2pix.netG.state_dict(),
                'model_D': pix2pix.netD.state_dict(),
                'optimizer_G_state_dict': opt_G.state_dict(),
                'optimizer_D_state_dict': opt_D.state_dict(),
                'loss_G_GAN': float(gl_losses[0].item()),
                'loss_G_L1': float(gl_losses[1].item()),
                'loss_D_real': float(rf_losses[0].item()),
                'loss_D_fake': float(rf_losses[0].item())}, 'model.pth')
#     torch.save({'epoch': epoch,
#                 'model': pix2pix.netG.state_dict(),
#                 'optimizer': opt_G.state_dict()}, 'model.pth')
    
print('Total time taken for training data on ', epoch, 's is: ', time.time() - epoch_start_time)

Epoch Number on Training:  1 

Losses for Epoch number  1  for iteration  1  are G_GAN: 3.2179808616638184 , G_L1: 64.8541259765625 , D_real: 1.1071852445602417 , D_fake: 1.2025845050811768 

Losses for Epoch number  1  for iteration  2  are G_GAN: 18.001569747924805 , G_L1: 62.300289154052734 , D_real: 3.369034767150879 , D_fake: 3.060173749923706 

Losses for Epoch number  1  for iteration  3  are G_GAN: 66.40013122558594 , G_L1: 54.997802734375 , D_real: 15.453411102294922 , D_fake: 17.869972229003906 

Epoch Number on Training:  2 

Losses for Epoch number  2  for iteration  1  are G_GAN: 245.7174072265625 , G_L1: 89.74800109863281 , D_real: 55.386600494384766 , D_fake: 69.54033660888672 

Losses for Epoch number  2  for iteration  2  are G_GAN: 411.45989990234375 , G_L1: 88.44414520263672 , D_real: 250.62472534179688 , D_fake: 265.6634521484375 

Losses for Epoch number  2  for iteration  3  are G_GAN: 512.1514282226562 , G_L1: 93.44236755371094 , D_real: 380.9671936035156 , D_fak