In [4]:
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow import keras
from tensorflow.keras import layers
from io import BytesIO
from PIL import Image
from zipfile import ZipFile
import numpy as np


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 

 The versions of TensorFlow you are currently using is 2.10.1 and is not supported. 
Some things might work, some things might not.
If you were to encounter a bug, do not file an issue.
If you want to make sure you're using a tested and supported configuration, either change the TensorFlow version or the TensorFlow Addons's version. 
You can find the compatibility matrix in TensorFlow Addon's readme:
https://github.com/tensorflow/addons


# Problem Description

The idea is to train a Generative Adversarial Network (GAN) that can turn regular photos into images that look like paintings. Along the way, we explain in simple terms how generative models create new data and how GANs use a generator and a discriminator to learn from each other. By the end, we aim to have a working model and a good sense of how these techniques actually come together.

### Checking for the GPUs

In [5]:
# Check for GPUs
gpus = tf.config.experimental.list_physical_devices('GPU')
print(f"Found {len(gpus)} GPU(s)")

if gpus:
    # Enable memory growth to avoid allocating all GPU memory at once
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("GPU memory growth enabled")
    except RuntimeError as e:
        print(e)
    
    # Use MirroredStrategy for single or multiple GPUs
    # It works efficiently with 1 GPU and scales to multiple GPUs
    strategy = tf.distribute.MirroredStrategy()
    print(f"Using MirroredStrategy with {strategy.num_replicas_in_sync} replica(s)")
else:
    # Fallback to CPU
    strategy = tf.distribute.get_strategy()
    print("No GPU found, using CPU")

Found 1 GPU(s)
GPU memory growth enabled
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Using MirroredStrategy with 1 replica(s)


# Downloading the dataset

In [6]:
gcs_path = r"C:\Users\david\OneDrive\Documentos\Data Science\Projects\I'm something of a painter myself - GANs\gan-getting-started"

In [7]:
AUTOTUNE = tf.data.experimental.AUTOTUNE
IMAGE_SIZE = [256, 256]

monet_filenames = tf.io.gfile.glob(gcs_path + '/monet_tfrec/*.tfrec')
photo_filenames = tf.io.gfile.glob(gcs_path + '/photo_tfrec/*.tfrec')

# EDA

In [8]:
# Specify the feature description for TFRecord parsing: we have only the image feature, stored as raw bytes
features = {'image': tf.io.FixedLenFeature([], tf.string)}

def read_tfrecord(example):
    # Parse the input tf.train.Example proto using the feature description
    image_data = tf.io.parse_single_example(example, features)
    # Extract the raw image bytes
    image = image_data['image']
    # Decode the JPEG-encoded image into a tensor
    image = tf.image.decode_jpeg(image, channels=3)
    # Scale pixel values from [0, 255] to [-1, 1]
    image = (tf.cast(image, tf.float32) / 127.5) - 1
    # Reshape the image tensor to the required dimensions
    image = tf.reshape(image, [*IMAGE_SIZE, 3])
    return image

# Create the Monet dataset by reading TFRecord files, parsing them, and batching.
monet_dataset = tf.data.TFRecordDataset(monet_filenames)\
    .map(read_tfrecord, num_parallel_calls=AUTOTUNE)\
    .batch(1)

# Create the Photo dataset in the same fashion.
photo_dataset = tf.data.TFRecordDataset(photo_filenames)\
    .map(read_tfrecord, num_parallel_calls=AUTOTUNE)\
    .batch(1)

### Data Description

In [9]:
# Calculate the number of elements in the Monet dataset by converting it to a list and getting its length
monet_dataset_len = len(list(iter(monet_dataset)))
# Calculate the number of elements in the Photo dataset using as_numpy_iterator for iteration
photo_dataset_len = len(list(photo_dataset.as_numpy_iterator()))
# Print the lengths of both datasets
print(monet_dataset_len, photo_dataset_len)

300 7038


In [12]:
example = next(iter(monet_dataset))
example

<tf.Tensor: shape=(1, 256, 256, 3), dtype=float32, numpy=
array([[[[-0.69411767, -0.5529412 , -1.        ],
         [-0.5058824 , -0.372549  , -0.8039216 ],
         [-0.1372549 , -0.00392157, -0.35686272],
         ...,
         [-0.05882353,  0.09019613, -0.06666666],
         [-0.14509803,  0.0196079 , -0.12941176],
         [-0.08235294,  0.09803927, -0.05882353]],

        [[-0.27843136, -0.3960784 , -0.85882354],
         [-0.17647058, -0.24705881, -0.6784314 ],
         [ 0.03529418,  0.00392163, -0.36470586],
         ...,
         [-0.01176471,  0.13725495,  0.09019613],
         [-0.08235294,  0.082353  ,  0.02745104],
         [-0.02745098,  0.15294123,  0.09019613]],

        [[-0.11372548, -0.23137254, -0.7411765 ],
         [-0.00392157, -0.09803921, -0.5686275 ],
         [ 0.11372554,  0.05882359, -0.36470586],
         ...,
         [ 0.00392163,  0.18431377,  0.2313726 ],
         [-0.05882353,  0.12941182,  0.16078436],
         [-0.01960784,  0.1686275 ,  0.2000000

# Model Definition

In [13]:
# Define the number of output channels for the generator (e.g., RGB)
OUTPUT_CHANNELS = 3

class CycleGan(keras.Model):
    def __init__(self):
        super(CycleGan, self).__init__()
        # Define the Monet and Photo generators
        self.monet_generator = self.create_generator()
        self.photo_generator = self.create_generator()
        # Define the Monet and Photo discriminators
        self.monet_discriminator = self.create_discriminator()
        self.photo_discriminator = self.create_discriminator()
        # Lambda for cycle consistency loss
        self.lambda_cycle = 10

    def compile(self):
        # Compile method to instantiate separate optimizers for each model component
        super(CycleGan, self).compile()
        self.monet_generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
        self.monet_discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
        self.photo_generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
        self.photo_discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

    def create_downsampler(self, filters, size, apply_instance_norm=True):
        # Downsampling block for generators and discriminators
        model = keras.Sequential()
        model.add(
            layers.Conv2D(
                filters,
                size,
                strides=2,
                padding="same",
                use_bias=False,
                kernel_initializer=tf.random_normal_initializer(0.0, 0.02),
            )
        )
        if apply_instance_norm:
            model.add(
                tfa.layers.InstanceNormalization(
                    gamma_initializer=keras.initializers.RandomNormal(
                        mean=0.0, stddev=0.02
                    )
                )
            )
        model.add(layers.LeakyReLU())
        return model

    def create_upsampler(self, filters, size, apply_dropout=False):
        # Upsampling block for generators
        model = keras.Sequential()
        model.add(
            layers.Conv2DTranspose(
                filters,
                size,
                strides=2,
                padding="same",
                use_bias=False,
                kernel_initializer=tf.random_normal_initializer(0.0, 0.02),
            )
        )
        model.add(
            tfa.layers.InstanceNormalization(
                gamma_initializer=tf.random_normal_initializer(0.0, 0.02)
            )
        )
        if apply_dropout:
            model.add(layers.Dropout(0.5))
        model.add(layers.ReLU())
        return model

    def create_generator(self):
        # Construct the U-Net generator architecture
        # Downsampling layers
        downsampler_stack = [
            self.create_downsampler(64, 4, apply_instance_norm=False),
            self.create_downsampler(128, 4),
            self.create_downsampler(256, 4),
        ] + [self.create_downsampler(512, 4) for i in range(5)]
        # Upsampling layers
        upsampler_stack = [
            self.create_upsampler(512, 4, apply_dropout=True) for i in range(3)
        ] + [
            self.create_upsampler(512, 4),
            self.create_upsampler(256, 4),
            self.create_upsampler(128, 4),
            self.create_upsampler(64, 4),
        ]
        # Input layer
        input_layer = layers.Input(shape=[256, 256, 3])
        x = input_layer
        skips = []
        for downsampler in downsampler_stack:
            # Downsample and store skip connections
            x = downsampler(x)
            skips.append(x)
        # Reverse skip connections, excluding the last
        skips = reversed(skips[:-1])
        for upsampler, skip_layer in zip(upsampler_stack, skips):
            # Upsample and concatenate with skip connection
            x = upsampler(x)
            x = layers.Concatenate()([x, skip_layer])
        # Final transposed conv layer to get output image
        last_layer = layers.Conv2DTranspose(
            OUTPUT_CHANNELS,
            4,
            strides=2,
            padding="same",
            kernel_initializer=tf.random_normal_initializer(0.0, 0.02),
            activation="tanh",
        )
        x = last_layer(x)
        return keras.Model(inputs=input_layer, outputs=x)

    def create_discriminator(self):
        # Construct the PatchGAN discriminator architecture
        input_layer = layers.Input(shape=[256, 256, 3], name="input_image")
        x = input_layer
        # Downsampling blocks (no normalization in first block)
        downsampler1 = self.create_downsampler(64, 4, False)(x)
        downsampler2 = self.create_downsampler(128, 4)(downsampler1)
        downsampler3 = self.create_downsampler(256, 4)(downsampler2)
        # Padding and main conv layer
        zero_pad1 = layers.ZeroPadding2D()(downsampler3)
        conv_layer = layers.Conv2D(
            512,
            4,
            strides=1,
            use_bias=False,
            kernel_initializer=tf.random_normal_initializer(0.0, 0.02),
        )(zero_pad1)
        normalization_layer1 = tfa.layers.InstanceNormalization(
            gamma_initializer=tf.random_normal_initializer(0.0, 0.02)
        )(conv_layer)
        leaky_relu_layer = layers.LeakyReLU()(normalization_layer1)
        zero_pad2 = layers.ZeroPadding2D()(leaky_relu_layer)
        # Output layer for patchwise discrimination
        last_layer = layers.Conv2D(
            1, 4, strides=1, kernel_initializer=tf.random_normal_initializer(0.0, 0.02)
        )(zero_pad2)
        return tf.keras.Model(inputs=input_layer, outputs=last_layer)

    def discriminator_loss_fn(self, real, fake):
        # PatchGAN discriminator loss: compares real and fake with binary cross-entropy
        real_loss = tf.keras.losses.BinaryCrossentropy(
            from_logits=True, reduction=tf.keras.losses.Reduction.NONE
        )(tf.ones_like(real), real)
        fake_loss = tf.keras.losses.BinaryCrossentropy(
            from_logits=True, reduction=tf.keras.losses.Reduction.NONE
        )(tf.zeros_like(fake), fake)
        return (real_loss + fake_loss) / 2

    def generator_loss_fn(self, generated_image):
        # Generator loss: how well generator fools discriminator (wants discriminator to output ones)
        return tf.keras.losses.BinaryCrossentropy(
            from_logits=True, reduction=tf.keras.losses.Reduction.NONE
        )(tf.ones_like(generated_image), generated_image)

    def cycle_loss_fn(self, image, cycled_image, lambda_cycle):
        # Cycle consistency loss: difference between original and cycled image
        return tf.reduce_mean(tf.abs(image - cycled_image)) * lambda_cycle

    def identity_loss_fn(self, real_photo, photo, lambda_cycle):
        # Identity loss: difference between real image and image after identity mapping through generator
        return tf.reduce_mean(tf.abs(real_photo - photo)) * lambda_cycle / 2

    def train_step(self, batch_data):
        # Custom training step for CycleGAN
        real_monet, real_photo = batch_data

        with tf.GradientTape(persistent=True) as tape:
            # Forward pass: generate fakes and cycled images
            fake_monet = self.monet_generator(real_photo, training=True)
            cycled_photo = self.photo_generator(fake_monet, training=True)
            fake_photo = self.photo_generator(real_monet, training=True)
            cycled_monet = self.monet_generator(fake_photo, training=True)

            # Identity mapping for identity loss
            monet1 = self.monet_generator(real_monet, training=True)
            photo1 = self.photo_generator(real_photo, training=True)

            # Discriminator predictions for real and fake images
            monet_real_discriminated = self.monet_discriminator(
                real_monet, training=True
            )
            monet_fake_discriminated = self.monet_discriminator(
                fake_monet, training=True
            )
            photo_real_discriminated = self.photo_discriminator(
                real_photo, training=True
            )
            photo_fake_discriminated = self.photo_discriminator(
                fake_photo, training=True
            )

            # Generator losses
            monet_generator_loss = self.generator_loss_fn(monet_fake_discriminated)
            photo_generator_loss = self.generator_loss_fn(photo_fake_discriminated)
            # Cycle-consistency loss (sum for both directions)
            cycle_loss = self.cycle_loss_fn(
                real_monet, cycled_monet, self.lambda_cycle
            ) + self.cycle_loss_fn(real_photo, cycled_photo, self.lambda_cycle)
            # Total generator losses (GAN + cycle + identity)
            total_monet_generator_loss = (
                monet_generator_loss
                + cycle_loss
                + self.identity_loss_fn(real_monet, monet1, self.lambda_cycle)
            )
            total_photo_generator_loss = (
                photo_generator_loss
                + cycle_loss
                + self.identity_loss_fn(real_photo, photo1, self.lambda_cycle)
            )
            # Discriminator losses
            monet_discriminator_loss = self.discriminator_loss_fn(
                monet_real_discriminated, monet_fake_discriminated
            )
            photo_discriminator_loss = self.discriminator_loss_fn(
                photo_real_discriminated, photo_fake_discriminated
            )
        # Calculate gradients and apply them for all generator/discriminator networks
        monet_generator_gradients = tape.gradient(
            total_monet_generator_loss, self.monet_generator.trainable_variables
        )
        photo_generator_gradients = tape.gradient(
            total_photo_generator_loss, self.photo_generator.trainable_variables
        )
        monet_discriminator_gradients = tape.gradient(
            monet_discriminator_loss, self.monet_discriminator.trainable_variables
        )
        photo_discriminator_gradients = tape.gradient(
            photo_discriminator_loss, self.photo_discriminator.trainable_variables
        )
        self.monet_generator_optimizer.apply_gradients(
            zip(monet_generator_gradients, self.monet_generator.trainable_variables)
        )
        self.photo_generator_optimizer.apply_gradients(
            zip(photo_generator_gradients, self.photo_generator.trainable_variables)
        )
        self.monet_discriminator_optimizer.apply_gradients(
            zip(
                monet_discriminator_gradients,
                self.monet_discriminator.trainable_variables,
            )
        )
        self.photo_discriminator_optimizer.apply_gradients(
            zip(
                photo_discriminator_gradients,
                self.photo_discriminator.trainable_variables,
            )
        )
        # Return training losses for monitoring
        return {
            "monet_generator_loss": total_monet_generator_loss,
            "photo_generator_loss": total_photo_generator_loss,
            "monet_discriminator_loss": monet_discriminator_loss,
            "photo_discriminator_loss": photo_discriminator_loss,
        }

# Model Training

In [None]:
with strategy.scope():
    cycle_gan_model = CycleGan()
    cycle_gan_model.compile()

In [None]:
cycle_gan_model.fit(
    tf.data.Dataset.zip((monet_dataset.repeat(-1), photo_dataset.repeat(-1))),
    steps_per_epoch=300,
    epochs=30
)

# Kaggle Delivery

In [None]:
with ZipFile('images.zip', mode='w') as zip_file:
    i = 1
    for img in photo_dataset:
        generated_image_data = cycle_gan_model.monet_generator(img, training=False)[0].numpy()
        scaled_generated_image_data = (generated_image_data * 127.5 + 127.5).astype(np.uint8)
        with BytesIO() as image_bytes_io:
            Image.fromarray(scaled_generated_image_data).save(image_bytes_io, 'JPEG')
            image_bytes_io.seek(0)
            zip_file.writestr('{}.jpg'.format(i), image_bytes_io.read())
            i += 1

# Conclusion

In this notebook, we successfully trained a CycleGAN model to translate photos into Monet-style paintings. We prepared the datasets, built and compiled the model using TensorFlow and Keras, and carried out the training process. Finally, we generated Monet-like images from the photo dataset and saved the results in a zipped file suitable for submission. This workflow demonstrates the powerful capabilities of generative models for artistic style transfer and reinforces the utility of deep learning in tackling creative tasks.
