Training notebook for:

Model 1: AE without fusion

Model 2: AE with fusion

In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


Loading libraries:

In [None]:
from keras.layers import Conv2D, Conv2DTranspose, UpSampling2D, Reshape, concatenate, Layer, Flatten, Dense
from keras.layers.core import RepeatVector
from keras.models import Sequential, Model
from keras.utils import plot_model
from keras.preprocessing.image import ImageDataGenerator, img_to_array, load_img
from keras import Input, datasets, metrics, optimizers
from keras.applications.inception_resnet_v2 import InceptionResNetV2, preprocess_input, decode_predictions
from keras.callbacks import EarlyStopping
from keras.callbacks import ModelCheckpoint
from skimage.color import rgb2lab, lab2rgb, grey2rgb, rgb2grey
from skimage.transform import resize
from skimage.io import imsave, imshow, imread
import numpy as np
import tensorflow as tf
import os
from sklearn.model_selection import train_test_split
from google.colab import files
import time
from cv2 import resize, INTER_AREA

In [None]:
inception = InceptionResNetV2(weights='imagenet', include_top=True)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_resnet_v2/inception_resnet_v2_weights_tf_dim_ordering_tf_kernels.h5


Data loader:

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    """Generates data for Keras
    Sequence based data generator. Suitable for building data generator for training and prediction.
    """
    def __init__(self, list_IDs, image_path,
                 to_fit=True, batch_size=32, dim=(256, 256), shuffle=True, fusion=False, fusion_path='fusion/'):
        """Initialization

        :param list_IDs: list of all 'label' ids to use in the generator
        :param image_path: path to images location
        :param to_fit: True to return X and y, False to return X only
        :param batch_size: batch size at each iteration
        :param dim: tuple indicating image dimension
        :param shuffle: True to shuffle label indexes after every epoch
        :param fusion: True to return X and X_fusion, False returns X
        """
        self.list_IDs = list_IDs
        self.image_path = image_path
        self.to_fit = to_fit
        self.batch_size = batch_size
        self.dim = dim
        self.shuffle = shuffle
        self.fusion = fusion
        self.fusion_path = fusion_path
        self.on_epoch_end()

    def __len__(self):
        """Denotes the number of batches per epoch

        :return: number of batches per epoch
        """
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        """Generate one batch of data

        :param index: index of the batch
        :return: X and y when fitting. X only when predicting
        """
        # Generate indexes of the batch
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        X = self._generate_X(list_IDs_temp)
        if self.to_fit:
            y = self._generate_y(list_IDs_temp)
            return X, y
        else:
            return X

    def on_epoch_end(self):
        """Updates indexes after each epoch

        """
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def _generate_X(self, list_IDs_temp):
        """Generates data containing batch_size images

        :param list_IDs_temp: list of label ids to load
        :return: batch of images
        """
        # Initialization
        X = np.empty((self.batch_size, *self.dim, 1))
        if self.fusion:
          X_fusion = np.empty((self.batch_size, 1000))

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            # Store sample
            X[i,] = self._load_lab_grayscale_image(self.image_path + ID)
            if self.fusion:
              X_fusion[i,] = self._load_fusion(self.fusion_path + ID)

        if self.fusion:
          return [X, X_fusion]
        else:
          return X

    def _generate_y(self, list_IDs_temp):
        """Generates data containing batch_size masks

        :param list_IDs_temp: list of label ids to load
        :return: batch if masks
        """
        y = np.empty((self.batch_size, *self.dim, 2))

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            # Store sample
            y[i,] = self._load_lab_color_image(self.image_path + ID)

        return y

    def _load_lab_grayscale_image(self, image_path):
      img = imread(image_path)
      img = img*(1.0/255)
      img = resize(img, (256, 256))
      if img.shape == (256, 256):
        img = grey2rgb(img)
      img = rgb2lab(img)
      img =(img[:,:,0]).reshape(img[:,:,0].shape+(1,))
      return img

    def _load_fusion(self, image_path):
      with open(image_path, 'rb') as f:
          embed = np.load(f)
          return embed 

    def _load_lab_color_image(self, image_path):
      img = imread(image_path)
      img = img*(1.0/255)
      img = resize(img, (256, 256))
      if img.shape == (256, 256):
        img = grey2rgb(img)
      img = rgb2lab(img)
      img = img[:,:,1:]*(1.0/128)
      return img

Loading data list, making inception-resnet-v2 representations

In [None]:
with open("places_sample_train.txt") as f:
  train_list = f.readlines()
train_list = [x[:-1] for x in train_list]

with open("places_sample_val.txt") as f:
  val_list = f.readlines()
val_list = [x[:-1] for x in val_list]

In [None]:
!mkdir fusion

In [None]:
for image in val_list:
      img = imread('./gdrive/MyDrive/val_256/'+ image)
      img = img*(1.0/255)
      img = grey2rgb(rgb2grey(img))
      img = resize(img, (299, 299))
      img = preprocess_input(img)
      img = img.reshape((1,) + img.shape)
      embed = inception.predict(img)
      with open('./fusion/' + image, 'wb') as f:
          np.save(f, embed)

In [None]:
for image in train_list:
      img = imread('./gdrive/MyDrive/val_256/'+ image)
      img = img*(1.0/255)
      img = grey2rgb(rgb2grey(img))
      img = resize(img, (299, 299))
      img = preprocess_input(img)
      img = img.reshape((1,) + img.shape)
      embed = inception.predict(img)
      with open('./fusion/' + image, 'wb') as f:
          np.save(f, embed)

Model 1 AE below:

In [None]:
#Vanilla Autoencoder
encoder_input = Input(
    shape=(256, 256, 1,), name="input"
) 

encoder_output = Conv2D(64, (3,3), activation='relu', padding='same', strides=2)(encoder_input)
encoder_output = Conv2D(128, (3, 3), activation='relu', padding='same')(encoder_output)
encoder_output = Conv2D(128, (3, 3), activation='relu', padding='same', strides=2)(encoder_output)
encoder_output = Conv2D(256, (3, 3), activation='relu', padding='same')(encoder_output)
encoder_output = Conv2D(256, (3, 3), activation='relu', padding='same', strides=2)(encoder_output)
encoder_output = Conv2D(512, (3, 3), activation='relu', padding='same')(encoder_output)
encoder_output = Conv2D(256, (3, 3), activation='relu', padding='same')(encoder_output)

decoder_output = Conv2D(128, (3, 3), activation='relu', padding='same')(encoder_output)
decoder_output = UpSampling2D((2, 2))(decoder_output)
decoder_output = Conv2D(64, (3, 3), activation='relu', padding='same')(decoder_output)
decoder_output = UpSampling2D((2, 2))(decoder_output)
decoder_output = Conv2D(32, (3, 3), activation='relu', padding='same')(decoder_output)
decoder_output = Conv2D(2, (3, 3), activation='tanh', padding='same')(decoder_output)
decoder_output = UpSampling2D((2, 2))(decoder_output)

model = Model(inputs=encoder_input, outputs=decoder_output)
model.compile(optimizer='adam', loss='mse', metrics=['accuracy'])
model.summary()
plot_model(model, show_shapes=True)

Training model 1:

In [None]:
image_path = "gdrive/MyDrive/val_256/"
train_datagen = DataGenerator(train_list, image_path, fusion = False, batch_size = 100)
val_datagen = DataGenerator(val_list, image_path, fusion = False, batch_size = 100)

In [None]:
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=7)
mc = ModelCheckpoint('gdrive/MyDrive/colorize_autoencoder_10000_50.model', monitor='val_accuracy', mode='max', verbose=1, save_best_only=True)
model.fit(train_datagen, validation_data = val_datagen, epochs=100, callbacks=[es, mc])

Model 2 AE with fusion below:

In [None]:
# Vanilla Autoencoder + inception-resnet-v2

encoder_input = Input(
    shape=(256, 256, 1,), name="input"
) 

embed_input = Input(shape=(1000,))

encoder_output = Conv2D(64, (3,3), activation='relu', padding='same', strides=2)(encoder_input)
encoder_output = Conv2D(128, (3, 3), activation='relu', padding='same')(encoder_output)
encoder_output = Conv2D(128, (3, 3), activation='relu', padding='same', strides=2)(encoder_output)
encoder_output = Conv2D(256, (3, 3), activation='relu', padding='same')(encoder_output)
encoder_output = Conv2D(256, (3, 3), activation='relu', padding='same', strides=2)(encoder_output)
encoder_output = Conv2D(512, (3, 3), activation='relu', padding='same')(encoder_output)
encoder_output = Conv2D(256, (3, 3), activation='relu', padding='same')(encoder_output)

fusion_output = RepeatVector(32 * 32)(embed_input) 
fusion_output = Reshape(([32, 32, 1000]))(fusion_output)
fusion_output = concatenate([encoder_output, fusion_output], axis=3) 
fusion_output = Conv2D(256, (1, 1), activation='relu', padding='same')(fusion_output) 

decoder_output = Conv2D(128, (3, 3), activation='relu', padding='same')(fusion_output)
decoder_output = UpSampling2D((2, 2))(decoder_output)
decoder_output = Conv2D(64, (3, 3), activation='relu', padding='same')(decoder_output)
decoder_output = UpSampling2D((2, 2))(decoder_output)
decoder_output = Conv2D(32, (3, 3), activation='relu', padding='same')(decoder_output)
decoder_output = Conv2D(2, (3, 3), activation='tanh', padding='same')(decoder_output)
decoder_output = UpSampling2D((2, 2))(decoder_output)

model = Model(inputs=[encoder_input, embed_input], outputs=decoder_output)
model.compile(optimizer='adam', loss='mse', metrics=['accuracy'])
model.summary()
plot_model(model, show_shapes=True)

Training model 2:

In [None]:
image_path = "gdrive/MyDrive/val_256/"
train_datagen = DataGenerator(train_list, image_path, fusion = True, batch_size = 100)
val_datagen = DataGenerator(val_list, image_path, fusion = True, batch_size = 100)

In [None]:
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=7)
mc = ModelCheckpoint('gdrive/MyDrive/colorize_autoencoder_fusion_10000_50.model', monitor='val_accuracy', mode='max', verbose=1, save_best_only=True)
model.fit(train_datagen, validation_data = val_datagen, epochs=100, callbacks=[es, mc])