analysis (model building and training), result, and discussion/conclusion. 

# Description

A GAN consists of at least two neural networks: a generator model and a discriminator model. The generator is a neural network that creates the images. For our competition, you should generate images in the style of Monet. This generator is trained using a discriminator.

The two models will work against each other, with the generator trying to trick the discriminator, and the discriminator trying to accurately classify the real vs. generated images.

Your task is to build a GAN that generates 7,000 to 10,000 Monet-style images.

# EDA

In [None]:
import os
from PIL import Image
import matplotlib.pyplot as plt
import random
import numpy as np


import tensorflow as tf
from tensorflow.keras import models, layers, losses, optimizers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow_addons.layers import InstanceNormalization

In [None]:
def display_images(image_paths, title):
    plt.figure(figsize=(12, 8))
    for i, img_path in enumerate(image_paths):
        img = Image.open(img_path)
        plt.subplot(2, 3, i + 1)
        plt.imshow(img)
        plt.title(title)
        plt.axis('off')
    plt.show()

monet_images = random.sample(os.listdir('./monet_jpg'), 3)
photo_images = random.sample(os.listdir('./photo_jpg'), 3)

monet_image_paths = [os.path.join('./monet_jpg', img) for img in monet_images]
photo_image_paths = [os.path.join('./photo_jpg', img) for img in photo_images]


display_images(monet_image_paths, 'Monet Paintings')


display_images(photo_image_paths, 'Photos')

# These are for the generator
def generate_images(generator, z_dim, num_images=10):
    noise = tf.random.normal([num_images, z_dim])
    generated_images = generator.predict(noise)
    generated_images = (generated_images * 127.5) + 127.5
    generated_images = generated_images.astype(np.uint8)

    plt.figure(figsize=(10, 10))
    for i in range(num_images):
        plt.subplot(num_images // 5 + 1, 5, i+1)
        plt.imshow(generated_images[i])
        plt.axis('off')
    plt.show()

def visualize_feature_maps(generator, z_dim, num_images=1):
    noise = tf.random.normal([num_images, z_dim])
    generated_images = generator(noise, training=False)

    layer_outputs = [layer.output for layer in generator.layers if 'conv' in layer.name]
    activation_model = tf.keras.models.Model(inputs=generator.input, outputs=layer_outputs)

    feature_maps = activation_model.predict(noise)

    for layer_name, feature_map in zip([layer.name for layer in generator.layers if 'conv' in layer.name], feature_maps):
        size = feature_map.shape[1]
        n_features = feature_map.shape[-1]
        n_cols = n_features // 16
        display_grid = np.zeros((size * n_cols, size * 16))

        for col in range(n_cols):
            for row in range(16):
                channel_image = feature_map[0, :, :, col * 16 + row]
                channel_image -= channel_image.mean()
                channel_image /= channel_image.std()
                channel_image *= 64
                channel_image += 128
                channel_image = np.clip(channel_image, 0, 255).astype('uint8')
                display_grid[col * size: (col + 1) * size, row * size: (row + 1) * size] = channel_image

        scale = 20. / n_features
        plt.figure(figsize=(scale * 16, scale * n_cols))
        plt.title(layer_name)
        plt.grid(False)
        plt.imshow(display_grid, aspect='auto', cmap='viridis')

In [1]:
def get_all_image_paths(image_dir):
    image_files = os.listdir(image_dir)
    image_paths = [os.path.join(image_dir, img) for img in image_files]
    return image_paths

monet_image_paths = get_all_image_paths('./monet_jpg')
photo_image_paths = get_all_image_paths('./photo_jpg')

NameError: name 'os' is not defined

In [None]:
def preprocess_image(path):
    ''' Function that preprocesses the images.
        Ensures that the image is RBG, and size (256, 256)
        Normalizes the value
    '''
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [256, 256])
    image = tf.cast(image, tf.float32)
    image = (image / 127.5) - 1
    return image

def augment_image(image):
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)
    image = tf.image.rot90(image, tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32))
    image = tf.image.random_brightness(image, max_delta=0.1)
    return image

def prepare_and_augment(image_path):
    image = preprocess_image(image_path)
    return tf.data.Dataset.from_tensors(image).map(augment_image)


monet_dataset = tf.data.Dataset.from_tensor_slices(monet_image_paths)
monet_dataset = monet_dataset.map(preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)

photo_dataset = tf.data.Dataset.from_tensor_slices(photo_image_paths)
photo_dataset = photo_dataset.map(preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)

for image in monet_dataset.take(1):
    print(image.numpy().min(), image.numpy().max())

for image in photo_dataset.take(1):
    print(image.numpy().min(), image.numpy().max()) 

# Model Building

Originally, i had built my own GAN that only upsamples and the discriminator downsamples. However, when i got into tuning, i discovered CycleGAN's method and so i created that architecture below. This is just for reference

In [None]:
def build_generator(z_dim):
    model = models.Sequential()
    model.add(layers.Dense(8*8*256, use_bias=False, input_shape=(z_dim,)))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Reshape((8, 8, 256)))

    # Upsample to 16x16
    model.add(layers.Conv2DTranspose(128, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    # Upsample to 32x32
    model.add(layers.Conv2DTranspose(128, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    # Upsample to 64x64
    model.add(layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    # Upsample to 128x128
    model.add(layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    # Upsample to 256x256
    model.add(layers.Conv2DTranspose(3, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh'))

    return model

# Discriminator model
def build_discriminator(image_shape):
    model = models.Sequential()
    model.add(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=image_shape))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # 128x128
    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # 64x64
    model.add(layers.Conv2D(256, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # 32x32
    model.add(layers.Conv2D(512, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # 16x16
    model.add(layers.Conv2D(1024, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # Flatten and output layer
    model.add(layers.Flatten())
    model.add(layers.Dense(1))  # No activation bc we'll use from_logits in loss function

    return model

generator_init = build_generator(z_dim)
discriminator_init = build_discriminator((256, 256, 3))

generator_init.summary()
discriminator_init.summary()

discriminator_init.compile(
    optimizer=Adam(learning_rate=disc_learning_rate, beta_1=beta_1),
    loss=BinaryCrossentropy(from_logits=True),
    metrics=['accuracy']
)

In [None]:
def build_gan(generator, discriminator, z_dim):
    gan_input = layers.Input(shape=(z_dim,))
    fake_image = generator(gan_input)
    discriminator.trainable = False
    gan_output = discriminator(fake_image)
    
    gan = models.Model(gan_input, gan_output)
    
    gan.compile(
        optimizer=Adam(learning_rate=gen_learning_rate, beta_1=beta_1),
        loss=BinaryCrossentropy(from_logits=True)
    )
    return gan

In [None]:
gan_init = build_gan(generator_init, discriminator_init, z_dim)

# Model Training

In [None]:
def train_gan(gan, dataset, batch_size, z_dim, smooth_factor=0.1, epochs=1):
    generator, discriminator = gan.layers[1:]

    for epoch in range(epochs):
        print(f"Epoch {epoch+1}/{epochs}")
        for real_images, _ in dataset:
            batch_size = real_images.shape[0]

            # Train real
            real_labels = tf.ones((batch_size, 1)) * (1 - smooth_factor)  # Smooth labels for real images
            d_loss_real = discriminator.train_on_batch(real_images, real_labels)

            # Train fake
            noise = tf.random.normal(shape=(batch_size, z_dim))
            fake_images = generator(noise)
            fake_labels = tf.zeros((batch_size, 1)) + smooth_factor  # Smooth labels
            d_loss_fake = discriminator.train_on_batch(fake_images, fake_labels)

            # Train the generator (thru gan)
            noise = tf.random.normal(shape=(batch_size, z_dim))
            gan_labels = tf.ones((batch_size, 1))
            g_loss = gan.train_on_batch(noise, gan_labels)

            if (epoch + 1) % 100 == 0:
                generate_images(generator, z_dim, 3)

            print(f"D Loss Real: {d_loss_real}, D Loss Fake: {d_loss_fake}, G Loss: {g_loss}")

In [None]:
#Map the datasets to labels and combine
monet_dataset = monet_dataset.map(lambda x: (x, tf.ones((1,))))
monet_dataset_tfrec = monet_dataset_tfrec.map(lambda x: (x, tf.ones((1,))))

photo_dataset = photo_dataset.map(lambda x: (x, tf.zeros((1,))))
photo_dataset_tfrec = photo_dataset_tfrec.map(lambda x: (x, tf.zeros((1,))))

# Combine
combined_dataset = monet_dataset.concatenate(photo_dataset)
combined_dataset_tfrec = monet_dataset_tfrec.concatenate(photo_dataset_tfrec)

combined_dataset = combined_dataset.shuffle(buffer_size=1024).batch(batch_size)
combined_dataset_tfrec = combined_dataset_tfrec.shuffle(buffer_size=1024).batch(batch_size)

In [None]:
train_gan(gan_init, combined_dataset, batch_size, z_dim, epochs=30)

# Tuning

It looks like the above architecture is a bit of overkill and severely overfits. We can tell this by the loss functions starting low and going even lower to essentially 0.

We want the loss on D Real and D Fake to go down, meaning the discriminator is good at determining whether a photo is a Monet or not. Whereas we want our G loss to struggle at the beginning, going down meaning the discriminator can tell when a picture is fake. This will then cause the generator to create better pictures and fool the discriminator. Then the G loss should go up as the discriminator gets worse. And then there should ideally be a back and forth power struggle

### Revision -> I was actually using a smaller subset of photos and thats why my model was overfitting. Now i am unsure

In [None]:
def build_generator_tuning(z_dim):
    model = models.Sequential()
    model.add(layers.Dense(4 * 4 * 256, use_bias=False, input_shape=(z_dim, )))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Reshape((4, 4, 256)))

    # Upsample to 8x8
    model.add(layers.Conv2DTranspose(128, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    # Upsample to 16x16
    model.add(layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    # Upsample to 32x32
    model.add(layers.Conv2DTranspose(32, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    # Upsample to 64x64
    model.add(layers.Conv2DTranspose(16, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    # Upsample to 128x128
    model.add(layers.Conv2DTranspose(8, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    # Upsample to 256x256
    model.add(layers.Conv2DTranspose(3, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh'))
    return model

def build_discriminator_tuning(image_shape):
    model = models.Sequential()
    model.add(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=image_shape))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # 128x128
    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # 64x64
    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # 32x32
    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # 16x16
    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # Flatten
    model.add(layers.Flatten())
    model.add(layers.Dense(1))  # No activation because we'll use from_logits in the loss function

    return model

In [None]:
generator_tuning = build_generator_tuning(z_dim)
discriminator_tuning = build_discriminator_tuning((256, 256, 3))

generator_tuning.summary()
discriminator_tuning.summary()

discriminator_tuning.compile(
    optimizer=Adam(learning_rate=disc_learning_rate, beta_1=beta_1),
    loss=BinaryCrossentropy(from_logits=True),
    metrics=['accuracy']
)

In [None]:
gan_tuning = build_gan(generator_tuning, discriminator_tuning, z_dim)

In [None]:
train_gan(gan_tuning, combined_dataset, batch_size, z_dim, smooth_factor, epochs=4000)
generate_images(generator_tuning, z_dim, num_images=10)
visualize_feature_maps(generator_tuning, z_dim, num_images=5)

# CycleGAN Architecture

Please note the hyperparameters were moved below here. They could be referenced above and thats why the code isnt running. This is the important code. I also did not run a GridSearch as it is pretty computationally expensive. I spent a few days tuning this by trial and error

In [None]:
# hyperparameters
z_dim = 600
disc_learning_rate = 0.002
gen_learning_rate = 0.002
beta_1 = 0.5 # change this to 0.9 -> slower but more controlled convergence ideally maybe 0.8
batch_size = 32
smooth_factor = 0.03
lambda_cycle = 10 # Weight for cycle consistency loss

In [None]:
def build_generator_cyclegan():
    model = models.Sequential()

    # Initial convolution layer
    model.add(layers.Conv2D(64, (4, 4), strides=(2, 2), padding='same', input_shape=(256, 256, 3)))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())

    # Downsampling
    model.add(layers.Conv2D(128, (4, 4), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())

    model.add(layers.Conv2D(256, (4, 4), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())

    model.add(layers.Conv2D(512, (4, 4), strides=(2, 2), padding='same',kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())

    model.add(layers.Conv2D(512, (4, 4), strides=(2, 2), padding='same',kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())

    model.add(layers.Conv2D(512, (4, 4), strides=(2, 2), padding='same',kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())

    model.add(layers.Conv2D(512, (4, 4), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())

    model.add(layers.Conv2D(512, (4, 4), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())

    # Upsampling
    model.add(layers.Conv2DTranspose(512, (4, 4), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())
    model.add(layers.Dropout(0.5))

    model.add(layers.Conv2DTranspose(512, (4, 4), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())
    model.add(layers.Dropout(0.5))

    model.add(layers.Conv2DTranspose(512, (4, 4), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())
    model.add(layers.Dropout(0.5))

    model.add(layers.Conv2DTranspose(512, (4, 4), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())

    model.add(layers.Conv2DTranspose(256, (4, 4), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())
    
    model.add(layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())

    model.add(layers.Conv2DTranspose(64, (4, 4), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(InstanceNormalization())

    # Output layer
    model.add(layers.Conv2DTranspose(3, (4, 4), strides=(2, 2), padding='same', activation='tanh'))

    return model

def build_discriminator_cyclegan(image_shape):
    model = models.Sequential()
    model.add(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=image_shape))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # 128x128
    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # 64x64
    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # 32x32
    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # 16x16
    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same', kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    # Flatten
    model.add(layers.Flatten())
    model.add(layers.Dense(1))  # No activation because we'll use from_logits in the loss function

    return model

def generate_images_cyclegan(generator, test_input):
    generated_images = generator(test_input, training=False)
    generated_images = (generated_images * 127.5) + 127.5
    generated_images = generated_images.numpy().astype(np.uint8)

    test_input = (test_input * 127.5) + 127.5
    test_input = test_input.numpy().astype(np.uint8)

    plt.figure(figsize=(10, 10))
    for i in range(generated_images.shape[0]):
        plt.subplot(2, generated_images.shape[0], i+1)
        plt.imshow(test_input[i])
        plt.title("Original" if i == 0 else "")  # Only add the title to the first image for clarity
        plt.axis('off')

        # AI Generated images
        plt.subplot(2, generated_images.shape[0], i + 1 + generated_images.shape[0])
        plt.imshow(generated_images[i])
        plt.title("AI Generated" if i == 0 else "")  # Only add the title to the first image for clarity
        plt.axis('off')

    plt.show()

def visualize_feature_maps_cyclegan(generator, test_input):
    generated_images = generator(test_input, training=False)

    layer_outputs = [layer.output for layer in generator.layers if 'conv' in layer.name]
    activation_model = tf.keras.models.Model(inputs=generator.input, outputs=layer_outputs)

    feature_maps = activation_model.predict(test_input)

    for layer_name, feature_map in zip([layer.name for layer in generator.layers if 'conv' in layer.name], feature_maps):
        size = feature_map.shape[1]
        n_features = feature_map.shape[-1]
        n_cols = n_features // 16
        display_grid = np.zeros((size * n_cols, size * 16))

        for col in range(n_cols):
            for row in range(16):
                channel_image = feature_map[0, :, :, col * 16 + row]
                channel_image -= channel_image.mean()
                channel_image /= channel_image.std()
                channel_image *= 64
                channel_image += 128
                channel_image = np.clip(channel_image, 0, 255).astype('uint8')
                display_grid[col * size: (col + 1) * size, row * size: (row + 1) * size] = channel_image

        scale = 20. / n_features
        plt.figure(figsize=(scale * 16, scale * n_cols))
        plt.title(layer_name)
        plt.grid(False)
        plt.imshow(display_grid, aspect='auto', cmap='viridis')

In [None]:
# Doing research into CycleGAN and want to recreate that architecture
photo_to_monet_generator = build_generator_cyclegan()
monet_to_photo_generator = build_generator_cyclegan()
photo_discriminator = build_discriminator_cyclegan((256, 256, 3))
monet_discriminator = build_discriminator_cyclegan((256, 256, 3))

gen_lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    gen_learning_rate,
    decay_steps=5000,
    decay_rate=0.94,
    staircase=True)
disc_lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    disc_learning_rate,
    decay_steps=4000,
    decay_rate=0.85,
    staircase=True)

generator_optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=gen_learning_rate, beta_1=beta_1)
discriminator_optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=disc_lr_schedule, beta_1=beta_1)

photo_discriminator.compile(optimizer=discriminator_optimizer, loss=losses.BinaryCrossentropy(from_logits=True))
monet_discriminator.compile(optimizer=discriminator_optimizer, loss=losses.BinaryCrossentropy(from_logits=True))

#generator_optimizer.build(photo_to_monet_generator.trainable_variables)
#generator_optimizer.build(monet_to_photo_generator.trainable_variables)
#discriminator_optimizer.build(photo_discriminator.trainable_variables)
#discriminator_optimizer.build(monet_discriminator.trainable_variables)

In [None]:
def cycle_consistency_loss(real_images, generated_images, lambda_cycle=lambda_cycle):
    return lambda_cycle * tf.reduce_mean(tf.abs(real_images - generated_images))

def generator_loss(disc_generated_output, lambda_cycle=lambda_cycle/2):
    return tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(disc_generated_output), disc_generated_output)

def discriminator_loss(disc_real_output, disc_generated_output):
    real_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(disc_real_output), disc_real_output)
    generated_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.zeros_like(disc_generated_output), disc_generated_output)
    total_disc_loss = (real_loss + generated_loss) * 0.5
    return tf.reduce_mean(total_disc_loss)

def identity_loss(real_image, same_image, lambda_identity=lambda_cycle/2):
    return lambda_identity * tf.reduce_mean(tf.abs(real_image - same_image))

def train_cyclegan(photo_dataset, monet_dataset, epochs=1):
    sample_photos = next(iter(photo_dataset.take(5)))
    sample_monets = next(iter(monet_dataset.take(5)))
    for epoch in range(epochs):
        photo_gen_loss_sum = monet_gen_loss_sum = photo_disc_loss_sum = monet_disc_loss_sum = cycle_loss_sum = 0
        num_batches = 0

        for photo_images, monet_images in zip(photo_dataset, monet_dataset):
            num_batches += 1
            with tf.GradientTape(persistent=True) as tape:
                # Generate images
                fake_monet = photo_to_monet_generator(photo_images, training=True)
                cycled_photo = monet_to_photo_generator(fake_monet, training=True)

                fake_photo = monet_to_photo_generator(monet_images, training=True)
                cycled_monet = photo_to_monet_generator(fake_photo, training=True)

                # Generate Real Image
                same_monet = photo_to_monet_generator(monet_images, training=True)
                same_photo = monet_to_photo_generator(photo_images, training=True)

                # Discriminator output
                disc_real_photo = photo_discriminator(photo_images, training=True)
                disc_fake_photo = photo_discriminator(fake_photo, training=True)

                disc_real_monet = monet_discriminator(monet_images, training=True)
                disc_fake_monet = monet_discriminator(fake_monet, training=True)

                # Calculate losses
                photo_gen_loss = generator_loss(disc_fake_monet)
                monet_gen_loss = generator_loss(disc_fake_photo)
                photo_disc_loss = discriminator_loss(disc_real_photo, disc_fake_photo)
                monet_disc_loss = discriminator_loss(disc_real_monet, disc_fake_monet)
                total_cycle_loss = cycle_consistency_loss(photo_images, cycled_photo) + cycle_consistency_loss(monet_images, cycled_monet)

                # Calculate identity loss
                photo_identity_loss = identity_loss(photo_images, same_photo, lambda_identity=lambda_cycle/2) # lamdba_identity makes sure the image looks like itself
                monet_identity_loss = identity_loss(monet_images, same_monet, lambda_identity=lambda_cycle/2)
                
                # Total generator loss with identity loss
                total_photo_gen_loss = photo_gen_loss + total_cycle_loss + photo_identity_loss
                total_monet_gen_loss = monet_gen_loss + total_cycle_loss + monet_identity_loss

                # Accumulate losses for logging
                photo_gen_loss_sum += photo_gen_loss.numpy()
                monet_gen_loss_sum += monet_gen_loss.numpy()
                photo_disc_loss_sum += photo_disc_loss.numpy()
                monet_disc_loss_sum += monet_disc_loss.numpy()
                cycle_loss_sum += total_cycle_loss.numpy()

            # Calculate the gradients and apply them
            photo_generator_gradients = tape.gradient(total_photo_gen_loss, photo_to_monet_generator.trainable_variables)
            monet_generator_gradients = tape.gradient(total_monet_gen_loss, monet_to_photo_generator.trainable_variables)
            photo_discriminator_gradients = tape.gradient(photo_disc_loss, photo_discriminator.trainable_variables)
            monet_discriminator_gradients = tape.gradient(monet_disc_loss, monet_discriminator.trainable_variables)

            generator_optimizer.apply_gradients(zip(photo_generator_gradients, photo_to_monet_generator.trainable_variables))
            generator_optimizer.apply_gradients(zip(monet_generator_gradients, monet_to_photo_generator.trainable_variables))
            discriminator_optimizer.apply_gradients(zip(photo_discriminator_gradients, photo_discriminator.trainable_variables))
            discriminator_optimizer.apply_gradients(zip(monet_discriminator_gradients, monet_discriminator.trainable_variables))

        if (epoch + 1) % 10 == 0:
            print("Generated images at epoch", epoch + 1)
            generate_images_cyclegan(photo_to_monet_generator, sample_photos)
            generate_images_cyclegan(monet_to_photo_generator, sample_monets)

            
        # Print the average losses
        print(f'Epoch {epoch + 1}/{epochs}')
        print(f'    Photo Generator Loss: {photo_gen_loss_sum / num_batches}')
        print(f'    Monet Generator Loss: {monet_gen_loss_sum / num_batches}')
        print(f'    Photo Discriminator Loss: {photo_disc_loss_sum / num_batches}')
        print(f'    Monet Discriminator Loss: {monet_disc_loss_sum / num_batches}')
        print(f'    Photo Identity Loss: {photo_identity_loss / num_batches}')
        print(f'    Monet Identity Loss: {monet_identity_loss / num_batches}')
        print(f'    Cycle Consistency Loss: {cycle_loss_sum / num_batches}')

In [None]:
monet_dataset_cyclegan = tf.data.Dataset.from_tensor_slices(monet_image_paths)
monet_dataset_cyclegan_augmented = monet_dataset_cyclegan.flat_map(prepare_and_augment)
monet_dataset_cyclegan_augmented = monet_dataset_cyclegan_augmented.shuffle(buffer_size=1024).batch(batch_size, drop_remainder=True)

photo_dataset_cyclegan = tf.data.Dataset.from_tensor_slices(photo_image_paths)
photo_dataset_cyclegan_augmented = photo_dataset_cyclegan.flat_map(prepare_and_augment)
photo_dataset_cyclegan_augmented = photo_dataset_cyclegan_augmented.shuffle(buffer_size=1024).batch(batch_size, drop_remainder=True)

In [None]:
train_cyclegan(photo_dataset_cyclegan_augmented, monet_dataset_cyclegan_augmented, epochs=25)

In [None]:
plt.figure(figsize=(20, 20))

i = 1
for img in photo_dataset_cyclegan_augmented:
    prediction = photo_to_monet_generator(img, training=False)[0].numpy()
    prediction = (prediction * 127.5 + 127.5).astype(np.uint8)
    if i <= 4:  # Display and save the first four images
        plt.subplot(1, 4, i)
        plt.imshow(prediction)
        plt.axis('off')

        im = Image.fromarray(prediction)
        im.save("./final_images/" + str(i) + ".jpg")

    i += 1
    if i > 5:
        break

plt.show()

# Result

Unfortunately, my GAN did not get to the point of producing good images. The Cycle Consistency Loss is always over 10 (which is weighted by 10 so the loss is rarely less than 1). It's like the Losses arent applied to the gradients properly, but i've double checked the training loop multiple times. I use Keras's Example CycleGAN to help tune and debug the model, but yet my model doesnt perform nearly as close. They can train for 5 epochs and it produces a proper picture, i train for 5000 (10 hours overnight) and i get static.

# Conclusion & Discussion

I tried dozen of hyperparameter permutations. I tuned for 2 days. At one point, i was getting the generator to produce images but it still suffered from mode collapse, where the generator had learned to draw 1 specific picture. Then in an effort to fix that, I converted to trying to create a CycleGAN copy where i found their architecture in literature. I do a few things differently that may effect the model like not optimally initializing my neural network layers and my instance normalizations. I also had a much smaller dataset, only 300 Monet's. I think that could've been a big factor, so i had implemented augmentation to increase the data diversity. Overall, this was very frustrating due to constant mode collapse but i learned a lot!

### EDIT: I'm dumb! In my data loading i was only using 3 samples!!! CRAZY! I retrained on 15 epochs and it took so much longer per epoch so that makes sense!

# Resources

- https://developers.google.com/machine-learning/gan/applications
- https://openaccess.thecvf.com/content_ICCV_2017/papers/Zhu_Unpaired_Image-To-Image_Translation_ICCV_2017_paper.pdf
- https://www.kaggle.com/code/amyjang/monet-cyclegan-tutorial#Visualize-our-Monet-esque-photos