In [1]:
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [10]:
import shutil 
from argparse import ArgumentParser
from sklearn.model_selection import train_test_split
import glob
import random

from torch.utils.data import Dataset , DataLoader
from PIL import Image
import torchvision.transforms as tranforms
import torch.nn as nn
import torch.nn.functional as F


import time
import datetime
import sys

from torch.autograd import Variable
import torch
import numpy as np
import itertools
from tqdm import tqdm_notebook as tqdm
import torchvision.utils as vutils
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
from torchsummary import summary
from torchvision.utils import save_image
import pandas as pd
from IPython.display import Image as ImageDisplay

In [11]:
image_dir = r'C:\Users\EmincanY\Desktop\Pythorch\GAN\img_celeba\images'
attributes_file = r'C:\Users\EmincanY\Desktop\Pythorch\GAN\img_celeba\list_attr_celeba.csv'
output_dir = r'C:\Users\EmincanY\Desktop\Pythorch\GAN\img_celeba\preprocessed_dataset_celeba'

In [14]:
df = pd.read_csv(attributes_file)
df.head()

Unnamed: 0,image_id,5_o_Clock_Shadow,Arched_Eyebrows,Attractive,Bags_Under_Eyes,Bald,Bangs,Big_Lips,Big_Nose,Black_Hair,...,Sideburns,Smiling,Straight_Hair,Wavy_Hair,Wearing_Earrings,Wearing_Hat,Wearing_Lipstick,Wearing_Necklace,Wearing_Necktie,Young
0,000001.jpg,-1,1,1,-1,-1,-1,-1,-1,-1,...,-1,1,1,-1,1,-1,1,-1,-1,1
1,000002.jpg,-1,-1,-1,1,-1,-1,-1,1,-1,...,-1,1,-1,-1,-1,-1,-1,-1,-1,1
2,000003.jpg,-1,-1,-1,-1,-1,-1,1,-1,-1,...,-1,-1,-1,1,-1,-1,-1,-1,-1,1
3,000004.jpg,-1,-1,1,-1,-1,-1,-1,-1,-1,...,-1,-1,1,-1,1,-1,1,1,-1,1
4,000005.jpg,-1,1,1,-1,-1,-1,1,-1,-1,...,-1,-1,-1,-1,-1,-1,1,-1,-1,1


In [15]:
df_blackHairFemale = df.loc[df['Black_Hair' ] == 1 & (df['Male'] == -1)].sample(1000)
df_blondHairFemale = df.loc[df['Blond_Hair' ] == 1 & (df['Male'] == -1)].sample(1000)

In [16]:
domainA , domainB = [] , []

for index, row in df_blackHairFemale.iterrows():
    domainA.append(row['image_id'])

for index, row in df_blondHairFemale.iterrows():
    domainB.append(row['image_id'])

In [17]:
A_train , A_test , = train_test_split(domainA , test_size = 0.01 , random_state = 53)
B_train , B_test , = train_test_split(domainB , test_size = 0.01 , random_state = 53)

In [25]:
N = min(len(A_train) , len(B_train))
A_train = A_train[:N]
B_train = B_train[:N]

print(f'Images in A {len(A_train)} and B {len(B_train)}')

Images in A 990 and B 990


In [26]:
N = min(len(A_test) , len(B_test))
A_test = A_test[:N]
B_test = B_test[:N]

print(f'Images in A {len(A_test)} and B {len(B_test)}')

Images in A 10 and B 10


In [22]:
A_train_dir = os.path.join(output_dir , 'train/A')
B_train_dir = os.path.join(output_dir , 'train/B')

os.makedirs(A_train_dir , exist_ok = True) # Creating locations.
os.makedirs(B_train_dir , exist_ok = True)

for imageA , imageB in zip(A_train , B_train):
    shutil.copy(os.path.join(image_dir,imageA) , os.path.join(A_train_dir , imageA))
    shutil.copy(os.path.join(image_dir,imageB) , os.path.join(B_train_dir , imageB))

A_test_dir = os.path.join(output_dir,'test/A')
B_test_dir = os.path.join(output_dir,'test/B')

os.makedirs(A_test_dir , exist_ok = True)
os.makedirs(B_test_dir , exist_ok = True)

for imageA , imageB in zip(A_test , B_test):
    shutil.copy(os.path.join(image_dir, imageA) , os.path.join(A_test_dir , imageA))
    shutil.copy(os.path.join(image_dir, imageB) , os.path.join(B_test_dir , imageB))

In [27]:
class ImaageDataSet(Dataset):
    def __init__(self , root , transforms_ = None , unaligned = False , mode = 'train'):
        self.transform = tranforms.Compose(transforms_)
        self.unaligned = unaligned
        
        self.files_A = sorted(glob.glob(os.path.join(root , f'{mode}/A') + '/*.*')) # self.files_A = sorted(glob.glob(os.path.join(root, '%s/A' % mode) + '/*.*'))
        self.files_B = sorted(glob.glob(os.path.join(root , f'{mode}/B') + '/*.*'))
    
    def __getitem__(self, index) : 
        item_A = self.transform(Image.open(self.files_A[index % len(self.files_A)]))
        
        if self.unaligned :
            item_B = self.transform(Image.open(self.files_B[random.randing(0, len(self.files_B) - 1)]))
        else :
            item_B = self.transform(Image.open(self.files_B[index % len(self.files_B)]))
        return {'A' : item_A , 'B' : item_B}
    
    def __len__(self):
        return max(len(self.files_A) , len(self.files_B))

![](2022-11-17-19-21-04.png)

In [29]:
class ResidualBlock(nn.Module):
    def __init__(self , in_features) :
        super(ResidualBlock, self).__init__()
        
        conv_block = [ nn.ReflectionPad2d(1),
                      nn.Conv2d(in_features , in_features , 3),
                      nn.InstanceNorm2d(in_features),
                      nn.ReLU(inplace = True),
                      nn.ReflectionPad2d(i),
                      nn.Conv2d(in_features, in_features , 3),
                      nn.InstanceNorm2d(in_features)
                      ]
        
        self.conv_block = nn.Sequential(*conv_block)
        
        def forward(self, x) : 
            return x + self.conv_block(x)
        
class Generator(nn.Module):
    def __init__(self, input_nc , output_nc , n_residual_blocks = 9):
        super(Generator , self).__init__()
        
        # Initial convolution block
        model = [ nn.ReflectionPad2d(3),
                 nn.Conv2d(input_nc , 64 , 7),
                 nn.InstanceNorm2d(64),
                 nn.ReLU(inplace = True)
                 ]
        
        # Downsampling
        in_features = 64
        out_features = in_features*2
        for _ in range(2):
            model += [  nn.Conv2d(in_features , out_features , 3 , stride = 2 , padding = 1),
                        nn.InstanceNorm2d(out_features),
                        nn.ReLU(inplace=True)
                      ]
            in_features = out_features
            out_features = in_features*2
        
        # Residual Blocks
        for _ in range(n_residual_blocks):
            model += [ResidualBlock(in_features)]
            
        # Upsampling
        out_features = in_features//2
        for _ in range(2):
            model += [  nn.ConvTranspose2d(in_features , out_features , 3 , stride = 2 , padding = 1 , output_padding = 1),
                        nn.InstanceNorm2d(out_features),
                        nn.ReLU(inplace = True)  
                      ]
            in_features = out_features 
            out_features = in_features // 2
        
        # Output Layer
        model += [   nn.ReflectionPad2d(3),
                     nn.Conv2d(64 , output_nc , 7),
                     nn.Tanh()   
                  ]
        self.model = nn.Sequential(*model)
        
    def forward(self, x):
        return self.model(x)

![](2022-11-17-19-52-40.png)

In [31]:
class Discriminator(nn.Module):
    def __init__(self, input_nc):
        super(Discriminator , self).__init__()
        
        # A bunch of convolutions one after another
        model = [   nn.Conv2d(input_nc , 64 , 4 , stride = 2, padding = 1),
                    nn.LeakyReLU(0.2, inplace = True)]
        
        model += [  nn.Conv2d(64 , 128, 4 , stride = 2 , padding = 1) ,
                    nn.LeakyReLU(0.2 , inplace = True)]
        
        model += [  nn.Conv2d(128, 256 , 4 , stride = 2 , padding = 1),
                    nn.InstanceNorm2d(256),
                    nn.LeakyReLU(0.2,inplae = True)]
        
        model += [  nn.Conv2d(256, 512, 4 , padding = 1) , 
                    nn.InstanceNorm2d(512), 
                    nn.LeakyReLU(0.2 , inplace = True)]
        
        # FCN Classification layer
        model += [  nn.Conv2d(512, 1 , 4 , padding = 1)]
        
        self.model = nn.Sequential(*model)
        
    def forward(self , x):
        x = self.model(x)
        # Average pooling and flatten
        return F.avg_pool2d(x, x.size()[2:]).view(x.size()[0] , -1)

In [32]:
def tensor2image(tensor):
    image = 127.5 * (tensor[0].cpu().float().numpy() + 1.0)
    if image.shape[0] == 1 :
        image = np.tile(image , (3,1,1))
    return image.astype(np.uint8)

class ReplayBuffer():
    def __init__ (self,max_size = 50) : 
        assert (max_size > 0), 'Empty buffer or trying to create a black hole. Be careful'
        self.max_size = max_size
        self.data = []
        
    def push_and_pop(self, data):
        to_return = []
        for element in data.data:
            element = torch.unsqueeze(element,0)
            if len(self.data) < self.max_size :
                self.data.append(element)
                to_return.append(element)
            else :
                if random.uniform(0.1) > 0.5:
                    i = random.randint(0 , self.max_size - 1)
                    to_return.append(self.data[i].clone())
                    self.data[i] = element
                else : 
                    to_return.append(element)
        return Variable(torch.cat(to_return))
    
    class LambdaLR():
        def __init__(self, n_epochs , offset , decay_start_epoch):
            assert((n_epochs - decay_start_epoch) > 0), "Decay must start before the training session ends !"
            self.n_epochs = n_epochs
            self.offset = offset
            self.decay_start_epoch = decay_start_epoch
            
        def step(self, epoch):
            return 1.0 - max(0 , epoch + self.offset - self.decay_start_epoch) / (self.n_epochs - self.decay_start_epoch)
    
    def weights_init_normal(m):
        classname = m.__class__.__name__
        if classname.find('Conv') != -1:
            torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
        elif classname.find('BatchNorm2d') != -1:
            torch.nn.init.normal_(m.weight.data, 1.0, 0.02)
            torch.nn.init.constant(m.bias.data, 0.0)

In [34]:
# Training Parameters

epoch = 0
n_epochs = 50
batchSize = 1
dataroot = './img_celeba/preprocessed_dataset_celeba/' 
lr = 0.0002
decay_epoch = 3
size = 256
input_nc = 3
output_nc = 3
cuda = True
n_cpu = 8

In [None]:
# Done for now