In [None]:
! pip install visualkeras

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

import tensorflow as tf

from keras.layers import Conv2D, UpSampling2D, Dense, MaxPooling2D, BatchNormalization, Dropout, BatchNormalization, Conv2DTranspose
from keras.models import Sequential

import random
import visualkeras

In [None]:
dataset_path = "drive/MyDrive/tiny-imagenet-200/"
models_path = 'drive/MyDrive/Università/Magistrale/VCS/models/'

In [None]:
SIZE = 64

## Dataset loading

In [None]:
X_train = np.load(dataset_path+'x_train.npy')
Y_train = np.load(dataset_path+'y_train.npy')

X_valid = np.load(dataset_path+'x_valid.npy')
Y_valid = np.load(dataset_path+'y_valid.npy')

X_test = np.load(dataset_path+'x_test.npy')
Y_test = np.load(dataset_path+'y_test.npy')

In [None]:
X_train = tf.image.convert_image_dtype(X_train, tf.float32)
Y_train = tf.image.convert_image_dtype(Y_train, tf.float32)

X_valid = tf.image.convert_image_dtype(X_valid, tf.float32)
Y_valid = tf.image.convert_image_dtype(Y_valid, tf.float32)

X_test = tf.image.convert_image_dtype(X_test, tf.float32)
Y_test = tf.image.convert_image_dtype(Y_test, tf.float32)

In [None]:
print(f"{len(X_train)} training examples")
print(f"{len(X_valid)} validation examples")
print(f"{len(X_test)} test examples")

## Sample of the dataset

In [None]:
from mpl_toolkits.axes_grid1 import ImageGrid
from skimage.color import lab2rgb

samples = []
for i in range(12):
  samples.append(round(random.random()*len(X_train)))

images = []
for i in samples:    
    
    img = np.zeros((SIZE, SIZE, 3))
    img[:,:,0] = X_train[i][:,:,0]
    img[:,:,1:] = Y_train[i]*128
    images.append(lab2rgb(img))
    
fig = plt.figure(figsize=(8., 8.))
grid = ImageGrid(fig, 111,  
                 nrows_ncols=(4, 3), 
                 axes_pad=0.1,
                 )

for ax, im in zip(grid, images):
    ax.imshow(im)
    ax.set_yticklabels([])
    ax.set_xticklabels([])

plt.show()

## CNN

Baseline model to test the correctness of the learning framework.

In [None]:
model = Sequential(name=("CNN"))

model.add(Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=(SIZE, SIZE, 1)))    
model.add(BatchNormalization())
model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))    
model.add(BatchNormalization())
model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
model.add(BatchNormalization())

model.add(Conv2D(2, (1, 1), activation='tanh', padding='valid'))    

## Polychromify
Main model: deep convolutional autoencoder.

In [None]:
model = Sequential(name=("Polychromify"))

model.add(Conv2D(64, (3, 3), activation='relu', padding='same', strides=2, input_shape=(SIZE, SIZE, 1)))
model.add(Conv2D(128, (3, 3), activation='relu', strides=2, padding='same'))
model.add(Conv2D(256, (3, 3), activation='relu', strides=2, padding='same'))

model.add(BatchNormalization())

model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(128, activation='relu'))

model.add(BatchNormalization())

model.add(Conv2D(256, (3,3), activation='relu', padding='same'))
model.add(UpSampling2D((2, 2)))
model.add(Conv2D(128, (3,3), activation='relu', padding='same'))
model.add(UpSampling2D((2, 2)))
model.add(Conv2D(64, (3,3), activation='relu', padding='same'))
model.add(UpSampling2D((2, 2)))

# Output layer
model.add(Conv2D(2, (1,1), activation='tanh', padding='valid'))

## Polychromify (overfit)

In [None]:
# I want to test a more aggressive colorization despite having lower generalization metrics values (metrics vs qualitative results problem)
# In this model I simply removed the dropout and trained for 50 epochs, in this way we achieve a complete overfitting.

model = Sequential(name=("Polychromify-overfit"))

model.add(Conv2D(64, (3, 3), activation='relu', padding='same', strides=2, input_shape=(SIZE, SIZE, 1)))
model.add(Conv2D(128, (3, 3), activation='relu', strides=2, padding='same'))
model.add(Conv2D(256, (3, 3), activation='relu', strides=2, padding='same'))

model.add(BatchNormalization())

model.add(Dense(128, activation='relu'))
# model.add(Dropout(0.5))
model.add(Dense(64, activation='relu'))
# model.add(Dropout(0.5))
model.add(Dense(128, activation='relu'))

model.add(BatchNormalization())

model.add(Conv2D(256, (3,3), activation='relu', padding='same'))
model.add(UpSampling2D((2, 2)))
model.add(Conv2D(128, (3,3), activation='relu', padding='same'))
model.add(UpSampling2D((2, 2)))
model.add(Conv2D(64, (3,3), activation='relu', padding='same'))
model.add(UpSampling2D((2, 2)))

# Output layer
model.add(Conv2D(2, (1,1), activation='tanh', padding='valid'))

## Compile & train

In [None]:
def PSNR(y_true,y_pred):
  return tf.image.psnr(y_true, y_pred, max_val=1.0)

In [None]:
model.compile(optimizer="adam", loss='mse', metrics=[PSNR])
model.summary()
visualkeras.layered_view(model)

In [None]:
percentage = 1
training_size = round(len(X_train)*percentage)
print(f"Using {percentage*100}% of the available training data:\ntrain: {training_size}/{len(X_train)}")

In [None]:
history = model.fit(
    X_train[:training_size], Y_train[:training_size],
    validation_data=(X_valid, Y_valid),
    epochs=25,  # Polychromify = 25 | Polychromify-overfit = 50
    batch_size=64)

## Plots, validation & model save

In [None]:
def plot_history(history,model_name):
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))
    ax = axes.ravel()

    # Loss
    ax[0].plot(history['loss'])
    ax[0].plot(history['val_loss'])
    ax[0].set_title('model loss')
    ax[0].set_ylabel('loss')
    ax[0].set_xlabel('epochs')
    ax[0].legend(['train', 'validation'], loc='upper right')
    
    # PSNR
    ax[1].plot(history['PSNR'])
    ax[1].plot(history['val_PSNR'])
    ax[1].set_title('model PSNR')
    ax[1].set_ylabel('PSNR')
    ax[1].set_xlabel('epochs')
    ax[1].legend(['train', 'validation'], loc='upper left')

    fig.tight_layout()
    fig.suptitle(model_name,fontsize=16)
    plt.show()      

In [None]:
plot_history(history.history,model.name)

In [None]:
print("validation")
model.evaluate(X_valid, Y_valid, batch_size=64, verbose=1)
print("test")
model.evaluate(X_test, Y_test, batch_size=64, verbose=1)

In [None]:
model.save(models_path+model.name) 

## Load model

In [None]:
model_name = "Polychromify" # change here to load another one
model_loaded = tf.keras.models.load_model(models_path+model_name, custom_objects={'PSNR':PSNR})

In [None]:
model_loaded.summary()
visualkeras.layered_view(model_loaded)

In [None]:
print("validation")
model_loaded.evaluate(X_valid, Y_valid, batch_size=64, verbose=1)
print("test")
model_loaded.evaluate(X_test, Y_test, batch_size=64, verbose=1)

## Visualization

In [None]:
from skimage.color import rgb2lab, lab2rgb
from skimage.transform import resize
from skimage.metrics import structural_similarity as ssim
from skimage.metrics import peak_signal_noise_ratio as psnr
from keras.preprocessing.image import ImageDataGenerator, img_to_array, load_img, array_to_img

In [None]:
def plot_comparison(img_resized,img_recolored, figsize=(7,3),cast=False):     
    
    fig, axes = plt.subplots(1, 2, figsize=figsize)
    ax = axes.ravel()

    ax[0].imshow(img_resized)
    ax[0].set_title("Resized")
    
    ax[1].imshow(img_recolored)
    ax[1].set_title("Recolored")

    fig.tight_layout()
    plt.show()     

In [None]:
X_data = X_test
Y_data = Y_test
for i, x in enumerate(X_data[:10]):     
    
    img_color = np.array([x], dtype=float)

    output = model.predict(img_color)
    
    result = np.zeros((SIZE, SIZE, 3))
    result[:,:,0] = x[:,:,0]
    result[:,:,1:] = output[0]*128

    recolored = lab2rgb(result)
    
    original = np.zeros((SIZE, SIZE, 3))
    original[:,:,0] = x[:,:,0]
    original[:,:,1:] = Y_data[i]*128
    
    original = lab2rgb(original)
    
    ssim = tf.image.ssim(original, recolored, max_val=1.0, filter_size=11,
                          filter_sigma=1.5, k1=0.01, k2=0.03)
    print(f"SSIM {ssim}")
    print(f"PSNR {PSNR(original,recolored)}")
    plot_comparison(original, recolored)

## Metric evaluation

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
SSIM_tot = 0
PSNR_tot = 0
size = len(X_test)
for i, x in enumerate(tqdm(X_test[:size])):     
    
    img_color = np.array([x], dtype=float)

    output = model.predict(img_color)

    result = np.zeros((SIZE, SIZE, 3))
    result[:,:,0] = x[:,:,0]
    result[:,:,1:] = output[0]*128

    recolored = lab2rgb(result)
    
    original = np.zeros((SIZE, SIZE, 3))
    original[:,:,0] = x[:,:,0]
    original[:,:,1:] = Y_test[i]*128
    
    original = lab2rgb(original)
    
    ssim = tf.image.ssim(original, recolored, max_val=1.0, filter_size=11,
                          filter_sigma=1.5, k1=0.01, k2=0.03)
    SSIM_tot += ssim
    PSNR_tot += PSNR(original,recolored)

print(f"SSIM {SSIM_tot/size} - PSNR {PSNR_tot/size}")

## Separate training test

In [None]:
model_A = Sequential(name=("Polychromify_A"))

model_A.add(Conv2D(64, (3, 3), activation='relu', strides=2, padding='same', input_shape=(SIZE, SIZE, 1)))
model_A.add(Conv2D(128, (3, 3), activation='relu', strides=2, padding='same'))
model_A.add(Conv2D(256, (3, 3), activation='relu', strides=2, padding='same'))

model_A.add(BatchNormalization())

model_A.add(Dense(128, activation='relu'))
model_A.add(Dropout(0.5))
model_A.add(Dense(64, activation='relu'))
model_A.add(Dropout(0.5))
model_A.add(Dense(128, activation='relu'))

model_A.add(BatchNormalization())

model_A.add(Conv2D(256, (3, 3), activation='relu', padding='same'))
model_A.add(UpSampling2D((2, 2)))
model_A.add(Conv2D(128, (3,3), activation='relu', padding='same'))
model_A.add(UpSampling2D((2, 2)))
model_A.add(Conv2D(64, (3,3), activation='relu', padding='same'))
model_A.add(UpSampling2D((2, 2)))

# Output layer
model_A.add(Conv2D(1, (1, 1), activation='tanh', padding='valid'))

In [None]:
model_B = Sequential(name=("Polychromify_B"))

model_B.add(Conv2D(64, (3, 3), activation='relu', strides=2, padding='same', input_shape=(SIZE, SIZE, 1)))
model_B.add(Conv2D(128, (3, 3), activation='relu', strides=2, padding='same'))
model_B.add(Conv2D(256, (3, 3), activation='relu', strides=2, padding='same'))

model_B.add(BatchNormalization())

model_B.add(Dense(128, activation='relu'))
model_B.add(Dropout(0.5))
model_B.add(Dense(64, activation='relu'))
model_B.add(Dropout(0.5))
model_B.add(Dense(128, activation='relu'))

model_B.add(BatchNormalization())

model_B.add(Conv2D(256, (3, 3), activation='relu', padding='same'))
model_B.add(UpSampling2D((2, 2)))
model_B.add(Conv2D(128, (3,3), activation='relu', padding='same'))
model_B.add(UpSampling2D((2, 2)))
model_B.add(Conv2D(64, (3,3), activation='relu', padding='same'))
model_B.add(UpSampling2D((2, 2)))

# Output layer
model_B.add(Conv2D(1, (1, 1), activation='tanh', padding='valid'))

In [None]:
model_A.compile(optimizer="adam", loss='mse', metrics=[PSNR])
model_B.compile(optimizer="adam", loss='mse', metrics=[PSNR])

In [None]:
model_A.summary()
visualkeras.layered_view(model_A)

In [None]:
model_B.summary()
visualkeras.layered_view(model_B)

In [None]:
history_A = model_A.fit(
    X_train[:training_size], Y_train[:,:,:,0][:training_size],
    validation_data=(X_valid, Y_valid[:,:,:,0]),
    epochs=30, 
    batch_size=64)

In [None]:
history_B = model_B.fit(
    X_train[:training_size], Y_train[:,:,:,1][:training_size],
    validation_data=(X_valid, Y_valid[:,:,:,1]),
    epochs=30, 
    batch_size=64)

In [None]:
plot_history(history_A.history,model_A.name)
plot_history(history_B.history,model_B.name)

In [None]:
model_A.save(models_path+model_A.name)
model_B.save(models_path+model_B.name)

### Visualization

In [None]:
for i, x in enumerate(X_test[:10]):     
    
    img_color = []
    img_color.append(x)
    img_color = np.array(img_color, dtype=float)

    output_A = model_A.predict(img_color)
    output_A = output_A*128

    output_B = model_B.predict(img_color)
    output_B = output_B*128
    
    result = np.zeros((SIZE, SIZE, 3))
    result[:,:,0] = img_color[0][:,:,0]

    result[:,:,1] = output_A[0][:,:,0]
    result[:,:,2] = output_B[0][:,:,0]

    recolored = lab2rgb(result)
    
    original = np.zeros((SIZE, SIZE, 3))
    original[:,:,0] = x[:,:,0]
    original[:,:,1:] = Y_test[i]*128
    
    original = lab2rgb(original)

    ssim = tf.image.ssim(original, recolored, max_val=1.0, filter_size=11,
                          filter_sigma=1.5, k1=0.01, k2=0.03)
    print(f"SSIM {ssim}")
    print(f"PSNR {PSNR(original,recolored)}")

    plot_comparison(original, recolored)

### Metrics

In [None]:
SSIM_tot = 0
PSNR_tot = 0

size = len(X_test)
for i, x in enumerate(tqdm(X_test[:size])):     
    
    img_color = []
    img_color.append(x)
    img_color = np.array(img_color, dtype=float)

    output_A = model_A.predict(img_color)
    output_A = output_A*128

    output_B = model_B.predict(img_color)
    output_B = output_B*128
    
    result = np.zeros((SIZE, SIZE, 3))
    result[:,:,0] = img_color[0][:,:,0]

    result[:,:,1] = output_A[0][:,:,0]
    result[:,:,2] = output_B[0][:,:,0]

    recolored = lab2rgb(result)
    
    original = np.zeros((SIZE, SIZE, 3))
    original[:,:,0] = x[:,:,0]
    original[:,:,1:] = Y_test[i]*128
    
    original = lab2rgb(original)

    ssim = tf.image.ssim(original, recolored, max_val=1.0, filter_size=11,
                          filter_sigma=1.5, k1=0.01, k2=0.03)
    SSIM_tot += ssim
    PSNR_tot += PSNR(original,recolored)
print(f"SSIM {SSIM_tot/size} - PSNR {PSNR_tot/size}")