In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import pathlib
import time
import datetime
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy
from matplotlib import pyplot as plt
from IPython import display
import PIL

AUTOTUNE = tf.data.experimental.AUTOTUNE
# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory
TRAIN_FILES = []
TEST_FILES = []
for dirname, _, filenames in os.walk('../input/gan-getting-started/monet_tfrec'):
    for filename in filenames:
         TRAIN_FILES.append(os.path.join(dirname, filename))
            
for dirname, _, filenames in os.walk('../input/gan-getting-started/photo_tfrec'):
    for filename in filenames:
         TEST_FILES.append(os.path.join(dirname, filename))

TRAIN_FILES

1. **Data Understanding and Augmentation:**
* Goal: Enhance the dataset to create more robust training data.
* Action: Use the jittering, cropping, and normalization steps you've already implemented. Consider additional augmentation methods like rotation, brightness adjustment, and contrast changes to further diversify the dataset.
2. **Model Selection:**
* Goal: Choose appropriate deep learning architectures that align with your task (e.g., image translation, classification).
* Options:
    * Generative Adversarial Networks (GANs) for style transfer if you're working with Monet paintings vs. photos.
    * Convolutional Neural Networks (CNNs) for classification tasks.
* Transfer Learning using pre-trained models like VGG, ResNet, or Inception if the dataset size is limited.
* Action: Start with simpler models and incrementally move to more complex ones as needed.
3. **Training the Model:**
* Goal: Optimize model training by tuning hyperparameters such as learning rate, batch size, and number of epochs.
* Action:
    * Split the dataset into training, validation, and test sets.
    * Apply the pre-processing pipeline to all datasets to ensure consistency.
    * Use callbacks like early stopping to prevent overfitting.
4. **Evaluation Metrics:**
* Goal: Measure the performance of your model effectively.
* Metrics:
    * For image generation: Inception Score, Fréchet Inception Distance (FID).
    * For classification: Accuracy, Precision, Recall, F1-score.
    * Visual evaluation: Compare generated vs. real images side-by-side.
* Action: Implement metric-specific evaluation functions and generate visual comparisons to assess qualitative performance.
5. **Error Analysis:**
* Goal: Identify areas where the model performs poorly and understand why.
* Action:
    * Review misclassified images or poorly generated samples.
    * Check for biases in training data or misalignments during pre-processing.
    * Use techniques like Grad-CAM for CNNs to interpret what the model is focusing on.
6. **Model Fine-tuning and Optimization:**
* Goal: Refine the model for better performance.
* Action:
    * Perform hyperparameter tuning using grid search or random search.
    * Experiment with different layers or architectures.
    * Consider ensemble techniques if using classification models.

# Step 1: Data Understanding, Introduction and Augementation

**Problem Description:**<br>
The "I'm Something of a Painter Myself" Kaggle challenge focuses on generating Monet-style artworks using generative deep learning models, specifically GANs (Generative Adversarial Networks). The main goal is to train a model that can transform regular photos into paintings that mimic Monet's distinctive style. This task combines artistic style transfer with deep learning, pushing participants to explore advanced generative modeling techniques.

**Generative Deep Learning Models:**<br>
Generative deep learning models, such as GANs, consist of two main components: a generator and a discriminator. The generator creates new data instances (in this case, Monet-style images), while the discriminator evaluates them against real Monet paintings to distinguish between authentic and generated artworks. The iterative competition between these two models helps improve the quality of the generated images.

**Dataset Overview:**<br>
The dataset contains four directories:
* Monet Images: Includes 300 Monet paintings sized 256x256 pixels available in both JPEG and TFRecord formats.
* Photo Images: Contains 7,028 photos sized 256x256 pixels, also provided in JPEG and TFRecord formats.

In [None]:
# Define function to parse TFRecord images
def parse_tfrecord_fn(example):
    feature_description = {
        'image': tf.io.FixedLenFeature([], tf.string),
        'image_name': tf.io.FixedLenFeature([], tf.string),
    }
    example = tf.io.parse_single_example(example, feature_description)
    image = tf.image.decode_jpeg(example['image'], channels=3)
    image = tf.image.resize(image, [256, 256])
    return image

# Load the Monet and photo datasets
def load_dataset(filenames):
    ignore_order = tf.data.Options()
    ignore_order.experimental_deterministic = False  # for better performance
    dataset = tf.data.TFRecordDataset(filenames)
    dataset = dataset.with_options(ignore_order)
    dataset = dataset.map(parse_tfrecord_fn, num_parallel_calls=AUTOTUNE)
    return dataset

monet_ds = load_dataset(TRAIN_FILES)
photo_ds = load_dataset(TEST_FILES)

# Function to display images from a dataset
def display_samples(dataset, n_samples):
    plt.figure(figsize=(12, 12))
    for i, image in enumerate(dataset.take(n_samples)):
        plt.subplot(1, n_samples, i + 1)
        plt.imshow(image.numpy().astype("uint8"))
        plt.axis("off")
    plt.show()

# Display some Monet paintings
display_samples(monet_ds, 5)

# Display some photos
display_samples(photo_ds, 5)

In [None]:
def compute_image_stats(dataset):
    pixel_values = []
    for image in dataset:
        pixel_values.append(tf.reshape(image, [-1]))  # Flatten image to 1D
    pixel_values = tf.concat(pixel_values, axis=0)
    mean = tf.reduce_mean(pixel_values).numpy()
    stddev = tf.math.reduce_std(pixel_values).numpy()
    return mean, stddev

# Compute stats for Monet paintings
monet_mean, monet_stddev = compute_image_stats(monet_ds)
print(f"Monet Paintings - Mean pixel value: {monet_mean}, StdDev: {monet_stddev}")

# Compute stats for photos
photo_mean, photo_stddev = compute_image_stats(photo_ds)
print(f"Photos - Mean pixel value: {photo_mean}, StdDev: {photo_stddev}")

# Preprocessing

Setting up the datasets

In [None]:
raw_monet_ds = tf.data.TFRecordDataset(TRAIN_FILES)
raw_painting_ds = tf.data.TFRecordDataset(TEST_FILES)


In [None]:
for raw_record in raw_monet_ds.take(1):
    example = tf.train.Example()
    example.ParseFromString(raw_record.numpy())

    result = {}

# example.features.feature is the dictionary
for key, feature in example.features.feature.items():
    # The values are the Feature objects which contain a `kind` which contains:
    # one of three fields: bytes_list, float_list, int64_list
    kind = feature.WhichOneof('kind')
    result[key] = np.array(getattr(feature, kind).value).dtype

In [None]:
# Create a description of the features.
feature_description = {
    'target': tf.io.FixedLenFeature([], tf.string, default_value=''),
    'image_name': tf.io.FixedLenFeature([], tf.string, default_value=''),
    'image': tf.io.FixedLenFeature([], tf.string, default_value=''),
}

def _parse_image(example_proto):
  # Parse the input tf.train.Example proto using the dictionary above.
  return tf.io.parse_single_example(example_proto, feature_description)

monet_train_ds = raw_monet_ds.map(_parse_image)
painting_test_ds = raw_painting_ds.map(_parse_image)

In [None]:
# Each image is 256x256 in size
IMAGE_SIZE = 256
CHANNELS = 3
EPOCHS = 10

def random_crop(image):
    cropped_image = tf.image.random_crop(image, size=[IMAGE_SIZE, IMAGE_SIZE, CHANNELS])

    return cropped_image

# normalizing the images to [-1, 1]
def normalize(image):
    image = tf.cast(image, tf.float32)
    image = (image / 127.5) - 1
    return image

def random_jitter(image):
    # resizing to 286 x 286 x 3
    image = tf.image.resize(image, [286, 286], method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

    # randomly cropping to 256 x 256 x 3
    image = random_crop(image)

    # random mirroring
    image = tf.image.random_flip_left_right(image)

    return image


In [None]:
# The facade training set consist of 300 images
BUFFER_SIZE = 300
BATCH_SIZE = 30

def preprocess_image_train(batch):
    image = batch['image']
    image = tf.io.decode_jpeg(image, channels=CHANNELS)
    image = random_jitter(image)
    image = normalize(image)
    return image

def preprocess_image_test(batch):
    image = batch['image']
    image = tf.io.decode_jpeg(image, channels=CHANNELS)
    image = normalize(image)
    return image

def create_dataset_train(dataset):
    ds = dataset.map(lambda batch: preprocess_image_train(batch), num_parallel_calls=AUTOTUNE)
    ds = ds.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
    return ds.prefetch(AUTOTUNE)

def create_dataset_test(dataset):
    ds = dataset.map(lambda batch: preprocess_image_test(batch), num_parallel_calls=AUTOTUNE)
    ds = ds.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
    return ds.prefetch(AUTOTUNE)

train_ds = create_dataset_train(monet_train_ds)
test_ds = create_dataset_test(painting_test_ds)

# Step 2: Model Selection
For model selection in the context of our current project, we need to choose models that are well-suited for handling images, especially considering the extensive augmentation and preprocessing pipeline you've developed. Here are the key considerations and potential models that align with your task:

1. Convolutional Neural Networks (CNNs)
* Why Use CNNs? CNNs are highly effective for image classification and generation tasks due to their ability to capture spatial hierarchies and patterns within images.
* Popular Architectures:
    * ResNet (Residual Networks): Known for handling deep models with residual connections, making them less prone to vanishing gradients.
    * VGGNet: Provides simplicity and effectiveness with stacked convolutional layers, though it has a higher number of parameters.
    * Inception (GoogLeNet): Efficient with depthwise separable convolutions and multiple filter sizes that capture varying features simultaneously.
    * MobileNet: Lightweight architecture suitable for tasks requiring efficiency, especially useful if training on limited computational resources.
2. Transfer Learning Models
* Why Use Transfer Learning? <br>
    * Leveraging pre-trained models, such as those trained on large datasets like ImageNet, can significantly speed up training and improve performance with limited data. You can fine-tune these models with your data to enhance their capability for specific tasks.
* Models to Consider:
    * ResNet50: Effective for image classification, suitable for transfer learning with relatively few parameters compared to other deep networks.
    * EfficientNet: Provides a good trade-off between model size and accuracy, scaling width, depth, and resolution systematically.
    * DenseNet: Uses dense connections between layers to improve feature reuse, leading to better parameter efficiency.
3. Generative Adversarial Networks (GANs)
* Why Use GANs? 
    * If your task involves image generation or style transfer, GANs can be incredibly powerful. They work by training two neural networks (generator and discriminator) in a game-theoretic setting, leading to realistic image outputs.
* Models to Consider:
    * Pix2Pix: For paired image-to-image translation tasks, where input-output pairs are available.
    * CycleGAN: For unpaired image-to-image translation, allowing transformation between two domains without paired examples.
4. Vision Transformers (ViTs)
* Why Use ViTs? 
    * Vision Transformers have recently gained popularity due to their ability to capture long-range dependencies and are state-of-the-art in many image classification tasks. They split images into patches and process them similarly to how transformers handle sequences in NLP.
* Models to Consider:
    * ViT-Base: The base version of the Vision Transformer, good for classification tasks.
    * DeiT (Data-efficient Image Transformers): Designed to be more efficient and effective when trained with less data compared to traditional ViTs.
5. Model Evaluation Metrics
* Accuracy: Basic metric for classification tasks, though not always the best if class imbalances exist.
* Precision, Recall, and F1-score: Useful for tasks where the balance between false positives and false negatives matters.
* Mean Absolute Error (MAE) and Mean Squared Error (MSE): Common for regression-based evaluations.
* Inception Score (IS) and Fréchet Inception Distance (FID): Specific to evaluating the quality of generated images from GANs.

**CycleGAN**

A CycleGAN involves implementing two main components: a generator and a discriminator for each domain (e.g., Monet paintings and photos). The key difference with CycleGAN is its ability to translate images between two domains without paired data using two sets of generators and discriminators

In [None]:
#Build the downsampler (encoder)¶
#Structure: Convolution -> Instance normalization -> Leaky ReLU
def downsample(filters, size, apply_norm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = models.Sequential()
    result.add(layers.Conv2D(filters, size, strides=2, padding='same',
                             kernel_initializer=initializer, use_bias=False))
    if apply_norm:
        result.add(layers.GroupNormalization(groups=-1))
    result.add(layers.LeakyReLU())
    return result

In [None]:
#Build the upsampler(decoder)
#Structure: Transposed convolution -> Instance normalization -> Dropout (applied to the first 3 blocks) -> ReLU
def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = models.Sequential()
    result.add(layers.Conv2DTranspose(filters, size, strides=2, padding='same',
                                      kernel_initializer=initializer, use_bias=False))
    result.add(layers.GroupNormalization(groups=-1))
    if apply_dropout:
        result.add(layers.Dropout(0.5))
    result.add(layers.ReLU())
    return result

In [None]:
#Build the generator with the encoder and decoder
def make_generator():
    inputs = layers.Input(shape=[IMAGE_SIZE, IMAGE_SIZE, CHANNELS])

    down_stack = [
        downsample(64, 4, apply_norm=False),
        downsample(128, 4),
        downsample(256, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
    ]
    
    up_stack = [
        upsample(512, 4, apply_dropout=True),
        upsample(512, 4, apply_dropout=True),
        upsample(512, 4, apply_dropout=True),
        upsample(512, 4),
        upsample(256, 4),
        upsample(128, 4),
        upsample(64, 4),
    ]
    
    initializer = tf.random_normal_initializer(0., 0.02)
    last = layers.Conv2DTranspose(3, 4, strides=2, padding='same',
                                  kernel_initializer=initializer, activation='tanh')
    
    x = inputs
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)
    
    skips = reversed(skips[:-1])
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = layers.Concatenate()([x, skip])
    
    x = last(x)
    return models.Model(inputs=inputs, outputs=x)

In [None]:
#Build the discriminator based on a CNN-based image classifier:
def make_discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)
    inputs = layers.Input(shape=[IMAGE_SIZE, IMAGE_SIZE, 3], name='input_image')
    
    x = inputs
    x = downsample(64, 4, False)(x)
    x = downsample(128, 4)(x)
    x = downsample(256, 4)(x)
    x = layers.ZeroPadding2D()(x)
    x = layers.Conv2D(512, 4, strides=1, kernel_initializer=initializer, use_bias=False)(x)
    x = layers.GroupNormalization(groups=-1)(x)
    x = layers.LeakyReLU()(x)
    x = layers.ZeroPadding2D()(x)
    x = layers.Conv2D(1, 4, strides=1, kernel_initializer=initializer)(x)
    return models.Model(inputs=inputs, outputs=x)


In [None]:
# Create the requried generators and discriminators
monet_generator = make_generator()
photo_generator = make_generator()
monet_discriminator = make_discriminator()
photo_discriminator = make_discriminator()

In [None]:
# Build the loss function functions 
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss * 0.5

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

LAMBDA = 10

def compute_cycle_loss(real_img, cycled_img):
    loss = tf.reduce_mean(tf.abs(real_img - cycled_img))
    return LAMBDA * loss

def identity_loss(real_img, same_img):
    loss = tf.reduce_mean(tf.abs(real_img - same_img))
    return LAMBDA * 0.5 * loss

In [None]:
# Initialize the optimizers for all the generators and the discriminator
monet_generator_optimizer = Adam(2e-4, beta_1=0.5)
photo_generator_optimizer = Adam(2e-4, beta_1=0.5)
monet_discriminator_optimizer = Adam(2e-4, beta_1=0.5)
photo_discriminator_optimizer = Adam(2e-4, beta_1=0.5)

In [None]:
#Creating a check
CHECKPOINT_DIR = '../working/checkpoints/train'

ckpt = tf.train.Checkpoint(monet_generator=monet_generator,
                           photo_generator=photo_generator,
                           monet_discriminator=monet_discriminator,
                           photo_discriminator=photo_discriminator,
                           monet_generator_optimizer=monet_generator_optimizer,
                           photo_generator_optimizer=photo_generator_optimizer,
                           monet_discriminator_optimizer=monet_discriminator_optimizer,
                           photo_discriminator_optimizer=photo_discriminator_optimizer)

ckpt_manager = tf.train.CheckpointManager(ckpt, CHECKPOINT_DIR, max_to_keep=5)

# if a checkpoint exists, restore the latest checkpoint.
if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)
    print ('Latest checkpoint restored!!')

In [None]:
#Create the training step for the definition of the loop later
@tf.function
def train_step(real_monet, real_photo):
    with tf.GradientTape(persistent=True) as tape:
        # real monet -> fake photo
        fake_photo = monet_generator(real_monet, training=True)
        # fake photo -> monet
        cycled_monet = photo_generator(fake_photo, training=True)
        # real photo -> fake monet
        fake_monet = photo_generator(real_photo, training=True)
        # fake monet -> photo
        cycled_photo = monet_generator(fake_monet, training=True)
        # use for identity loss
        same_monet = monet_generator(real_monet, training=True)
        same_photo = photo_generator(real_photo, training=True)
        
        # The discriminators evaluate both real and fake images in each domain
        disc_real_monet = monet_discriminator(real_monet, training=True)
        disc_real_photo = photo_discriminator(real_photo, training=True)
        disc_fake_monet = monet_discriminator(fake_monet, training=True)
        disc_fake_photo = photo_discriminator(fake_photo, training=True)
        
        # Generator losses: How well the generators fool the discriminators
        monet_gen_loss = generator_loss(disc_fake_photo)
        photo_gen_loss = generator_loss(disc_fake_monet)
        # Cycle consistency loss: Ensures that cycled images are similar to originals
        total_cycle_loss = compute_cycle_loss(real_monet, cycled_monet) + compute_cycle_loss(real_photo, cycled_photo)
        # Identity loss: Encourages generators to preserve input when it's already in the target domain
        total_monet_gen_loss = monet_gen_loss + total_cycle_loss + identity_loss(real_monet, same_monet)
        total_photo_gen_loss = photo_gen_loss + total_cycle_loss + identity_loss(real_photo, same_photo)
        # Discriminator losses: How well the discriminators distinguish real from fake images
        monet_disc_loss = discriminator_loss(disc_real_monet, disc_fake_monet)
        photo_disc_loss = discriminator_loss(disc_real_photo, disc_fake_photo)
    
    # Computes the gradients for each model with respect to its loss.
    monet_generator_gradients = tape.gradient(total_monet_gen_loss, monet_generator.trainable_variables)
    photo_generator_gradients = tape.gradient(total_photo_gen_loss, photo_generator.trainable_variables)
    monet_discriminator_gradients = tape.gradient(monet_disc_loss, monet_discriminator.trainable_variables)
    photo_discriminator_gradients = tape.gradient(photo_disc_loss, photo_discriminator.trainable_variables)
    
    # Updates the model parameters using the calculated gradients.
    monet_generator_optimizer.apply_gradients(zip(monet_generator_gradients, monet_generator.trainable_variables))
    photo_generator_optimizer.apply_gradients(zip(photo_generator_gradients, photo_generator.trainable_variables))
    monet_discriminator_optimizer.apply_gradients(zip(monet_discriminator_gradients, monet_discriminator.trainable_variables))
    photo_discriminator_optimizer.apply_gradients(zip(photo_discriminator_gradients, photo_discriminator.trainable_variables))

In [None]:
from IPython.display import clear_output

def train(monet_dataset, photo_dataset, epochs):
    n = 0
    for epoch in range(epochs):
        start = time.time()
        for monet_image, photo_image in tf.data.Dataset.zip((monet_dataset, photo_dataset)):
            train_step(monet_image, photo_image)
            if n % 10 == 0:
                print ('.', end='')
            n += 1
        clear_output(wait=True)
        if (epoch + 1) % 5 == 0:
            ckpt_save_path = ckpt_manager.save()
            print ('Saving checkpoint for epoch {} at {}'.format(epoch+1,ckpt_save_path))
        
        print(f'Time taken for epoch {epoch + 1} is {time.time() - start} sec')

In [None]:
def generate_images(model, test_input):
    prediction = model(test_input)
    plt.figure(figsize=(12, 12))
    display_list = [test_input[0], prediction[0]]
    title = ['Input Image', 'Monet-style Image']
    for i in range(2):
        plt.subplot(1, 2, i+1)
        plt.title(title[i])
        plt.imshow(display_list[i] * 0.5 + 0.5)
        plt.axis('off')
    plt.show()

# Train the model
train(train_ds, test_ds, epochs=EPOCHS)

#Show some sample output
for photo in test_ds.take(5):
    generate_images(monet_generator, photo)

In [None]:
! mkdir ../images

In [None]:
import shutil

i = 1
for photo in test_ds:
    prediction = monet_generator(photo, training=False)[0].numpy()
    prediction = (prediction * 127.5 + 127.5).astype(np.uint8)
    im = PIL.Image.fromarray(prediction)
    im.save("../images/" + str(i) + ".jpg")
    i += 1
    
shutil.make_archive("/kaggle/working/images", 'zip', "/kaggle/images")