!!! Le generateur ne progresse pas et les discriminateurs sont a zeros tous les deux

Cela peut etre du a la conversion en tensorflow 2.4

In [1]:
%matplotlib inline

import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras import layers
from tensorflow.keras import optimizers
from tensorflow import keras
from tensorflow.keras.utils import plot_model
import time

from IPython import display

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [2]:
# example of progressive growing gan on celebrity faces dataset
from math import sqrt
from numpy import load
from numpy import asarray
from numpy import zeros
from numpy import ones
from numpy.random import randn
from numpy.random import randint
from skimage.transform import resize
from matplotlib import pyplot
from tqdm import trange
from tqdm import tqdm

In [3]:
# weighted sum output
class WeightedSum(layers.Add):
    # init with default value
    def __init__(self, alpha=0.0, **kwargs):
        super(WeightedSum, self).__init__(**kwargs)
        self.alpha = tf.Variable(alpha, name='ws_alpha')

    # output a weighted sum of inputs
    def _merge_function(self, inputs):
        # only supports a weighted sum of two inputs
        assert (len(inputs) == 2)
        # ((1-a) * input1) + (a * input2)
        output = ((1.0 - self.alpha) * inputs[0]) + (self.alpha * inputs[1])
        return output
    
    #def get_config(self):
#
#        config = super().get_config().copy()
#        config.update({
#            'alpha': self.alpha
#        })
#        return config

In [4]:
# calculate wasserstein loss
def wasserstein_loss(y_true, y_pred):
    return tf.math.reduce_mean(y_true * y_pred)

In [5]:
# add a discriminator block
def add_discriminator_block(old_model, n_input_layers=3):
    # get shape of existing model
    in_shape = list(old_model.input.shape)
    # define new input shape as double the size
    input_shape = (in_shape[-2]*2, in_shape[-2]*2, in_shape[-1])
    in_image = layers.Input(shape=input_shape)
    # define new input processing layer
    d = layers.Conv2D(64, (1,1), padding='same', kernel_initializer='he_normal')(in_image)
    d = layers.LeakyReLU(alpha=0.2)(d)
    # define new block
    d = layers.Conv2D(64, (3,3), padding='same', kernel_initializer='he_normal')(d)
    d = layers.BatchNormalization()(d)
    d = layers.LeakyReLU(alpha=0.2)(d)
    d = layers.Conv2D(64, (3,3), padding='same', kernel_initializer='he_normal')(d)
    d = layers.BatchNormalization()(d)
    d = layers.LeakyReLU(alpha=0.2)(d)
    d = layers.AveragePooling2D()(d)
    block_new = d
    # skip the input, 1x1 and activation for the old model
    for i in range(n_input_layers, len(old_model.layers)):
        d = old_model.layers[i](d)
    # define straight-through model
    model1 = keras.models.Model(in_image, d)
    # compile model
    model1.compile(loss='mse', optimizer=optimizers.Adam(lr=0.001, beta_1=0, beta_2=0.99, epsilon=10e-8))
    # downsample the new larger image
    downsample = layers.AveragePooling2D()(in_image)
    # connect old input processing to downsampled new input
    block_old = old_model.layers[1](downsample)
    block_old = old_model.layers[2](block_old)
    # fade in output of old model input layer with new input
    d = WeightedSum()([block_old, block_new])
    # skip the input, 1x1 and activation for the old model
    for i in range(n_input_layers, len(old_model.layers)):
        d = old_model.layers[i](d)
    # define straight-through model
    model2 = keras.models.Model(in_image, d)
    # compile model
    model2.compile(loss='mse', optimizer=optimizers.Adam(lr=0.001, beta_1=0, beta_2=0.99, epsilon=10e-8))
    return [model1, model2]

In [6]:
# define the discriminator models for each image resolution
def define_discriminator(n_blocks, input_shape=(4,4,3)):
    print('Building the Discriminator')
    model_list = list()
    # base model input
    in_image = layers.Input(shape=input_shape)
    # conv 1x1
    d = layers.Conv2D(64, (1,1), padding='same', kernel_initializer='he_normal')(in_image)
    d = layers.LeakyReLU(alpha=0.2)(d)
    # conv 3x3 (output block)
    d = layers.Conv2D(128, (3,3), padding='same', kernel_initializer='he_normal')(d)
    d = layers.BatchNormalization()(d)
    d = layers.LeakyReLU(alpha=0.2)(d)
    # conv 4x4
    d = layers.Conv2D(128, (4,4), padding='same', kernel_initializer='he_normal')(d)
    d = layers.BatchNormalization()(d)
    d = layers.LeakyReLU(alpha=0.2)(d)
    # dense output layer
    d = layers.Flatten()(d)
    out_class = layers.Dense(1)(d)
    # define model
    model = keras.models.Model(in_image, out_class)
    # compile model
    model.compile(loss='mse', optimizer=optimizers.Adam(lr=0.001, beta_1=0, beta_2=0.99, epsilon=10e-8))
    # store model
    model_list.append([model, model])
    # create submodels
    for i in range(1, n_blocks):
        # get prior model without the fade-on
        old_model = model_list[i - 1][0]
        # create new model for next resolution
        models = add_discriminator_block(old_model)
        # store model
        model_list.append(models)
    print('* Discriminator built')
    return model_list

In [7]:
# add a generator block
def add_generator_block(old_model):
    # get the end of the last block
    block_end = old_model.layers[-2].output
    # upsample, and define new block
    upsampling = layers.UpSampling2D()(block_end)
    g = layers.Conv2D(64, (3,3), padding='same', kernel_initializer='he_normal')(upsampling)
    g = layers.BatchNormalization()(g)
    g = layers.LeakyReLU(alpha=0.2)(g)
    g = layers.Conv2D(64, (3,3), padding='same', kernel_initializer='he_normal')(g)
    g = layers.BatchNormalization()(g)
    g = layers.LeakyReLU(alpha=0.2)(g)
    # add new output layer
    out_image = layers.Conv2D(3, (1,1), padding='same', kernel_initializer='he_normal')(g)
    # define model
    model1 = keras.models.Model(old_model.input, out_image)
    # get the output layer from old model
    out_old = old_model.layers[-1]
    # connect the upsampling to the old output layer
    out_image2 = out_old(upsampling)
    # define new output image as the weighted sum of the old and new models
    merged = WeightedSum()([out_image2, out_image])
    # define model
    model2 = keras.models.Model(old_model.input, merged)
    return [model1, model2]

In [8]:
# define generator models
def define_generator(latent_dim, n_blocks, in_dim=4):
    print('Building the Generator')
    model_list = list()
    # base model latent input
    in_latent = layers.Input(shape=(latent_dim,))
    # linear scale up to activation maps
    g  = layers.Dense(128 * in_dim * in_dim, kernel_initializer='he_normal')(in_latent)
    g = layers.Reshape((in_dim, in_dim, 128))(g)
    # conv 4x4, input block
    g = layers.Conv2D(128, (3,3), padding='same', kernel_initializer='he_normal')(g)
    g = layers.BatchNormalization()(g)
    g = layers.LeakyReLU(alpha=0.2)(g)
    # conv 3x3
    g = layers.Conv2D(128, (3,3), padding='same', kernel_initializer='he_normal')(g)
    g = layers.BatchNormalization()(g)
    g = layers.LeakyReLU(alpha=0.2)(g)
    # conv 1x1, output block
    out_image = layers.Conv2D(3, (1,1), padding='same', kernel_initializer='he_normal')(g)
    # define model
    model = keras.models.Model(in_latent, out_image)
    # store model
    model_list.append([model, model])
    # create submodels
    for i in range(1, n_blocks):
        # get prior model without the fade-on
        old_model = model_list[i - 1][0]
        # create new model for next resolution
        models = add_generator_block(old_model)
        # store model
        model_list.append(models)
    print('* Generator built')
    return model_list

In [9]:
# define composite models for training generators via discriminators
def define_composite(discriminators, generators):
    model_list = list()
    # create composite models
    for i in range(len(discriminators)):
        g_models, d_models = generators[i], discriminators[i]
        # straight-through model
        d_models[0].trainable = False
        model1 = tf.keras.Sequential()
        model1.add(g_models[0])
        model1.add(d_models[0])
        model1.compile(loss='mse', optimizer=optimizers.Adam(lr=0.001, beta_1=0, beta_2=0.99, epsilon=10e-8))
        # fade-in model
        d_models[1].trainable = False
        model2 = tf.keras.Sequential()
        model2.add(g_models[1])
        model2.add(d_models[1])
        model2.compile(loss='mse', optimizer=optimizers.Adam(lr=0.001, beta_1=0, beta_2=0.99, epsilon=10e-8))
        # store
        model_list.append([model1, model2])
    return model_list

In [10]:
# load dataset
def load_real_samples(filename):
    # load dataset
    data = load(filename)
    # extract numpy array
    X = data['arr_0']
    # convert from ints to floats
    X = X.astype('float32')
    # scale from [0,255] to [-1,1]
    X = (X - 127.5) / 127.5
    return X

In [11]:
# select real samples
def generate_real_samples(dataset, n_samples):
    # choose random instances
    ix = randint(0, dataset.shape[0], n_samples)
    # select images
    X = dataset[ix]
    # generate class labels
    y = ones((n_samples, 1))
    return X, y

In [12]:
# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples):
    # generate points in the latent space
    x_input = randn(latent_dim * n_samples)
    # reshape into a batch of inputs for the network
    x_input = x_input.reshape(n_samples, latent_dim)
    return x_input

In [13]:
# use the generator to generate n fake examples, with class labels
def generate_fake_samples(generator, latent_dim, n_samples):
    # generate points in latent space
    x_input = generate_latent_points(latent_dim, n_samples)
    # predict outputs
    X = generator.predict(x_input)
    # create class labels
    y = -ones((n_samples, 1))
    return X, y

In [14]:
# update the alpha value on each instance of WeightedSum
def update_fadein(models, step, n_steps):
    # calculate current alpha (linear from 0 to 1)
    alpha = step / float(n_steps - 1)
    # update the alpha for each model
    for model in models:
        for layer in model.layers:
            if isinstance(layer, WeightedSum):
                #tf.set_value(layer.alpha, alpha)
                #layer.alpha.assign(alpha)
                keras.backend.set_value(layer.alpha, alpha)

In [15]:
# train a generator and discriminator
def train_epochs(g_model, d_model, gan_model, dataset, n_epochs, n_batch, fadein=False):
    # calculate the number of batches per training epoch
    bat_per_epo = int(dataset.shape[0] / n_batch)
    # calculate the number of training iterations
    n_steps = bat_per_epo * n_epochs
    # calculate the size of half a batch of samples
    half_batch = int(n_batch / 2)
    # manually enumerate epochs
    print('* Batch per epoch:', bat_per_epo)
    print('* Steps:', n_steps)
    pbar = trange(n_steps, desc='Epoch runs', leave=True)
    for i in pbar:
        # update alpha for all WeightedSum layers when fading in new blocks
        if fadein:
            update_fadein([g_model, d_model, gan_model], i, n_steps)
        # prepare real and fake samples
        X_real, y_real = generate_real_samples(dataset, half_batch)
        X_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
        # update discriminator model
        d_loss1 = d_model.train_on_batch(X_real, y_real)
        d_loss2 = d_model.train_on_batch(X_fake, y_fake)
        # update the generator via the discriminator's error
        z_input = generate_latent_points(latent_dim, n_batch)
        y_real2 = ones((n_batch, 1))
        g_loss = gan_model.train_on_batch(z_input, y_real2)
        # summarize loss on this batch
        #t.set_description('>{}, d1={0.3f}, d2={0.3f} g={0.3f}'.format(i+1, d_loss1, d_loss2, g_loss), refresh=True)
        pbar.set_postfix({'d1':"%.3f" % d_loss1, 'd2': "%.3f" % d_loss2, 'g':"%.3f" % g_loss})
        #print('>%d, d1=%.3f, d2=%.3f g=%.3f' % (i+1, d_loss1, d_loss2, g_loss))

In [16]:
# scale images to preferred size
def scale_dataset(images, new_shape):
    images_list = list()
    print('Scaling image from {} to {}'.format(images[0].shape, new_shape))
    for image in tqdm(images):
        # resize with nearest neighbor interpolation
        new_image = resize(image, new_shape, 0)
        # store
        images_list.append(new_image)
        
    print('Images scaled')
    return asarray(images_list)

In [17]:
# generate samples and save as a plot and save the model
def summarize_performance(status, g_model, latent_dim, n_samples=25):
    print("Summarize Performance")
    # devise name
    gen_shape = g_model.output_shape
    name = '%03dx%03d-%s' % (gen_shape[1], gen_shape[2], status)
    # generate images
    X, _ = generate_fake_samples(g_model, latent_dim, n_samples)
    # normalize pixel values to the range [0,1]
    X = (X - X.min()) / (X.max() - X.min())
    # plot real images
    square = int(sqrt(n_samples))
    for i in range(n_samples):
        pyplot.subplot(square, square, 1 + i)
        pyplot.axis('off')
        pyplot.imshow(X[i])
    pyplot.show()
    # save plot to file
    filename1 = 'plot_%s.png' % (name)
    pyplot.savefig(filename1)
    pyplot.close()
    # save the generator model
    filename2 = 'model_%s.h5' % (name)
    g_model.save(filename2)
    print('>Saved: %s and %s' % (filename1, filename2))

In [18]:
# train the generator and discriminator
def train(g_models, d_models, gan_models, dataset, latent_dim, e_norm, e_fadein, n_batch):
    print("fit the baseline model")
    g_normal, d_normal, gan_normal = g_models[0][0], d_models[0][0], gan_models[0][0]
    print("scale dataset to appropriate size:", g_normal.output_shape[1:])
    gen_shape = g_normal.output_shape
    scaled_data = scale_dataset(dataset, gen_shape[1:])
    print("train normal or straight-through models. Norm: {} - Batch: {}".format(e_norm[0], n_batch[0]))
    train_epochs(g_normal, d_normal, gan_normal, scaled_data, e_norm[0], n_batch[0])
    summarize_performance('tuned', g_normal, latent_dim)
    print("process each level of growth. Nb of levels:".format(len(g_models)))
    for i in range(1, len(g_models)):
        # retrieve models for this level of growth
        [g_normal, g_fadein] = g_models[i]
        [d_normal, d_fadein] = d_models[i]
        [gan_normal, gan_fadein] = gan_models[i]
        # scale dataset to appropriate size
        gen_shape = g_normal.output_shape
        scaled_data = scale_dataset(dataset, gen_shape[1:])
        print('Scaled Data', scaled_data.shape)
        # train fade-in models for next level of growth
        train_epochs(g_fadein, d_fadein, gan_fadein, scaled_data, e_fadein[i], n_batch[i], True)
        summarize_performance('faded', g_fadein, latent_dim)
        # train normal or straight-through models
        train_epochs(g_normal, d_normal, gan_normal, scaled_data, e_norm[i], n_batch[i])
        summarize_performance('tuned', g_normal, latent_dim)

In [19]:
# number of growth phases, e.g. 9 == [4, 8, 16, 32, 64, 128, 512, 1024, 2048]
n_blocks = 6
# size of the latent space
latent_dim = 100
# define models
d_models = define_discriminator(n_blocks)

Building the Discriminator
* Discriminator built


In [None]:
# define models
g_models = define_generator(latent_dim, n_blocks)
# define composite models
gan_models = define_composite(d_models, g_models)
# load image data
dataset = load_real_samples('img_align_celeba_128.npz')
print('Loaded', dataset.shape)

In [None]:
# load the prepared dataset
from numpy import load
from matplotlib import pyplot

# plot a list of loaded faces
def plot_faces(faces, n):
    for i in range(n * n):
        # define subplot
        pyplot.subplot(n, n, 1 + i)
        # turn off axis
        pyplot.axis('off')
        # plot raw pixel data
        pyplot.imshow(faces[i].astype('uint8'))
    pyplot.show()

# load the face dataset
data = load('img_align_celeba_128.npz')
faces = data['arr_0']
print('Loaded: ', faces.shape)
plot_faces(faces, 10)

In [None]:
def sample_images(generator, batch, image_grid_rows=1, image_grid_columns=1):
    print('Printing Sample image')
    #z = np.random.normal(0, 1, (image_grid_rows * image_grid_columns, z_dim))
    z = generate_latent_points(latent_dim, n_batch)
    gen_imgs = generator.predict(z)

    gen_imgs = 0.5 * gen_imgs * 0.5

    fig, axs = plt.subplots(image_grid_rows, image_grid_columns, figsize=(4, 4), sharey=True, sharex=True)

    cnt = 0
    for i in range(image_grid_rows):
        for j in range(image_grid_columns):
            axs[i, j].imshow(gen_imgs[cnt, :, :, 0])#, cmap='gray')
            axs[i, j].axis('off')
            cnt += 1
    plt.show()
    print('Printed')

In [None]:
d_models[0][0].summary()

In [None]:
# train model
n_batch = [16, 16, 16, 8, 4, 4] # [16384, 8192, 8192, 1024, 512, 512]
# 10 epochs == 500K images per training phase
n_epochs = [1, 8, 8, 10, 10, 10] # [1000, 1000, 2000, 3000, 4000, 6000]
train(g_models, d_models, gan_models, dataset, latent_dim, n_epochs, n_epochs, n_batch)