In [27]:
import numpy as np
import math
import tensorflow as tf
from tensorflow import keras
from keras import layers
import matplotlib.pyplot as plt
import cv2
import os
import random
from skimage.io import imread
from skimage.transform import resize
!pip install tensorflow_addons
import tensorflow_addons as tfa

# tf.config.run_functions_eagerly(True)



Objectif de ce notebook : créer le réseau (GAN) qui apprend un style de peinture (sur une selection de styles).

https://www.kaggle.com/amyjang/monet-cyclegan-tutorial (tuto complet)
https://www.kaggle.com/shivansh002/gentle-introduction-to-gan (tuto basique + compréhensible)

In [28]:
PATH = "/content/drive/Shareddrives/PFE artists/"
DATA_PATH = "/content/drive/Shareddrives/PFE artists/data/wikiart/base/"
STYLES = ["Photo", "Cubism", "Ukiyo_e", "Symbolism", "Expressionism", "Baroque", "Fauvism"]
NB_CHANNELS = 3
IMG_SIZE = (128, 128)
IMG_SHAPE =  (*IMG_SIZE, NB_CHANNELS)

In [29]:
# affichage d'une image
def display_image(image):
  plt.figure(figsize=(6,6))
  plt.imshow(image)

def display_images(tab_images):
  # affiche plusieurs images contenues dans un tableau
  n = len(tab_images)
  n_rows = n//3 +1 if n%3!=0 else n//3
  figure, axs = plt.subplots(nrows=n_rows,ncols=3,figsize=(15,n_rows*5) )
  for i in range(n):
    img = tab_images[i]
    if n<=3 :
      axs[i].imshow(img)
    else :
      k = i//3
      j = i-k*3
      axs[k,j].imshow(img)
  plt.show()

Pour chaque style de peinture :    
- Générateur (réseau de neurones) qui recopie le style (entrainé sur les images du dossier correspondant)
- Discriminateur (réseau de neurones aussi) qui détermine si c'est un tableau de ce style ou non
     -> discriminateur utilisé aussi pour déterminé le style d'un tableau donné ? 
- fonction qui entraine le modèle en boucle

En théorie on peut utiliser les mêmes fonctions pour tous les styles, en les entrainant sur des bases d'images adaptées


In [30]:
# # Downsampling = réduit les dimensions du tableau (longeur et largeur) par 2
# def downsample(filters, size, norm=True):
#   initializer = tf.random_normal_initializer(0., 0.2)
  
#   model = keras.Sequential()
#   model.add(keras.layers.Conv2D(filters, size, strides=2, padding='same', kernel_initializer=initializer))
#   if norm:
#     model.add(keras.layers.BatchNormalization())
#   model.add(keras.layers.LeakyReLU())
#   return model

# # Upsampling : augmente la dimension du tableau par 2
# def upsample(filters, size, drop=False):
#   initializer = tf.random_normal_initializer(0., 0.2)

#   model = keras.Sequential()
#   model.add(keras.layers.Conv2DTranspose(filters, size, strides=2, padding='same', kernel_initializer=initializer, use_bias=False))
#   model.add(keras.layers.BatchNormalization())
#   if drop:
#     model.add(keras.layers.Dropout(0.5))
#   model.add(keras.layers.ReLU())

#   return model

def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    result = keras.Sequential()
    result.add(layers.Conv2DTranspose(filters, size, strides=2,
                                      padding='same',
                                      kernel_initializer=initializer,
                                      use_bias=False))

    result.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_init))

    if apply_dropout:
        result.add(layers.Dropout(0.5))

    result.add(layers.ReLU())

    return result

def downsample(filters, size, apply_instancenorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    result = keras.Sequential()
    result.add(layers.Conv2D(filters, size, strides=2, padding='same',
                             kernel_initializer=initializer, use_bias=False))

    if apply_instancenorm:
        result.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_init))

    result.add(layers.LeakyReLU())

    return result

In [31]:
# # Nouveau Générateur
# def generator(data_shape=IMG_SHAPE):
#   model = keras.Sequential(name="Generateur")
#   # entrée
#   model.add(keras.layers.Input(shape=data_shape))
#   # downsampling
#   model.add(downsample(64, 4, False))
#   model.add(downsample(128, 4))
#   model.add(downsample(256, 4))
#   model.add(downsample(512, 4))
#   model.add(downsample(512, 4))
#   model.add(downsample(512, 4))
#   model.add(downsample(512, 4))
#   # model.add(downsample(512, 4))
#   # upsampling
#   # model.add(upsample(512, 4, True))
#   model.add(upsample(512, 4, True))
#   model.add(upsample(512, 4, True))
#   model.add(upsample(512, 4, True))
#   model.add(upsample(256, 4))
#   model.add(upsample(128, 4))
#   model.add(upsample(64, 4))
#   # sortie
#   initializer = tf.random_normal_initializer(0., 0.02)
#   model.add(keras.layers.Conv2DTranspose(NB_CHANNELS, 4, strides=2, padding='same', kernel_initializer=initializer, activation='relu'))
#   return model

def discriminator(data_shape=IMG_SHAPE):
  model = keras.Sequential(name="Discriminateur")
  model.add(keras.layers.Input(shape=data_shape))
  model.add(downsample(64, 4, False))
  model.add(downsample(128, 4))
  model.add(downsample(256, 4))
  model.add(keras.layers.ZeroPadding2D())
  initializer = tf.random_normal_initializer(0., 0.2)
  model.add(keras.layers.Conv2D(512, 4, strides=1, kernel_initializer=initializer, use_bias=False))
  model.add(keras.layers.BatchNormalization())
  model.add(keras.layers.LeakyReLU())
  model.add(keras.layers.ZeroPadding2D())
  model.add(keras.layers.Conv2DTranspose(1, 4, strides=1, kernel_initializer=initializer, activation='sigmoid'))
  model.add(keras.layers.Flatten())
  model.add(keras.layers.Dense(1, activation = "sigmoid"))

  return model

In [32]:
# def discriminator():
#     initializer = tf.random_normal_initializer(0., 0.02)
#     gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

#     inp = layers.Input(shape=IMG_SHAPE, name='input_image')

#     x = inp

#     down1 = downsample(64, 4, False)(x) # (bs, 128, 128, 64)
#     down2 = downsample(128, 4)(down1) # (bs, 64, 64, 128)
#     down3 = downsample(256, 4)(down2) # (bs, 32, 32, 256)

#     zero_pad1 = layers.ZeroPadding2D()(down3) # (bs, 34, 34, 256)
#     conv = layers.Conv2D(512, 4, strides=1,
#                          kernel_initializer=initializer,
#                          use_bias=False)(zero_pad1) # (bs, 31, 31, 512)

#     norm1 = tfa.layers.InstanceNormalization(gamma_initializer=gamma_init)(conv)

#     leaky_relu = layers.LeakyReLU()(norm1)

#     zero_pad2 = layers.ZeroPadding2D()(leaky_relu) # (bs, 33, 33, 512)

#     last = layers.Conv2D(1, 4, strides=1,
#                          kernel_initializer=initializer)(zero_pad2) # (bs, 30, 30, 1)

#     return tf.keras.Model(inputs=inp, outputs=last)
  
def generator():
    inputs = layers.Input(shape=IMG_SHAPE)

    # bs = batch size
    down_stack = [
        downsample(64, 4, False), # (bs, 128, 128, 64)
        downsample(128, 4), # (bs, 64, 64, 128)
        downsample(256, 4), # (bs, 32, 32, 256)
        downsample(512, 4), # (bs, 16, 16, 512)
        downsample(512, 4), # (bs, 8, 8, 512)
        downsample(512, 4), # (bs, 4, 4, 512)
        downsample(512, 4), # (bs, 2, 2, 512)
        #downsample(512, 4), # (bs, 1, 1, 512)
    ]

    up_stack = [
        upsample(512, 4, True), # (bs, 2, 2, 1024)
        upsample(512, 4, True), # (bs, 4, 4, 1024)
        upsample(512, 4, True), # (bs, 8, 8, 1024)
        #upsample(512, 4), # (bs, 16, 16, 1024)
        upsample(256, 4), # (bs, 32, 32, 512)
        upsample(128, 4), # (bs, 64, 64, 256)
        upsample(64, 4), # (bs, 128, 128, 128)
    ]

    initializer = tf.random_normal_initializer(0., 0.02)
    last = layers.Conv2DTranspose(NB_CHANNELS, 4,
                                  strides=2,
                                  padding='same',
                                  kernel_initializer=initializer,
                                  activation='tanh') # (bs, 256, 256, 3)
    lastlast = layers.Lambda(tf.math.abs)
    x = inputs

    # Downsampling through the model
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])

    # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = layers.Concatenate()([x, skip])

    x = last(x)
    #x = lastlast(x)

    return keras.Model(inputs=inputs, outputs=x)

In [33]:
from tensorflow.python.keras.utils.data_utils import Sequence

def gen_dataset(path, batch_size=32):
  # génère le dataset sous forme d'iterateur
  dataset = tf.keras.utils.image_dataset_from_directory(
    directory = path,
    labels = None,
    color_mode = 'rgb',
    image_size=IMG_SIZE,
    shuffle = True,
    batch_size = batch_size,
  )
  # mutliplication des données et mélange
  dataset.repeat(2)
  dataset.shuffle(1000)
  # normalisation des données
  normalization_layer = tf.keras.layers.Rescaling(1./255)
  normalized_ds = dataset.map(lambda x: (normalization_layer(x)))
  return normalized_ds #.as_numpy_iterator()

def gen_datasets_all(batch_size=32, styles=STYLES):
  datasets_all = []
  for style in styles:
    path = DATA_PATH+style
    dataset = gen_dataset(path, batch_size)
    datasets_all.append(dataset)
  return tuple(datasets_all)


class ZippedDatasets(Sequence):

    def __init__(self, batch_size=32, path=DATA_PATH, styles=STYLES):
        self.path = path
        self.styles = styles
        self.batch_size = batch_size
        self.datasets = {}
        # récup de toutes les images sous forme de datagenerator pour chaque style
        for style in self.styles : 
          self.datasets[style] = gen_dataset(self.path+style, self.batch_size)

    def __len__(self):
      # chacun des datasets a une cardinality = nb d'épochs nécessaires pour balayer toutes les données
      # pour éviter le dernier epoch (qui ne contient pas un batch complet de données), 
        # on set notre len==cardinality à min -1 
      return min(map(lambda x : x.cardinality().numpy(), self.datasets.values()))

    def __getitem__(self, idx):
      batch = {}
      for style in self.styles:
        batch[style] = self.datasets[style].batch(self.batch_size, drop_remainder=True)
      return batch
    
    # def on_epoch_end(self):
    #   # permuter les données ? Surement déjà fait pas les datagenarators de base



# def test_generator(generator, images):
#   # affiche plusieurs images contenues dans un tableau
#   n = len(images)
#   predictions = generator(images)
#   figure, axs = plt.subplots(nrows=n,ncols=2,figsize=(10,n*5) )
#   for i in range(n):
#     img = images[i]
#     pred = predictions[i]
#     if n==1 :
#       axs[0].imshow(img)
#       axs[1].imshow(pred)
#     else :
#       axs[i,0].imshow(img)
#       axs[i,1].imshow(pred)
#   plt.show()

# min(map(lambda x : x.cardinality().numpy(), ZippedDatasets().datasets.values()))


In [34]:
# Métriques (tout sous forme de loss = à minimiser)

def loss_generator(disc_fake):
  # entrée : évaluation du discriminateur sur les fausses images générées par le générateur
  # objectif : si générateur parfait, disc_fake doit être un vecteur de 1
  target = tf.ones_like(disc_fake)
  loss = tf.keras.losses.BinaryCrossentropy(from_logits=False, reduction=tf.keras.losses.Reduction.NONE)(target, disc_fake)
  return loss

def loss_discriminator(disc_real, disc_fake):
  # entrée : évaluation du discriminateur sur les fausses images et sur les vraies
  # objectif : disc_fake doit être un vecteur de 0 // disc_real un vecteur de 1
  uns = tf.ones_like(disc_real)
  zeros = tf.zeros_like(disc_fake)
  loss_real = tf.keras.losses.BinaryCrossentropy(from_logits=False, reduction=tf.keras.losses.Reduction.NONE)(uns, disc_real)
  loss_fake = tf.keras.losses.BinaryCrossentropy(from_logits=False, reduction=tf.keras.losses.Reduction.NONE)(zeros, disc_fake)
  return (loss_real+loss_fake)
  #return tf.keras.losses.BinaryCrossentropy(from_logits=False, reduction=tf.keras.losses.Reduction.NONE)(uns, disc_real)

def loss_cycle(original, cycled):
  # loss par passage d'une image dans un cycle
  # entrée : l'image originale, et la version passée dans 2 générateurs à la suite
  # objectif : les images doivent être identiques
  return tf.reduce_mean(tf.abs(original - cycled))

def loss_identity(original, processed):
  # loss par passage d'une image dans le générateur de son vrai style
  # objectif : l'image ne doit pas avoir été modifiée
  # entrée : l'image d'origine et l'image traité par le générateur
  return tf.reduce_mean(tf.abs(original - processed))


In [35]:
# initialisation des modèles
def build_models(styles=STYLES, batch_size=32, img_shape=IMG_SHAPE):
  discriminators = {}
  generators = {}
  # pour chacun des styles étudiés, on initialise un générateur et un discriminateur
  for style in styles:
    # ex: generators["Photo"] transforme une image en photo
    # ex: discriminators["Photo"] détermine si une image est une photo
    generators[style] = generator()
    #generators[style].build(input_shape=(batch_size,*img_shape))
    discriminators[style] = discriminator()
    #discriminators[style].build(input_shape=(batch_size, *img_shape))
  return generators, discriminators

# définition des optimizers
def build_optimizers(styles=STYLES):
  gen_optimizers = {}
  disc_optimizers = {}
  for style in styles:
    gen_optimizers[style] = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
    disc_optimizers[style] = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
  return gen_optimizers, disc_optimizers

In [36]:
from tensorflow.python.ops.gen_math_ops import TruncateMod
# Training fait en override les méthodes de keras
class CycleGan(keras.Model):
  def __init__(self, generators, discriminators, lambda_cycle=10, styles=STYLES, batch_size=32):
    super(CycleGan, self).__init__()
    self.generators = generators
    self.discriminators = discriminators
    self.lambda_cycle = lambda_cycle
    self.styles = styles
    self.batch_size = batch_size
    # self.datasets = datasets
  
  def compile(self, disc_optimizers, gen_optimizers, loss_generator_fn, loss_discriminator_fn, loss_cycle_fn, loss_identity_fn):
    super(CycleGan, self).compile()
    self.disc_optimizers = disc_optimizers
    self.gen_optimizers = gen_optimizers
    self.loss_generator_fn = loss_generator_fn
    self.loss_discriminator_fn = loss_discriminator_fn
    self.loss_cycle_fn = loss_cycle_fn
    self.loss_identity_fn = loss_identity_fn
    # stockage des losses
    self.gen_loss = {}
    self.disc_loss = {}
    self.cycle_loss = {}
    self.id_loss = {}
    # initialiser les poids des optimizers
    # for style in self.styles:
    #   init_g = tf.compat.v1.variables_initializer(gen_optimizers[style].variables())
    #   #init_g.run()
    #   init_d = tf.compat.v1.variables_initializer(disc_optimizers[style].variables())
    #   gen_optimizers[style].set_weights(??)
    #   disc_optimizers[style].set_weights(??)
  
  def call(self, inputs, training=False):
    #super(CycleGan, self).__call__(inputs)
    # call tous les modeles puis return leurs résultats ? 
    inputs = tf.reshape(inputs, (1,*inputs.shape))
    res = {}
    for i in range(len(self.styles)):
      style = self.styles[i]
      res[style] = self.discriminators[style].call(self.generators[style].call(inputs))
    return res

  def build(self, input_shape):
    super(CycleGan, self).build(input_shape)
    for style in self.styles:
      self.generators[style].build(input_shape)
      self.discriminators[style].build(input_shape)
    #   # initialisation des losses
    #   self.gen_loss[style] = self.loss_generator_fn(tf.zeros(self.batch_size))
    #   self.disc_loss[style] = self.loss_discriminator_fn(tf.zeros(self.batch_size), tf.ones(self.batch_size))
    #   self.cycle_loss[style] = self.loss_cycle_fn(np.zeros(input_shape), np.ones(input_shape))
    #   self.id_loss[style] = self.loss_identity_fn(np.zeros(input_shape), np.ones(input_shape))
    #   # premier évaluation des optimizers (pour éviter les ValueErrors dues à des tf.Variables créées dans la boucle)
    # with tf.GradientTape(persistent=False) as tape:
    #   for style in self.styles:
    #     tape.watch(self.generators[style].trainable_variables)
    #     tape.watch(self.generators[style].trainable_variables)
    # for style in self.styles:
    #   generator_gradients = tape.gradient(self.gen_loss[style], self.generators[style].trainable_variables)
    #   self.gen_optimizers[style].apply_gradients(zip(generator_gradients, self.generators[style].trainable_variables))
    #   discriminator_gradients = tape.gradient(self.disc_loss[style], self.discriminators[style].trainable_variables)
    #   self.disc_optimizers[style].apply_gradients(zip(discriminator_gradients, self.discriminators[style].trainable_variables))
  
  def save(self):
    path = PATH+"models/"
    for style in self.styles:
      path_style = path+style
      self.generators[style].save(path_style+"/generator.h5")
      self.discriminators[style].save(path_style+"/discriminator.h5")
  
  def __getitem__(self, batch_data):
    # batch_data = self.datasets.batch(batch_size= self.batch_size, drop_remainder=True)
    batch = {}
    i=0
    for style in self.styles:
      batch[style] = batch_data[i]
      i+=1
    return batch


  def train_step(self, batch_data):
    """ batch = liste des batchs de données pour chaque style """
    n_styles = len(self.styles)
    batch = self.__getitem__(batch_data)
    # batch = batch_data

    with tf.GradientTape(persistent=True) as tape:
      # on selectionne un style k aléatoire à prendre pour référence dans cet epoch
      k = np.random.randint(n_styles)
      style_k = self.styles[k]
      # On entraine chaque style j!= k sur style_k
      for j in range(n_styles):
        if j!=k :
          style_j = self.styles[j]
          # GENERATEUR
          # k to j to k
          # transforme une image de style k en style j
          fake_j = self.generators[style_j](batch[style_k], training=True)
          # retour au style k
          cycled_k = self.generators[style_k](fake_j, training=True)

          # DISCRIMINATEUR
          disc_real_j = self.discriminators[style_j](batch[style_j], training=True)
          disc_fake_j = self.discriminators[style_j](fake_j, training=True)

          # LOSS
          self.gen_loss[style_j] = (self.loss_generator_fn(disc_fake_j))
          self.disc_loss[style_j] = (self.loss_discriminator_fn(disc_real_j, disc_fake_j))
          self.cycle_loss[style_j] = (self.loss_cycle_fn(batch[style_k], cycled_k))
          self.id_loss[style_j] = (self.loss_identity_fn(batch[style_j], fake_j))

    # GRADIENTS
    for j in range(n_styles):
      if j!=k:
        style_j = self.styles[j]
        generator_gradients = tape.gradient(self.gen_loss[style_j], self.generators[style_j].trainable_variables)
        self.gen_optimizers[style_j].apply_gradients(zip(generator_gradients, self.generators[style_j].trainable_variables))
        discriminator_gradients = tape.gradient(self.disc_loss[style_j], self.discriminators[style_j].trainable_variables)
        self.disc_optimizers[style_j].apply_gradients(zip(discriminator_gradients, self.discriminators[style_j].trainable_variables))

    metriques = {
        "k" : style_k,
    }
    #self.disc_loss.update(self.gen_loss)
    #metriques.update(self.disc_loss)
    for j in range(n_styles):
      if j!=k:
        metriques["gen_"+self.styles[j]] = self.gen_loss[self.styles[j]]
        metriques["disc_"+self.styles[j]] = self.disc_loss[self.styles[j]]
    #print("k : ", style_k)
    #print(self.disc_loss)
    return metriques



In [37]:
#D['Photos'](images, training=True) # retourne la prédiction et entraine le modèle au passage ? 
st = ["Photo", "Symbolism", "Expressionism", "Baroque"]
datasets = gen_datasets_all(batch_size=32, styles=st)
# data = ZippedDatasets(styles=st)
G, D = build_models(st)
gen_optimizers, disc_optimizers = build_optimizers(st)

Found 7038 files belonging to 1 classes.
Found 4500 files belonging to 1 classes.
Found 6725 files belonging to 1 classes.
Found 4221 files belonging to 1 classes.


In [41]:
modele = CycleGan(G, D, styles=st)
modele.compile(disc_optimizers, gen_optimizers, loss_generator, loss_discriminator, loss_cycle, loss_identity)

In [39]:
# modele.generators['Photo'].summary()
# modele.build(input_shape=IMG_SHAPE)

In [42]:
# entrainement
history = modele.fit(
    # data,
    tf.data.Dataset.zip(datasets),
    epochs=5,
    steps_per_epoch=2,
    verbose=True,
)

Epoch 1/5


TypeError: ignored

In [None]:
modele.save()

In [None]:
images = gen_dataset(path=DATA_PATH, batch_size=3).as_numpy_iterator()

In [None]:
from numpy.core.numerictypes import maximum_sctype
# ajouter fonction pour sauvegarder tous les modèles
G = modele.generators["Symbolism"]
pred = G.predict(images.next())
print(pred.min(), pred.max())

def test_generator2(generator, images):
  # affiche plusieurs images contenues dans un tableau
  n = len(images)
  predictions = (generator(images)).numpy()
  min = predictions.min()
  max = predictions.max()
  rescale = lambda x : (x-min)/(max-min)
  print(min, max)
  # predictions = (predictions * 127.5 + 127.5).astype(np.uint8)
  predictions = rescale(predictions)
  print(predictions.min(), predictions.max())
  figure, axs = plt.subplots(nrows=n,ncols=2,figsize=(10,n*5) )
  for i in range(n):
    img = images[i]
    pred = (predictions[i])
    if n==1 :
      axs[0].imshow(img)
      axs[1].imshow(pred)
    else :
      axs[i,0].imshow(img)
      axs[i,1].imshow(pred)
  plt.show()

test_generator2(G, images.next())

In [None]:
D = modele.discriminators["Symbolism"]
D.predict(G.predict(images.next()))

In [None]:
# tracer de l'évolution des métriques au long de l'entrainement
def plot_learning_curves(acc, loss):
    #epochs = range(len(acc))

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20,5))
    
    ax1.plot(acc)
    ax1.set_title("Accuracy")
    ax1.set_ylabel("Accuracy")
    ax1.set_xlabel("Epoch")
    
    ax2.plot(loss)
    ax2.set_title("Loss")
    ax2.set_ylabel('Loss')
    ax2.set_xlabel('Epoch')

    fig.show()

plot_learning_curves(a_acc, a_loss)
plot_learning_curves(d_acc, d_loss)

- Expressionnism
- Baroque
- Symbolisme
- Cubisme
- Ukiyo_e
- Fauvisme


Pointillisme (peu de data)