In [1]:
import tensorflow as tf
import numpy as np
import cv2
import os
import glob
import time
import matplotlib.pyplot as plt
from PIL import Image
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, concatenate, BatchNormalization, LeakyReLU, ReLU
from tensorflow.keras.models import Model

from Data_processing_functions import *
from Pix2Pix_model import *


## Dataset Creation

In [8]:
import tensorflow as tf
import numpy as np
import cv2
import os

# Load and preprocess image
def load_and_preprocess_image(image_path, target_size, is_grayscale=False):
    img = tf.io.read_file(image_path)
    channels = 1 if is_grayscale else 3
    img = tf.image.decode_jpeg(img, channels=channels)
    img = tf.image.resize(img, target_size)
    img = img / 255.0
    return img

# Convert RGB image to LAB using OpenCV (requires numpy conversion)
def convert_to_lab(image):
    image_np = image.numpy() * 255.0  # Convert to 0-255 range for OpenCV
    image_np = image_np.astype(np.uint8)
    lab_image = cv2.cvtColor(image_np, cv2.COLOR_RGB2LAB)
    lab_image = lab_image.astype(np.float32) / 255.0  # Normalize to 0-1
    return lab_image

# Wrapper to use convert_to_lab in TensorFlow pipeline
def preprocess_lab_image(image):
    lab_image = tf.py_function(func=convert_to_lab, inp=[image], Tout=tf.float32)
    lab_image.set_shape([None, None, 3])  # Set shape to avoid shape issues in TensorFlow
    return lab_image

# Load and preprocess images
def load_and_preprocess(gray_path, lab_path, target_size):
    gray_img = load_and_preprocess_image(gray_path, target_size, is_grayscale=True)
    rgb_img = load_and_preprocess_image(lab_path, target_size, is_grayscale=False)
    lab_img = preprocess_lab_image(rgb_img)
    return gray_img, lab_img

# Create dataset
def create_dataset(gray_image_paths, lab_image_paths, target_size=(256, 256), batch_size=32):
    gray_image_paths = tf.constant(gray_image_paths, dtype=tf.string)
    lab_image_paths = tf.constant(lab_image_paths, dtype=tf.string)
    
    # Check if paths are correctly loaded
    if len(gray_image_paths) == 0 or len(lab_image_paths) == 0:
        raise ValueError("No image paths provided.")
    
    # Define the dataset from image paths
    dataset = tf.data.Dataset.from_tensor_slices((gray_image_paths, lab_image_paths))
    
    # Map the dataset to preprocess images
    def map_fn(gray_path, lab_path):
        gray_img, lab_img = tf.py_function(
            func=lambda x, y: load_and_preprocess(x, y, target_size),
            inp=[gray_path, lab_path],
            Tout=[tf.float32, tf.float32]
        )
        gray_img.set_shape([target_size[0], target_size[1], 1])
        lab_img.set_shape([target_size[0], target_size[1], 3])
        

        return gray_img, lab_img
    
    dataset = dataset.map(map_fn, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    
    return dataset

# # Extract edges from an image
# def extract_edges(image):
#     edges = cv2.Canny((image * 255).astype(np.uint8), 100, 200)
#     return edges / 255.0

# # Process a single grayscale image: extract edges and combine with the original image
# def process_single_image(gray_image):
#     edges = extract_edges(gray_image)
#     edges = tf.convert_to_tensor(edges, dtype=tf.float32)[..., tf.newaxis]
#     combined = tf.concat([tf.expand_dims(gray_image, axis=-1), edges], axis=-1)
#     return combined

# # Combine grayscale image with edges
# def combine_gray_and_edges(gray_img, lab_img):
#     combined_images = []
#     for i in range(gray_img.shape[0]):
#         single_gray = gray_img[i].numpy()  # Convert to NumPy array
#         combined_images.append(process_single_image(single_gray))
#     combined_batch = tf.stack(combined_images, axis=0)
#     return combined_batch, lab_img

# # Create dataset with edges
# def create_dataset_with_edges(original_dataset):
#     dataset_with_edges = original_dataset.map(
#         lambda gray_img, lab_img: tf.py_function(
#             func=lambda x, y: combine_gray_and_edges(x, y),
#             inp=[gray_img, lab_img],
#             Tout=[tf.float32, tf.float32]
#         ),
#         num_parallel_calls=tf.data.AUTOTUNE
#     )
#     return dataset_with_edges

## Hyperparameters

In [2]:
# Hyperparameters
hyperparams = {
    'initial_filters': 48,         # Starting number of filters in the first layer
    'kernel_size': 5,              # Size of the convolutional kernel
    'num_layers': 5,               # Number of convolutional layers
    'dropout_rate': 0.5,           # Dropout rate for regularization
    'batch_norm': True,            # Use of batch normalization
    'lambda_l1': 100,              # L1 regularization parameter
    'learning_rate': 1e-3,         # Learning rate for the optimizer
    'beta_1': 0.5,                 # Beta1 hyperparameter for the Adam optimizer
    'batch_size': 32,               # Batch size for training
    'epochs': 50,                 # Number of epochs for training
    'dropout': True,               # Whether to use dropout
    'input_shape': (256, 256, 1)   # Input shape of the images (3 channels for RGB)
}


## Model 

In [6]:
import tensorflow as tf
import numpy as np
import cv2
import os
import glob
import time
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, concatenate, BatchNormalization, LeakyReLU, ReLU
from tensorflow.keras.models import Model

# Hyperparameters
hyperparams = {
    'initial_filters': 48,
    'kernel_size': 5,
    'num_layers': 5,
    'dropout_rate': 0.5,
    'batch_norm': True,
    'lambda_l1': 100,
    'learning_rate': 1e-3,
    'beta_1': 0.5,
    'batch_size': 1,
    'epochs': 50,
    'dropout': True,
    'input_shape': (256, 256, 1)
}

# Define the downsampling and upsampling blocks
def downsample(filters, size, apply_batchnorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = tf.keras.Sequential()
    result.add(tf.keras.layers.Conv2D(filters, size, strides=2, padding='same', kernel_initializer=initializer, use_bias=False))
    if apply_batchnorm:
        result.add(tf.keras.layers.BatchNormalization())
    result.add(tf.keras.layers.LeakyReLU())
    return result

def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = tf.keras.Sequential()
    result.add(tf.keras.layers.Conv2DTranspose(filters, size, strides=2, padding='same', kernel_initializer=initializer, use_bias=False))
    result.add(tf.keras.layers.BatchNormalization())
    if apply_dropout:
        result.add(tf.keras.layers.Dropout(hyperparams['dropout_rate']))
    result.add(tf.keras.layers.ReLU())
    return result

# Define the Generator model
def Generator(hyperparams):
    inputs = tf.keras.layers.Input(shape=hyperparams['input_shape'])

    down_stack = [downsample(hyperparams['initial_filters'] * (2 ** i), hyperparams['kernel_size'], apply_batchnorm=hyperparams['batch_norm']) for i in range(hyperparams['num_layers'])]
    up_stack = [upsample(hyperparams['initial_filters'] * (2 ** i), hyperparams['kernel_size'], apply_dropout=hyperparams['dropout']) for i in range(hyperparams['num_layers']-1, 0, -1)]
    initializer = tf.random_normal_initializer(0., 0.02)
    last = tf.keras.layers.Conv2DTranspose(3, hyperparams['kernel_size'], strides=2, padding='same', kernel_initializer=initializer, activation='tanh')

    x = inputs
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = tf.keras.layers.concatenate([x, skip])

    x = last(x)

    return tf.keras.Model(inputs=inputs, outputs=x)

# Define the Discriminator model
def Discriminator(hyperparams):
    initializer = tf.random_normal_initializer(0., 0.02)
    
    inp = tf.keras.layers.Input(shape=hyperparams['input_shape'], name='input_image')
    tar = tf.keras.layers.Input(shape=[*hyperparams['input_shape'][:2], 3], name='target_image')

    x = tf.keras.layers.concatenate([inp, tar])

    down1 = downsample(64, 4, False)(x)
    down2 = downsample(128, 4)(down1)
    down3 = downsample(256, 4)(down2)

    zero_pad1 = tf.keras.layers.ZeroPadding2D()(down3)
    conv = tf.keras.layers.Conv2D(512, 4, strides=1, kernel_initializer=initializer, use_bias=False)(zero_pad1)

    batchnorm1 = tf.keras.layers.BatchNormalization()(conv)
    leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)

    zero_pad2 = tf.keras.layers.ZeroPadding2D()(leaky_relu)
    last = tf.keras.layers.Conv2D(1, 4, strides=1, kernel_initializer=initializer)(zero_pad2)

    return tf.keras.Model(inputs=[inp, tar], outputs=last)

# Loss functions
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def generator_loss(disc_generated_output, gen_output, target):
    gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)
    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))
    total_gen_loss = gan_loss + (hyperparams['lambda_l1'] * l1_loss)
    return total_gen_loss, gan_loss, l1_loss

def discriminator_loss(disc_real_output, disc_generated_output):
    real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)
    generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)
    total_disc_loss = real_loss + generated_loss
    return total_disc_loss

# PSNR metric
def psnr_metric(y_true, y_pred):
    max_pixel = 1.0
    psnr_value = tf.image.psnr(y_true, y_pred, max_val=max_pixel)
    return tf.reduce_mean(psnr_value)

# Training step function
def train_step(generator, discriminator, input_image, target, generator_optimizer, discriminator_optimizer):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        gen_output = generator(input_image, training=True)
        disc_real_output = discriminator([input_image, target], training=True)
        disc_generated_output = discriminator([input_image, gen_output], training=True)

        gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_generated_output, gen_output, target)
        disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

    generator_gradients = gen_tape.gradient(gen_total_loss, generator.trainable_variables)
    discriminator_gradients = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(generator_gradients, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(discriminator_gradients, discriminator.trainable_variables))

    return gen_total_loss, disc_loss

# Training function without validation
def model_fit(train_ds, hyperparams, checkpoint_prefix):
    gen_losses = []
    disc_losses = []

    generator = Generator(hyperparams)
    discriminator = Discriminator(hyperparams)

    # Define the optimizers
    generator_optimizer = tf.keras.optimizers.Adam(learning_rate=hyperparams['learning_rate'], beta_1=hyperparams['beta_1'])
    discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=hyperparams['learning_rate'], beta_1=hyperparams['beta_1'])

    # Define checkpoints
    checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                     discriminator_optimizer=discriminator_optimizer,
                                     generator=generator,
                                     discriminator=discriminator)

    for epoch in range(hyperparams['epochs']):
        start = time.time()
        epoch_gen_loss = 0
        epoch_disc_loss = 0

        # Progress bar
        progbar = tf.keras.utils.Progbar(len(train_ds), stateful_metrics=['loss'])

        for step, (input_image, target) in enumerate(train_ds):
            gen_total_loss, disc_loss = train_step(generator, discriminator, input_image, target, generator_optimizer, discriminator_optimizer)
            epoch_gen_loss += gen_total_loss
            epoch_disc_loss += disc_loss

            # Update progress bar
            progbar.update(step + 1, [('gen_loss', gen_total_loss), ('disc_loss', disc_loss)])

        gen_losses.append(epoch_gen_loss / len(train_ds))
        disc_losses.append(epoch_disc_loss / len(train_ds))

        # Save checkpoint
        checkpoint.save(file_prefix=checkpoint_prefix)

        print(f'Epoch {epoch+1}, Gen Loss: {gen_losses[-1]}, Disc Loss: {disc_losses[-1]}, Time: {time.time() - start}')

        # Early stopping logic
        if len(gen_losses) > 1 and gen_losses[-1] > gen_losses[-2]:
            print("Early stopping triggered")
            break

    return gen_losses, disc_losses

# Visualize losses
def visualize_losses(gen_losses, disc_losses):
    plt.figure(figsize=(12, 6))
    
    # Plot Generator and Discriminator Losses
    plt.subplot(1, 1, 1)
    plt.plot(gen_losses, label='Generator Loss')
    plt.plot(disc_losses, label='Discriminator Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Generator and Discriminator Losses')

    plt.tight_layout()
    plt.show()

## Data Preparing

In [5]:
# Define Data Directory

dir_path = 'data'


color_dir = os.path.join(dir_path, 'train_color')
black_dir = os.path.join(dir_path, 'train_black')


# List all images
color_images_paths = glob.glob(os.path.join(color_dir, '*.jpg'))
black_images_paths = glob.glob(os.path.join(black_dir, '*.jpg'))

# Ensure there are images in both directories
if not color_images_paths or not black_images_paths:
    raise ValueError("No images found in one or both directories.")

# Create dataset
dataset = create_dataset(black_images_paths, color_images_paths, target_size=(256, 256), batch_size=hyperparams['batch_size'])

# Shuffle the dataset with a smaller buffer size and prefetch
dataset = dataset.shuffle(buffer_size=1000, reshuffle_each_iteration=True)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

#Print dataset shapes for verification
for gray_img, lab_img in dataset.take(1):
    print("Train Dataset Shapes:")
    print(f"Input Image Shape: {gray_img.shape}")
    print(f"Target Shape: {lab_img.shape}")



Train Dataset Shapes:
Input Image Shape: (32, 256, 256, 1)
Target Shape: (32, 256, 256, 3)


## Model Summary

In [6]:
# Initialize models
generator = Generator(hyperparams)
discriminator = Discriminator(hyperparams)

# Print model summaries
print("Generator Summary:")
generator.summary()

print("\nDiscriminator Summary:")
discriminator.summary()

Generator Summary:



Discriminator Summary:


## Training

In [8]:
import tensorflow as tf
import numpy as np
import cv2
import os
import time
import glob
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, concatenate, BatchNormalization, LeakyReLU, ReLU
from tensorflow.keras.models import Model
import matplotlib.pyplot as plt

from Data_processing_functions import *
from Pix2Pix_model import *

# Hyperparameters
hyperparams = {
    'initial_filters': 48,
    'kernel_size': 5,
    'num_layers': 5,
    'dropout_rate': 0.5,
    'batch_norm': True,
    'lambda_l1': 100,
    'learning_rate': 1e-3,
    'beta_1': 0.5,
    'batch_size': 32,
    'epochs': 50,
    'dropout': True,
    'input_shape': (256, 256, 1)
}

# Define Data Directory
dir_path = 'data'
color_dir = os.path.join(dir_path, 'train_color')
black_dir = os.path.join(dir_path, 'train_black')

# List all images
color_images_paths = glob.glob(os.path.join(color_dir, '*.jpg'))
black_images_paths = glob.glob(os.path.join(black_dir, '*.jpg'))

# Ensure there are images in both directories
if not color_images_paths or not black_images_paths:
    raise ValueError("No images found in one or both directories.")

# Create dataset
dataset = create_dataset(black_images_paths, color_images_paths, target_size=(256, 256), batch_size=hyperparams['batch_size'])

# Shuffle the dataset
dataset = dataset.shuffle(buffer_size=len(color_images_paths), reshuffle_each_iteration=True)

# Initialize models
generator = Generator(hyperparams)
discriminator = Discriminator(hyperparams)

# Print model summaries
print("Generator Summary:")
generator.summary()

print("\nDiscriminator Summary:")
discriminator.summary()

# Define the optimizers
generator_optimizer = tf.keras.optimizers.Adam(learning_rate=hyperparams['learning_rate'], beta_1=hyperparams['beta_1'])
discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=hyperparams['learning_rate'], beta_1=hyperparams['beta_1'])

# Define the checkpoint directory
checkpoint_dir = './training_checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

# Create a checkpoint object
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator)

# Restore the latest checkpoint if it exists
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
    checkpoint.restore(latest_checkpoint)
    print(f"Restored from checkpoint: {latest_checkpoint}")
else:
    print("Starting fresh training")

# Train the model
gen_losses, disc_losses = model_fit(dataset, hyperparams, checkpoint_prefix)



# Define the directory path
model_dir = './Trained_models'

# Create the directory if it doesn't exist
os.makedirs(model_dir, exist_ok=True)

# Save the models
generator.save('./Trained_models/pix2pix_model_generator.keras')
discriminator.save('./Trained_models/pix2pix_model_discriminator.keras')

# Visualize losses
def visualize_losses(gen_losses, disc_losses):
    plt.figure(figsize=(12, 6))
    
    # Plot Generator and Discriminator Losses
    plt.plot(gen_losses, label='Generator Loss')
    plt.plot(disc_losses, label='Discriminator Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Generator and Discriminator Losses')

    plt.tight_layout()
    plt.show()

visualize_losses(gen_losses, disc_losses)


## Validating results

In [None]:
import tensorflow as tf
import numpy as np
import cv2
import os
import glob
import matplotlib.pyplot as plt
import random  # Import random module

from Data_processing_functions import load_and_preprocess_image, preprocess_lab_image, load_and_preprocess

# Create dataset for test images
def create_test_dataset(gray_image_paths, target_size=(256, 256), batch_size=32):
    gray_image_paths = tf.constant(gray_image_paths, dtype=tf.string)
    
    # Define the dataset from image paths
    dataset = tf.data.Dataset.from_tensor_slices(gray_image_paths)
    
    # Map the dataset to preprocess images
    dataset = dataset.map(lambda gray_path: load_and_preprocess_image(gray_path, target_size, is_grayscale=True), num_parallel_calls=tf.data.AUTOTUNE)
    
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

# Function to generate colorful images
def generate_images(model, test_input):
    prediction = model(test_input, training=False)  # Set training=False for inference
    return prediction

# Function to convert LAB to RGB
def lab_to_rgb(lab_image):
    lab_image_np = lab_image.numpy() * 255.0  # Convert to 0-255 range for OpenCV
    lab_image_np = lab_image_np.astype(np.uint8)
    rgb_image = cv2.cvtColor(lab_image_np, cv2.COLOR_LAB2RGB)
    rgb_image = rgb_image / 255.0  # Normalize to [0, 1]
    return rgb_image

# Function to show images
def show_images(gray_images, color_images, predicted_images, num_images):
    plt.figure(figsize=(18, 12))  # Adjusted height for 3 rows

    for i in range(num_images):
        # Display grayscale images (first row)
        plt.subplot(3, num_images, i + 1)
        plt.imshow(tf.squeeze(gray_images[i]), cmap='gray')
        plt.axis('off')
        plt.title('Grayscale Image')

        # Display original color images (second row)
        plt.subplot(3, num_images, i + 1 + num_images)
        plt.imshow(tf.squeeze(color_images[i]))
        plt.axis('off')
        plt.title('Original Color Image')

        # Display predicted images (third row)
        predicted_rgb = lab_to_rgb(predicted_images[i])  # Convert LAB to RGB
        plt.subplot(3, num_images, i + 1 + 2 * num_images)
        plt.imshow(predicted_rgb)
        plt.axis('off')
        plt.title('Predicted Color Image')

    plt.show()

# Define directories
black_test_dir = 'data/test_black'  # Update with your actual path
color_test_dir = 'data/test_color'   # Update with your actual path

# Load image paths
black_image_paths = glob.glob(os.path.join(black_test_dir, '*.jpg'))
color_image_paths = glob.glob(os.path.join(color_test_dir, '*.jpg'))

# Randomly select five image paths
black_images_test_paths = random.sample(black_image_paths, 5)
color_images_test_paths = random.sample(color_image_paths, 5)

# Ensure these paths are not empty
assert len(black_images_test_paths) > 0, "No black and white test images found."
assert len(color_images_test_paths) > 0, "No color test images found."



# Create the test dataset
test_dataset = create_test_dataset(black_images_test_paths, target_size=(256, 256), batch_size=32)

# Load the trained generator model
generator = tf.keras.models.load_model('./Trained_models/pix2pix_model_generator.keras')



# Collect images for display
predicted_images_list = []
original_images_list = []
gray_images_list = []

for gray_img_batch in test_dataset:
    generated_images = generate_images(generator, gray_img_batch)
    
    for i in range(generated_images.shape[0]):
        predicted_images_list.append(generated_images[i])
        
        # Get the grayscale image tensor from the dataset (assuming batches are consistent)
        gray_img = gray_img_batch[i]
        gray_images_list.append(gray_img)
        
        # Match the color image paths to generated images
        original_color_img_path = color_images_test_paths[i % len(color_images_test_paths)]
        original_color_img = load_and_preprocess_image(original_color_img_path, target_size=(256, 256), is_grayscale=False)
        original_images_list.append(original_color_img)

# Ensure the number of images to show is consistent
num_images = min(len(predicted_images_list), len(original_images_list), len(gray_images_list))
show_images(gray_images=gray_images_list, color_images=original_images_list, predicted_images=predicted_images_list, num_images=num_images)

In [2]:
ls

 Volume in drive C is OS
 Volume Serial Number is A056-5119

 Directory of C:\Users\mohds\OneDrive\Desktop\MS_AAI\Course_501_Intro_AI_A2\image-colorization-via-transfer-learning

07-08-2024  01:32    <DIR>          .
05-08-2024  20:55    <DIR>          ..
05-08-2024  19:41                54 .gitignore
07-08-2024  01:32    <DIR>          .ipynb_checkpoints
07-08-2024  00:52    <DIR>          __pycache__
05-08-2024  20:56    <DIR>          data
03-08-2024  23:15         7,765,010 Data_Processing.ipynb
07-08-2024  00:54             3,850 Data_processing_functions.py
07-08-2024  01:28           208,558 Full_model_pipeline.ipynb
03-08-2024  23:08             1,091 LICENSE
05-08-2024  20:56    <DIR>          Models
07-08-2024  00:55             7,943 Pix2Pix_model.py
07-08-2024  00:49             3,315 Pix2Pix_train.py
03-08-2024  23:08                 2 README.md
05-08-2024  20:56    <DIR>          Trained_models
06-08-2024  00:22    <DIR>          training_checkpoints
06-08-2024  11:52    