In [None]:
!pip install keras
!pip install scikit-image
!pip install moviepy
!pip install torch
!pip install f2format

In [None]:
import os
import shutil
import cv2
import time
from pathlib import PurePath, Path
from glob import glob
from IPython.display import clear_output
import numpy as np
from matplotlib import pyplot as plt
import tensorflow as tf
from keras import backend as K
from keras.layers import *
from keras import regularizers
from moviepy.editor import VideoFileClip
from tqdm import tqdm

In [None]:
# Only do this once then comment it out
# !git clone https://github.com/shaoanlu/faceswap-GAN.git

In [None]:
# import os
os.chdir('faceswap-GAN')

In [None]:
# Only required if Python version is < 3.6, as f-strings were introduced in 3.6
!f2format networks
!f2format converter
!f2format data_loader
!f2format detector
!f2format utils.py

In [None]:
# Adds import statement to networks/nn_blocks.py
# Only do this once then comment it out
"""temp_str = ''
with open('networks/nn_blocks.py','r') as f:
  temp_str = f.read()

temp_str = 'import keras.backend as K\nfrom keras import regularizers\n'.join(temp_str.split('import keras.backend as K\n'))

with open('networks/nn_blocks.py','w') as f:
  f.write(temp_str)"""

In [None]:
# Only do this once then comment it out
!git clone https://github.com/1adrianb/face-alignment.git
shutil.move('face-alignment/face_alignment','face_alignment')
shutil.rmtree('face-alignment')

In [None]:
# Only do this once then comment it out
!git clone https://github.com/rcmalli/keras-vggface.git
shutil.move('keras-vggface/keras_vggface','keras_vggface')
shutil.rmtree('keras-vggface')

## Choose Videos

In [None]:
# youtube-dl if links available
# !pip install youtube-dl
# !youtube-dl -f best -o <filenameA> <linkA>
# !youtube-dl -f best -o <filenameB> <linkB>

In [None]:
vid1_name = 'ankit.mp4'
vid2_name = 'vaibhav.mp4'

# Face detection for video
Images of detected faces have format `frameXfaceY.jpg`, where `X` represents the Xth frame and `Y` the Yth face in Xth frame. 

In [None]:
from umeyama import umeyama
import mtcnn_detect_face

def create_mtcnn(sess, model_path):
    if not model_path:
        model_path,_ = os.path.split(os.path.realpath(__file__))

    with tf.variable_scope('pnet2'):
        data = tf.placeholder(tf.float32, (None,None,None,3), 'input')
        pnet = mtcnn_detect_face.PNet({'data':data})
        pnet.load(os.path.join(model_path, 'det1.npy'), sess)
    with tf.variable_scope('rnet2'):
        data = tf.placeholder(tf.float32, (None,24,24,3), 'input')
        rnet = mtcnn_detect_face.RNet({'data':data})
        rnet.load(os.path.join(model_path, 'det2.npy'), sess)
    with tf.variable_scope('onet2'):
        data = tf.placeholder(tf.float32, (None,48,48,3), 'input')
        onet = mtcnn_detect_face.ONet({'data':data})
        onet.load(os.path.join(model_path, 'det3.npy'), sess)
    return pnet, rnet, onet

WEIGHTS_PATH = "./mtcnn_weights/"

sess = K.get_session()
with sess.as_default():
    global pnet, rnet, onet 
    pnet, rnet, onet = create_mtcnn(sess, WEIGHTS_PATH)

global pnet, rnet, onet
    
pnet = K.function([pnet.layers['data']],[pnet.layers['conv4-2'], pnet.layers['prob1']])
rnet = K.function([rnet.layers['data']],[rnet.layers['conv5-2'], rnet.layers['prob1']])
onet = K.function([onet.layers['data']],[onet.layers['conv6-2'], onet.layers['conv6-3'], onet.layers['prob1']])

Path("aligned_faces").mkdir(parents=True, exist_ok=True)

def get_src_landmarks(x0, x1, y0, y1, pnts):
    """
    x0, x1, y0, y1: (smoothed) bbox coord.
    pnts: landmarks predicted by MTCNN
    """    
    src_landmarks = [(int(pnts[i+5][0]-x0), 
                      int(pnts[i][0]-y0)) for i in range(5)]
    return src_landmarks

def get_tar_landmarks(img):
    """    
    img: detected face image
    """         
    ratio_landmarks = [
        (0.31339227236234224, 0.3259269274198092),
        (0.31075140146108776, 0.7228453709528997),
        (0.5523683107816256, 0.5187296867370605),
        (0.7752419985257663, 0.37262483743520886),
        (0.7759613623985877, 0.6772957581740159)
        ]   
        
    img_size = img.shape
    tar_landmarks = [(int(xy[0]*img_size[0]), 
                      int(xy[1]*img_size[1])) for xy in ratio_landmarks]
    return tar_landmarks

def landmarks_match_mtcnn(src_im, src_landmarks, tar_landmarks): 
    """
    umeyama(src, dst, estimate_scale)
    landmarks coord. for umeyama should be (width, height) or (y, x)
    """
    src_size = src_im.shape
    src_tmp = [(int(xy[1]), int(xy[0])) for xy in src_landmarks]
    tar_tmp = [(int(xy[1]), int(xy[0])) for xy in tar_landmarks]
    M = umeyama(np.array(src_tmp), np.array(tar_tmp), True)[0:2]
    result = cv2.warpAffine(src_im, M, (src_size[1], src_size[0]), borderMode=cv2.BORDER_REPLICATE) 
    return result

def process_mtcnn_bbox(bboxes, im_shape):
    """
    output bbox coordinate of MTCNN is (y0, x0, y1, x1)
    Here we process the bbox coord. to a square bbox with ordering (x0, y1, x1, y0)
    """
    for i, bbox in enumerate(bboxes):
        y0, x0, y1, x1 = bboxes[i,0:4]
        w, h = int(y1 - y0), int(x1 - x0)
        length = (w + h)/2
        center = (int((x1+x0)/2),int((y1+y0)/2))
        new_x0 = np.max([0, (center[0]-length//2)])#.astype(np.int32)
        new_x1 = np.min([im_shape[0], (center[0]+length//2)])#.astype(np.int32)
        new_y0 = np.max([0, (center[1]-length//2)])#.astype(np.int32)
        new_y1 = np.min([im_shape[1], (center[1]+length//2)])#.astype(np.int32)
        bboxes[i,0:4] = new_x0, new_y1, new_x1, new_y0
    return bboxes

def process_video(input_img): 
    global frames, save_interval
    global pnet, rnet, onet
    minsize = 30 # minimum size of face
    detec_threshold = 0.7
    threshold = [0.6, 0.7, detec_threshold]  # three steps's threshold
    factor = 0.709 # scale factor   
    
    frames += 1    
    if frames % save_interval == 0:
        faces, pnts = mtcnn_detect_face.detect_face(
            input_img, minsize, pnet, rnet, onet, threshold, factor)
        faces = process_mtcnn_bbox(faces, input_img.shape)
        
        for idx, (x0, y1, x1, y0, conf_score) in enumerate(faces):
            det_face_im = input_img[int(x0):int(x1),int(y0):int(y1),:]

            # get src/tar landmarks
            src_landmarks = get_src_landmarks(x0, x1, y0, y1, pnts)
            tar_landmarks = get_tar_landmarks(det_face_im)

            # align detected face
            aligned_det_face_im = landmarks_match_mtcnn(
                det_face_im, src_landmarks, tar_landmarks)

            fname = "./aligned_faces/frame"+str(frames)+"face"+str(idx)+".jpg"
            plt.imsave(fname, aligned_det_face_im, format="jpg")
        
    return np.zeros((3,3,3))

In [None]:
# Number of images of each face
num_images = 1000
# Create folders for Face A and Face B
Path("faceA").mkdir(parents=True, exist_ok=True)
Path("faceB").mkdir(parents=True, exist_ok=True)

In [None]:
# Extract faces from A

global frames
frames = 0

# configuration
fn_input_video = vid1_name
output = vid1_name.split('.')[0] + '_faces.mp4'

clip1 = VideoFileClip(fn_input_video)
save_interval = clip1.duration*clip1.fps//num_images # perform face detection every {save_interval} frames
clip = clip1.fl_image(process_video) # .subclip(0,3) #NOTE: this function expects color images!!
clip.write_videofile(output, audio=False)
clip1.reader.close()

face_letter = 'A'

faces_list = os.listdir('aligned_faces')
if os.listdir('face' + face_letter):
    raise Exception('face' + face_letter + ' contains faces. Please empty it before using this command.')
for face in faces_list:
    os.rename('aligned_faces/'+face,'face'+face_letter+'/'+face)

In [None]:
# Extract faces from B

global frames
frames = 0

# configuration
fn_input_video = vid2_name
output = vid2_name.split('.')[0] + '_faces.mp4'

clip1 = VideoFileClip(fn_input_video)
save_interval = clip1.duration*clip1.fps//num_images # perform face detection every {save_interval} frames
clip = clip1.fl_image(process_video) # .subclip(0,3) #NOTE: this function expects color images!!
clip.write_videofile(output, audio=False)
clip1.reader.close()

face_letter = 'B'

faces_list = os.listdir('aligned_faces')
if os.listdir('face' + face_letter):
    raise Exception('face' + face_letter + ' contains faces. Please empty it before using this command.')
for face in faces_list:
    os.rename('aligned_faces/'+face,'face'+face_letter+'/'+face)

In [None]:
# Zip face folders
!zip -r faceA.zip faceA
!zip -r faceB.zip faceB

In [None]:
# Delete face folders
shutil.rmtree('faceA')
shutil.rmtree('faceB')

In [None]:
# Download the zips, manually clean the folders by removing blurry faces or non-faces, then upload them back on the system.

In [None]:
# Unzip uploaded zips
!unzip faceA.zip
!unzip faceB.zip

# Preparation of Binary Masks

Creating high quality binary masks from face data.

In [None]:
import face_alignment

In [None]:
dir_faceA = "./faceA"
dir_faceB = "./faceB"
dir_bm_faceA_eyes = "./binary_masks/faceA_eyes"
dir_bm_faceB_eyes = "./binary_masks/faceB_eyes"

In [None]:
fns_faceA = glob(dir_faceA + "/*.*")
fns_faceB = glob(dir_faceB + "/*.*")

In [None]:
fa = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, flip_input=False)

In [None]:
# !mkdir -p binary_masks/faceA_eyes
Path("binary_masks/faceA_eyes").mkdir(parents=True, exist_ok=True)
# !mkdir -p binary_masks/faceB_eyes
Path("binary_masks/faceB_eyes").mkdir(parents=True, exist_ok=True)

In [None]:
fns_face_not_detected = []

for idx, fns in enumerate([fns_faceA, fns_faceB]):
    if idx == 0:
        save_path = dir_bm_faceA_eyes
    elif idx == 1:
        save_path = dir_bm_faceB_eyes     
    
    # create binary mask for each training image
    for fn in tqdm(fns):
        raw_fn = PurePath(fn).parts[-1]

        x = plt.imread(fn)
        x = cv2.resize(x, (256,256))
        preds = fa.get_landmarks(x)
        
        if preds is not None:
            preds = preds[0]
            mask = np.zeros_like(x)
            
            # Draw right eye binary mask
            pnts_right = [(preds[i,0],preds[i,1]) for i in range(36,42)]
            hull = cv2.convexHull(np.array(pnts_right)).astype(np.int32)
            mask = cv2.drawContours(mask,[hull],0,(255,255,255),-1)

            # Draw left eye binary mask
            pnts_left = [(preds[i,0],preds[i,1]) for i in range(42,48)]
            hull = cv2.convexHull(np.array(pnts_left)).astype(np.int32)
            mask = cv2.drawContours(mask,[hull],0,(255,255,255),-1)

            # Draw mouth binary mask
            #pnts_mouth = [(preds[i,0],preds[i,1]) for i in range(48,60)]
            #hull = cv2.convexHull(np.array(pnts_mouth)).astype(np.int32)
            #mask = cv2.drawContours(mask,[hull],0,(255,255,255),-1)
            
            mask = cv2.dilate(mask, np.ones((13,13), np.uint8), iterations=1)
            mask = cv2.GaussianBlur(mask, (7,7), 0)
            plt.imsave(fname=save_path+'/'+raw_fn, arr=mask, format="jpg")
            
        else:
            #mask = np.zeros_like(x)
            print("No faces were detected in image " + fn + ", deleting.")
            os.remove(fn)
            fns_face_not_detected.append(fn)

In [None]:
num_faceA = len(glob(dir_faceA+"/*.*"))
num_faceB = len(glob(dir_faceB+"/*.*"))

print("Nuber of processed images: "+ str(num_faceA + num_faceB))
print("Number of image(s) with no face detected: " + str(len(fns_face_not_detected)))

# Training

Training the model on the two chosen faces

## Configuration


In [None]:
# Number of CPU cores
num_cpus = os.cpu_count()

# Input/Output resolution
RESOLUTION = 128 # 64x64, 128x128, 256x256
assert (RESOLUTION % 64) == 0, "RESOLUTION should be 64, 128, or 256."

# Batch size
batchSize = 8
assert (batchSize != 1 and batchSize % 2 == 0) , "batchSize should be an even number."

# Use motion blurs (data augmentation)
# set True if training data contains images extracted from videos
use_da_motion_blur = True 

# Use eye-aware training
# require images generated from prep_binary_masks.ipynb
use_bm_eyes = True

# Probability of random color matching (data augmentation)
prob_random_color_match = 0.5

da_config = {
    "prob_random_color_match": prob_random_color_match,
    "use_da_motion_blur": use_da_motion_blur,
    "use_bm_eyes": use_bm_eyes
}

In [None]:
# Path to training images
img_dirA = './faceA'
img_dirB = './faceB'
img_dirA_bm_eyes = "./binary_masks/faceA_eyes"
img_dirB_bm_eyes = "./binary_masks/faceB_eyes"

# Path to saved model weights
models_dir = "./models"

In [None]:
# Architecture configuration
arch_config = {}
arch_config['IMAGE_SHAPE'] = (RESOLUTION, RESOLUTION, 3)
arch_config['use_self_attn'] = True
arch_config['norm'] = "instancenorm" # instancenorm, batchnorm, layernorm, groupnorm, none
arch_config['model_capacity'] = "standard" # standard, lite

In [None]:
# Loss function weights configuration
loss_weights = {}
loss_weights['w_D'] = 0.1 # Discriminator
loss_weights['w_recon'] = 1. # L1 reconstruction loss
loss_weights['w_edge'] = 0.1 # edge loss
loss_weights['w_eyes'] = 30. # reconstruction and edge loss on eyes area
loss_weights['w_pl'] = (0.01, 0.1, 0.3, 0.1) # perceptual loss (0.003, 0.03, 0.3, 0.3)

# Init. loss config.
loss_config = {}
loss_config["gan_training"] = "mixup_LSGAN" # "mixup_LSGAN" or "relativistic_avg_LSGAN"
loss_config['use_PL'] = False
loss_config["PL_before_activ"] = False
loss_config['use_mask_hinge_loss'] = False
loss_config['m_mask'] = 0.
loss_config['lr_factor'] = 1.
loss_config['use_cyclic_loss'] = False

## Define Models

In [None]:
from networks.faceswap_gan_model import FaceswapGANModel
model = FaceswapGANModel(**arch_config)
model.load_weights(path=models_dir+"/backup_iter10000")

from keras_vggface.vggface import VGGFace

# VGGFace ResNet50
vggface = VGGFace(include_top=False, model='resnet50', input_shape=(224, 224, 3))
model.build_pl_model(vggface_model=vggface, before_activ=loss_config["PL_before_activ"])

model.build_train_functions(loss_weights=loss_weights, **loss_config)

# Create ./models directory
Path("models").mkdir(parents=True, exist_ok=True)

In [None]:
from data_loader.data_loader import DataLoader

def show_loss_config(loss_config):
    for config, value in loss_config.items():
        print(config + ' = ' + str(value))

def reset_session(save_path):
    global model, vggface
    global train_batchA, train_batchB
    model.save_weights(path=save_path)
    del model
    del vggface
    del train_batchA
    del train_batchB
    K.clear_session()
    model = FaceswapGANModel(**arch_config)
    model.load_weights(path=save_path)
    vggface = VGGFace(include_top=False, model='resnet50', input_shape=(224, 224, 3))
    model.build_pl_model(vggface_model=vggface, before_activ=loss_config["PL_before_activ"])
    train_batchA = DataLoader(train_A, train_AnB, batchSize, img_dirA_bm_eyes,
                              RESOLUTION, num_cpus, K.get_session(), **da_config)
    train_batchB = DataLoader(train_B, train_AnB, batchSize, img_dirB_bm_eyes, 
                              RESOLUTION, num_cpus, K.get_session(), **da_config)

## Start Training

In [None]:
from utils import showG, showG_mask, showG_eyes

# Get filenames
train_A = glob(img_dirA+"/*.*")
train_B = glob(img_dirB+"/*.*")

train_AnB = train_A + train_B

assert len(train_A), "No image found in " + str(img_dirA)
assert len(train_B), "No image found in " + str(img_dirB)
print ("Number of images in folder A: " + str(len(train_A)))
print ("Number of images in folder B: " + str(len(train_B)))

if use_bm_eyes:
    assert len(glob(img_dirA_bm_eyes+"/*.*")), "No binary mask found in " + str(img_dirA_bm_eyes)
    assert len(glob(img_dirB_bm_eyes+"/*.*")), "No binary mask found in " + str(img_dirB_bm_eyes)
    assert len(glob(img_dirA_bm_eyes+"/*.*")) == len(train_A), \
    "Number of faceA images does not match number of their binary masks. Can be caused by any none image file in the folder."
    assert len(glob(img_dirB_bm_eyes+"/*.*")) == len(train_B), \
    "Number of faceB images does not match number of their binary masks. Can be caused by any none image file in the folder."

In [None]:
# Display random binary masks of eyes
train_batchA = DataLoader(train_A, train_AnB, batchSize, img_dirA_bm_eyes, 
                          RESOLUTION, num_cpus, K.get_session(), **da_config)
train_batchB = DataLoader(train_B, train_AnB, batchSize, img_dirB_bm_eyes, 
                          RESOLUTION, num_cpus, K.get_session(), **da_config)
_, tA, bmA = train_batchA.get_next_batch()
_, tB, bmB = train_batchB.get_next_batch()
showG_eyes(tA, tB, bmA, bmB, batchSize)
del train_batchA, train_batchB

In [None]:
# code to send updates to e-mail account of your choice
import smtplib 
from email.mime.multipart import MIMEMultipart 
from email.mime.text import MIMEText 
from email.mime.base import MIMEBase 
from email import encoders

In [None]:
fromaddr = "example@gmail.com"
toaddr = "example@gmail.com"
frompassword = 'password123'

In [None]:
# Start training
t0 = time.time()

# This try/except is meant to resume training that was accidentally interrupted
try:
    gen_iterations
    print("Resume training from iter "+str(gen_iterations))
except:
    gen_iterations = 0
print('Initializing Errors...')
errGA_sum = errGB_sum = errDA_sum = errDB_sum = 0
errGAs = {}
errGBs = {}
# Dictionaries are ordered in Python 3.6
for k in ['ttl', 'adv', 'recon', 'edge', 'pl']:
    errGAs[k] = 0
    errGBs[k] = 0

display_iters = 5000 # Display results every {display_iters} iterations
backup_iters = 5000 # Backup models every {backup_iters} iterations
TOTAL_ITERS = 50000 # Train model for {TOTAL_ITERS} iterations

print('Creating Dataloaders...')
global train_batchA, train_batchB
train_batchA = DataLoader(train_A, train_AnB, batchSize, img_dirA_bm_eyes, 
                          RESOLUTION, num_cpus, K.get_session(), **da_config)
train_batchB = DataLoader(train_B, train_AnB, batchSize, img_dirB_bm_eyes, 
                          RESOLUTION, num_cpus, K.get_session(), **da_config)

print('Start training...')
while gen_iterations <= TOTAL_ITERS: 
    
    # Loss function automation
    if gen_iterations == (TOTAL_ITERS//5 - display_iters//2):
        clear_output()
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = False
        loss_config['m_mask'] = 0.0
        reset_session(models_dir)
        print("Building new loss funcitons...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Done.")
    elif gen_iterations == (TOTAL_ITERS//5 + TOTAL_ITERS//10 - display_iters//2):
        clear_output()
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = True
        loss_config['m_mask'] = 0.5
        reset_session(models_dir)
        print("Building new loss funcitons...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Complete.")
    elif gen_iterations == (2*TOTAL_ITERS//5 - display_iters//2):
        clear_output()
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = True
        loss_config['m_mask'] = 0.2
        reset_session(models_dir)
        print("Building new loss funcitons...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Done.")
    elif gen_iterations == (TOTAL_ITERS//2 - display_iters//2):
        clear_output()
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = True
        loss_config['m_mask'] = 0.4
        reset_session(models_dir)
        print("Building new loss funcitons...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Done.")
    elif gen_iterations == (2*TOTAL_ITERS//3 - display_iters//2):
        clear_output()
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = False
        loss_config['m_mask'] = 0.
        loss_config['lr_factor'] = 0.3
        reset_session(models_dir)
        print("Building new loss funcitons...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Done.")
    elif gen_iterations == (8*TOTAL_ITERS//10 - display_iters//2):
        clear_output()
        model.decoder_A.load_weights("models/decoder_B.h5") # swap decoders
        model.decoder_B.load_weights("models/decoder_A.h5") # swap decoders
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = True
        loss_config['m_mask'] = 0.1
        loss_config['lr_factor'] = 0.3
        reset_session(models_dir)
        print("Building new loss funcitons...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Done.")
    elif gen_iterations == (9*TOTAL_ITERS//10 - display_iters//2):
        clear_output()
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = False
        loss_config['m_mask'] = 0.0
        loss_config['lr_factor'] = 0.1
        reset_session(models_dir)
        print("Building new loss funcitons...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Done.")
    
    if gen_iterations == 5:
        print ("working.")
    
    # Train dicriminators for one batch
    data_A = train_batchA.get_next_batch()
    data_B = train_batchB.get_next_batch()
    errDA, errDB = model.train_one_batch_D(data_A=data_A, data_B=data_B)
    errDA_sum +=errDA[0]
    errDB_sum +=errDB[0]

    # Train generators for one batch
    data_A = train_batchA.get_next_batch()
    data_B = train_batchB.get_next_batch()
    errGA, errGB = model.train_one_batch_G(data_A=data_A, data_B=data_B)
    errGA_sum += errGA[0]
    errGB_sum += errGB[0]
    """for i, k in enumerate(['ttl', 'adv', 'recon', 'edge', 'pl']):
        errGAs[k] += errGA[i]
        errGBs[k] += errGB[i]"""
    gen_iterations+=1
    
    if gen_iterations % (display_iters//10) == 0:
        print(str(gen_iterations) + ' iterations done in ' + str(time.time()-t0))
    
    # Visualization
    if gen_iterations % display_iters == 0:
        clear_output()
        
        #####
        msg = MIMEMultipart()  
        msg['From'] = fromaddr 
        msg['To'] = toaddr 
        msg['Subject'] = vid1_name[:-4] + ' and ' + vid2_name[:-4] + ": Epochs " + str(gen_iterations)
        body = ""
        msg.attach(MIMEText(body, 'plain'))
        #####
        
        # Display loss information
        show_loss_config(loss_config)
        print("----------") 
        print('[iter %d] Loss_DA: %f Loss_DB: %f Loss_GA: %f Loss_GB: %f time: %f'
        % (gen_iterations, errDA_sum/display_iters, errDB_sum/display_iters,
           errGA_sum/display_iters, errGB_sum/display_iters, time.time()-t0))  
        print("----------") 
        print("Generator loss details:")
        print('[Adversarial loss]')  
        print('GA: {:.4f} GB: {:.4f}'.format(errGAs["adv"]/display_iters, errGBs["adv"]/display_iters))
        print('[Reconstruction loss]')
        print('GA: {:.4f} GB: {:.4f}'.format(errGAs["recon"]/display_iters, errGBs["recon"]/display_iters))
        print('[Edge loss]')
        print('GA: {:.4f} GB: {:.4f}'.format(errGAs["edge"]/display_iters, errGBs["edge"]/display_iters))
        if loss_config['use_PL'] == True:
            print('[Perceptual loss]')
            try:
                print('GA: {:.4f} GB: {:.4f}'.format(errGAs["pl"][0]/display_iters, errGBs["pl"][0]/display_iters))
            except:
                print('GA: {:.4f} GB: {:.4f}'.format(errGAs["pl"]/display_iters, errGBs["pl"]/display_iters))
        
        # Display images
        print("----------") 
        wA, tA, _ = train_batchA.get_next_batch()
        wB, tB, _ = train_batchB.get_next_batch()
        
        print("Transformed (masked) results:")
        showG(tA, tB, model.path_A, model.path_B, batchSize)  
        filename = "opimages/showG.jpg"
        attachment = open(filename, "rb") 
        # instance of MIMEBase and named as p 
        p = MIMEBase('application', 'octet-stream') 
        # To change the payload into encoded form 
        p.set_payload((attachment).read()) 
        # encode into base64 
        encoders.encode_base64(p) 
        p.add_header('Content-Disposition', "attachment; filename= showG.jpg")
        # attach the instance 'p' to instance 'msg' 
        msg.attach(p)
        
        print("Masks:")
        showG_mask(tA, tB, model.path_mask_A, model.path_mask_B, batchSize)
        filename = "opimages/showG_mask.jpg"
        attachment = open(filename, "rb") 
        # instance of MIMEBase and named as p 
        p = MIMEBase('application', 'octet-stream') 
        # To change the payload into encoded form 
        p.set_payload((attachment).read()) 
        # encode into base64 
        encoders.encode_base64(p) 
        p.add_header('Content-Disposition', "attachment; filename= showG_mask.jpg")
        # attach the instance 'p' to instance 'msg' 
        msg.attach(p)
        
        print("Reconstruction results:")
        showG(wA, wB, model.path_bgr_A, model.path_bgr_B, batchSize)
        filename = "opimages/showG.jpg"
        attachment = open(filename, "rb") 
        # instance of MIMEBase and named as p 
        p = MIMEBase('application', 'octet-stream') 
        # To change the payload into encoded form 
        p.set_payload((attachment).read()) 
        # encode into base64 
        encoders.encode_base64(p) 
        p.add_header('Content-Disposition', "attachment; filename= showG1.jpg") 
        # attach the instance 'p' to instance 'msg' 
        msg.attach(p)
        
        # creates SMTP session 
        s = smtplib.SMTP('smtp.gmail.com', 587) 
        # start TLS for security 
        s.starttls() 
        # Authentication 
        s.login(fromaddr, frompassword)
        # Converts the Multipart msg into a string 
        text = msg.as_string() 
        # sending the mail 
        s.sendmail(fromaddr, toaddr, text)
        # terminating the session 
        s.quit()
        
        errGA_sum = errGB_sum = errDA_sum = errDB_sum = 0
        for k in ['ttl', 'adv', 'recon', 'edge', 'pl']:
            errGAs[k] = 0
            errGBs[k] = 0
        
        # Save models
        model.save_weights(path=models_dir)

    # Backup models
    if gen_iterations % backup_iters == 0: 
        bkup_dir = "{}/backup_iter{}".format(models_dir, gen_iterations)
        Path(bkup_dir).mkdir(parents=True, exist_ok=True)
        model.save_weights(path=bkup_dir)

In [None]:
# Display random results
wA, tA, _ = train_batchA.get_next_batch()
wB, tB, _ = train_batchB.get_next_batch()
print("Transformed (masked) results:")
showG(tA, tB, model.path_A, model.path_B, batchSize)   
print("Masks:")
showG_mask(tA, tB, model.path_mask_A, model.path_mask_B, batchSize)  
print("Reconstruction results:")
showG(wA, wB, model.path_bgr_A, model.path_bgr_B, batchSize) 

# Video Conversion

## Model Configuration

In [None]:
!pip install moviepy

In [None]:
import os
import shutil
import cv2
import time
from pathlib import PurePath, Path
from glob import glob
from IPython.display import clear_output
import numpy as np
from matplotlib import pyplot as plt
import tensorflow as tf
from keras import backend as K
from keras.layers import *
from keras import regularizers
from moviepy.editor import VideoFileClip
from tqdm import tqdm
import keras

In [None]:
os.chdir('faceswap-GAN/')

In [None]:
K.set_learning_phase(0)

# Input/Output resolution
RESOLUTION = 128 # 64x64, 128x128, 256x256
assert (RESOLUTION % 64) == 0, "RESOLUTION should be 64, 128, 256"

# Architecture configuration
arch_config = {}
arch_config['IMAGE_SHAPE'] = (RESOLUTION, RESOLUTION, 3)
arch_config['use_self_attn'] = True
arch_config['norm'] = "instancenorm" # instancenorm, batchnorm, layernorm, groupnorm, none
arch_config['model_capacity'] = "standard" # standard, lite

## Define models

In [None]:
from networks.faceswap_gan_model import FaceswapGANModel
model = FaceswapGANModel(**arch_config)
model.load_weights(path="./models")

## Video Conversion

In [None]:
from converter.video_converter import VideoConverter
from detector.face_detector import MTCNNFaceDetector

mtcnn_weights_dir = "./mtcnn_weights/"

fd = MTCNNFaceDetector(sess=K.get_session(), model_path=mtcnn_weights_dir)
vc = VideoConverter()

vc.set_face_detector(fd)
vc.set_gan_model(model)

In [None]:
[f for f in os.listdir() if 'mp4' in f]

### Video conversion configuration


- `use_smoothed_bbox`: 
    - Boolean. Whether to enable smoothed bbox.
- `use_kalman_filter`: 
    - Boolean. Whether to enable Kalman filter.
- `use_auto_downscaling`:
    - Boolean. Whether to enable auto-downscaling in face detection (to prevent OOM error).
- `bbox_moving_avg_coef`: 
    - Float point between 0 and 1. Smoothing coef. used when use_kalman_filter is set False.
- `min_face_area`:
    - int x int. Minimum size of face. Detected faces smaller than min_face_area will not be transformed.
- `IMAGE_SHAPE`:
    - Input/Output resolution of the GAN model
- `kf_noise_coef`:
    - Float point. Increase by 10x if tracking is slow. Decrease by 1/10x if trakcing works fine but jitter occurs.
- `use_color_correction`: 
    - String of "adain", "adain_xyz", "hist_match", or "none". The color correction method to be applied.
- `detec_threshold`: 
    - Float point between 0 and 1. Decrease its value if faces are missed. Increase its value to reduce false positives.
- `roi_coverage`: 
    - Float point between 0 and 1 (exclusive). Center area of input images to be cropped (Suggested range: 0.85 ~ 0.95)
- `enhance`: 
    - Float point. A coef. for contrast enhancement in the region of alpha mask (Suggested range: 0. ~ 0.4)
- `output_type`: 
    - Layout format of output video: 1. [ result ], 2. [ source | result ], 3. [ source | result | mask ]
- `direction`: 
    - String of "AtoB" or "BtoA". Direction of face transformation.

### Start video conversion


- `input_fn`: 
    - String. Input video path.
- `output_fn`: 
    - String. Output video path.
- `duration`: 
    - None or a non-negative float tuple: (start_sec, end_sec). Duration of input video to be converted
    - e.g., setting `duration = (5, 7.5)` outputs a 2.5-sec-long video clip corresponding to 5s ~ 7.5s of the input video.

In [None]:
vid1_name = 'ankit.mp4'
vid2_name = 'vaibhav.mp4'

In [None]:
options = {
    # ===== Fixed =====
    "use_smoothed_bbox": True,
    "use_kalman_filter": True,
    "use_auto_downscaling": False,
    "bbox_moving_avg_coef": 0.65,
    "min_face_area": 35 * 35,
    "IMAGE_SHAPE": model.IMAGE_SHAPE,
    # ===== Tunable =====
    "kf_noise_coef": 3e-3,
    "use_color_correction": "hist_match",
    "detec_threshold": 0.7,
    "roi_coverage": 0.9,
    "enhance": 0.,
    "output_type": 1,
    "direction": "AtoB",
}

input_fn = vid1_name
output_fn = vid1_name.split('.')[0] + '2' + vid2_name.split('.')[0] +'_35_35.mp4'
duration = None
vc.convert(input_fn=input_fn, output_fn=output_fn, options=options, duration=duration)

In [None]:
options = {
    # ===== Fixed =====
    "use_smoothed_bbox": True,
    "use_kalman_filter": True,
    "use_auto_downscaling": False,
    "bbox_moving_avg_coef": 0.65,
    "min_face_area": 35 * 35,
    "IMAGE_SHAPE": model.IMAGE_SHAPE,
    # ===== Tunable =====
    "kf_noise_coef": 3e-3,
    "use_color_correction": "hist_match",
    "detec_threshold": 0.7,
    "roi_coverage": 0.9,
    "enhance": 0.,
    "output_type": 1,
    "direction": "BtoA",
}

input_fn = vid2_name
output_fn = vid2_name.split('.')[0] + '2' + vid1_name.split('.')[0] + '_35_35.mp4'
duration = None

vc.convert(input_fn=input_fn, output_fn=output_fn, options=options, duration=duration)