In [None]:
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Conv2D, Dense, Dropout, Activation, Input, Reshape, UpSampling2D, ReLU, GaussianNoise
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.layers import Layer
from tensorflow.keras.losses import binary_crossentropy
from tensorflow.python.ops import math_ops
from tensorflow.python.framework import ops
#from tensorflow.keras.applications import ResNet50V2 as pretrained
from tensorflow.keras.applications import Xception as pretrained
#from efficientnet.tfkeras import EfficientNetB0 as pretrained
import tensorflow.keras.backend as K
import tensorflow as tf
from tensorflow.keras.utils import multi_gpu_model

import numpy as np
import os
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import cv2
from multiprocessing import Pool
import random
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from timeout_decorator import timeout, TimeoutError
import itertools

from utils import *

In [None]:
os.environ["CUDA_VISIBLE_DEVICES"]="1"

In [None]:
noise_length = 512
batch_size = 32
n_estimators = 3
folder = "./faces"

In [None]:
conv_init = RandomNormal(0, 0.02)


def G_model_begin(noise_length, channel=256):
    stddev = 0.1
    use_bias=True
    zin = Input((noise_length,))
    z = Lambda(lambda x: x[:, :noise_length//2], output_shape=(noise_length//2,))(zin)
    #z = zin
    for i in range(8):
        z = Dense(512, use_bias=use_bias)(z)
        z = ReLU()(z)
    
    #x = Constant(4*4*channel, trainable=False)(None)
    x = Lambda(lambda x: x[:, noise_length//2:], output_shape=(noise_length-noise_length//2,))(zin)
    x = Dense(4*4*channel, activation="relu")(x)
    x = Reshape((4, 4, channel))(x)
    
    x = GaussianNoise(stddev)(x)
    x = AdaINBlock(x,z, use_bias=use_bias)
    x = Conv2D(channel, 3, strides=1, padding="same", kernel_initializer=conv_init, use_bias=use_bias)(x)
    x = GaussianNoise(stddev)(x)
    x = AdaINBlock(x,z, use_bias=use_bias)
    model = Model(inputs=zin, outputs=[x,z], name="generator")
    return model

def G_model_expand(g, channel=256):
    stddev = 0.1
    use_bias=True
    x, z = g.output
    x = UpSampling2D()(x)
    x = Conv2D(channel, 3, strides=1, padding="same", kernel_initializer=conv_init, use_bias=use_bias)(x)
    x = ReLU()(x)
    x = GaussianNoise(stddev)(x)
    x = AdaINBlock(x, z, use_bias=use_bias)
    x = Conv2D(channel, 3, strides=1, padding="same", kernel_initializer=conv_init, use_bias=use_bias)(x)
    x = ReLU()(x)
    x = GaussianNoise(stddev)(x)
    x = AdaINBlock(x, z, use_bias=use_bias)
    model = Model(inputs=g.input, outputs=[x,z])
    return model

def G_model_attach(g, channel=3):
    x, z = g.output
    #x = Dropout(0.1)(x)
    x = Conv2D(channel, 1, strides=1, padding="same", use_bias=False)(x)
    outputs = Activation("tanh", name="generator_output")(x)
    model = Model(inputs=g.input, outputs=outputs, name="generator")
    return model

def mini_batch(g):
    x = g.output
    out = MinibatchStd()(x)
    return Model(inputs=g.input, outputs=out)

In [None]:
#def bin_custom(y_true, y_pred):
    #return binary_crossentropy(y_true, y_pred) + 1.0/(K.std(y_pred)+0.0001)

In [None]:
ch = 64
g = G_model_begin(noise_length, ch)
#g = load_model("models/generator_core_8_8_20.h5", custom_objects={"Constant":Constant})
epochs = 20
while True:
    g_out = G_model_attach(g)
    g_out = mini_batch(g_out)
    height = g_out.layers[-1].output.shape[1]
    width = g_out.layers[-1].output.shape[2]
    ch = g_out.layers[-1].output.shape[3]
    d = D_model(Height=height, Width=width, n_estimators=n_estimators, channel=ch)
    c = Combined_model(g=g_out, d=d)

    d_opt = Adam(lr=1e-4, beta_1=0.5, beta_2=0.9)
    g_opt = Adam(lr=1e-4, beta_1=0.5, beta_2=0.9)
    
    g_out.compile(loss='binary_crossentropy', optimizer=g_opt)
    d.trainable = False
    for layer in d.layers:
        layer.trainable = False
    c.compile(loss='binary_crossentropy', optimizer=g_opt)
    
    d.trainable = True
    for layer in d.layers:
        layer.trainable = True
    d.compile(loss='binary_crossentropy', optimizer=d_opt)

    g_losses=[]
    d_losses=[]
    

    X_train = [{"filepath":os.path.join(folder,path), "height":height, "width":width} for path in os.listdir(folder)]    
    with Pool() as p:
        imap = p.imap(read_file_wrap, X_train)
        X_train = list(tqdm(imap,total=len(X_train)))

    X_train = [img for img in X_train if img is not None]
    
    row = 3
    col = 4
    fig = plt.figure(figsize=(col*3, row*3))
    plt.suptitle("train shape: {0} * {1}".format(width,height), fontsize=20)
    for i in range(row * col):
        plt.subplot(row, col, i+1)
        img = random.choice(X_train)
        plt.imshow(img.astype(np.uint8))
        plt.axis('off')
    plt.close(fig)
    
    
    for epoch in tqdm(range(1, epochs+1)):
        _g_losses = []
        _d_losses = []
        np.random.shuffle(X_train)
        for ite in tqdm(range(1, len(X_train)//batch_size+1)):
            # Discremenator training
            y = X_train[ite * batch_size: (ite+1) * batch_size]
            y += [cv2.flip(img, 1) for img in y]
            y = np.asarray(y)
            y = (y.astype(np.float32)-127.5)/127.5
            y = MinibatchStd()(y)
            input_noise = np.random.uniform(-1, 1, size=(y.shape[0], noise_length))
            g_output = g_out.predict(input_noise, verbose=0)
            _X_train = np.concatenate((y, g_output))
            _Y_train = [np.array([1.0] * y.shape[0] + [0.0] * g_output.shape[0])] * n_estimators
            d_loss = d.train_on_batch(_X_train, _Y_train)
            _d_losses.append(d_loss)
            # Generator training
            input_noise = np.random.uniform(-1, 1, size=(batch_size*5, noise_length))
            g_loss = c.train_on_batch(input_noise, [[1.0] * batch_size*5] * n_estimators)
            _g_losses.append(g_loss)
    
        if epoch%5==1 or epoch==epochs:
            g_out.save("./models/generator_{0}_{1}_{2}.h5".format(width, height, epoch))
            g.save("./models/generator_core_{0}_{1}_{2}.h5".format(width, height, epoch))
            d.save("./models/discriminator_{0}_{1}_{2}.h5".format(width, height, epoch))
        g_losses.append(np.mean(_g_losses))
        d_losses.append(np.mean(_d_losses))
        print("g loss = {0}, d loss = {1}".format(g_losses[-1], d_losses[-1]))
        visualize(g_out, d, epoch, row=1, col=3, save="output_images/{0}_{1}_{2}epoch{3}.png".format(width, height, ch, epoch))
    
    epochs += 20
    #for layer in g.layers[20:]:
        #layer.trainable=False
    g = G_model_expand(g, ch)

In [None]:
noise = np.random.uniform(-1, 1, size=(1, noise_length))
plt.imshow((g_out.predict(noise, verbose=0)[0]*127.5+127.5).astype(np.uint8))

In [None]:
visualize(g,d)

In [None]:
plt.plot(g_losses)

In [None]:
plt.plot(d_losses)

In [None]:
print(ch)