### Imports

In [32]:
import os
import random
import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
import statistics
from PIL import Image
from matplotlib import pyplot
from keras.initializers import RandomNormal
from keras.layers import Input, Reshape, Dropout, Dense, Flatten, BatchNormalization, Activation, ZeroPadding2D, LeakyReLU, Concatenate
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model, load_model
from keras.optimizers import adam_v2
from keras.utils.vis_utils import plot_model

### Pre-processing

In [51]:
IMAGE_SIZE = 256
IMAGE_CHANNELS = 3
PAINTING_TRAINING_SET_SIZE = 200  # we have a total of 300 monet paintings
PAINTINGS_DIR = '/kaggle/input/gan-getting-started/monet_jpg' 

real_paintings_training = []
real_paintings_testing = []


for filename in os.listdir(PAINTINGS_DIR):
    path = os.path.join(PAINTINGS_DIR, filename)
    image = Image.open(path).resize((IMAGE_SIZE, IMAGE_SIZE), Image.ANTIALIAS)
    
    if len(real_paintings_training) < PAINTING_TRAINING_SET_SIZE:
        real_paintings_training.append(np.asarray(image))
    else: 
        real_paintings_testing.append(np.asarray(image))

real_paintings_training = np.reshape(
    real_paintings_training, (-1, IMAGE_SIZE, IMAGE_SIZE, IMAGE_CHANNELS))
real_paintings_training = (real_paintings_training - 127.5) / 127.5

real_paintings_testing = np.reshape(
    real_paintings_testing, (-1, IMAGE_SIZE, IMAGE_SIZE, IMAGE_CHANNELS))
real_paintings_testing = (real_paintings_testing - 127.5) / 127.5

np.save('/kaggle/working/real_paintings_training.npy', real_paintings_training)
np.save('/kaggle/working/real_paintings_testing.npy', real_paintings_testing)

  if sys.path[0] == "":


In [None]:
PHOTOS_DIR = '/kaggle/input/gan-getting-started/photo_jpg'
# We have 7000 images, but kaggle doesn't allow to store all of them in RAM :(
KAGGLE_LIMIT = 300  
PHOTOS_TRAINING_SET_SIZE = 200  

real_photos_training = []
real_photos_testing = []

for filename in os.listdir(PHOTOS_DIR):
    if len(real_photos_training) + len(real_photos_testing) == KAGGLE_LIMIT:
        break
        
    path = os.path.join(PHOTOS_DIR, filename)
    image = Image.open(path).resize((IMAGE_SIZE, IMAGE_SIZE), Image.ANTIALIAS)

    if len(real_photos_training) < PHOTOS_TRAINING_SET_SIZE:
        real_photos_training.append(np.asarray(image))
    else: 
        real_photos_testing.append(np.asarray(image))
        

real_photos_training = np.reshape(
    real_photos_training, (-1, IMAGE_SIZE, IMAGE_SIZE, IMAGE_CHANNELS))
real_photos_training = (real_photos_training - 127.5) / 127.5

real_photos_testing = np.reshape(
    real_photos_testing, (-1, IMAGE_SIZE, IMAGE_SIZE, IMAGE_CHANNELS))
real_photos_testing = (real_photos_testing - 127.5) / 127.5

np.save('/kaggle/working/real_photos_training.npy', real_photos_training)
np.save('/kaggle/working/real_photos_testing.npy', real_photos_testing)

### Discriminator

In [None]:
def build_discriminator(image_shape): # PatchGAN Discriminator: https://machinelearningmastery.com/how-to-implement-pix2pix-gan-models-from-scratch-with-keras/
    weightInit = RandomNormal(stddev=0.02)
    input_image = Input(shape=image_shape)    
    
    layer = tfa.layers.SpectralNormalization(Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=weightInit))(input_image)
    layer = LeakyReLU(alpha=0.2)(layer)
    
    layer = tfa.layers.SpectralNormalization(Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=weightInit))(layer)
    layer = LeakyReLU(alpha=0.2)(layer)
    
    layer = tfa.layers.SpectralNormalization(Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=weightInit))(layer)
    layer = LeakyReLU(alpha=0.2)(layer)
    
    layer = tfa.layers.SpectralNormalization(Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=weightInit))(layer)
    layer = LeakyReLU(alpha=0.2)(layer)
    
    layer = tfa.layers.SpectralNormalization(Conv2D(512, (4,4), padding='same', kernel_initializer=weightInit))(layer)
    layer = LeakyReLU(alpha=0.2)(layer)
    
    layer = tfa.layers.SpectralNormalization(Conv2D(1, (4,4), padding='same', kernel_initializer=weightInit))(layer)
    patch = Activation('sigmoid')(layer)
    
    model = Model(input_image, patch)
    
    optimizer = adam_v2.Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(loss='mse', optimizer=optimizer, loss_weights=[0.5])
#     model.summary()
    plot_model(model, to_file='/kaggle/working/Cycle-GAN_discriminator.png', show_shapes=True, show_layer_names=True)
    
    return model

### Generator

In [None]:
def encoder_block(input_layer, num_filters):
    weightInit = RandomNormal(stddev=0.02)
    
    convolution = tfa.layers.SpectralNormalization(
        Conv2D(num_filters, kernel_size=4, strides=2, padding='same', kernel_initializer=weightInit)
    )(input_layer)
    activated_convolution = LeakyReLU(alpha=0.2)(convolution)
    
    return activated_convolution

def decoder_block(input_layer, layer_to_skip_to, num_filters, dropout=True):
    weightInit = RandomNormal(stddev=0.02)
    
    transposed_convolution = tf.keras.layers.Conv2DTranspose(
        num_filters, kernel_size=4, strides=2, padding='same', kernel_initializer=weightInit
    )(input_layer)
    activated_transposed_convolution = BatchNormalization(momentum=0.1)(transposed_convolution)
    
    if dropout:
        activated_transposed_convolution = Dropout(0.5)(activated_transposed_convolution, training=True)
    
    skip_connection = Concatenate()([activated_transposed_convolution, layer_to_skip_to])
    activated_skip_connection = Activation('relu')(skip_connection)
    
    return activated_skip_connection

def build_generator(image_shape):
    weightInit = RandomNormal(stddev=0.02)
    
    input_image = Input(shape=image_shape)
    
    encoder_64  = encoder_block(input_image, 64)
    encoder_128 = encoder_block(encoder_64, 128)
    encoder_256 = encoder_block(encoder_128, 256)
    encoder_512_a = encoder_block(encoder_256, 512)
    encoder_512_b = encoder_block(encoder_512_a, 512)
    encoder_512_c = encoder_block(encoder_512_b, 512)
    encoder_512_d = encoder_block(encoder_512_c, 512)
    
    bottleneck_convolution = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=weightInit)(encoder_512_d)
    bottleneck_activation = Activation('relu')(bottleneck_convolution)
    
    decoder_512_d = decoder_block(bottleneck_activation, encoder_512_d, 512)
    decoder_512_c = decoder_block(decoder_512_d, encoder_512_c, 512)
    decoder_512_b = decoder_block(decoder_512_c, encoder_512_b, 512)
    decoder_512_a = decoder_block(decoder_512_b, encoder_512_a, 512, dropout=False)
    decoder_256 = decoder_block(decoder_512_a, encoder_256, 256, dropout=False)
    decoder_128 = decoder_block(decoder_256, encoder_128, 128, dropout=False)
    decoder_64 = decoder_block(decoder_128, encoder_64, 64, dropout=False)
    
    output = tf.keras.layers.Conv2DTranspose(3, kernel_size=4, strides=2, padding='same', kernel_initializer=weightInit)(decoder_64)
    activated_output = Activation('tanh')(output)
    
    model = Model(input_image, activated_output)
#     model.summary()
    plot_model(model, to_file='/kaggle/working/Cycle-GAN_generator.png', show_shapes=True, show_layer_names=True)
    
    return model

### GAN

In [None]:
# Test function to build models into one another, 
# [ This model would work for normal GAN, but not for Cycle, so it's not used. ]
def build_gan(gen_paintings_model, gen_photos_model, disc_paintings_model, disc_photos_model, image_shape):
    in_real_painting = Input(shape=image_shape)
    in_real_photo = Input(shape=image_shape)
    
    generated_painting = gen_paintings_model(in_real_photo)
    disc_paintings = disc_paintings_model([in_real_painting, generated_painting])
    
    generated_photos = gen_photos_model(in_real_painting)
    disc_photos = disc_photos_model([in_real_photo, generated_photos])
    
    gan_model = Model(
        [in_real_painting, in_real_photo], 
        [generated_painting, disc_paintings, generated_photos, disc_photos],
        name="Cycle GAN"
    )
    
    optimizer = Adam(lr=0.0002, beta_1=0.5)
    gan_model.compile(loss=['mse', 'mae', 'mae', 'mae'], optimizer=optimizer, loss_weights=[1, 5, 10, 10])
    
    return gan_model

# We train generators separately for each direction of the cycle to implement cycle loss.
def composite_model(gen_model_1, disc_model, gen_model_2, image_shape):
    gen_model_1.trainable = True
    disc_model.trainable = False
    gen_model_2.trainable = False

    input_gen = Input(shape=image_shape)
    gen_1_output = gen_model_1(input_gen)
    disc_output = disc_model(gen_1_output)

    identity_input = Input(shape=image_shape)
    identitiy_output = gen_model_1(identity_input)

    forward_output = gen_model_2(gen_1_output)

    gen_2_output = gen_model_2(identity_input)
    backward_output = gen_model_1(gen_2_output)

    model = Model([input_gen, identity_input], [disc_output, identitiy_output, forward_output, backward_output])
    optimizer = adam_v2.Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(loss=['mse', 'mae', 'mae', 'mae'], loss_weights=[1, 5, 10, 10], optimizer=optimizer)

    return model

# Grab real samples & their target classification from disc. POV (Tensor of 1s)
def generate_real_samples(dataset, n_samples, patch_shape):
    ix = np.random.randint(0, dataset.shape[0], n_samples)
    X = dataset[ix]
    y = np.ones((n_samples, patch_shape, patch_shape, 1))
 
    return X, y

# Grab fake samples & their target classification from disc. POV (Tensor of 0s)
def generate_fake_samples(g_model, dataset, patch_shape):
    X = g_model.predict(dataset)
    y = np.zeros((len(X), patch_shape, patch_shape, 1))
    
    return X, y

# The reference material keeps a pool of 50 images to compute loss
# [ Instead of computing loss for each newly generated image, we get an average. ]
# [ This reduces gradient yo-yo-ing. ]
def update_image_pool(pool, images, max_size=50):
    selected = list()
    for image in images:
        if len(pool) < max_size:
            # stock the pool
            pool.append(image)
            selected.append(image)
        elif random.uniform(0, 1) < 0.5:
            # use image, but don't add it to the pool
            selected.append(image)
        else:
            # replace an existing image and use replaced image
            ix = np.random.randint(0, len(pool))
            selected.append(pool[ix])
            pool[ix] = image
    return np.asarray(selected)

### Load Models

In [None]:
image_shape = (256,256,3)

painting_generator = build_generator(image_shape)   # Creates paintings
photo_generator = build_generator(image_shape)      # Creates photos 
painting_disc = build_discriminator(image_shape)    # Classifies paintings
photo_disc = build_discriminator(image_shape)       # Classifies photos

painting_to_photo = composite_model(painting_generator, painting_disc, photo_generator, image_shape)
photo_to_painting = composite_model(photo_generator, photo_disc, painting_generator, image_shape)

### Helper to save images during training / predicting afterwards

In [None]:
def save_images(step, data, isPainting, subpath=""):   
    output_path = '/kaggle/working/output' + subpath
    tag = 'paint'
    
    # GAN algorithmes compute values from [ 0, 1 ], so we bring it back to [ 0, 256 ]
    image = data  * 127.5 + 127.5
    image = (np.rint(image)).astype(np.uint8)

    if not os.path.exists(output_path):
        os.makedirs(output_path)
      
    # Function is used to save paitings & photos
    if not isPainting:
        tag = 'photo'
        
    filename = os.path.join(output_path, f"trained-{step}-{tag}.png")
    im = Image.fromarray(image[0])
    im.save(filename)

### Training

In [None]:
def train(d_model_A, d_model_B, g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, dataset):
    n_epochs, n_batch, = 10,1                # For more complex horse->zebra: 100, 1 
    n_patch = d_model_A.output_shape[1]      # determine the output square shape of the discriminator   
    trainA, trainB = dataset                
    poolA, poolB = list(), list()            
    bat_per_epo = int(len(trainA) / n_batch) # calculate the number of batches per training epoch
    n_steps = bat_per_epo * n_epochs         # calculate the number of training iterations
    print(f"n_steps: {n_steps}")             
    
    # manually enumerate epochs
    for i in range(n_steps):
        print(f"Doing Step: {i}")
        # select a batch of real samples
        X_realA, y_realA = generate_real_samples(trainA, n_batch, n_patch)
        X_realB, y_realB = generate_real_samples(trainB, n_batch, n_patch)
        
        # generate a batch of fake samples
        X_fakeA, y_fakeA = generate_fake_samples(g_model_BtoA, X_realB, n_patch)
        X_fakeB, y_fakeB = generate_fake_samples(g_model_AtoB, X_realA, n_patch)
        
        # Save Outputs
        save_images(i, X_fakeA, True)
        save_images(i, X_realB, False)
        
        # update fakes from pool
        X_fakeA = update_image_pool(poolA, X_fakeA)
        X_fakeB = update_image_pool(poolB, X_fakeB)
        
        # update generator B->A via adversarial and cycle loss
        g_loss2, _, _, _, _  = c_model_BtoA.train_on_batch([X_realB, X_realA], [y_realA, X_realA, X_realB, X_realA])
        
        # update discriminator for A -> [real/fake]
        dA_loss1 = d_model_A.train_on_batch(X_realA, y_realA)
        dA_loss2 = d_model_A.train_on_batch(X_fakeA, y_fakeA)
        
        # update generator A->B via adversarial and cycle loss
        g_loss1, _, _, _, _ = c_model_AtoB.train_on_batch([X_realA, X_realB], [y_realB, X_realB, X_realA, X_realB])
        
        # update discriminator for B -> [real/fake]
        dB_loss1 = d_model_B.train_on_batch(X_realB, y_realB)
        dB_loss2 = d_model_B.train_on_batch(X_fakeB, y_fakeB)

train(painting_disc, photo_disc, painting_generator, photo_generator, painting_to_photo, photo_to_painting, [real_paintings_training, real_photos_training])

### SWD

In [40]:
def sliced_wasserstein(A, B, dir_repeats=128, dirs_per_repeat=4):
    
    A = np.reshape(A, (A.shape[0], -1))
    B = np.reshape(B, (B.shape[0], -1))
    
    assert A.ndim == 2 and A.shape == B.shape                           # (neighborhood, descriptor_component)
    results = []
    for repeat in range(dir_repeats):
        dirs = np.random.randn(A.shape[1], dirs_per_repeat)             # (descriptor_component, direction)
        dirs /= np.sqrt(np.sum(np.square(dirs), axis=0, keepdims=True)) # normalize descriptor components for each direction
        dirs = dirs.astype(np.float32)
        projA = np.matmul(A, dirs)                                      # (neighborhood, direction)
        projB = np.matmul(B, dirs)
        projA = np.sort(projA, axis=0)                                  # sort neighborhood projections for each direction
        projB = np.sort(projB, axis=0)
        dists = np.abs(projA - projB)                                   # pointwise wasserstein distances
        results.append(np.mean(dists))                                  # average over neighborhoods and directions
    return np.mean(results)

### Predict some more results with trained generators

In [53]:
### Grab random photos or paintings and convert them to the other style

patch_shape = painting_disc.output_shape[1]

photos_to_paintings_SWD = []
paintings_to_photos_SWD = []

# Photo to Paintings
for i in range(len(real_photos_testing)):
    ix = np.random.randint(0, real_photos_testing.shape[0], patch_shape)
    data = real_photos_testing[ix]
    image = photo_generator.predict(data)
#     save_images(i, data, False, '/generated_paitings')
#     save_images(i, image, True, '/generated_paitings')
    photos_to_paintings_SWD.append(sliced_wasserstein(data, image))
    
    # Stop Early if needed
    print(i)
    if i == 15:
        break

# Paintings to Photo 
for i in range(len(real_paintings_testing)):
    ix = np.random.randint(0, real_paintings_testing.shape[0], patch_shape)
    data = real_paintings_testing[ix]
    image = painting_generator.predict(data)
#     save_images(i, data, False, '/generated_photos')
#     save_images(i, image, True, '/generated_photos')   
    paintings_to_photos_SWD.append(sliced_wasserstein(data, image))
    
    # Stop Early if needed
    print(i)
    if i == 15:
        break
    

print(f"Mean SWD for photos to paintings: {statistics.mean(photos_to_paintings_SWD)}")
print(f"Mean SWD for paintings to photos: {statistics.mean(paintings_to_photos_SWD)}")


0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Mean SWD for photos to paintings: 0.06677430730273845
Mean SWD for paintings to photos: 0.06010330965283581


In [None]:
# Download data to view locally to view photos (Kaggle doesn't allow that)
os.chdir(r'/kaggle/working')

!tar -czf Out.tar.gz ./

from IPython.display import FileLink

FileLink(r'Out.tar.gz')