In [None]:
pip install tensorflow-addons

In [None]:
!pip install -q git+https://github.com/tensorflow/examples.git

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa
import tensorflow_datasets as tfds


import matplotlib.pyplot as plt
import numpy as np

from functools import partial
from albumentations import (
    Compose, RandomBrightness, JpegCompression, HueSaturationValue, RandomContrast, HorizontalFlip,
    Rotate
)

try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Device:', tpu.master())
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
except:
    strategy = tf.distribute.get_strategy()
print('Number of replicas:', strategy.num_replicas_in_sync)

AUTOTUNE = tf.data.experimental.AUTOTUNE

print(tf.__version__)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import tensorflow as tf
from os import listdir
import tensorflow_datasets as tfds
from tensorflow_examples.models.pix2pix import pix2pix
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import time
import matplotlib.pyplot as plt
from IPython.display import clear_output
from numpy import asarray
from numpy import vstack
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import load_img
from numpy import savez_compressed

AUTOTUNE = tf.data.AUTOTUNE

OUTPUT_CHANNELS = 3
LATENT_DIM = 1024

# load all images in a directory into memory
def load_images(path, size=(256,256)):
    data_list = list()
    # enumerate filenames in directory, assume all are images
    for filename in listdir(path):
        # load and resize the image
        pixels = load_img(path + filename, target_size=size)
        # convert to numpy array
        pixels = img_to_array(pixels)
        # store
        data_list.append(pixels)
    return asarray(data_list)


# load dataset A
dataA = load_images('/content/drive/MyDrive/datasets/train_A/')


print('Loaded dataA: ', dataA.shape)
# load dataset B
dataB = load_images('/content/drive/MyDrive/datasets/train_B/')

print('Loaded dataB: ', dataB.shape)
# save as compressed numpy array
filename = './cycleGAN_colorizationofimages.npz'
savez_compressed(filename, dataA, dataB)
print('Saved dataset: ', filename)

In [None]:
def numpy_to_dataset(data):
    dataset = tf.data.Dataset.from_tensor_slices(data)
    return dataset

train_A1 = numpy_to_dataset(dataA)
train_B1 = numpy_to_dataset(dataB)



def preprocess_image(image):
    image = tf.image.resize(image, [256, 256])
    image = (image / 127.5) - 1  # Normalize to [-1, 1]
    return image

train_A1 = train_A1.map(preprocess_image, num_parallel_calls=tf.data.AUTOTUNE).cache().batch(1)
train_B1 = train_B1.map(preprocess_image, num_parallel_calls=tf.data.AUTOTUNE).cache().batch(1)



train_dataset1 = tf.data.Dataset.zip((train_A1, train_B1))

In [None]:
def downsample(filters, size, apply_instancenorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    result = keras.Sequential()
    result.add(layers.Conv2D(filters, size, padding='same',
                             kernel_initializer=initializer, use_bias=False))
    result.add(layers.MaxPool2D())

    if apply_instancenorm:
        result.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_init))

    result.add(layers.LeakyReLU())

    return result

def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    result = keras.Sequential()
    result.add(layers.Conv2DTranspose(filters, size, strides=2,
                                      padding='same',
                                      kernel_initializer=initializer,
                                      use_bias=False))

    result.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_init))

    if apply_dropout:
        result.add(layers.Dropout(0.5))

    result.add(layers.LeakyReLU())

    return result


def CycleGenerator():
    inputs = layers.Input(shape=[256,256,3])

    # bs = batch size
    down_stack = [
        downsample(64, 4, apply_instancenorm=False), # (bs, 128, 128, 64)
        downsample(128, 4), # (bs, 64, 64, 128)
        downsample(256, 4), # (bs, 32, 32, 256)
        downsample(512, 4), # (bs, 16, 16, 512)
        downsample(512, 4), # (bs, 8, 8, 512)
        downsample(512, 4), # (bs, 4, 4, 512)
        downsample(512, 4), # (bs, 2, 2, 512)
        downsample(512, 4), # (bs, 1, 1, 512)
    ]

    up_stack = [
        upsample(512, 4, apply_dropout=True), # (bs, 2, 2, 1024)
        upsample(512, 4, apply_dropout=True), # (bs, 4, 4, 1024)
        upsample(512, 4, apply_dropout=True), # (bs, 8, 8, 1024)
        upsample(512, 4), # (bs, 16, 16, 1024)
        upsample(256, 4), # (bs, 32, 32, 512)
        upsample(128, 4), # (bs, 64, 64, 256)
        upsample(64, 4), # (bs, 128, 128, 128)
    ]

    initializer = tf.random_normal_initializer(0., 0.02)
    last = layers.Conv2DTranspose(OUTPUT_CHANNELS, 4,
                                  strides=2,
                                  padding='same',
                                  kernel_initializer=initializer,
                                  activation='tanh') # (bs, 256, 256, 3)

    x = inputs

    # Downsampling through the model
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])

    # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = layers.Concatenate()([x, skip])

    x = last(x)

    return keras.Model(inputs=inputs, outputs=x)


def CycleDiscriminator():
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    inp = layers.Input(shape=[256, 256, 3], name='input_image')

    x = inp

    down1 = downsample(64, 4, False)(x) # (bs, 128, 128, 64)
    down2 = downsample(128, 4)(down1) # (bs, 64, 64, 128)
    down3 = downsample(256, 4)(down2) # (bs, 32, 32, 256)

    zero_pad1 = layers.ZeroPadding2D()(down3) # (bs, 34, 34, 256)
    conv = layers.Conv2D(512, 4, strides=1,
                         kernel_initializer=initializer,
                         use_bias=False)(zero_pad1) # (bs, 31, 31, 512)

    norm1 = tfa.layers.InstanceNormalization(gamma_initializer=gamma_init)(conv)

    leaky_relu = layers.LeakyReLU()(norm1)

    zero_pad2 = layers.ZeroPadding2D()(leaky_relu) # (bs, 33, 33, 512)

    last_conv = layers.Conv2D(1, 4, strides=1,
                         kernel_initializer=initializer)(zero_pad2) # (bs, 30, 30, 1)

    last_relu = layers.LeakyReLU(alpha=0.2)(last_conv)
    last_pool = layers.Flatten()(last_relu)
    last = layers.Dense(1, activation='sigmoid')(last_pool)

    return tf.keras.Model(inputs=inp, outputs=last)

In [None]:
with strategy.scope():
    Generator1 = CycleGenerator() # transforms colourized to grescale
    Generator2 = CycleGenerator() # transforms greyscale to colourize images

    Discriminator1 = CycleDiscriminator()
    Discriminator2 = CycleDiscriminator()

CycleGAN model


In [None]:
class CycleGan(keras.Model):
    def __init__(
        self,
        generator1,
        generator2,
        discriminator1,
        discriminator2,
        lambda_cycle=10,
        real_label=.5
    ):
        super(CycleGan, self).__init__()
        self.m_gen = generator1
        self.p_gen = generator2
        self.m_disc = discriminator1
        self.p_disc = discriminator2
        self.lambda_cycle = lambda_cycle
        self.real_label = real_label

    def compile(
        self,
        m_gen_optimizer,
        p_gen_optimizer,
        m_disc_optimizer,
        p_disc_optimizer,
        gen_loss_fn,
        disc_loss_fn,
        cycle_loss_fn,
        identity_loss_fn
    ):
        super(CycleGan, self).compile()
        self.m_gen_optimizer = m_gen_optimizer
        self.p_gen_optimizer = p_gen_optimizer
        self.m_disc_optimizer = m_disc_optimizer
        self.p_disc_optimizer = p_disc_optimizer
        self.gen_loss_fn = gen_loss_fn
        self.disc_loss_fn = disc_loss_fn
        self.cycle_loss_fn = cycle_loss_fn
        self.identity_loss_fn = identity_loss_fn

    def train_step(self, batch_data):
        real_1, real_2 = batch_data

        batch_size = tf.shape(real_2)[0]
        labels_real = tf.zeros((batch_size, 1)) + self.real_label
        labels_real += 0.05 * tf.random.uniform(tf.shape(labels_real))

        with tf.GradientTape(persistent=True) as tape:
            # photo to monet back to photo
            fake_monet = self.m_gen(real_2, training=True)
            cycled_photo = self.p_gen(fake_monet, training=True)

            # monet to photo back to monet
            fake_photo = self.p_gen(real_1, training=True)
            cycled_monet = self.m_gen(fake_photo, training=True)

            # generating itself
            same_monet = self.m_gen(real_1, training=True)
            same_photo = self.p_gen(real_2, training=True)

            # discriminator used to check, inputing real images
            disc_real_monet = self.m_disc(real_1, training=True)
            disc_real_photo = self.p_disc(real_2, training=True)

            # discriminator used to check, inputing fake images
            disc_fake_monet = self.m_disc(fake_monet, training=True)
            disc_fake_photo = self.p_disc(fake_photo, training=True)

            # evaluates generator loss
            monet_gen_loss = self.gen_loss_fn(disc_real_monet, disc_fake_monet, labels_real)
            photo_gen_loss = self.gen_loss_fn(disc_real_photo, disc_fake_photo, labels_real)

            # evaluates total cycle consistency loss
            total_cycle_loss = self.cycle_loss_fn(real_1, cycled_monet, self.lambda_cycle) + self.cycle_loss_fn(real_2, cycled_photo, self.lambda_cycle)

            # evaluates total generator loss
            total_monet_gen_loss = monet_gen_loss + total_cycle_loss + self.identity_loss_fn(real_1, same_monet, self.lambda_cycle)
            total_photo_gen_loss = photo_gen_loss + total_cycle_loss + self.identity_loss_fn(real_2, same_photo, self.lambda_cycle)
            # evaluates discriminator loss
            monet_disc_loss = self.disc_loss_fn(disc_real_monet, disc_fake_monet, labels_real)
            photo_disc_loss = self.disc_loss_fn(disc_real_photo, disc_fake_photo, labels_real)

        # Calculate the gradients for generator and discriminator
        monet_generator_gradients = tape.gradient(total_monet_gen_loss,
                                                  self.m_gen.trainable_variables)
        photo_generator_gradients = tape.gradient(total_photo_gen_loss,
                                                  self.p_gen.trainable_variables)

        monet_discriminator_gradients = tape.gradient(monet_disc_loss,
                                                      self.m_disc.trainable_variables)
        photo_discriminator_gradients = tape.gradient(photo_disc_loss,
                                                      self.p_disc.trainable_variables)

        # Apply the gradients to the optimizer
        self.m_gen_optimizer.apply_gradients(zip(monet_generator_gradients,
                                                 self.m_gen.trainable_variables))

        self.p_gen_optimizer.apply_gradients(zip(photo_generator_gradients,
                                                 self.p_gen.trainable_variables))

        self.m_disc_optimizer.apply_gradients(zip(monet_discriminator_gradients,
                                                  self.m_disc.trainable_variables))

        self.p_disc_optimizer.apply_gradients(zip(photo_discriminator_gradients,
                                                  self.p_disc.trainable_variables))

        return {
            "gen_1_loss": total_monet_gen_loss,
            "gen_2_loss": total_photo_gen_loss,
            "disc_1_loss": monet_disc_loss,
            "disc_2_loss": photo_disc_loss
        }

In [None]:
with strategy.scope():
    def discriminator_loss(predictions_real, predictions_gen, labels_real):
        return (tf.reduce_mean((predictions_gen  - tf.reduce_mean(predictions_real) + labels_real) ** 2) +
                tf.reduce_mean((predictions_real - tf.reduce_mean(predictions_gen)  - labels_real) ** 2))/2

    def generator_loss(predictions_real, predictions_gen, labels_real):
        return (tf.reduce_mean((predictions_real - tf.reduce_mean(predictions_gen)  + labels_real) ** 2) +
                tf.reduce_mean((predictions_gen  - tf.reduce_mean(predictions_real) - labels_real) ** 2)) / 2

In [None]:
with strategy.scope():
    def calc_cycle_loss(real_image, cycled_image, LAMBDA):
        loss1 = tf.reduce_mean(tf.abs(real_image - cycled_image))

        return LAMBDA * loss1

In [None]:
with strategy.scope():
    def identity_loss(real_image, same_image, LAMBDA):
        loss = tf.reduce_mean(tf.abs(real_image - same_image))
        return LAMBDA * 0.5 * loss

In [None]:
EPOCHS = 50
LR_G = 2e-4
LR_D = 2e-4
beta_1 = .5

real_label = .9
fake_label = 0

In [None]:
with strategy.scope():
    generator1_optimizer = tf.keras.optimizers.Adam(LR_G, beta_1=0.5)
    generator2_optimizer = tf.keras.optimizers.Adam(LR_G, beta_1)
    discriminator1_optimizer = tf.keras.optimizers.Adam(LR_D, beta_1=0.5)
    discriminator2_optimizer = tf.keras.optimizers.Adam(LR_D, beta_1=0.5)

In [None]:
with strategy.scope():
    cycle_gan_model = CycleGan(
        Generator1, Generator2, Discriminator1, Discriminator2, real_label=0.66
    )

    cycle_gan_model.compile(
        m_gen_optimizer = generator1_optimizer,
        p_gen_optimizer = generator2_optimizer,
        m_disc_optimizer = discriminator1_optimizer,
        p_disc_optimizer = discriminator2_optimizer,
        gen_loss_fn = generator_loss,
        disc_loss_fn = discriminator_loss,
        cycle_loss_fn = calc_cycle_loss,
        identity_loss_fn = identity_loss
    )

In [None]:
cycle_gan_model.fit(train_dataset1,epochs=EPOCHS)

In [None]:
Generator1.save_weights('Generator-colour-to-grey.h5')
Generator2.save_weights('greyscale_to_colorize_weights.h5')
Discriminator1.save('Discriminator1.h5')
Discriminator2.save('Discriminator2.h5')
Generator1.save('Generator1.h5')
Generator2.save('Generator2.h5')

In [None]:
Generator1.summary()
Generator2.summary()
Discriminator1.summary()
Discriminator2.summary()

In [None]:
import tensorflow as tf
import numpy as np
import os
import random
from keras.preprocessing.image import load_img, img_to_array, array_to_img
import matplotlib.pyplot as plt
import tensorflow_addons as tfa

# Load the saved generator model with the custom layer
generator_model_path = '/content/Generator1.h5'  # update with your model path
custom_objects = {'InstanceNormalization': tfa.layers.InstanceNormalization}
generator = tf.keras.models.load_model(generator_model_path, custom_objects=custom_objects, compile=False)

# Function to preprocess a single image
def preprocess_image(image_path, target_size=(256, 256)):
    img = load_img(image_path, target_size=target_size)
    img = img_to_array(img)
    if img.shape[-1] == 1:  # If grayscale, convert to 3 channels
        img = np.concatenate([img, img, img], axis=-1)
    img = (img / 127.5) - 1  # Normalize to [-1, 1]
    img = np.expand_dims(img, axis=0)  # Add batch dimension
    return img

# Function to post-process the generated image
def postprocess_image(predicted_img):
    predicted_img = (predicted_img + 1) * 127.5  # Denormalize to [0, 255]
    predicted_img = np.clip(predicted_img, 0, 255)  # Clip values to [0, 255]
    predicted_img = predicted_img.astype('uint8')  # Convert to uint8
    return predicted_img

# Function to calculate PSNR
def calculate_psnr(original_image, generated_image):
    mse = np.mean((original_image - generated_image) ** 2)
    if mse == 0:
        return float('inf')
    max_pixel = 255.0
    psnr = 20 * np.log10(max_pixel / np.sqrt(mse))
    return psnr

# Function to randomly select 5 images from a folder
def select_random_images(folder_path, num_images=5):
    all_images = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(('.png', '.jpg', '.jpeg'))]
    selected_images = random.sample(all_images, num_images)
    return selected_images

# Folder containing the test images
test_images_folder = '/content/drive/MyDrive/datasets/train_B/'  # update with your folder path

# Select 5 random images
random_images = select_random_images(test_images_folder)

total_psnr = 0.0

plt.figure(figsize=(15, 6))

for i, image_path in enumerate(random_images):
    # Preprocess the test image
    test_image = preprocess_image(image_path)

    # Generate the colorized image using the generator model
    generated_image = generator(test_image, training=False)

    # Post-process the generated image
    generated_image = postprocess_image(generated_image[0])

    # Load and preprocess the original image
    original_image = img_to_array(load_img(image_path, target_size=(256, 256)))

    # Calculate PSNR
    psnr = calculate_psnr(original_image, generated_image)
    total_psnr += psnr

    # Display the original and generated images
    plt.subplot(2, 5, i + 1)
    plt.title(f'Original Image\nPSNR: {psnr:.2f}')
    plt.imshow(array_to_img(original_image))
    plt.axis('off')

    plt.subplot(2, 5, i + 6)
    plt.title('Generated Image')
    plt.imshow(array_to_img(generated_image))
    plt.axis('off')

avg_psnr = total_psnr / len(random_images)
print(f'Average PSNR: {avg_psnr:.2f}')

plt.show()
