In [1]:
# prerequisites
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.autograd import Variable
from torchvision.utils import save_image

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
import os
import numpy as np
import tensorflow as tf

In [3]:
from math import floor

In [4]:
use_bias = False
front_path = './images/fig1'
profile_path = '.images/fig2'
lambda_l1 = 0.001, #'weight of the loss for L1 texture loss') # 0.001
lambda_fea=100 #'weight of the loss for face model feature loss')
lambda_reg= 1e-5# 'weight of the loss for L2 regularitaion loss')
lambda_gan= 1# 'weight of the loss for gan loss')
lambda_gp=10# 'weight of the loss for gradient penalty on parameter of D')

# For training
dataset_size=  1000# 'dataset path')  # casia_aligned_250_250_jpg
profile_list=''# 'train profile list')
front_path=''#front data path')
front_list=''# 'train front list')
test_path=''# 'front data path')
is_train=True# 'train or test')
is_finetune= False# 'finetune') # False, True
face_mode='resnet50.npy'# 'face model path')
checkpoint='checkpoint/fnm'# 'checkpoint directory')
summary_dir= 'log/fnm'# 'logs directory')
checkpoint_ft='checkpoint/fnm/ck-09'#'finetune or test checkpoint path')
batch_size= 2# 'batch size')#was 16
BS = batch_size
epoc=10 # 'epoch')
critic= 1 #'number of D training times')
save_freq= 1000 # 'the frequency of saving model')
lr=1e-4# 'base learning rate')
beta1=0. # 'beta1 momentum term of adam')
beta2=0.9 # 'beta2 momentum term of adam')
stddev= 0.02 # 'stddev for W initializer')
use_bias=False # 'whether to use bias')
results='results/fnm' # 'path for saving results') #
############################
batch_size = 4
############################
#   environment setting    #
############################
device_id='3,4'# 'device id')
ori_height=224 # 'original height of profile images')
ori_width=224 # 'original width of profile images')
height= 224 #'height of images') # do not modified
width= 224 # 'width of images') # do not modified
CHANNEL=3 # 'channel of images')
num_threads=8 # 'number of threads of enqueueing examples')


# Resnet50 model trained on VGGFace2 dataset

In [5]:
from Resnet50_ft_dag import resnet50_ft_dag


# Batch normalisation

In [6]:
# class Batch_norm(nn.Module):
#     def __init__(self, in_channels, epsilon=1e-5, momentum = 0.9, is_train = False):
#         super(Batch_norm, self).__init__()
#         # self.mean  = mosv_dict['mean']
#         self.norm = nn.BatchNorm2d(in_channels, eps = epsilon,  momentum = momentum, track_running_stats = is_train)
#         #self.scale = mosv_dict['scale']
#         #self.variance = mosv_dict['variance']
#         #self.epsilon = 1e-5
#     def forward(self, x):
#         norm = self.norm(x)
#         return norm
            
    

In [7]:
def Batch_norm(in_channels, epsilon=1e-5, momentum = 0.9, is_train = False):
    return nn.BatchNorm2d(in_channels, eps = epsilon,  momentum = momentum, track_running_stats = is_train)

In [8]:
NORM = Batch_norm

# Options: conv2d, res_block 

In [9]:
def calc_conv_pad(input_size, output_size, filter_size = 3, stride = 2):
    return max(0, floor((stride * (output_size - 1) + filter_size - input_size)/2 + 0.5))

In [10]:
def calc_deconv_pad(input_size, output_size, filter_size = 3, stride = 2):
    return max(0, floor((stride * (input_size - 1) + filter_size - output_size)/2 + 0.5))

In [11]:
def conv2d( in_channels , out_channels , kernel_size = 3 , strides = 1  , 
           padding  =  None,  pad_input = None,
           bias = use_bias,dilation_rate = 1, activation = None):
    layers = []
    if padding == None:
        if pad_input is not None:
            padding = calc_conv_pad(pad_input , 2*pad_input, kernel_size , strides)
        else:
            padding = 0
    conv = nn.Conv2d( in_channels , out_channels , bias=bias, kernel_size = kernel_size, stride=strides, padding=padding, dilation=dilation_rate )
    layers.append( conv )
    if activation is not None:
        layers.append( activation )
    return nn.Sequential( *layers )

def deconv2d(in_channels , out_channels , kernel_size = 3 , strides = 1  , 
             padding  = None, pad_input = None,
             bias= use_bias, dilation_rate = 1,
            activation = None):
    if padding == None:
        if pad_input is not None:
            padding = calc_deconv_pad(pad_input , 2*pad_input, kernel_size , strides)
        else:
            padding = 0
    layers = [nn.ConvTranspose2d(in_channels, out_channels, kernel_size =  kernel_size, stride=strides, 
                      padding=padding, bias=bias)]
    if activation is not None:
        layers.append( activation )
    return nn.Sequential( *layers )
    

In [12]:
class res_block(nn.Module):
    def __init__(self,  in_channels , 
                 out_channels , kernel_size = 3, 
                 stride = 1  , padding  =  None, pad_input = None,
                 bias = use_bias,  norm = NORM, activation2 = nn.ReLU, activation = None,):
        super(res_block, self).__init__()
        if padding == None:
            if pad_input is not None:
                padding = calc_conv_pad(pad_input , pad_input, kernel_size , stride)
            else: 
                padding = 0
        self.out_channels = out_channels
        if activation is not None:
            self.activation = activation(out_channels)
        else:
            self.activation = activation
        self.activation2 = activation2(out_channels)
        convs = [conv2d(in_channels , out_channels , kernel_size , stride , padding, bias, activation = self.activation), 
             nn.BatchNorm2d( in_channels ), 
             nn.ReLU(out_channels), 
             conv2d(out_channels, out_channels, kernel_size, stride, padding, bias, activation = self.activation),
             norm(in_channels)]
        self.layers = nn.Sequential(*convs)
        #независимо от передаваемого параметра NORM норма всегда Batch_norm
    def forward(self, x):
        return self.activation2(self.layers(x) + x)

# Load data

In [13]:
import glob
import shutil
import os

src_dir = "your/source/dir"
dst_dir = "your/destination/dir"
profile_set = set()
profile_list = []
front_set = set()
front_list = []

def relocate_data(src_dir, src_dir_front, src_dir_profile):
    for jpgfile in glob.iglob(os.path.join(src_dir, "*.png")):
        tmp = jpgfile.split('\\')[-1].split('.')[0].split('_')
        if (len(tmp) > 6):
            if tmp[0] not in profile_set:
                shutil.copy(jpgfile, src_dir_profile)
                profile_list.append(jpgfile)
                profile_set.add(tmp[0])
        else:
            if tmp[0] not in front_set:
                shutil.copy(jpgfile, src_dir_front)
                front_list.append(jpgfile)
                front_set.add(tmp[0])

In [14]:
src_dir = "C:/Users/d.baranovska/Downloads/15"
src_dir_front = "./TPdata/train_data/front"
src_dir_profile = "./TPdata/train_data/profile"
relocate_data(src_dir, src_dir_front, src_dir_profile)

In [15]:
from torch.utils.data import Dataset
from PIL import Image

In [16]:
PIC_SIZE = (224, 224)
class TrainDataset(Dataset):
    def __init__(self, img_list):
        super(type(self), self).__init__()
        self.img_list = img_list
    def __len__(self):
        return len(self.img_list)
    def __getitem__(self, idx):
        #229_01_01_200_08_cropped_test.png
        img_name = self.img_list[idx]
        with Image.open(img_name) as i:
            im = i.resize(PIC_SIZE)
            our_tensor = transforms.ToTensor()(im)
        return our_tensor

# Generator 

In [17]:
f7_shape = [7, 7, 2048]

#### second variant

In [18]:
class Generator(nn.Module):
    def __init__(self): #, profile, front, train):
        super(Generator, self).__init__() 
#         self.face_model = Resnet50_ft_dag()
#         self.feature_p = self.face_model.forward(profile)
#         self.feature_f = self.face_model.forward(front)
#         self.fc2 = nn.Linear(self.fc1.out_features, self.fc1.out_features*2)
#         self.is_train = train
        
        self.conv1 = nn.Sequential(conv2d(2048, 512, kernel_size=1, strides = 1), NORM(512, is_train), nn.ReLU())
    
        self.res1_1 = res_block(512, 512, norm = NORM, pad_input = 7)
        self.res1_2 = res_block(512, 512, norm = NORM, pad_input = 7)
        self.res1_3 = res_block(512, 512, norm = NORM, pad_input = 7)
        self.res1_4 = res_block(512, 512, norm = NORM, pad_input = 7)
        self.dconv2 = nn.Sequential(deconv2d(512, 256, kernel_size=4, strides = 2, pad_input = 7), NORM(256), nn.ReLU())
        self.res2 = res_block(256, 256, norm = NORM, pad_input = 14)
        self.dconv3 = nn.Sequential(deconv2d(256, 128, kernel_size=4, strides = 2, pad_input = 14) , NORM(128), nn.ReLU())
        self.res3 = res_block(128, 128, norm = NORM, pad_input = 28)
        self.dconv4 = nn.Sequential(deconv2d(128, 64, kernel_size=4, strides = 2, pad_input = 28), NORM(64), nn.ReLU())
        self.res4 = res_block(64, 64, norm = NORM, pad_input = 56)
        self.dconv5 = nn.Sequential(deconv2d(64, 32, kernel_size=4, strides = 2, pad_input = 56), NORM(32), nn.ReLU())
        self.res5 = res_block(32, 32, norm = NORM, pad_input = 112)
        self.dconv6 = nn.Sequential(deconv2d(32, 32, kernel_size=4, strides = 2, pad_input =112),  NORM(32), nn.ReLU())
        self.res6 = res_block(32, 32, norm = NORM,  pad_input = 224)
        self.gen = nn.Sequential(conv2d(32, 3,  kernel_size=1, strides = 1), nn.Tanh())
        
    def forward(self, feature):
        feat7 = feature[0]
        feat7 = self.conv1(feat7)
        pool5 = feature[1]
        res1_1 = self.res1_1(feat7)
        res1_2 = self.res1_2(res1_1)
        res1_3 = self.res1_3(res1_2)
        res1_4 = self.res1_2(res1_3)
        dconv2 = self.dconv2(res1_4)
        res2 = self.res2(dconv2)
        dconv3 = self.dconv3(res2)
        res3 = self.res3(dconv3)
        dconv4 = self.dconv4(res3)
        res4 = self.res4(dconv4)
        dconv5 = self.dconv5(res4)
        res5 = self.res5(dconv5)
        dconv6 = self.dconv6(res5)
        res6 = self.res6(dconv6)
        gen = self.gen(res6)
        return (gen + 1)* 127.5

In [19]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        pad = 1
        self.h0_0 = nn.Sequential(conv2d(3, 32, kernel_size=3, strides=2, padding = pad), nn.LeakyReLU())
        self.h0_1 = nn.Sequential(conv2d(32, 64, kernel_size=3, strides=2, padding = pad), NORM(64), nn.LeakyReLU())
        self.h0_2 = nn.Sequential(conv2d(64, 128, kernel_size=3, strides=2, padding = pad), NORM(128), nn.LeakyReLU())
        self.h0_3 = nn.Sequential(conv2d(128, 256, kernel_size=3, strides=2, padding = pad), NORM(256), nn.LeakyReLU())
        self.h0_4 = nn.Sequential(conv2d(256, 256, kernel_size=3, strides=2, padding = pad), NORM(256), nn.LeakyReLU())
        self.h0_5 = nn.Linear(12544, 1)
        
        self.h1_0 = nn.Sequential(conv2d(3, 32, kernel_size=3, strides=2, padding = pad), nn.LeakyReLU())
        self.h1_1 = nn.Sequential(conv2d(32, 64, kernel_size=3, strides=2, padding = pad), NORM(64), nn.LeakyReLU())
        self.h1_2 = nn.Sequential(conv2d(64, 128, kernel_size=3, strides=2, padding = pad), NORM(128), nn.LeakyReLU())
        self.h1_3 = nn.Sequential(conv2d(128, 256, kernel_size=3, strides=2, padding = pad), NORM(256), nn.LeakyReLU())
        self.h1_4 = nn.Linear(6144, 1)
        
        self.h2_0 = nn.Sequential(conv2d(3, 32, kernel_size=3, strides=2, padding = pad), nn.LeakyReLU())
        self.h2_1 = nn.Sequential(conv2d(32, 64, kernel_size=3, strides=2, padding = pad), NORM(64), nn.LeakyReLU())
        self.h2_2 = nn.Sequential(conv2d(64, 128, kernel_size=3, strides=2, padding = pad), NORM(128), nn.LeakyReLU())
        self.h2_3 = nn.Sequential(conv2d(128, 256, kernel_size=3, strides=2, padding = pad), NORM(256), nn.LeakyReLU())
        #self.h2_4 = nn.Linear(self.h2_3.out_features, 1)
        self.h2_4 = nn.Linear(3840, 1)
        
        self.h3_0 = nn.Sequential(conv2d(3, 32, kernel_size=3, strides=2, padding = pad), nn.LeakyReLU())
        self.h3_1 = nn.Sequential(conv2d(32, 64, kernel_size=3, strides=2, padding = pad), NORM(64), nn.LeakyReLU())
        self.h3_2 = nn.Sequential(conv2d(64, 128, kernel_size=3, strides=2, padding = pad), NORM(128), nn.LeakyReLU())
        self.h3_3 = nn.Sequential(conv2d(128, 256, kernel_size=3, strides=2, padding = pad), NORM(256), nn.LeakyReLU())
        self.h3_4 = nn.Linear(2560, 1)
        
        self.h4_0 = nn.Sequential(conv2d(3, 32, kernel_size=3, strides=2, padding = pad), nn.LeakyReLU())
        self.h4_1 = nn.Sequential(conv2d(32, 64, kernel_size=3, strides=2, padding = pad), NORM(64), nn.LeakyReLU())
        self.h4_2 = nn.Sequential(conv2d(64, 128, kernel_size=3, strides=2, padding = pad), NORM(128), nn.LeakyReLU())
        self.h4_3 = nn.Sequential(conv2d(128, 256, kernel_size=3, strides=2, padding = pad), NORM(256), nn.LeakyReLU())
        self.h4_4 = nn.Linear(16384, 1)
    
    
    def forward(self, images):
        eyes = images[0:BS,0:CHANNEL,64:100,50:174] #[BS,36,124,CHANNEL])
        nose =  images[0:BS,0:CHANNEL,75:140,90:134]#tf.slice(images, [0,75,90,0], [BS,65,44,CHANNEL])
        mouth = images[0:BS,0:CHANNEL,140:170,75:149]#tf.slice(images, [0,140,75,0], [BS,30,74,CHANNEL])
        face = images[0:BS,0:CHANNEL,64:180,50:174]#[BS,116,124,CHANNEL])
        h0_0 = self.h0_0(images)
        h0_1 = self.h0_1(h0_0)
        h0_2 = self.h0_2(h0_1)
        h0_3 = self.h0_3(h0_2)
        h0_4 = self.h0_4(h0_3)
        h0_5 = self.h0_5(torch.reshape(h0_4, [BS, -1]))
        
        h1_0 = self.h1_0(eyes)
        h1_1 = self.h1_1(h1_0)
        h1_2 = self.h1_2(h1_1)
        h1_3 = self.h1_3(h1_2)
        h1_4 = self.h1_4(torch.reshape(h1_3, [BS, -1]))
        
        h2_0 = self.h2_0(nose)
        h2_1 = self.h2_1(h2_0)
        h2_2 = self.h2_2(h2_1)
        h2_3 = self.h2_3(h2_2)
        h2_4 = self.h2_4(torch.reshape(h2_3, [BS, -1]))
        
        h3_0 = self.h3_0(mouth)
        h3_1 = self.h3_1(h3_0)
        h3_2 = self.h3_2(h3_1)
        h3_3 = self.h3_3(h3_2)
        h3_4 = self.h3_4(torch.reshape(h3_3, [BS, -1]))
        
        h4_0 = self.h4_0(face)
        h4_1 = self.h4_1(h4_0)
        h4_2 = self.h4_2(h4_1)
        h4_3 = self.h4_3(h4_2)
        h4_4 = self.h4_4(torch.reshape(h4_3, [BS, -1]))
        return (h0_5, h1_4, h2_4, h3_4, h4_4)

# G and D

In [20]:
#built network
z_dim = 224*224
mnist_dim = 224*224
#mnist_dim = train_dataset_front.train_data.size(1) * train_dataset.train_data.size(2)

G = Generator()
D = Discriminator()

In [21]:
criterion = nn.BCELoss() 

# optimizer
lr = 0.0002 
G_optimizer = optim.Adam(G.parameters(),betas=(beta1, beta2), weight_decay=0)
D_optimizer = optim.Adam(D.parameters(), betas=(beta1, beta2), weight_decay=0)


# Loss functions

In [22]:
Losses = []
D_losses, G_losses = [], []
D_finalLosses, G_finalLosses = [], []

In [23]:
def reverse(x, dim):
    dim = x.dim() + dim if dim < 0 else dim
    return x[tuple(slice(None, None) if i != dim
             else torch.arange(x.size(i)-1, -1, -1).long()
             for i in range(x.dim()))]

In [24]:
epsilon = 1e-9

In [71]:
def loss(profile, front):
    #=======================Train the generator=======================#
    feature_p = Model.forward(profile) # G_enc(x)
    feature_f = Model.forward(front) # G_enc(y)
    G.zero_grad()
    gen_p = G.forward(feature_p) # ~x
    gen_f = G.forward(feature_f) # ~y
    dr = torch.cat(D(front))
    df1 = torch.cat(D(gen_p))
    df2 = torch.cat(D(gen_f))
    feature_gen_p = Model.forward(gen_p) #G_enc(~x)
    feature_gen_f = Model.forward(gen_f) #G_enc(~y)
    
    pool5_p_norm = feature_p[-1]/(torch.norm(feature_p[-1],dim = 1, keepdim = True) + epsilon)
    pool5_f_norm = feature_f[-1]/(torch.norm(feature_f[-1],dim = 1, keepdim = True) + epsilon)
    
    pool5_gen_p_norm = feature_gen_p[-1]/(torch.norm(feature_gen_p[-1],dim = 1, keepdim = True) + epsilon)
    pool5_gen_f_norm = feature_gen_f[-1]/(torch.norm(feature_gen_f[-1],dim = 1, keepdim = True) + epsilon)
    
     # 1. Frontalization Loss: L1-Norm
    front_loss = torch.mean(torch.sum(torch.abs(front/255. - gen_f/255.), [1,2,3]))
    
    # 2. identity perseption loss l2-norm
    feature_distance = 0.5*(1 - torch.sum(torch.mul(pool5_p_norm, pool5_gen_p_norm), [1])) + \
                                0.5*(1 - torch.sum(torch.mul(pool5_f_norm, pool5_gen_f_norm), [1]))
    feature_loss = torch.mean(feature_distance)
    Losses.append(feature_loss)
    #.
    #trainable var
    #all_vars = torch.autograd.Variables()
    #vars_gen = Variable(G.parameters, requires_grad= True)
    #vars_dis = Variable(D.parameters, requires_grad= True)
    #vars_gen = weights_list=[var for var in self.vars_gen if 'kernel' in var.name]
    
    
    # 3. Regulation loss
#     loss = nn.MSELoss(lambda_reg, reduction='mean')
#     #reg_gen = loss(weights_list=[var for var in vars_gen]) # if 'kernel' in var.name
#     #reg_dis = loss(weights_list=[var for var in vars_dis])# if 'kernel' in var.name
#     target = torch.zeros(G.parameters.shape)
#     reg_gen = loss(G.parameters, target)
#     reg_dis = loss(D.parameters, target)
#     G_losses.append(reg_gen)
#     D_losses.append(reg_dis)
    
    
    # 4. Adversarial Loss
    d_loss = torch.mean(torch.sum(df1)*0.5 + torch.sum(df2)*0.5 - torch.sum(dr)) / 5
    g_loss = torch.mean(torch.sum(df1)*0.5 + torch.sum(df2)*0.5) / 5
    D_losses.append(d_loss)
    G_losses.append(g_loss)
    
    # 5. Symmetric Loss - not applied
    #mirror_p = reverse(gen_p, dim=[2])
    #sym_loss = torch.mean(torch.sum(torchf.abs(mirror_p/225. - gen_p/255.), [1,2,3]))
    
    # Gradient Penalty #
#     alpha = tf.random_uniform((self.gen_p.get_shape().as_list()[0], 1, 1, 1),minval = 0., maxval = 1.,)
#       inter = front + alpha * (self.gen_p - front)
#       d = self.discriminator(inter, reuse=True)
#       grad = tf.gradients([d], [inter])[0]
#       slopes = tf.sqrt(tf.reduce_sum(tf.square(grad), [1,2,3]))
#       self.gradient_penalty = tf.reduce_mean(tf.square(slopes - 1.))
      
     # 6. Drift Loss - not applied
    #drift_loss = 0
    #torch.mean(torch.add(torch.square(df)) + torch.add(torch.square(dr))) / 10
    
    Gen_loss =  torch.mul(front_loss, torch.tensor(lambda_l1)) + torch.mul( feature_loss, torch.tensor(lambda_fea)) + torch.mul(g_loss,torch.tensor(1/lambda_gan))
    Dis_loss = torch.tensor(lambda_gan) * d_loss # + torch.tensor(lambda_gp) * gradient_penalty
    G_finalLosses.append(Gen_loss)
    D_finalLosses.append(Dis_loss)
    Gen_loss.backward()
    Dis_loss.backward()
    
    G_optimizer.step()
    D_optimizer.step()
    return Gen_loss,  Dis_loss
#     z = Variable(torch.randn(bs, z_dim).to(device))
#     y = Variable(torch.ones(bs, 1).to(device))

#     G_output = G(z)
#     D_output = D(G_output)
#     G_loss = criterion(D_output, y)

#     # gradient backprop & optimize ONLY G's parameters
#     G_loss.backward()
#     G_optimizer.step()
        
#     return G_loss.data.item()

In [26]:
sum(p.numel() for p in G.parameters() if p.requires_grad)


24322144

# Main

In [27]:
weights_path= "./resnet50_ft_dag.pth"
Model = resnet50_ft_dag(weights_path=weights_path)

In [28]:
# x = torch.rand([2, 3, 224, 224])
# t = Model(x)
# G = Generator()
# x_gen = G(t)
# print(x_gen.size())

In [29]:
# D = Discriminator()
# x_dis = D(x_gen)
# print(x_dis[0].size(), x_dis[1].size(), x_dis[2].size(), x_dis[3].size(), x_dis[4].size())

In [30]:
Train_front_data = TrainDataset(front_list)
Train_profile_data = TrainDataset(profile_list)

In [31]:
train_loader_front = torch.utils.data.DataLoader(dataset=Train_front_data, batch_size=BS, shuffle=True)
train_loader_profile = torch.utils.data.DataLoader(dataset=Train_profile_data, batch_size=BS, shuffle=True)
#test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=bs, shuffle=False)

In [32]:
d = list(enumerate(train_loader_front))
print(d[3][1].size())

torch.Size([2, 3, 224, 224])


In [73]:
n_epoch = 20
for epoch in range(1, n_epoch+1):           
    D_losses, G_losses = [], []
    d = list(enumerate(train_loader_front))
    for batch_idx, x in enumerate(train_loader_profile):
        y = d[batch_idx][1]
        g_curr_loss, d_curr_loss = loss(x, y)
        D_losses.append(D_train(x))
        G_losses.append(G_train(x))

    print('[%d/%d]: loss_d: %.3f, loss_g: %.3f' % (
            (epoch), n_epoch, torch.mean(torch.FloatTensor(D_losses)), torch.mean(torch.FloatTensor(G_losses))))

RuntimeError: [enforce fail at ..\c10\core\CPUAllocator.cpp:72] data. DefaultCPUAllocator: not enough memory: you tried to allocate 1204224 bytes. Buy new RAM!
