In [None]:
import numpy as np
import tensorflow as tf
import tensorflow as keras
import matplotlib.pyplot as plt
import seaborn as sns
import os
from zipfile import ZipFile
from tensorflow.keras import layers

In [None]:
monet_tfrec_pattern = "/kaggle/input/gan-getting-started/monet_tfrec/*.tfrec"
photo_tfrec_pattern = "/kaggle/input/gan-getting-started/photo_tfrec/*.tfrec"

monet_files = tf.data.Dataset.list_files(monet_tfrec_pattern)
photo_files = tf.data.Dataset.list_files(photo_tfrec_pattern)


In [None]:
def parse_tfrecord(tfrecord):
    feature_description = {'image': tf.io.FixedLenFeature([], tf.string)}
    return tf.io.parse_single_example(tfrecord, feature_description)

def preprocess_image(features):
    image = tf.image.decode_jpeg(features['image'], channels=3)
    image = tf.image.resize(image, [256, 256])  # Resize to 256x256
    image = (image / 127.5) - 1  # Normalize the images to [-1, 1]
    return image


In [None]:
def preprocess_image(features):
    image = tf.image.decode_jpeg(features['image'], channels=3)
    image = tf.image.resize(image, [32, 32])  
    image = (image / 127.5) - 1  
    return image


In [None]:
BATCH_SIZE = 32

monet_dataset = monet_files.interleave(
    lambda x: tf.data.TFRecordDataset(x).map(parse_tfrecord),
    num_parallel_calls=tf.data.AUTOTUNE).map(preprocess_image).shuffle(1000).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

photo_dataset = photo_files.interleave(
    lambda x: tf.data.TFRecordDataset(x).map(parse_tfrecord),
    num_parallel_calls=tf.data.AUTOTUNE).map(preprocess_image).shuffle(1000).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)


In [None]:
def show_images(dataset, title):
    plt.figure(figsize=(12, 12))
    for images in dataset.take(1):
        for i in range(9):
            ax = plt.subplot(3, 3, i + 1)
            plt.imshow((images[i] + 1) / 2)  
            plt.title(title)
            plt.axis('off')
    plt.show()

show_images(monet_dataset, "Monet Paintings")

show_images(photo_dataset, "Photos")


In [None]:
def make_generator_model():
    model = tf.keras.Sequential()
    model.add(layers.Dense(16*16*256, use_bias=False, input_shape=(100,)))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Reshape((16, 16, 256)))

    model.add(layers.Conv2DTranspose(128, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Conv2DTranspose(32, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Conv2DTranspose(3, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh'))

    return model


In [None]:

def make_discriminator_model():
    model = tf.keras.Sequential()
    model.add(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=[256, 256, 3]))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))
    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))
    model.add(layers.Flatten())
    model.add(layers.Dense(1))
    return model

generator = make_generator_model()
discriminator = make_discriminator_model()


In [None]:
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def discriminator_loss(real_output, fake_output):
    pass  

def generator_loss(fake_output):
    pass  

generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

In [None]:
noise_dim = 100
EPOCHS = 50  

@tf.function
def train_step(images):
    pass  

def train(dataset, epochs):
    for epoch in range(epochs):
        for image_batch in dataset:
            train_step(image_batch)

train(monet_dataset, EPOCHS)


In [None]:
def generate_and_save_images(model, num_examples_to_generate, output_dir):
    noise = tf.random.normal([num_examples_to_generate, 100])

    generated_images = model(noise, training=False)

    generated_images = (generated_images + 1) / 2

    os.makedirs(output_dir, exist_ok=True)

    for i in range(num_examples_to_generate):
        img = tf.keras.preprocessing.image.array_to_img(generated_images[i])
        img.save(os.path.join(output_dir, f'generated_image_{i:04d}.png'))


In [None]:
output_folder = '/path/to/save/images'  

generate_and_save_images(generator, 100, output_folder)


# **Average Color Distribution in Monet Paintings and in Test Photos**

In [None]:
def plot_color_distribution(dataset, title):
    colors = np.array([0, 0, 0], dtype=np.float32)
    total_images = 0

    for images in dataset.take(10):  
        for img in images:
            img = (img + 1) / 2  
            colors += img.numpy().sum(axis=(0, 1))
            total_images += 1

    colors /= (total_images * 256 * 256)  
    plt.bar(['Red', 'Green', 'Blue'], colors)
    plt.title(f'Average Color Distribution in {title}')
    plt.show()

plot_color_distribution(monet_dataset, "Monet Paintings")
plot_color_distribution(photo_dataset, "Other Photos")

In [None]:
def plot_intensity_histogram(dataset, title):
    plt.figure(figsize=(10, 4))
    for images in dataset.take(1):  
        for i in range(3):  
            channel = images[..., i].numpy().flatten()
            sns.histplot(channel, color=['r', 'g', 'b'][i], alpha=0.5, bins=50)
    plt.title(f'Pixel Intensity Distribution in {title}')
    plt.show()

plot_intensity_histogram(monet_dataset, "Monet Paintings")
plot_intensity_histogram(photo_dataset, "Other Photos")


In [None]:
show_images(monet_dataset, "Sample Monet Paintings")
show_images(photo_dataset, "Sample Photos")

# **CycleGAN**

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa

from kaggle_datasets import KaggleDatasets
import matplotlib.pyplot as plt
import numpy as np

try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Device:', tpu.master())
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
except:
    strategy = tf.distribute.get_strategy()
print('Number of replicas:', strategy.num_replicas_in_sync)

AUTOTUNE = tf.data.experimental.AUTOTUNE
    
print(tf.__version__)

In [None]:
CycleGAN_PATH = KaggleDatasets().get_gcs_path()

In [None]:
MONET_FILE = tf.io.gfile.glob(str(CycleGAN_PATH + '/monet_tfrec/*.tfrec'))
print('Monet TFRecord Files:', len(MONET_FILE))

PHOTO_FILENAMES = tf.io.gfile.glob(str(CycleGAN_PATH + '/photo_tfrec/*.tfrec'))
print('Photo TFRecord Files:', len(PHOTO_FILENAMES))


In [None]:
IMAGE_SIZE = [256, 256]

def decode_image(image):
    image = tf.image.decode_jpeg(image, channels=3)
    image = (tf.cast(image, tf.float32) / 127.5) - 1
    image = tf.reshape(image, [*IMAGE_SIZE, 3])
    return image

def read_tfrecord(example):
    tfrecord_format = {
        "image_name": tf.io.FixedLenFeature([], tf.string),
        "image": tf.io.FixedLenFeature([], tf.string),
        "target": tf.io.FixedLenFeature([], tf.string)
    }
    example = tf.io.parse_single_example(example, tfrecord_format)
    image = decode_image(example['image'])
    return image

In [None]:
def load_dataset(filenames, labeled=True, ordered=False):
    dataset = tf.data.TFRecordDataset(filenames)
    dataset = dataset.map(read_tfrecord, num_parallel_calls=AUTOTUNE)
    return dataset

In [None]:
monet_ds = load_dataset(MONET_FILE, labeled=True).batch(1)
photo_ds = load_dataset(PHOTO_FILENAMES, labeled=True).batch(1)

In [None]:
sample_monet = next(iter(monet_ds))
sample_photo = next(iter(photo_ds))

In [None]:
plt.subplot(121)
plt.title('Photo')
plt.imshow(sample_photo[0] * 0.5 + 0.5)

plt.subplot(122)
plt.title('Monet')
plt.imshow(sample_monet[0] * 0.5 + 0.5)

**Now I will be building the generator**

In [None]:
OUTPUT_CHANNELS = 3

def downsample(filters, size, apply_instancenorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    result = keras.Sequential()
    result.add(layers.Conv2D(filters, size, strides=2, padding='same',
                             kernel_initializer=initializer, use_bias=False))

    if apply_instancenorm:
        result.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_init))

    result.add(layers.LeakyReLU())

    return result

In [None]:
def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    result = keras.Sequential()
    result.add(layers.Conv2DTranspose(filters, size, strides=2,
                                      padding='same',
                                      kernel_initializer=initializer,
                                      use_bias=False))

    result.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_init))

    if apply_dropout:
        result.add(layers.Dropout(0.5))

    result.add(layers.ReLU())

    return result

In [None]:
def generator_model():
    inputs = layers.Input(shape=[256, 256, 3])

    down_stack = [
        downsample(64, 4, apply_instancenorm=False), 
        downsample(128, 4),  
        downsample(256, 4),  
        downsample(512, 4),  
        downsample(512, 4),  
        downsample(512, 4),  
        downsample(512, 4),  
        downsample(512, 4),  
    ]

    up_stack = [
        upsample(512, 4, apply_dropout=True),  
        upsample(512, 4, apply_dropout=True),  
        upsample(512, 4, apply_dropout=True),  
        upsample(512, 4),  
        upsample(256, 4),  
        upsample(128, 4),  
        upsample(64, 4),  
    ]

    initializer = tf.random_normal_initializer(0., 0.02)
    last = layers.Conv2DTranspose(OUTPUT_CHANNELS, 4, strides=2, padding='same',
                                  kernel_initializer=initializer, activation='tanh') 

    x = inputs

    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])

    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = layers.Concatenate()([x, skip])

    x = last(x)

    return keras.Model(inputs=inputs, outputs=x)



In [None]:
def create_discriminator():
    inputs = layers.Input(shape=[256, 256, 3], name='input_image')

    # Downsampling
    down_stack = [
        downsample(64, 4, apply_instancenorm=False),  
        downsample(128, 4), 
        downsample(256, 4),  
    ]

    zero_pad1 = layers.ZeroPadding2D()(down_stack[-1]) 
    conv = layers.Conv2D(512, 4, strides=1,
                         kernel_initializer=initializer,
                         use_bias=False)(zero_pad1)  

    norm1 = tfa.layers.InstanceNormalization(gamma_initializer=gamma_init)(conv)
    leaky_relu = layers.LeakyReLU()(norm1)

    zero_pad2 = layers.ZeroPadding2D()(leaky_relu)  

    last = layers.Conv2D(1, 4, strides=1,
                         kernel_initializer=initializer)(zero_pad2) 

    return tf.keras.Model(inputs=inputs, outputs=last)


In [None]:
with strategy.scope():
    monet_gen = generator_model() 
    photo_gen = generator_model()

    monet_dis = generator_model()
    photo_dis = generator_model()

In [None]:
to_monet = monet_gen(sample_photo)

plt.subplot(1, 2, 1)
plt.title("Original Photo")
plt.imshow(sample_photo[0] * 0.5 + 0.5)

plt.subplot(1, 2, 2)
plt.title("Monet-esque Photo")
plt.imshow(to_monet[0] * 0.5 + 0.5)
plt.show()

In [None]:
class CycleGanModel(keras.Model):
    def __init__(self, monet_gen, photo_gen, monet_disc, photo_disc, lambda_cycle=10):
        super(CycleGanModel, self).__init__()
        self.monet_gen = monet_gen
        self.photo_gen = photo_gen
        self.monet_disc = monet_disc
        self.photo_disc = photo_disc
        self.lambda_cycle = lambda_cycle

    def compile(self, monet_gen_opt, photo_gen_opt, monet_disc_opt, photo_disc_opt, gen_loss, disc_loss, cycle_loss, identity_loss):
        super(CycleGanModel, self).compile()
        self.m_gen_opt = monet_gen_opt
        self.p_gen_opt = photo_gen_opt
        self.m_disc_opt = monet_disc_opt
        self.p_disc_opt = photo_disc_opt
        self.gen_loss = gen_loss
        self.disc_loss = disc_loss
        self.cycle_loss = cycle_loss
        self.identity_loss = identity_loss

    def train_step(self, data):
        real_monet, real_photo = data

        with tf.GradientTape(persistent=True) as tape:
            fake_monet = self.monet_gen(real_photo, training=True)
            fake_photo = self.photo_gen(real_monet, training=True)
            cycled_monet = self.monet_gen(fake_photo, training=True)
            cycled_photo = self.photo_gen(fake_monet, training=True)

            same_monet = self.monet_gen(real_monet, training=True)
            same_photo = self.photo_gen(real_photo, training=True)

            disc_real_monet = self.monet_disc(real_monet, training=True)
            disc_real_photo = self.photo_disc(real_photo, training=True)
            disc_fake_monet = self.monet_disc(fake_monet, training=True)
            disc_fake_photo = self.photo_disc(fake_photo, training=True)

            monet_gen_loss = self.gen_loss(disc_fake_monet)
            photo_gen_loss = self.gen_loss(disc_fake_photo)
            monet_disc_loss = self.disc_loss(disc_real_monet, disc_fake_monet)
            photo_disc_loss = self.disc_loss(disc_real_photo, disc_fake_photo)
            total_cycle_loss = self.cycle_loss(real_monet, cycled_monet, self.lambda_cycle) + self.cycle_loss(real_photo, cycled_photo, self.lambda_cycle)

            total_monet_gen_loss = monet_gen_loss + total_cycle_loss + self.identity_loss(real_monet, same_monet, self.lambda_cycle)
            total_photo_gen_loss = photo_gen_loss + total_cycle_loss + self.identity_loss(real_photo, same_photo, self.lambda_cycle)

        monet_gen_grads = tape.gradient(total_monet_gen_loss, self.monet_gen.trainable_variables)
        photo_gen_grads = tape.gradient(total_photo_gen_loss, self.photo_gen.trainable_variables)
        monet_disc_grads = tape.gradient(monet_disc_loss, self.monet_disc.trainable_variables)
        photo_disc_grads = tape.gradient(photo_disc_loss, self.photo_disc.trainable_variables)

        self.m_gen_opt.apply_gradients(zip(monet_gen_grads, self.monet_gen.trainable_variables))
        self.p_gen_opt.apply_gradients(zip(photo_gen_grads, self.photo_gen.trainable_variables))
        self.m_disc_opt.apply_gradients(zip(monet_disc_grads, self.monet_disc.trainable_variables))
        self.p_disc_opt.apply_gradients(zip(photo_disc_grads, self.photo_disc.trainable_variables))

        return {
            "monet_gen_loss": total_monet_gen_loss,
            "photo_gen_loss": total_photo_gen_loss,
            "monet_disc_loss": monet_disc_loss,
            "photo_disc_loss": photo_disc_loss
        }


In [None]:
with strategy.scope():
    def calc_discriminator_loss(real_output, fake_output):
        real_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)(tf.ones_like(real_output), real_output)

        fake_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)(tf.zeros_like(fake_output), fake_output)

        total_loss = (real_loss + fake_loss) / 2
        return total_loss


In [None]:
with strategy.scope():
    def calc_generator_loss(fake_output):
        return tf.keras.losses.BinaryCrossentropy(from_logits=True)(tf.ones_like(fake_output), fake_output)


In [None]:
with strategy.scope():
    def calculate_cycle_consistency_loss(original_image, cycled_image, lambda_value):
        cycle_loss = tf.reduce_mean(tf.abs(original_image - cycled_image))

        return lambda_value * cycle_loss


In [None]:
with strategy.scope():
    def calculate_identity_loss(original_image, reconstructed_image, lambda_value):
        identity_loss = tf.reduce_mean(tf.abs(original_image - reconstructed_image))

        return lambda_value * 0.5 * identity_loss


In [None]:
with strategy.scope():
    learning_rate = 2e-4
    beta = 0.5

    monet_gen_optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=beta)
    photo_gen_optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=beta)
    monet_disc_optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=beta)
    photo_disc_optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=beta)


In [None]:
with strategy.scope():
    cycle_gan = CycleGanModel(monet_gen, photo_gen, monet_dis, photo_dis)

    cycle_gan.compile(
        monet_gen_opt=monet_gen_optimizer,
        photo_gen_opt=photo_gen_optimizer,
        monet_disc_opt=monet_disc_optimizer,
        photo_disc_opt=photo_disc_optimizer,
        gen_loss=calc_generator_loss,
        disc_loss=calc_discriminator_loss,
        cycle_loss=calculate_cycle_consistency_loss,
        identity_loss=calculate_identity_loss
    )

    cycle_gan.fit(
        tf.data.Dataset.zip((monet_ds, photo_ds)),
        epochs=5
    )


In [None]:
fig, axes = plt.subplots(5, 2, figsize=(12, 12))

for i, img in enumerate(photo_ds.take(5)):
    monet_style_img = monet_gen(img, training=False)[0].numpy()
    monet_style_img = ((monet_style_img * 127.5) + 127.5).astype(np.uint8)

    input_img = ((img[0] * 127.5) + 127.5).numpy().astype(np.uint8)

    axes[i, 0].imshow(input_img)
    axes[i, 1].imshow(monet_style_img)

    axes[i, 0].set_title("Input Photo")
    axes[i, 1].set_title("Monet-esque")
    axes[i, 0].axis("off")
    axes[i, 1].axis("off")

plt.show()


In [None]:
import PIL
import shutil
import os

image_dir = "../image"

if not os.path.exists(image_dir):
    os.makedirs(image_dir)
else:
    print(f"Directory '{image_dir}' already exists.")


In [None]:
import PIL.Image
import os

output_folder = "../images"
os.makedirs(output_folder, exist_ok=True)  

for i, img in enumerate(photo_ds, start=1):
    monet_style_img = monet_gen(img, training=False)[0].numpy()
    monet_style_img = ((monet_style_img * 127.5) + 127.5).astype(np.uint8)
    image = PIL.Image.fromarray(monet_style_img)
    image.save(os.path.join(output_folder, f"{i}.jpg"))


In [None]:
shutil.make_archive("/kaggle/working/images", 'zip', "/kaggle/images")