### CSI 4106 - Introduction to Artificial Intelligence - Project W2022

### Simon Paquette - spaqu044@uottawa.ca - 300044038

### Image Colorization


In [None]:
import shutil
import time
from pathlib import Path
from typing import Dict, List, Tuple

import cv2 as cv
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from keras.layers import (
    BatchNormalization,
    Conv2D,
    Input,
    LeakyReLU,
    MaxPooling2D,
    ReLU,
    RepeatVector,
    Reshape,
    UpSampling2D,
    concatenate,
)
from keras.models import Model, load_model
from sklearn.model_selection import train_test_split

np.random.seed(42)


In [None]:
# my gpu: GTX1650 4gb
import GPUtil

GPUs = GPUtil.getGPUs()
for i, gpu in enumerate(GPUs):
    print(
        "GPU {:d} ... Mem Free: {:.0f}MB / {:.0f}MB | Utilization {:3.0f}%".format(
            i, gpu.memoryFree, gpu.memoryTotal, gpu.memoryUtil * 100
        )
    )


In [None]:
WIDTH = 128
HEIGHT = 128
SIZE = (WIDTH, HEIGHT)
BATCH = 8


def make_dir(name: str) -> Path:
    """
    Create and return directory name

    Args:
        name (str): directory name

    Returns:
        Path: directory name
    """
    path = Path(name)
    if not path.exists():
        path.mkdir()
    return path


IMAGES_DIR = make_dir("images")
MODELS_DIR = make_dir("models")
LOGS_DIR = make_dir("logs")
TMP_DIR = make_dir("tmp")

# List of callback to be applied in model.fit()
early_stop = tf.keras.callbacks.EarlyStopping(
    patience=5, restore_best_weights=True, verbose=1
)
checkpoints = tf.keras.callbacks.ModelCheckpoint(
    TMP_DIR.joinpath(str(int(time.time()))), verbose=1
)
tensorboard = tf.keras.callbacks.TensorBoard(LOGS_DIR.joinpath(str(int(time.time()))))
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(patience=3, verbose=1)

CALLBACKS = [early_stop, checkpoints, tensorboard, reduce_lr]


In [None]:
def del_log_tmp():
    """
    Delete the files within logs folder (tensorboard log) and tmp folder (model checkpoint)
    """
    logs = list(LOGS_DIR.iterdir())
    tmps = list(TMP_DIR.iterdir())
    names = logs + tmps
    for name in names:
        if name.is_dir():
            shutil.rmtree(name)
        elif name.is_file():
            name.unlink()


del_log_tmp()


In [None]:
def preprocess_image(image_path: Path) -> Tuple[np.ndarray, np.ndarray]:
    """
    Open, resize, convert to LAB, and normalize the image

    Args:
        image_path (Path): image to be opened

    Returns:
        Tuple[np.ndarray, np.ndarray]: L channel and AB channel
    """
    image = cv.imread(str(image_path), cv.IMREAD_COLOR)
    image = cv.resize(image, SIZE)
    image = image.astype(np.uint8)
    image = cv.cvtColor(image, cv.COLOR_BGR2LAB)
    image = image.astype(np.float32)
    image /= 255.0
    l, a, b = cv.split(image)
    ab = cv.merge([a, b])
    return (l, ab)


def construct_image(l_channel: np.ndarray, ab_channel: np.ndarray) -> np.ndarray:
    """
    Merge LAB, clip, range value between 0-255, and convert to RGB

    Args:
        l_channel (np.ndarray): L* lightness
        ab_channel (np.ndarray): a* green-red && b* yellow-blue

    Returns:
        np.ndarray: RGB image
    """
    image = cv.merge([l_channel, ab_channel])
    image = np.clip(image, 0, 1)
    image *= 255
    image = image.astype(np.uint8)
    image = cv.cvtColor(image, cv.COLOR_LAB2RGB)
    return image


def load_dataset(directory: Path) -> Tuple:
    """
    Extract features (L channel) and labels (ab channels)

    Args:
        directory (Path): folder with images dataset

    Returns:
        Tuple: x_train, x_test, y_train, y_test
    """
    pathnames = list(directory.iterdir())
    x, y = [], []
    for pathname in pathnames:
        l_channel, ab_channel = preprocess_image(pathname)
        x.append(l_channel)
        y.append(ab_channel)
    features = np.array(x)
    labels = np.array(y)
    # train(70)/val(15)/test(15)
    x_train, x_test, y_train, y_test = train_test_split(
        features, labels, test_size=0.15, random_state=42
    )
    return (x_train, x_test, y_train, y_test)


def plot_images(image_dict: Dict[str, np.ndarray]):
    """
    Show the given images (1-4) side by side.

    Args:
        image_dict (Dict[str, np.ndarray]): title, image
    """

    keys = list(image_dict.keys())
    n = len(image_dict)
    assert n <= 4, "Limit of 4 images side by side"
    fig, images = plt.subplots(1, n)
    fig.set_size_inches(n * 5, 5)
    if n == 1:
        images = [images]
    for index, im in enumerate(images):
        im.axis("off")
        title = keys[index]
        im.set_title(title)
        image = image_dict[title]
        if image.ndim != 3:
            im.imshow(image, cmap="gray")
        else:
            im.imshow(image)


def create_model_colorization(model_func):
    """
    Define the colorization model

    Args:
        model_func (_type_): a function that create a TF model with different testing layer

    Returns:
        _type_: a TF colorization model with layers
    """
    input_layer = Input(shape=(HEIGHT, WIDTH, 1))
    output_layer = model_func(input_layer)
    model_colorization = Model(inputs=input_layer, outputs=output_layer)
    model_colorization.summary()
    return model_colorization


def train_pipeline(
    model_func, compile_func, fit_func, features: np.ndarray, labels: np.ndarray
):
    """
    A pipeline to apply a training step. A model creation, compilation and fit from training data and labels

    Args:
        model_func (_type_): a function that create a TF model with different testing layers
        compile_func (_type_): a function that compile a TF model with different testing optimizers, losses and metrics
        fit_func (_type_): a function that fit a TF model with different testing parameters like epochs
        data (np.ndarray): training data
        labels (np.ndarray): training labels

    Returns:
        _type_: a trained TF colorization model
    """
    model = create_model_colorization(model_func)
    compile_func(model)
    fit_func(model, features, labels)
    return model


def predict_ab_channel(l_channel: np.ndarray, model) -> np.ndarray:
    """
    Color prediction (ab_channel) from an image (using the grayscale L*) created by the model

    Args:
        l_channel (np.ndarray): input
        model (_type_): a TF model for colorization

    Returns:
        np.ndarray: a new predicted ab channel
    """
    grayscale = l_channel.reshape(1, HEIGHT, WIDTH, 1)
    ab_prediction = model.predict(grayscale, batch_size=BATCH)
    ab_prediction = ab_prediction.reshape(HEIGHT, WIDTH, 2)
    return ab_prediction


def test_eval(model, features: np.ndarray, labels: np.ndarray):
    """
    Evaluate the model with testing data and labels

    Args:
        model (_type_): a trained TF colorization model
        features (np.ndarray): testing features
        labels (np.ndarray): testing labels
    """
    model.evaluate(features, labels, batch_size=BATCH)


def save_my_model(model, name: str):
    """
    Save a model into the folder MODELS_DIR

    Args:
        model (_type_): a trained model
        name (str): model name
    """
    model.save(f"{MODELS_DIR}/{name}.h5")


def load_my_model(name: str):
    """
    Load a model from the folder MODELS_DIR

    Args:
        name (str): model name

    Returns:
        _type_: a trained model
    """
    model = load_model(f"{MODELS_DIR}/{name}.h5")
    return model


In [None]:
def model_classic(input_layer):
    model_ = Conv2D(16, (3, 3), padding="same", strides=1)(input_layer)
    model_ = LeakyReLU()(model_)
    # model_ = Conv2D(64,(3,3), activation='relu',strides=1)(model_)
    model_ = Conv2D(32, (3, 3), padding="same", strides=1)(model_)
    model_ = LeakyReLU()(model_)
    model_ = BatchNormalization()(model_)
    model_ = MaxPooling2D(pool_size=(2, 2), padding="same")(model_)
    model_ = Conv2D(64, (3, 3), padding="same", strides=1)(model_)
    model_ = LeakyReLU()(model_)
    model_ = BatchNormalization()(model_)
    model_ = MaxPooling2D(pool_size=(2, 2), padding="same")(model_)
    model_ = Conv2D(128, (3, 3), padding="same", strides=1)(model_)
    model_ = LeakyReLU()(model_)
    model_ = BatchNormalization()(model_)
    model_ = Conv2D(256, (3, 3), padding="same", strides=1)(model_)
    model_ = LeakyReLU()(model_)
    model_ = BatchNormalization()(model_)
    model_ = UpSampling2D((2, 2))(model_)
    model_ = Conv2D(128, (3, 3), padding="same", strides=1)(model_)
    model_ = LeakyReLU()(model_)
    model_ = BatchNormalization()(model_)
    model_ = UpSampling2D((2, 2))(model_)
    model_ = Conv2D(64, (3, 3), padding="same", strides=1)(model_)
    model_ = LeakyReLU()(model_)
    # model_ = BatchNormalization()(model_)
    concat_ = concatenate([model_, input_layer])
    model_ = Conv2D(64, (3, 3), padding="same", strides=1)(concat_)
    model_ = LeakyReLU()(model_)
    model_ = BatchNormalization()(model_)
    model_ = Conv2D(32, (3, 3), padding="same", strides=1)(model_)
    model_ = LeakyReLU()(model_)
    # model_ = BatchNormalization()(model_)
    # NOTE: BECAUSE MY VALUES ARE BETWEEN 0-1,  model_ = Conv2D(2, (3, 3), activation="sigmoid", padding="same", strides=1)(model_)
    model_ = Conv2D(2, (3, 3), activation="tanh", padding="same", strides=1)(model_)
    return model_


def model_basic_conv(input_layer):
    model = Conv2D(16, (3, 3), activation="relu", padding="same", strides=1)(
        input_layer
    )
    model = Conv2D(32, (3, 3), activation="relu", padding="same", strides=1)(model)
    model = Conv2D(64, (3, 3), activation="relu", padding="same", strides=1)(model)
    model = Conv2D(32, (3, 3), activation="relu", padding="same", strides=1)(model)
    model = Conv2D(16, (3, 3), activation="relu", padding="same", strides=1)(model)
    model = Conv2D(2, (3, 3), activation="relu", padding="same", strides=1)(model)
    return model


def model_conv_sampling(input_layer):
    model = Conv2D(64, (3, 3), activation="relu", padding="same")(input_layer)
    model = Conv2D(64, (3, 3), activation="relu", padding="same", strides=2)(model)
    model = Conv2D(128, (3, 3), activation="relu", padding="same")(model)
    model = Conv2D(128, (3, 3), activation="relu", padding="same", strides=2)(model)
    model = Conv2D(256, (3, 3), activation="relu", padding="same")(model)
    model = Conv2D(256, (3, 3), activation="relu", padding="same", strides=2)(model)
    model = Conv2D(512, (3, 3), activation="relu", padding="same")(model)
    model = Conv2D(256, (3, 3), activation="relu", padding="same")(model)
    model = Conv2D(128, (3, 3), activation="relu", padding="same")(model)
    model = UpSampling2D((2, 2))(model)
    model = Conv2D(64, (3, 3), activation="relu", padding="same")(model)
    model = UpSampling2D((2, 2))(model)
    model = Conv2D(32, (3, 3), activation="relu", padding="same")(model)
    model = Conv2D(2, (3, 3), activation="relu", padding="same")(model)
    model = UpSampling2D((2, 2))(model)
    return model


def compile_adam_mse(model):
    model.compile(optimizer="adam", loss="mse")


def compile_adam_rmse(model):
    model.compile(optimizer="adam", loss="rmse")


def compile_adam_mae(model):
    model.compile(optimizer="adam", loss="mae")


def compile_rmsprop_mse(model):
    model.compile(optimizer="rmsprop", loss="mse")


def compile_rmsprop_rmse(model):
    model.compile(optimizer="rmsprop", loss="rmse")


def compile_rmsprop_mae(model):
    model.compile(optimizer="rmsprop", loss="mae")


def fit_callbacks(model, features, labels):
    # train(70)/val(15)/test(15)
    model.fit(
        features,
        labels,
        epochs=50,
        validation_split=0.15,
        batch_size=BATCH,
        callbacks=CALLBACKS,
    )


In [None]:
# Get training and testing dataset from "./images"
x_train, x_test, y_train, y_test = load_dataset(IMAGES_DIR)


In [None]:
# Model parameters
model_name = "test"
model_layers = model_basic_conv
model_compile = compile_adam_mse
model_fit = fit_callbacks

# Model construction
model = train_pipeline(model_layers, model_compile, model_fit, x_train, y_train)
save_my_model(model, model_name)
print("\n\nTEST\n----")
test_eval(model, x_test, y_test)


In [None]:
# Show some results images
START = 20
N_IMAGES = 2
for n_jpg in range(START, START + N_IMAGES):
    image_path = Path(IMAGES_DIR, f"{n_jpg}.jpg")

    name_1 = "test"
    name_2 = "test"

    model_1 = load_my_model(name_1)

    l_channel, ab_channel = preprocess_image(image_path)
    new_ab_channel = predict_ab_channel(l_channel, model_1)
    original_image = construct_image(l_channel, ab_channel)
    new_image = construct_image(l_channel, new_ab_channel)

    im_show = {
        "REAL IMAGE": original_image,
        "GRAYSCALE": l_channel,
        "PREDICT": new_image,
    }
    plot_images(im_show)
