Dane to zbiór CelebA. 30000 zdjęć twarzy celebrytów wyśrodkowanych na oczy oraz przyciętych do rozmiaru 256x256.
Dane przechowywane są na Google Drive,

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Zdjęcia przetworzone zostają z przestrzeni barw RGB na LAB. LAB pozwala na latwe rozdzielenie zdjęć na warswę z jasnością oraz 2 warstwy odpowiadające za kolor zdjęcia. 

Zakres wartości warstwy L to od 0 do 100, AB od -128 do 128. Rozdzielone warstwy są od razu normalizowane.

By do modelu trafily dane tych samych rozmiarów rozdzielone warstwy zostają uzupelnione 0.

In [0]:
#@title
def load_img_to_lab(img_path):
    img = io.imread(img_path)
    img = np.array(img, dtype=float)
    img = color.rgb2lab(1/255.0*img)
    return img

def split_to_l_ab(img):
    l = img [:,:,0]
    ab = img [:,:,1:]
    l=l/100
    ab=ab/128
    return l, ab

def combine_l_ab(l, ab):
    shape = (256, 256, 3)
    img = np.zeros(shape)
    l=l*100
    ab = ab*128
    img[:, :, 0] = l[:, :, 0] 
    img[:, :, 1] = ab[:, :, 1]
    img[:, :, 2] = ab[:, :, 2]
    return img
  
def display_from_lab(img):
    img=color.lab2rgb(img)
    plt.set_cmap('gray')
    plt.imshow(img)
    
def display_from_l(l):
    shape = (256, 256, 3)
    img = np.zeros(shape)
    l=l*100
    img[:, :, 0] = l[:, :, 0] 
    img = color.lab2rgb(img)
    plt.imshow(img)
    
def make_img_from_l (l):
    shape = (256, 256, 3)
    img = np.zeros(shape)
    img[:, :, 0] = l[:, :]
    return img
    
def make_img_from_ab (ab):
    shape = (256, 256, 3)
    img = np.zeros(shape)
    img[:, :, 1] = ab[:, :, 0]
    img[:, :, 2] = ab[:, :, 1]
    return img    

Baza danych zostala podzielona na 10 części po 3000 zdjęć. Taki rozmiar umożliwia trenowanie modelu na platformie Colab.

In [0]:
#@title
def load_data_from_path(path, skip=0, limit=0):
    x_data=[]
    y_data=[]
    
    if skip != 0: 
      left=0
    
    if limit != 0: 
      stop=0
    for filename in os.listdir(path):
        if skip != 0 and left < skip:
          left  = left + 1  
        else:
          l, ab = split_to_l_ab(load_img_to_lab(path+filename))
          x_data.append(make_img_from_l(l))
          y_data.append(make_img_from_ab(ab))
          if (stop + 1) % 500 == 0:
                print(str(stop+1)+' images in!')
        if limit != 0:
          stop = stop+1
          if stop == limit:
            break
    x_data=np.array(x_data, dtype=float)
    y_data=np.array(y_data, dtype=float)
    return x_data, y_data  

def make_dataset_from_arrays(x_data, y_data, batch=1):
    x_data = np.expand_dims(x_data, axis=1)
    y_data = np.expand_dims(y_data, axis=1)
    x_dataset = tf.data.Dataset.from_tensor_slices((tf.cast(x_data, tf.float32), tf.cast(y_data, tf.float32)))
    x_dataset.batch(batch)
    return x_dataset
  
def make_dataset_ready(data_path, part, part_size=3000):
    x_data, y_data=[],[]
    down_bound = 0 + part_size*part
    up_bound = part_size+part_size*part
    mid_bound1 = down_bound + part_size/3
    mid_bound2 = mid_bound1 + part_size/3
    
    x_data, y_data = load_data_from_path(data_path,skip=down_bound, limit=mid_bound1)
    x_dataset = make_dataset_from_arrays(x_data, y_data)
    
    x_data, y_data = load_data_from_path(data_path,skip=mid_bound1, limit=mid_bound2)
    x_dataset2 = make_dataset_from_arrays(x_data, y_data)
    x_dataset = x_dataset.concatenate(x_dataset2)
    del x_dataset2
    
    x_data, y_data = load_data_from_path(data_path,skip=mid_bound2, limit=up_bound)
    x_dataset2 = make_dataset_from_arrays(x_data, y_data)
    x_dataset = x_dataset.concatenate(x_dataset2)
    #del those variables to prevent OOM on next iteration
    del x_data
    del y_data
    del x_dataset2
    print('Dataset made! size: ' + str(part_size) + ' part: ' + str(part) + ' photos from ' + str(down_bound) + ' to ' + str(up_bound))
    return x_dataset

Użyty model bazuje na modelu pix2pix z pracy "Image-to-Image Translation with Conditional Adversarial Networks" (https://arxiv.org/abs/1611.07004)

Model zgodnie z zalożeniami GAN sklada się z generatora i dyskryminatora:

Generator bazuje na modelu U-net:
*   Zbudowany z 2 części - kodera i dekodera
*   Koder zbudowany z warstw: Conv2D -> Leaky ReLU -> Batch Normalization
*   Dekoder zbudowany z warstw: UpSampling -> Conv2D -> Batch Normalization
*   Między każdą warstwą kodera i dekodera są przejścia 

Dyskryminator na modelu PatchGAN:
*   Zbudowany z warstw: Conv -> BatchNorm -> Leaky ReLU
*   Wyjście zwracane z ostatniej warswty ma rozmiar 30x30
*   Każdy fragment 30x30 odpowiada fragmentowi 70x70 danych wejściowych
*   Do dyskryminatora podawane są dwa obrazy wejściowe:
   *   Obraz z rzeczywistych barw zdjęcia który powinien zostać zakwalifikowany jako prawdziwy
   *   Obraz z generatora który zostanie oceniony   
   *   Oba obrazy są lączone
   
Trenowanie:
*   Na wejście generatora podawany jest obraz w skali szarości, model generuje obraz wyjściowy
*   Na wejście dyskryminatora podawany jest obraz wygenerowany oraz rzeczywisty obraz barw 
*   Następnie przeliczane są wartości funkcji blędu generatora i dyskryminatora
*   Następnie wartości blędu podawane są do optymalizatora


Funkcje generatora i dyskryminatora charakterystyczne dla wersji:

In [0]:
#@title
def build_generator():
  
        def conv2d(layer_input, filters, f_size=4, bn=True):
            d = Conv2D(filters, kernel_size=f_size, strides=2, padding='same',use_bias=False)(layer_input)
            if bn:
                d = BatchNormalization(momentum=0.8)(d)
            d = LeakyReLU(alpha=0.2)(d)
            return d

        def deconv2d(layer_input, skip_input, filters, f_size=4, dropout_rate=0):
            u = Conv2DTranspose(filters, kernel_size=f_size, strides=2,padding='same',use_bias=False)(layer_input)
            u = BatchNormalization(momentum=0.8)(u)
            if dropout_rate:
                u = Dropout(dropout_rate)(u)
            u = ReLU()(u)
            u = Concatenate()([u, skip_input])
            return u
        
        # Image input
        d0 = Input(shape=INPUT_IMG_SHAPE) #(bs, 256, 256, 1)
        
        # Downsampling
        d1 = conv2d(d0, GEN_F, bn=False) #(bs, 128, 128, 64)
        d2 = conv2d(d1, GEN_F*2) #(bs, 64, 64, 128)
        d3 = conv2d(d2, GEN_F*4) #(bs, 32, 32, 256)
        d4 = conv2d(d3, GEN_F*8) #(bs, 16, 16, 512)
        d5 = conv2d(d4, GEN_F*8) #(bs, 8, 8, 512)
        d6 = conv2d(d5, GEN_F*8) #(bs, 4, 4, 512)
        d7 = conv2d(d6, GEN_F*8) #(bs, 2, 2, 512)
        d8 = conv2d(d7, GEN_F*8) #(bs, 1, 1, 512)

        # Upsampling
        u0 = deconv2d(d8, d7, GEN_F*8, dropout_rate=0.5) #(bs, 2, 2, 512) 
        u1 = deconv2d(u0, d6, GEN_F*8, dropout_rate=0.5) #(bs, 4, 4, 512) 
        u2 = deconv2d(u1, d5, GEN_F*8, dropout_rate=0.5) #(bs, 8, 8, 512)
        u3 = deconv2d(u2, d4, GEN_F*8) #(bs, 16, 16, 512)
        u4 = deconv2d(u3, d3, GEN_F*4) #(bs, 32, 32, 256)
        u5 = deconv2d(u4, d2, GEN_F*2) #(bs, 64, 64, 128)
        u6 = deconv2d(u5, d1, GEN_F) #(bs, 128, 128, 64)

        u7 = UpSampling2D(size=2)(u6) # (bs, 256, 256, 3)
        output_img = Conv2D(OUTPUT_CHANNELS, kernel_size=4, strides=1, padding='same', activation='tanh')(u7) # (bs, 256, 256, 3)
        
        return Model(d0, output_img)

def build_discriminator():
  
    def d_layer(layer_input, filters, f_size=4, bn=True):
        d = Conv2D(filters, kernel_size=f_size, strides=2, padding='same')(layer_input)
        if bn:
            d = BatchNormalization(momentum=0.8)(d)
        d = LeakyReLU(alpha=0.2)(d)
        return d


    img_inp = Input(shape=INPUT_IMG_SHAPE, name='input_image')
    img_tar = Input(shape=OUTPUT_IMG_SHAPE, name='target_image')


    # Concatenate image and conditioning image by channels to produce input
    combined_imgs = Concatenate(axis=-1)([img_inp, img_tar]) #(bs, 256, 256, channels*2)

    d1 = d_layer(combined_imgs, DISC_F, bn=False) # (bs, 128, 128, 64)
    d2 = d_layer(d1, DISC_F*2) #(bs, 64, 64, 128)
    d3 = d_layer(d2, DISC_F*4) #(bs, 32, 32, 256)
    d4 = d_layer(d3, DISC_F*8) #(bs, 16, 16, 512)
  
    validity = Conv2D(1, kernel_size=4, strides=1, padding='same')(d4)

    return Model([img_inp, img_tar], validity)

Funkcje blędu i trenowanie niecharakterystyczne dla wersji:

In [0]:
#@title
def discriminator_loss(disc_real_output, disc_generated_output):
    real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)
    generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)
    total_disc_loss = real_loss + generated_loss
    return total_disc_loss

def generator_loss(disc_generated_output, gen_output, target):
    gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)
    # mean absolute error
    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))
    total_gen_loss = gan_loss + (LAMBDA * l1_loss)
    return total_gen_loss
  

def train_step(input_image, target, first=0):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
      gen_output = generator(input_image, training=True)

      disc_real_output = discriminator([input_image, target], training=True)
      disc_generated_output = discriminator([input_image, gen_output], training=True)

      gen_loss = generator_loss(disc_generated_output, gen_output, target)
      disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

    generator_gradients = gen_tape.gradient(gen_loss, generator.trainable_variables)
    discriminator_gradients = disc_tape.gradient(disc_loss,discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(generator_gradients,generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(discriminator_gradients,discriminator.trainable_variables))
    if first:
      return gen_loss, disc_loss
    else:
      return 0, 0
  
def fit(train_ds, epochs, path, version_str, samples=1 ,save_checkpoint=0):  
    epoch_first_gen_loss=0
    epoch_first_disc_loss=0
    if save_checkpoint:
      checkpoints_path = path + 'checkpoints/'
      if not os.path.isdir(checkpoints_path):
        os.mkdir(checkpoints_path)

    for epoch in range(epochs):
      start = time.time()
      first = 1 
      for input_image, target in train_ds:
        if first:
          epoch_first_gen_loss, epoch_first_disc_loss = train_step(input_image, target, first)
          first=0
        else:
          train_step(input_image, target)

      # generating samples every epoch
      checkpoint_str = version_str + '-epoch-' + str(epoch)
      if samples:
        output1, output2 = make_2samples(generator)
        out_sample1.append(output1)
        out_sample2.append(output2)
        samples_names.append(checkpoint_str)

      # saving (checkpoint) the model every x epochs  
      if (save_checkpoint != 0) and (epoch % save_checkpoint == 0) and (epoch !=0):
        save_model(generator, checkpoints_path, checkpoint_str, model_type='gen')
        save_model(discriminator, checkpoints_path, checkpoint_str, model_type='discr')
      
      loss_vec_epoch.append(epoch)
      loss_vec_gen.append(epoch_first_gen_loss)
      loss_vec_disc.append(epoch_first_disc_loss)
      print ("[Epoch %d/%d] [D loss: %f] [G loss: %f] time: %s" % (epoch, epochs,
                                                                          epoch_first_gen_loss,
                                                                          epoch_first_disc_loss,
                                                                          time.time()-start))

    
def save_model(model, path, version, model_type=''):
    #no need for JSON, model structure is known
    #serialize weights to HDF5
    if model_type == 'gen': model_name = 'modelGEN-' + version
    elif model_type == 'discr': model_name = 'modelDISCR-' + version
    else: model_name = 'modelX-' + version
    model.save_weights(path + model_name + '.m5')
    print('Saved model to disk: ' + model_name)

def load_models(path, version, activation=0):
    if activation:
      gen_load_name = 'modelGEN-' + version + '.m5'
      disc_load_name = 'modelDISCR-' + version + '.m5'
      generator.load_weights(path + gen_load_name)
      discriminator.load_weights(path + disc_load_name)

def make_2samples(model):
    x_input = in_2samples
    inp = tf.cast(x_input[0], tf.float32)
    gen_output = generator(inp[tf.newaxis,...], training=False)
    gen_output = tf.squeeze(gen_output,0)
    output1 = combine_l_ab(inp, gen_output) 
    
    inp = tf.cast(x_input[1], tf.float32)
    gen_output = generator(inp[tf.newaxis,...], training=False)
    gen_output = tf.squeeze(gen_output,0)
    output2 = combine_l_ab(inp, gen_output)
    
    return output1, output2

def show_2samples(path, samples1, samples2, samples_names, save=0, show=1):
    x_input = in_2samples
    sample1 = x_input[0]
    sample2 = x_input[1]
    if save:
      if not os.path.isdir(path):
        os.mkdir(path)
    for c, value in enumerate(samples1, 0):
      fig = plt.figure(figsize=(10,10))
      
      fig.add_subplot(1, 2, 1)
      fig.text(-0.25, 0.5, samples_names[c],
      horizontalalignment='center',
      verticalalignment='center',
      fontsize=15)
      display_from_lab(samples1[c])
      plt.axis('off')
      
      fig.add_subplot(1, 2, 2)
      display_from_lab(samples2[c])    
      plt.axis('off')
      
      fig.tight_layout()
      
      if save:
        plt.savefig(path + samples_names[c] + '.png', bbox_inches='tight')
      if show:
        plt.show()
    if save: 
      print("Saved samples to disk")
    
def test_10samples(model, path, model_name='', save=0, show=1):
    x_data=[]
    y_data=[]
    x_data, y_data = load_data_from_path(path+'input/',limit=11)
    if save:
      dir_path = path + model_name + '/'
      if not os.path.isdir(dir_path):
        os.mkdir(dir_path)
    
    for c, value in enumerate(x_data, 0):
      sample = x_data[c]
      real_ab = y_data[c]
      real = combine_l_ab(sample,real_ab)
      
      inp = tf.cast(sample, tf.float32)
      gen_output = generator(inp[tf.newaxis,...], training=False)
      gen_output = tf.squeeze(gen_output,0)
      
      title = ['Real Image','Grey Image', 'Colorized Image']
      plt.figure(figsize=(15,15))

      for i in range(3):
        plt.subplot(1, 3, i+1)
        plt.title(title[i])
        if i==0 : 
          display_from_lab(real)
        elif i==1 :
          display_from_l(sample)
        elif i==2 : 
          display_from_lab(combine_l_ab(inp, gen_output))    
        plt.axis('off')
      if save:
        plt.savefig(dir_path + str(c+1) + '.png', bbox_inches='tight')
      if show:
        plt.show()
    
    if save: 
      print("Saved samples to disk")

Biblioteki:

In [0]:
#@title

import tensorflow as tf

import numpy as np
import pandas as pd 
import matplotlib.pyplot as plt
from skimage import io, color
import os

from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Dropout, Concatenate
from tensorflow.keras.layers import BatchNormalization, Activation, ZeroPadding2D
from tensorflow.keras.layers import LeakyReLU, ReLU
from tensorflow.keras.layers import UpSampling2D, Conv2D, Conv2DTranspose
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
from keras.models import load_model
import time
import sys

tf.enable_eager_execution()

config = tf.ConfigProto()
config.gpu_options.allow_growth = True
sess = tf.Session(config=config)

Using TensorFlow backend.


Zmienne globalne i zbudowanie modeli:

In [0]:
#@title

IMG_ROWS = 256
IMG_COLS = 256
INPUT_CHANNELS = 3
OUTPUT_CHANNELS = 3
INPUT_IMG_SHAPE = (IMG_ROWS, IMG_COLS, INPUT_CHANNELS)
OUTPUT_IMG_SHAPE = (IMG_ROWS, IMG_COLS, OUTPUT_CHANNELS)

# Number of filters in the first layer of G and D
GEN_F = 64
DISC_F = 64
LAMBDA = 100
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)
generator_optimizer = Adam(2e-4, beta_1=0.5)
discriminator_optimizer = Adam(2e-4, beta_1=0.5)

generator = build_generator()
discriminator = build_discriminator()

Ścieżki do istotnych folderów i 2 zdjęcia testowe: 

In [0]:
#@title
model_path = '/content/drive/My Drive/Models&Checkpoints/'
data_path='/content/drive/My Drive/Celeba-data256x256/'
samples10_path='/content/drive/My Drive/10samples/'
if not os.path.isdir(model_path): 
  os.mkdir(model_path)
  
#2 test images from 2samples:
sample1_path='/content/drive/My Drive/2samples/input/4.jpg'
sample2_path='/content/drive/My Drive/2samples/input/7.jpg'
l_sample1, ab_sample1 = split_to_l_ab(load_img_to_lab(sample1_path))
l_sample2, ab_sample2 = split_to_l_ab(load_img_to_lab(sample2_path))
in_2samples = [make_img_from_l(l_sample1), make_img_from_l(l_sample2)]
out_sample1 = []
out_sample2 = []
samples_names = []

loss_vec_epoch = []
loss_vec_gen = []
loss_vec_disc = []

Komórka do ćwiczenia modelu:

In [0]:
EPOCHS = 20
PART = 4
model_version = '2.3'

"""old_part_str = '0' + str(PART-1)
old_version_str = 'Version' + model_version + '-epoch' + str(EPOCHS) + '-datapart' + old_part_str 
#before launching check if previous version had same name formula and correct model path
load_models(model_path, old_version_str, PART)"""
generator.load_weights(model_path+ 'model_gen_v2-epoch20-datapart_03.m5')
discriminator.load_weights(model_path+ 'model_discr_v2-epoch20-datapart_03.m5')

part_str = '0' + str(PART)
version_str = 'Version' + model_version + '-epoch' + str(EPOCHS) + '-datapart' + part_str 
path_2samples = '/content/drive/My Drive/2samples/'+ version_str + '/'

x_dataset = make_dataset_ready(data_path, PART, part_size=3000)
fit(x_dataset, EPOCHS, model_path, version_str, samples=1, save_checkpoint=5)

save_model(generator, model_path, version_str, model_type='gen')
save_model(discriminator, model_path, version_str, model_type='discr')

show_2samples(path_2samples, out_sample1, out_sample2, samples_names, save=1, show=1)

gen_model_name = 'modelGEN-' + version_str
test_10samples(generator, samples10_path, gen_model_name, save=1, show=1)


Output hidden; open in https://colab.research.google.com to view.

In [0]:
array_size = len(loss_vec_epoch)
loss_array_shape = (array_size,3)
loss_array=np.zeros(loss_array_shape)
loss_array[:,0] = loss_vec_epoch
loss_array[:,1] = loss_vec_gen
loss_array[:,2] = loss_vec_disc
np.save(model_path + 'losses_'+ version_str + '.npy',loss_array)

Komórka do zaladowania i stestowania modelu (nieużywana w trenowaniu modelu):

In [0]:
#@title
version_str = 'FinalVersion'

checkpoint_str = 'modelGEN-FinalVersion.m5'
generator.load_weights(model_path + checkpoint_str )
gen_model_name = 'modelGEN-' + version_str
#test_10samples(generator, samples10_path, gen_model_name, save=1, show=1)

myphoto_path = '/content/drive/My Drive/MyPhoto/'
test_10samples(generator, myphoto_path,gen_model_name, save=1, show=1)
plt.show()

NotFoundError: ignored