In [9]:
# !pip uninstall tensorflow -y
# !pip install tensorflow==2.15.0
# !pip install tensorflow-addons

In [10]:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
import cv2
import os
from tensorflow.keras import layers, Model
from PIL import Image
import zipfile

# TPU setup (if available)
try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Device:', tpu.master())
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.TPUStrategy(tpu)
except:
    strategy = tf.distribute.get_strategy()
print('Number of replicas:', strategy.num_replicas_in_sync)

AUTOTUNE = tf.data.experimental.AUTOTUNE
print(tf.__version__)

# Define directories
monet_jpg_directory = '/kaggle/input/gan-getting-started/monet_jpg'
photo_jpg_directory = '/kaggle/input/gan-getting-started/photo_jpg'



Number of replicas: 1
2.15.0


In [11]:
# Function to get image paths
def getImagePaths(path):
    image_names = []
    for dirname, _, filenames in os.walk(path):
        for filename in filenames:
            fullpath = os.path.join(dirname, filename)
            image_names.append(fullpath)
    return image_names

monet_images_path = getImagePaths(monet_jpg_directory)
photo_images_path = getImagePaths(photo_jpg_directory)

print(f"Number of Monet images: {len(monet_images_path)}")
print(f"Number of Photo images: {len(photo_images_path)}")



Number of Monet images: 300
Number of Photo images: 7038


In [12]:
# Image preprocessing function
def preprocess_image(image_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.cast(image, tf.float32)
    image = (image / 127.5) - 1  # Normalize to [-1, 1]
    return image

# Create TensorFlow datasets
monet_ds = tf.data.Dataset.from_tensor_slices(monet_images_path).map(preprocess_image, num_parallel_calls=AUTOTUNE)
photo_ds = tf.data.Dataset.from_tensor_slices(photo_images_path).map(preprocess_image, num_parallel_calls=AUTOTUNE)

# Batch and shuffle the datasets
BATCH_SIZE = 1  # Reverted to 1 for CycleGAN standard
monet_ds = monet_ds.cache().shuffle(1000).batch(BATCH_SIZE).prefetch(AUTOTUNE)
photo_ds = photo_ds.cache().shuffle(1000).batch(BATCH_SIZE).prefetch(AUTOTUNE)

# Downsampling block
def downsample(filters, size, apply_instancenorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = tf.keras.Sequential()
    result.add(layers.Conv2D(filters, size, strides=2, padding='same',
                             kernel_initializer=initializer, use_bias=False))
    if apply_instancenorm:
        result.add(tfa.layers.InstanceNormalization())
    result.add(layers.LeakyReLU())
    return result

# Upsampling block
def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = tf.keras.Sequential()
    result.add(layers.Conv2DTranspose(filters, size, strides=2, padding='same',
                                      kernel_initializer=initializer, use_bias=False))
    result.add(tfa.layers.InstanceNormalization())
    if apply_dropout:
        result.add(layers.Dropout(0.5))
    result.add(layers.ReLU())
    return result

In [13]:
# Generator model with additional residual blocks
def Generator():
    inputs = layers.Input(shape=[256, 256, 3])
    
    down_stack = [
        downsample(64, 4, apply_instancenorm=False),  # (bs, 128, 128, 64)
        downsample(128, 4),  # (bs, 64, 64, 128)
        downsample(256, 4),  # (bs, 32, 32, 256)
        downsample(512, 4),  # (bs, 16, 16, 512)
        downsample(512, 4),  # (bs, 8, 8, 512)
        downsample(512, 4),  # (bs, 4, 4, 512)
        downsample(512, 4),  # (bs, 2, 2, 512)
        downsample(512, 4),  # (bs, 1, 1, 512)
    ]
    
    # Residual blocks for better feature learning
    res_blocks = [layers.Conv2D(512, 3, strides=1, padding='same', activation='relu', kernel_initializer=tf.random_normal_initializer(0., 0.02)) for _ in range(2)]
    
    up_stack = [
        upsample(512, 4, apply_dropout=True),  # (bs, 2, 2, 512)
        upsample(512, 4, apply_dropout=True),  # (bs, 4, 4, 512)
        upsample(512, 4, apply_dropout=True),  # (bs, 8, 8, 512)
        upsample(512, 4),  # (bs, 16, 16, 512)
        upsample(256, 4),  # (bs, 32, 32, 256)
        upsample(128, 4),  # (bs, 64, 64, 128)
        upsample(64, 4),  # (bs, 128, 128, 64)
    ]
    
    initializer = tf.random_normal_initializer(0., 0.02)
    last = layers.Conv2DTranspose(3, 4, strides=2, padding='same',
                                  kernel_initializer=initializer, activation='tanh')  # (bs, 256, 256, 3)
    
    x = inputs
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)
    
    # Apply residual blocks
    for res in res_blocks:
        x = res(x)
    
    skips = reversed(skips[:-1])
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = layers.Concatenate()([x, skip])
    
    x = last(x)
    return Model(inputs=inputs, outputs=x)

In [14]:
# Discriminator model
def Discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)
    inp = layers.Input(shape=[256, 256, 3], name='input_image')
    x = inp
    
    down1 = downsample(64, 4, False)(x)  # (bs, 128, 128, 64)
    down2 = downsample(128, 4)(down1)  # (bs, 64, 64, 128)
    down3 = downsample(256, 4)(down2)  # (bs, 32, 32, 256)
    
    zero_pad1 = layers.ZeroPadding2D()(down3)  # (bs, 34, 34, 256)
    conv = layers.Conv2D(512, 4, strides=1, kernel_initializer=initializer,
                         use_bias=False)(zero_pad1)  # (bs, 31, 31, 512)
    norm1 = tfa.layers.InstanceNormalization()(conv)
    leaky_relu = layers.LeakyReLU()(norm1)
    zero_pad2 = layers.ZeroPadding2D()(leaky_relu)  # (bs, 33, 33, 512)
    last = layers.Conv2D(1, 4, strides=1, kernel_initializer=initializer)(zero_pad2)  # (bs, 30, 30, 1)
    
    return Model(inputs=inp, outputs=last)

In [15]:
# Instantiate models
with strategy.scope():
    monet_generator = Generator()
    photo_generator = Generator()
    monet_discriminator = Discriminator()
    photo_discriminator = Discriminator()

# Loss functions
with strategy.scope():
    loss_obj = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)

def discriminator_loss(real, generated):
    real_loss = tf.reduce_mean(loss_obj(tf.ones_like(real), real))
    generated_loss = tf.reduce_mean(loss_obj(tf.zeros_like(generated), generated))
    total_disc_loss = real_loss + generated_loss
    return total_disc_loss * 0.5

def generator_loss(generated):
    return tf.reduce_mean(loss_obj(tf.ones_like(generated), generated))

def calc_cycle_loss(real_image, cycled_image, LAMBDA=10):  # Standard LAMBDA
    l1_loss = tf.reduce_mean(tf.abs(real_image - cycled_image))
    l2_loss = tf.reduce_mean(tf.square(real_image - cycled_image))
    return LAMBDA * (0.9 * l1_loss + 0.1 * l2_loss)  # Mix L1 and L2

def identity_loss(real_image, same_image, LAMBDA=5):  # Reduced LAMBDA to avoid overfitting
    loss = tf.reduce_mean(tf.abs(real_image - same_image))
    return LAMBDA * 0.5 * loss

# Learning rate scheduler
initial_learning_rate = 2e-4
lr_schedule = tf.keras.optimizers.schedules.PiecewiseConstantDecay(
    boundaries=[50 * (len(monet_images_path) // BATCH_SIZE), 75 * (len(monet_images_path) // BATCH_SIZE)],
    values=[initial_learning_rate, initial_learning_rate * 0.1, initial_learning_rate * 0.01]
)


In [18]:
# Optimizers with gradient clipping
with strategy.scope():
    monet_generator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5, clipnorm=1.0)
    photo_generator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5, clipnorm=1.0)
    monet_discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5, clipnorm=1.0)
    photo_discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5, clipnorm=1.0)

@tf.function
def train_step(real_monet, real_photo):
    with tf.GradientTape(persistent=True) as tape:
        fake_monet = monet_generator(real_photo, training=True)
        cycled_photo = photo_generator(fake_monet, training=True)
        fake_photo = photo_generator(real_monet, training=True)
        cycled_monet = monet_generator(fake_photo, training=True)
        same_monet = monet_generator(real_monet, training=True)
        same_photo = photo_generator(real_photo, training=True)
        disc_real_monet = monet_discriminator(real_monet, training=True)
        disc_real_photo = photo_discriminator(real_photo, training=True)
        disc_fake_monet = monet_discriminator(fake_monet, training=True)
        disc_fake_photo = photo_discriminator(fake_photo, training=True)
        monet_gen_loss = generator_loss(disc_fake_monet)
        photo_gen_loss = generator_loss(disc_fake_photo)
        total_cycle_loss = calc_cycle_loss(real_monet, cycled_monet) + calc_cycle_loss(real_photo, cycled_photo)
        total_monet_gen_loss = monet_gen_loss + total_cycle_loss + identity_loss(real_monet, same_monet)
        total_photo_gen_loss = photo_gen_loss + total_cycle_loss + identity_loss(real_photo, same_photo)
        monet_disc_loss = discriminator_loss(disc_real_monet, disc_fake_monet)
        photo_disc_loss = discriminator_loss(disc_real_photo, disc_fake_photo)
    
    monet_generator_gradients = tape.gradient(total_monet_gen_loss, monet_generator.trainable_variables)
    photo_generator_gradients = tape.gradient(total_photo_gen_loss, photo_generator.trainable_variables)
    monet_discriminator_gradients = tape.gradient(monet_disc_loss, monet_discriminator.trainable_variables)
    photo_discriminator_gradients = tape.gradient(photo_disc_loss, photo_discriminator.trainable_variables)
    
    monet_generator_optimizer.apply_gradients(zip(monet_generator_gradients, monet_generator.trainable_variables))
    photo_generator_optimizer.apply_gradients(zip(photo_generator_gradients, photo_generator.trainable_variables))
    monet_discriminator_optimizer.apply_gradients(zip(monet_discriminator_gradients, monet_discriminator.trainable_variables))
    photo_discriminator_optimizer.apply_gradients(zip(photo_discriminator_gradients, photo_discriminator.trainable_variables))
    
    return total_monet_gen_loss, total_photo_gen_loss, monet_disc_loss, photo_disc_loss

In [None]:
# Training loop
EPOCHS = 150
for epoch in range(EPOCHS):
    print(f"Epoch {epoch + 1}/{EPOCHS}")
    total_monet_gen_loss = 0
    total_photo_gen_loss = 0
    total_monet_disc_loss = 0
    total_photo_disc_loss = 0
    n_batches = 0
    
    for real_monet, real_photo in tf.data.Dataset.zip((monet_ds, photo_ds)):
        monet_gen_loss, photo_gen_loss, monet_disc_loss, photo_disc_loss = train_step(real_monet, real_photo)
        total_monet_gen_loss += monet_gen_loss.numpy()
        total_photo_gen_loss += photo_gen_loss.numpy()
        total_monet_disc_loss += monet_disc_loss.numpy()
        total_photo_disc_loss += photo_disc_loss.numpy()
        n_batches += 1
    
    avg_monet_gen_loss = total_monet_gen_loss / n_batches
    avg_photo_gen_loss = total_photo_gen_loss / n_batches
    avg_monet_disc_loss = total_monet_disc_loss / n_batches
    avg_photo_disc_loss = total_photo_disc_loss / n_batches
    
    print(f"Avg Monet Gen Loss: {avg_monet_gen_loss:.4f}, Avg Photo Gen Loss: {avg_photo_gen_loss:.4f}, "
          f"Avg Monet Disc Loss: {avg_monet_disc_loss:.4f}, Avg Photo Disc Loss: {avg_photo_disc_loss:.4f}")

Epoch 1/150
Avg Monet Gen Loss: 4.7156, Avg Photo Gen Loss: 4.6965, Avg Monet Disc Loss: 0.7100, Avg Photo Disc Loss: 0.7131
Epoch 2/150
Avg Monet Gen Loss: 3.1608, Avg Photo Gen Loss: 3.1073, Avg Monet Disc Loss: 0.6705, Avg Photo Disc Loss: 0.6789
Epoch 3/150
Avg Monet Gen Loss: 2.9297, Avg Photo Gen Loss: 2.9168, Avg Monet Disc Loss: 0.6699, Avg Photo Disc Loss: 0.6769
Epoch 4/150
Avg Monet Gen Loss: 2.8040, Avg Photo Gen Loss: 2.7974, Avg Monet Disc Loss: 0.6596, Avg Photo Disc Loss: 0.6668
Epoch 5/150
Avg Monet Gen Loss: 2.7098, Avg Photo Gen Loss: 2.6947, Avg Monet Disc Loss: 0.6636, Avg Photo Disc Loss: 0.6713
Epoch 6/150
Avg Monet Gen Loss: 2.5534, Avg Photo Gen Loss: 2.5621, Avg Monet Disc Loss: 0.6633, Avg Photo Disc Loss: 0.6640
Epoch 7/150
Avg Monet Gen Loss: 2.4769, Avg Photo Gen Loss: 2.4542, Avg Monet Disc Loss: 0.6644, Avg Photo Disc Loss: 0.6633
Epoch 8/150
Avg Monet Gen Loss: 2.4410, Avg Photo Gen Loss: 2.4175, Avg Monet Disc Loss: 0.6554, Avg Photo Disc Loss: 0.6644


In [None]:
# Display generated images
def display_generated_images(photo_ds, num_images=5):
    plt.figure(figsize=(15, 5))
    for i, photo in enumerate(photo_ds.take(num_images)):
        fake_monet = monet_generator(photo, training=False)
        fake_monet = (fake_monet[0] * 0.5 + 0.5)
        photo = (photo[0] * 0.5 + 0.5)
        
        plt.subplot(2, num_images, i + 1)
        plt.imshow(photo)
        plt.title("Original Photo")
        plt.axis('off')
        
        plt.subplot(2, num_images, i + 1 + num_images)
        plt.imshow(fake_monet)
        plt.title("Monet Style")
        plt.axis('off')
    plt.show()

display_generated_images(photo_ds)


In [None]:
# Generate and save images
output_dir = '/kaggle/working/images'
os.makedirs(output_dir, exist_ok=True)

def generate_and_save_images(photo_ds, monet_generator, num_images=7000):
    count = 0
    for photo in photo_ds:
        if count >= num_images:
            break
        fake_monet = monet_generator(photo, training=False)
        fake_monet = (fake_monet[0] * 0.5 + 0.5)
        fake_monet = tf.clip_by_value(fake_monet, 0, 1)
        fake_monet = (fake_monet * 255).numpy().astype(np.uint8)
        img = Image.fromarray(fake_monet)
        img.save(os.path.join(output_dir, f'monet_{count:04d}.jpg'), quality=95)  # Higher quality JPEG
        count += 1
    print(f"Generated {count} Monet-style images.")

generate_and_save_images(photo_ds, monet_generator, num_images=7000)

zip_filename = '/kaggle/working/images.zip'
with zipfile.ZipFile(zip_filename, 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
    for root, _, files in os.walk(output_dir):
        for file in files:
            zipf.write(os.path.join(root, file), arcname=file)

print(f"Submission file created: {zip_filename}")
print(f"Total images zipped: {len(os.listdir(output_dir))}")