In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Model
from tensorflow.keras.preprocessing.image import load_img, img_to_array

import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter

from PIL import Image, ImageChops, ImageEnhance, ImageFilter
from skimage.metrics import structural_similarity as ssim

import os
import cv2
import shutil
import random
import numpy as np
import zipfile
import time
import sys




In [2]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

print("GPU device :", gpus)

GPU device : []


In [3]:
if tf.config.list_physical_devices('GPU'):
    print("✅ GPU is available")
else:
    print("❌ GPU is unavailable ")

❌ GPU is unavailable 


In [4]:
strategy = tf.distribute.get_strategy()
print("Number of replicas : ", strategy.num_replicas_in_sync)
print('Tensorflow : ',tf.__version__)
print('Python : ',sys.version.split()[0])
print("Num GPUs Available : ", len(tf.config.list_physical_devices('GPU')))
print("Using device : ", tf.test.gpu_device_name())

Number of replicas :  1
Tensorflow :  2.15.0
Python :  3.11.13
Num GPUs Available :  0
Using device :  


In [5]:
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())

[name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality {
}
incarnation: 4330845919668094905
xla_global_id: -1
]


In [6]:
monet_jpg_directory = r'D:\MATTHEW\Data Modelling\gan-getting-started\monet_jpg'
photo_jpg_directory = r'D:\MATTHEW\Data Modelling\gan-getting-started\photo_jpg'

In [7]:
def getImagePaths(path):
    image_names = []
    for dirname, _, filenames in os.walk(path):
        for filename in filenames:
            fullpath = os.path.join(dirname, filename)
            image_names.append(fullpath)
    return image_names

monet_images_path = getImagePaths(monet_jpg_directory)
photo_images_path = getImagePaths(photo_jpg_directory)

print(f"Monet images: {len(monet_images_path)}")
print(f"Photo images: {len(photo_images_path)}")

Monet images: 300
Photo images: 7038


In [8]:
AUTOTUNE = tf.data.AUTOTUNE

class InstanceNormalization(layers.Layer):
    def __init__(self, epsilon=1e-5):
        super(InstanceNormalization, self).__init__()
        self.epsilon = epsilon

    def build(self, input_shape):
        self.gamma = self.add_weight(name='gamma',shape=(input_shape[-1],),initializer='ones',trainable=True)
        self.beta = self.add_weight(name='beta',shape=(input_shape[-1],),initializer='zeros',trainable=True)

    def call(self, x):
        mean, variance = tf.nn.moments(x, axes=[1, 2], keepdims=True)
        return self.gamma * (x - mean) / tf.sqrt(variance + self.epsilon) + self.beta

In [9]:
def preprocess_image(image_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [256, 256]) 
    image = tf.cast(image, tf.float32)
    image = (image / 127.5) - 1
    return image

# Batch and shuffle
BATCH_SIZE = 1

monet_ds = tf.data.Dataset.from_tensor_slices(monet_images_path)
monet_ds = monet_ds.map(preprocess_image, num_parallel_calls=AUTOTUNE)
monet_ds = monet_ds.shuffle(1000).batch(BATCH_SIZE).prefetch(AUTOTUNE)

photo_ds = tf.data.Dataset.from_tensor_slices(photo_images_path)
photo_ds = photo_ds.map(preprocess_image, num_parallel_calls=AUTOTUNE)
photo_ds = photo_ds.shuffle(1000).batch(BATCH_SIZE).prefetch(AUTOTUNE)

# Downsampling block
def downsample(filters, size, apply_instancenorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = tf.keras.Sequential()
    result.add(layers.Conv2D(filters, size, strides=2, padding='same',kernel_initializer=initializer, use_bias=False))
    if apply_instancenorm:
        result.add(InstanceNormalization())
    result.add(layers.LeakyReLU())
    return result

# Upsampling block
def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = tf.keras.Sequential()
    result.add(layers.Conv2DTranspose(filters, size, strides=2, padding='same',kernel_initializer=initializer, use_bias=False))
    result.add(InstanceNormalization())
    if apply_dropout:
        result.add(layers.Dropout(0.5))
    result.add(layers.ReLU())
    return result

In [10]:
def Generator():
    inputs = layers.Input(shape=[256, 256, 3])

    down_stack = [
        downsample(32, 4, apply_instancenorm=False),
        downsample(64, 4),
        downsample(128, 4),
        downsample(256, 4),
        downsample(512, 4),
    ]

    res_blocks = [layers.Conv2D(512, 3, strides=1, padding='same',
                                 activation='relu',
                                 kernel_initializer=tf.random_normal_initializer(0., 0.02)) for _ in range(2)]

    up_stack = [
        upsample(256, 4),
        upsample(128, 4),
        upsample(64, 4),
        upsample(32, 4),
    ]

    initializer = tf.random_normal_initializer(0., 0.02)
    last = layers.Conv2DTranspose(3, 4, strides=2, padding='same',
                                  kernel_initializer=initializer,
                                  activation='tanh')

    x = inputs
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    for res in res_blocks:
        x = res(x)

    skips = reversed(skips[:-1])
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = layers.Concatenate()([x, skip])

    x = last(x)
    return Model(inputs=inputs, outputs=x)

In [11]:
def Discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)

    inp = layers.Input(shape=[256, 256, 3], name='input_image')
    x = inp

    down1 = downsample(64, 4, apply_instancenorm=False)(x)
    down2 = downsample(128, 4)(down1)
    down3 = downsample(256, 4)(down2)

    zero_pad1 = layers.ZeroPadding2D()(down3)
    conv = layers.Conv2D(512, 4, strides=1, padding='valid',kernel_initializer=initializer, use_bias=False)(zero_pad1)
    norm1 = InstanceNormalization()(conv)
    leaky_relu = layers.LeakyReLU()(norm1)
    zero_pad2 = layers.ZeroPadding2D()(leaky_relu)
    last = layers.Conv2D(1, 4, strides=1, padding='valid',kernel_initializer=initializer)(zero_pad2)

    return Model(inputs=inp, outputs=last)

In [12]:
# Instantiate models
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
    monet_generator = Generator()
    photo_generator = Generator()
    monet_discriminator = Discriminator()
    photo_discriminator = Discriminator()

# Loss functions
with strategy.scope():
    loss_obj = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)

def discriminator_loss(real, generated):
    real_loss = tf.reduce_mean(loss_obj(tf.ones_like(real), real))
    generated_loss = tf.reduce_mean(loss_obj(tf.zeros_like(generated), generated))
    total_disc_loss = real_loss + generated_loss
    return total_disc_loss * 0.5

def generator_loss(generated):
    return tf.reduce_mean(loss_obj(tf.ones_like(generated), generated))

def calc_cycle_loss(real_image, cycled_image, LAMBDA=10):
    l1_loss = tf.reduce_mean(tf.abs(real_image - cycled_image))
    l2_loss = tf.reduce_mean(tf.square(real_image - cycled_image))
    return LAMBDA * (0.9 * l1_loss + 0.1 * l2_loss)

def identity_loss(real_image, same_image, LAMBDA=5):
    loss = tf.reduce_mean(tf.abs(real_image - same_image))
    return LAMBDA * 0.5 * loss

# Learning rate scheduler
initial_learning_rate = 2e-4
lr_schedule = tf.keras.optimizers.schedules.PiecewiseConstantDecay(
    boundaries=[50 * (len(monet_images_path) // BATCH_SIZE), 75 * (len(monet_images_path) // BATCH_SIZE)],
    values=[initial_learning_rate, initial_learning_rate * 0.1, initial_learning_rate * 0.01]
)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)



In [13]:
# Optimizers with gradient clipping
with strategy.scope():
    monet_generator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5, clipnorm=1.0)
    photo_generator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5, clipnorm=1.0)
    monet_discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5, clipnorm=1.0)
    photo_discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5, clipnorm=1.0)

@tf.function
def train_step(real_monet, real_photo):
    with tf.GradientTape(persistent=True) as tape:
        fake_monet = monet_generator(real_photo, training=True)
        cycled_photo = photo_generator(fake_monet, training=True)
        fake_photo = photo_generator(real_monet, training=True)
        cycled_monet = monet_generator(fake_photo, training=True)
        same_monet = monet_generator(real_monet, training=True)
        same_photo = photo_generator(real_photo, training=True)
        disc_real_monet = monet_discriminator(real_monet, training=True)
        disc_real_photo = photo_discriminator(real_photo, training=True)
        disc_fake_monet = monet_discriminator(fake_monet, training=True)
        disc_fake_photo = photo_discriminator(fake_photo, training=True)
        monet_gen_loss = generator_loss(disc_fake_monet)
        photo_gen_loss = generator_loss(disc_fake_photo)
        total_cycle_loss = calc_cycle_loss(real_monet, cycled_monet) + calc_cycle_loss(real_photo, cycled_photo)
        total_monet_gen_loss = monet_gen_loss + total_cycle_loss + identity_loss(real_monet, same_monet)
        total_photo_gen_loss = photo_gen_loss + total_cycle_loss + identity_loss(real_photo, same_photo)
        monet_disc_loss = discriminator_loss(disc_real_monet, disc_fake_monet)
        photo_disc_loss = discriminator_loss(disc_real_photo, disc_fake_photo)

    monet_generator_gradients = tape.gradient(total_monet_gen_loss, monet_generator.trainable_variables)
    photo_generator_gradients = tape.gradient(total_photo_gen_loss, photo_generator.trainable_variables)
    monet_discriminator_gradients = tape.gradient(monet_disc_loss, monet_discriminator.trainable_variables)
    photo_discriminator_gradients = tape.gradient(photo_disc_loss, photo_discriminator.trainable_variables)

    monet_generator_optimizer.apply_gradients(zip(monet_generator_gradients, monet_generator.trainable_variables))
    photo_generator_optimizer.apply_gradients(zip(photo_generator_gradients, photo_generator.trainable_variables))
    monet_discriminator_optimizer.apply_gradients(zip(monet_discriminator_gradients, monet_discriminator.trainable_variables))
    photo_discriminator_optimizer.apply_gradients(zip(photo_discriminator_gradients, photo_discriminator.trainable_variables))

    return total_monet_gen_loss, total_photo_gen_loss, monet_disc_loss, photo_disc_loss

In [14]:
history = {
    'monet_gen_loss': [],
    'photo_gen_loss': [],
    'monet_disc_loss': [],
    'photo_disc_loss': []
}

EPOCHS = 1
for epoch in range(EPOCHS):
    print(f"Epoch {epoch + 1}/{EPOCHS}")
    start_time = time.time()
    total_monet_gen_loss = 0
    total_photo_gen_loss = 0
    total_monet_disc_loss = 0
    total_photo_disc_loss = 0
    n_batches = 0

    for real_monet, real_photo in tf.data.Dataset.zip((monet_ds, photo_ds)):
        monet_gen_loss, photo_gen_loss, monet_disc_loss, photo_disc_loss = train_step(real_monet, real_photo)
        total_monet_gen_loss += monet_gen_loss.numpy()
        total_photo_gen_loss += photo_gen_loss.numpy()
        total_monet_disc_loss += monet_disc_loss.numpy()
        total_photo_disc_loss += photo_disc_loss.numpy()
        n_batches += 1

    avg_monet_gen_loss = total_monet_gen_loss / n_batches
    avg_photo_gen_loss = total_photo_gen_loss / n_batches
    avg_monet_disc_loss = total_monet_disc_loss / n_batches
    avg_photo_disc_loss = total_photo_disc_loss / n_batches

    history['monet_gen_loss'].append(avg_monet_gen_loss)
    history['photo_gen_loss'].append(avg_photo_gen_loss)
    history['monet_disc_loss'].append(avg_monet_disc_loss)
    history['photo_disc_loss'].append(avg_photo_disc_loss)

    elapsed_time = time.time() - start_time
    
    print(f"{elapsed_time:.2f}s" , f"Avg Monet Gen Loss : {avg_monet_gen_loss:.4f}, Avg Photo Gen Loss : {avg_photo_gen_loss:.4f}, "
          f"Avg Monet Disc Loss : {avg_monet_disc_loss:.4f}, Avg Photo Disc Loss : {avg_photo_disc_loss:.4f}")


Epoch 1/1
391.12s Avg Monet Gen Loss : 5.0866, Avg Photo Gen Loss : 5.0989, Avg Monet Disc Loss : 0.6898, Avg Photo Disc Loss : 0.6960


In [None]:
def plot_generator_loss(history):
    plt.figure(figsize=(16, 6))

    epochs = range(len(history['monet_gen_loss']))
    xticks = list(range(0, len(epochs), 2))
    if (len(epochs) - 1) not in xticks:
        xticks.append(len(epochs))

    plt.subplot(1, 2, 1)
    plt.plot(epochs, history['monet_gen_loss'], label='Monet Generator Loss', color='orange', linewidth=2)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Monet Generator Loss')
    plt.xticks(xticks)
    plt.grid(True)

    plt.subplot(1, 2, 2)
    plt.plot(epochs, history['photo_gen_loss'], label='Photo Generator Loss', color='blue', linewidth=2)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Photo Generator Loss')
    plt.xticks(xticks)
    plt.grid(True)

    plt.tight_layout()
    plt.show()

plot_generator_loss(history)

In [None]:
def plot_discriminator_loss(history):
    plt.figure(figsize=(16, 6))

    epochs = range(len(history['monet_disc_loss']))
    xticks = list(range(0, len(epochs), 2))
    if (len(epochs) - 1) not in xticks:
        xticks.append(len(epochs))

    plt.subplot(1, 2, 1)
    plt.plot(history['monet_disc_loss'], label='Monet Discriminator Loss', color='orange', linewidth=2)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Monet Discriminator Loss')
    plt.grid(True)

    plt.subplot(1, 2, 2)
    plt.plot(history['photo_disc_loss'], label='Photo Discriminator Loss', color='blue', linewidth=2)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Photo Discriminator Loss')
    plt.grid(True)

    plt.tight_layout()
    plt.show()

plot_discriminator_loss(history)

In [None]:
def display_generated_images_SSIM(photo_ds, num_images=5, start_from=1):
    plt.figure(figsize=(20, 10))
    ssim_scores = []
    selected_photos = photo_ds.skip(start_from - 1).take(num_images)

    for i, photo in enumerate(selected_photos):
        fake_monet = monet_generator(photo, training=False)
        fake_monet_rescaled = (fake_monet[0] * 0.5 + 0.5).numpy()
        photo_rescaled = (photo[0] * 0.5 + 0.5).numpy()
        ssim_score = tf.image.ssim(photo_rescaled, fake_monet_rescaled, max_val=1.0).numpy()
        ssim_percent = ssim_score * 100
        ssim_scores.append(ssim_percent)

        plt.subplot(2, num_images, i + 1)
        plt.imshow(photo_rescaled)
        plt.title(f"Original Photo", fontsize=20)
        plt.axis('off')

        plt.subplot(2, num_images, i + 1 + num_images)
        plt.imshow(fake_monet_rescaled)
        plt.title(f"Monet Style\nSSIM : {ssim_percent:.3f}%", fontsize=18)
        plt.axis('off')

    plt.tight_layout(rect=[0, 0.03, 1, 1])
    plt.show()

    return ssim_scores 

In [None]:
ssim_1 = display_generated_images_SSIM(photo_ds, num_images=5, start_from=1)
print(" " * 100)
ssim_2 = display_generated_images_SSIM(photo_ds, num_images=5, start_from=6)
print(" " * 100)
ssim_3 = display_generated_images_SSIM(photo_ds, num_images=5, start_from=11)
print(" " * 100)

all_ssim = ssim_1 + ssim_2 + ssim_3
avg_ssim_all = sum(all_ssim) / len(all_ssim)
plt.figure(figsize=(20, 1))
plt.axis('off')
plt.figtext(0.5, 0.5, f"Average SSIM : {avg_ssim_all:.3f}%", ha='center', fontsize=20, fontweight='bold')
plt.show()