<a id='1'></a>
# Импортируйте пакеты

In [None]:
# from keras.layers import *
import keras.backend as K
# import tensorflow as tf

In [None]:
import os
import glob
import time
from pathlib import Path
from IPython.display import clear_output

import matplotlib.pyplot as plt
%matplotlib inline

<a id='4'></a>
# Конфигурация

In [None]:
#K.set_learning_phase(1)
#K.set_learning_phase(0) # set to 0 in inference phase

In [None]:
# Number of CPU cores
num_cpus = os.cpu_count()

# Input/Output resolution
RESOLUTION = 64  # 64x64, 128x128, 256x256
assert (RESOLUTION % 64) == 0, "Allowed values for RESOLUTION are 64, 128, or 256."

# Batch size
batch_size = 8
assert (batch_size != 1 and batch_size % 2 == 0), "Batch size should be an even number."

# Use motion blur (data augmentation)
# set True if training data contains images extracted from videos
use_da_motion_blur = False

# Use eye-aware training
# require images generated from prep_binary_masks.ipynb
use_bm_eyes = True

# Probability of random color matching (data augmentation)
prob_random_color_match = 0.5

da_config = {
    "prob_random_color_match": prob_random_color_match,
    "use_da_motion_blur": use_da_motion_blur,
    "use_bm_eyes": use_bm_eyes
}

In [None]:
# Path to training images
img_dir_src = './face_src/rgb' # source face
img_dir_dst = './face_dst/rgb' # target face
img_dir_src_bm_eyes = "./face_src/binary_mask/faceA_eyes"
img_dir_dst_bm_eyes = "./face_dst/binary_mask/faceB_eyes"

# Path to saved model weights
models_dir = "./models"

In [None]:
# Architecture configuration
arch_config = {
    "IMAGE_SHAPE": (RESOLUTION, RESOLUTION, 3),
    "use_self_attn": True,
    "norm": "instancenorm",
    "model_capacity": "standard"
}

In [None]:
# Loss function weights configuration
loss_weights = {
    "w_D": 0.1,
    "w_recon": 1.,
    "w_edge": 0.1,
    "w_eyes": 30.,
    "w_pl": (0.01, 0.1, 0.3, 0.1)
}

# Init. loss config.
loss_config = {
    "gan_training": "mixup_LSGAN",
    "use_PL": False,
    "PL_before_activ": False,
    "use_mask_hinge_loss": False,
    "m_mask": 0.,
    "lr_factor": 1.,
    "use_cyclic_loss": False
}

<a id='5'></a>
# Определение моделей

In [None]:
from networks.faceswap_model import FaceswapModel

In [None]:
model = FaceswapModel(**arch_config)

<a id='6'></a>
# Загрузка весов моделей

Имена файлов веса:
```shell
    encoder.h5
    decoder_A.h5
    deocder_B.h5
    netDA.h5
    netDB.h5
```

In [None]:
model.load_weights(path=models_dir)

# Определите потери и создайте обучающие функции

Если выдает ошибки при создании vggface ResNet (возможно, из-за версии Keras), следующий код - это то, что мы сделали,
чтобы сделать его доступным для работы в Colab.

```shell
!wget "https://github.com/rcmalli/keras-vggface/releases/download/v2.0/rcmalli_vggface_tf_notop_resnet50.h5"
from colab.vggface_models import RESNET50

vggface = RESNET50(include_top=False, weights=None, input_shape=(224, 224, 3))
vggface.load_weights("rcmalli_vggface_tf_notop_resnet50.h5")

```

In [None]:
# https://github.com/rcmalli/keras-vggface
from keras_vggface.vggface import VGGFace

# VGGFace ResNet50
vggface = VGGFace(include_top=False, model='resnet50', input_shape=(224, 224, 3))

#vggface.summary()

model.build_pl_model(vggface_model=vggface, before_activ=loss_config["PL_before_activ"])


In [None]:
model.build_train_functions(loss_weights=loss_weights, **loss_config)

<a id='9'></a>
# DataLoader

In [None]:
from data_loader import DataLoader

In [None]:
from utils import showG, showG_mask, showG_eyes

# Начинайте обучение

In [None]:
# Create ./models directory
Path(f"models").mkdir(parents=True, exist_ok=True)

In [None]:
# Get file names
train_src = glob.glob(f"{img_dir_src}/*.*")
train_dst = glob.glob(f"{img_dir_dst}/*.*")

train_src_n_dst = train_src + train_dst

assert len(train_src), f"Изображение не найдено в {img_dir_src}"
assert len(train_dst), f"Изображение не найдено в {img_dir_dst}"
print(f"Количество изображений в папке A: {str(len(train_src))}")
print(f"Количество изображений в папке B: {str(len(train_dst))}")

if use_bm_eyes:
    assert len(glob.glob(img_dir_src_bm_eyes + "/*.*")), f"Двоичная маска не найдена в {img_dir_src_bm_eyes}"
    assert len(glob.glob(img_dir_dst_bm_eyes + "/*.*")), f"Двоичная маска не найдена в {img_dir_dst_bm_eyes}"
    
    assert len(glob.glob(img_dir_src_bm_eyes + "/*.*")) == len(train_src), (
        "Количество изображений face_src не совпадает с количеством их двоичных масок. "
        "Может быть вызвано любым файлом none изображения в папке."
    )
    assert len(glob.glob(img_dir_dst_bm_eyes + "/*.*")) == len(train_dst), (
        "Количество изображений face_dst не совпадает с количеством их двоичных масок. "
        "Может быть вызвано любым файлом none изображения в папке."
    )
pass


In [None]:
def show_loss_config(loss_conf):
    for config, value in loss_conf.items():
        print(f"{config} = {value}")
        pass
    pass

In [None]:
# Display random binary masks of eyes
train_batch_src = DataLoader(filenames=train_src, all_filenames=train_src_n_dst,
                             batch_size=batch_size, dir_bm_eyes=img_dir_src_bm_eyes,
                             resolution=RESOLUTION, num_cpus=num_cpus, session=K.get_session(),
                             **da_config)
train_batch_dst = DataLoader(filenames=train_dst, all_filenames=train_src_n_dst,
                             batch_size=batch_size, dir_bm_eyes=img_dir_dst_bm_eyes,
                             resolution=RESOLUTION, num_cpus=num_cpus, session=K.get_session(),
                             **da_config)
_, t_src, bm_src = train_batch_src.get_next_batch()
_, t_dst, bm_dst = train_batch_dst.get_next_batch()
showG_eyes(t_src, t_dst, bm_src, bm_dst, batch_size)
del train_batch_src, train_batch_dst

In [None]:

def reset_session(save_path):
    global model, vggface
    global train_batch_src, train_batch_dst
    model.save_weights(path=save_path)
    del model
    del vggface
    del train_batch_src
    del train_batch_dst
    K.clear_session()
    model = FaceswapModel(**arch_config)
    model.load_weights(path=save_path)
    vggface = VGGFace(include_top=False, model='resnet50', input_shape=(224, 224, 3))
    model.build_pl_model(vggface_model=vggface, before_activ=loss_config["PL_before_activ"])
    train_batch_src = DataLoader(filenames=train_src, all_filenames=train_src_n_dst,
                                 batch_size=batch_size, dir_bm_eyes=img_dir_src_bm_eyes,
                                 resolution=RESOLUTION, num_cpus=num_cpus, session=K.get_session(),
                                 **da_config)
    train_batch_dst = DataLoader(filenames=train_dst, all_filenames=train_src_n_dst,
                                 batch_size=batch_size, dir_bm_eyes=img_dir_dst_bm_eyes,
                                 resolution=RESOLUTION, num_cpus=num_cpus, session=K.get_session(),
                                 **da_config)
    pass

In [None]:
# Start training
t0 = time.time()

# Resume training that was interrupted
try:
    gen_iterations
    print(f"Resume training from iter {gen_iterations}.")
except:
    gen_iterations = 0

errGA_sum = errGB_sum = errDA_sum = errDB_sum = 0
errGAs = {}
errGBs = {}
# Dictionaries are ordered in Python 3.6
for k in ['ttl', 'adv', 'recon', 'edge', 'pl']:
    errGAs[k] = 0
    errGBs[k] = 0

display_iters = 300
backup_iters = 5000
TOTAL_ITERS = 20000

global train_batch_src, train_batch_dst

train_batch_src = DataLoader(train_src, train_src_n_dst, batch_size,
                             dir_bm_eyes=img_dir_src_bm_eyes, resolution=RESOLUTION,
                             num_cpus=num_cpus, session=K.get_session(), **da_config)

train_batch_dst = DataLoader(train_dst, train_src_n_dst, batch_size,
                             dir_bm_eyes=img_dir_dst_bm_eyes, resolution=RESOLUTION,
                             num_cpus=num_cpus, session=K.get_session(), **da_config)

while gen_iterations <= TOTAL_ITERS:
    # Loss function automation
    if gen_iterations == (TOTAL_ITERS // 5 - display_iters // 2):
        clear_output()
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = False
        loss_config['m_mask'] = 0.0
        reset_session(models_dir)
        print("Конструкция новых функций потерь...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Выполнено.")
        pass
    elif gen_iterations == (TOTAL_ITERS // 5 + TOTAL_ITERS // 10 - display_iters // 2):
        clear_output()
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = True
        loss_config['m_mask'] = 0.5
        reset_session(models_dir)
        print("Конструкция новых функций потерь...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Завершено.")
        pass
    elif gen_iterations == (2 * TOTAL_ITERS // 5 - display_iters // 2):
        clear_output()
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = True
        loss_config['m_mask'] = 0.2
        reset_session(models_dir)
        print("Конструкция новых функций потерь...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Выполнено.")
        pass
    elif gen_iterations == (TOTAL_ITERS // 2 - display_iters // 2):
        clear_output()
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = True
        loss_config['m_mask'] = 0.4
        loss_config['lr_factor'] = 0.3
        reset_session(models_dir)
        print("Конструкция новых функций потерь...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Выполнено.")
        pass
    elif gen_iterations == (2 * TOTAL_ITERS // 3 - display_iters // 2):
        clear_output()
        model.decoder_src.load_weights("models/decoder_B.h5")  # swap decoders
        model.decoder_dst.load_weights("models/decoder_A.h5")  # swap decoders
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = True
        loss_config['m_mask'] = 0.5
        loss_config['lr_factor'] = 1
        reset_session(models_dir)
        print("Конструкция новых функций потерь...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Выполнено.")
        pass
    elif gen_iterations == (8 * TOTAL_ITERS // 10 - display_iters // 2):
        clear_output()
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = True
        loss_config['m_mask'] = 0.1
        loss_config['lr_factor'] = 0.3
        reset_session(models_dir)
        print("Конструкция новых функций потерь...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Выполнено.")
        pass
    elif gen_iterations == (9 * TOTAL_ITERS // 10 - display_iters // 2):
        clear_output()
        loss_config['use_PL'] = True
        loss_config['use_mask_hinge_loss'] = False
        loss_config['m_mask'] = 0.0
        loss_config['lr_factor'] = 0.1
        reset_session(models_dir)
        print("Конструкция новых функций потерь...")
        show_loss_config(loss_config)
        model.build_train_functions(loss_weights=loss_weights, **loss_config)
        print("Выполнено.")

        pass

    if gen_iterations == 5:
        print("Выполняется.")
        pass

    # Train dicriminators for one batch
    data_src = train_batch_src.get_next_batch()
    data_dst = train_batch_dst.get_next_batch()
    errDA, errDB = model.train_one_batch_disc(data_src, data_dst)
    errDA_sum += errDA[0]
    errDB_sum += errDB[0]

    # Train generators for one batch
    data_src = train_batch_src.get_next_batch()
    data_dst = train_batch_dst.get_next_batch()
    errGA, errGB = model.train_one_batch_gen(data_src, data_dst)
    errGA_sum += errGA[0]
    errGB_sum += errGB[0]
    for i, k in enumerate(['ttl', 'adv', 'recon', 'edge', 'pl']):
        errGAs[k] += errGA[i]
        errGBs[k] += errGB[i]
        pass
    gen_iterations += 1

    # Visualization
    if gen_iterations % display_iters == 0:
        clear_output()

        # Display loss information
        show_loss_config(loss_config)
        print("----------")
        print("[iter %d] Loss_DA: %f Loss_DB: %f Loss_GA: %f Loss_GB: %f time: %f"
              % (gen_iterations, errDA_sum / display_iters, errDB_sum / display_iters,
                 errGA_sum / display_iters, errGB_sum / display_iters, time.time() - t0))
        print("----------")
        print("Детали потерь генератора:")
        print(f"[Adversarial loss]\nGA: {errGAs['adv'] / display_iters:.4f} GB: {errGBs['adv'] / display_iters:.4f}")
        print(f"[Reconstruction loss]\nGA: {errGAs['recon'] / display_iters:.4f} GB: {errGBs['recon'] / display_iters:.4f}")
        print(f"[Edge loss]\nGA: {errGAs['edge'] / display_iters:.4f} GB: {errGBs['edge'] / display_iters:.4f}")
        if loss_config['use_PL']:
            print(f"[Perceptual loss]")
            try:
                print(f"GA: {errGAs['pl'][0] / display_iters:.4f} GB: {errGBs['pl'][0] / display_iters:.4f}")
            except:
                print(f"GA: {errGAs['pl'] / display_iters:.4f} GB: {errGBs['pl'] / display_iters:.4f}")
                pass
            pass

        # Display images
        print("----------")
        w_src, t_src, _ = train_batch_src.get_next_batch()
        w_dst, t_dst, _ = train_batch_dst.get_next_batch()
        print("Преобразованные (замаскированные) результаты:")
        showG(t_src, t_dst, model.path_src, model.path_dst, batch_size)
        print("Маски:")
        showG_mask(t_src, t_dst, model.path_mask_src, model.path_mask_dst, batch_size)
        print("Результаты реконструкции:")
        showG(w_src, w_dst, model.path_bgr_src, model.path_bgr_dst, batch_size)
        errGA_sum = errGB_sum = errDA_sum = errDB_sum = 0
        for k in ['ttl', 'adv', 'recon', 'edge', 'pl']:
            errGAs[k] = 0
            errGBs[k] = 0
            pass

        # Save models
        model.save_weights(path=models_dir)
        pass
    pass

In [None]:
# Display random results

w_src, t_src, _ = train_batch_src.get_next_batch()
w_dst, t_dst, _ = train_batch_dst.get_next_batch()
print("Преобразованные (замаскированные) результаты:")
showG(t_src, t_dst, model.path_src, model.path_dst, batch_size)
print("Маски:")
showG_mask(t_src, t_dst, model.path_mask_src, model.path_mask_dst, batch_size)
print("Результаты реконструкции:")
showG(w_src, w_dst, model.path_bgr_src, model.path_bgr_dst, batch_size)

<a id='tf'></a>
# Преобразование Одного Изображения

In [None]:
from detector.face_detector import MTCNNFaceDetector
from converter.landmarks_alignment import *

In [None]:
mtcnn_weights_dir = "./mtcnn_weights/"
fd = MTCNNFaceDetector(sess=K.get_session(), model_path=mtcnn_weights_dir)

In [None]:
from converter.face_transformer import FaceTransformer

ftrans = FaceTransformer()
ftrans.set_model(model)

In [None]:
# Read input image
input_img = plt.imread("./TEST_IMAGE.jpg")[..., :3]

if input_img.dtype == np.float32:
    print("input_img имеет тип dtype np.float32. Масштабируем его до uint8.")
    input_img = (input_img * 255).astype(np.uint8)
    pass

In [None]:
# Display input image
plt.imshow(input_img)

In [None]:
# Display detected face
face, lms = fd.detect_face(input_img)
aligned_det_face_im = None
(tar_landmarks, src_landmarks) = (None, None)
x0, y1, x1, y0 = (0, 0, 0, 0)
if len(face) == 1:
    x0, y1, x1, y0, _ = face[0]
    det_face_im = input_img[int(x0):int(x1), int(y0):int(y1), :]
    try:
        src_landmarks = get_src_landmarks(x0, x1, y0, y1, lms)
        tar_landmarks = get_tar_landmarks(det_face_im)
        aligned_det_face_im = landmarks_match_mtcnn(det_face_im, src_landmarks, tar_landmarks)
    except:
        print("Во время выравнивания лиц произошла ошибка.")
        aligned_det_face_im = det_face_im
elif len(face) == 0:
    raise ValueError("Ошибка: лицо не обнаружено.")
elif len(face) > 1:
    print(face)
    raise ValueError("Ошибка: обнаружено несколько лиц")

if aligned_det_face_im:
    plt.imshow(aligned_det_face_im)

In [None]:
# Transform detected face
result_img, result_rgb, result_mask = ftrans.transform(
    aligned_det_face_im,
    direction="AtoB",
    roi_coverage=0.93,
    color_correction="adain_xyz",
    IMAGE_SHAPE=(RESOLUTION, RESOLUTION, 3)
)
try:
    result_img = landmarks_match_mtcnn(result_img, tar_landmarks, src_landmarks)
    result_rgb = landmarks_match_mtcnn(result_rgb, tar_landmarks, src_landmarks)
    result_mask = landmarks_match_mtcnn(result_mask, tar_landmarks, src_landmarks)
except:
    print("Во время выравнивания лица произошла ошибка.")
    pass

result_input_img = input_img.copy()
result_input_img[int(x0):int(x1), int(y0):int(y1), :] = (
        result_mask.astype(np.float32) / 255 * result_rgb +
        (1 - result_mask.astype(np.float32) / 255) * result_input_img[
                                                     int(x0):int(x1),
                                                     int(y0):int(y1), :]
)

In [None]:
# Show result face
plt.imshow(result_input_img[int(x0):int(x1), int(y0):int(y1), :])

In [None]:
# Show transformed image before masking
plt.imshow(result_rgb)

In [None]:
# Show alpha mask
plt.imshow(result_mask[..., 0])

In [None]:
# Display interpolations before/after transformation
def interpolate_imgs(im1, im2):
    im1, im2 = map(np.float32, [im1, im2])
    out = [ratio * im1 + (1 - ratio) * im2 for ratio in np.linspace(1, 0, 5)]
    out = map(np.uint8, out)
    return out


plt.figure(figsize=(15, 8))
plt.imshow(np.hstack(interpolate_imgs(input_img, result_input_img)))