<a href="https://colab.research.google.com/github/giacomogreggio/computer-vision-project/blob/master/FCN8_VOC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
import os
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
if "JPEGImagesMax" not in os.listdir("/content"):
    !unzip "/content/drive/My Drive/Colab Notebooks/dati/JPEGImagesMax.zip"

In [3]:
if "SegmentationClassConvertedMin" not in os.listdir("/content"):
    !unzip "/content/drive/My Drive/Colab Notebooks/dati/SegmentationClassConvertedMin.zip"

In [4]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow.keras.backend as back 
import cv2
from tensorflow import keras
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.vgg16 import preprocess_input 
import os
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, Input, MaxPool2D, ZeroPadding2D, Cropping2D, Softmax, Add, Dropout
from tensorflow.keras.regularizers import l2
from tensorflow.keras import Model
from tensorflow.keras.optimizers import SGD, Adam
from tensorflow.keras.models import Sequential
from tensorflow.keras.metrics import MeanIoU
%matplotlib inline
import h5py
from IPython.display import display, clear_output
from PIL import Image
from skimage.io import imshow
from tensorflow.keras.losses import SparseCategoricalCrossentropy, CategoricalCrossentropy
from sklearn.preprocessing import OneHotEncoder
from tqdm.notebook import trange, tqdm 
from time import sleep  
import random

In [5]:
def get_color_map(N=256, normalized=False):
    def bitget(byteval, idx):
        return ((byteval & (1 << idx)) != 0)

    dtype = 'float32' if normalized else 'uint8'
    cmap = np.zeros((N, 3), dtype=dtype)
    for i in range(N):
        r = g = b = 0
        c = i
        for j in range(8):
            r = r | (bitget(c, 0) << 7-j)
            g = g | (bitget(c, 1) << 7-j)
            b = b | (bitget(c, 2) << 7-j)
            c = c >> 3

        cmap[i] = np.array([r, g, b])

    cmap = cmap/255 if normalized else cmap
    return cmap[0:N]


def color_map_viz():
    labels = ['background', 'aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car', 'cat', 'chair', 'cow', 'diningtable', 'dog', 'horse', 'motorbike', 'person', 'pottedplant', 'sheep', 'sofa', 'train', 'tvmonitor', 'void']
    nclasses = 21
    row_size = 50
    col_size = 500
    cmap = get_color_map()
    array = np.empty((row_size*(nclasses+1), col_size, cmap.shape[1]), dtype=cmap.dtype)
    for i in range(nclasses):
        array[i*row_size:i*row_size+row_size, :] = cmap[i]
    array[nclasses*row_size:nclasses*row_size+row_size, :] = cmap[-1]
    imshow(array)
    plt.yticks([row_size*i+row_size/2 for i in range(nclasses+1)], labels)
    plt.xticks([])
    plt.show()


def get_color_dict(n_classes):
    cmap = get_color_map(N=n_classes).tolist()
    colored_map = cmap[0:n_classes]
    color_dict = {}
    for i in range(len(colored_map)):
        color_dict[tuple(colored_map[i])] = i
    return color_dict

In [6]:
def print_img(img):
    plt.figure(figsize=(14, 7))
    plt.subplot(1,1,1), plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)), plt.title('Lena'), plt.axis('off')

In [7]:
nclasses = 22
colored_map = get_color_map(N=nclasses)
color_dict = get_color_dict(nclasses)
dataset_path = "/content"

In [8]:
def normalize(input_image):
  input_image = tf.cast(input_image, tf.float32) / 255.0
  return input_image

In [9]:
dataset = []
targets = []
images = os.listdir(dataset_path + "/SegmentationClassConvertedMin")
for i in tqdm(range(len(images)), desc='Loading Images...'):   
    
    name = images[i].split(".")[0]
    img_x = cv2.imread(dataset_path + '/JPEGImagesMax/' + name + '.jpg',1)
    dataset.append(normalize(img_x).numpy())
    
    with open(dataset_path + '/SegmentationClassConvertedMin/' + name + ".npy", 'rb') as f:
        targets.append(np.load(f))
    if i==20:
        break
  

HBox(children=(FloatProgress(value=0.0, description='Loading Images...', max=2223.0, style=ProgressStyle(descr…

In [10]:
BATCH_SIZE = 32
BUFFER_SIZE = 1000
EPOCHS = 100
VAL_SUBSPLITS = 5

TRAIN_LENGTH = len('/content/SegmentationClassConvertedMin')
STEPS_PER_EPOCH = TRAIN_LENGTH // BATCH_SIZE

In [11]:
def display(display_list):
  plt.figure(figsize=(15, 15))

  title = ['Input Image', 'True Mask', 'Predicted Mask']
  display_list[1] = np.expand_dims(display_list[1],axis=2)

  for i in range(len(display_list)):
    plt.subplot(1, len(display_list), i+1)
    plt.title(title[i])
    plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
    plt.axis('off')
  plt.show()

def create_mask(pred_mask):
  pred_mask = tf.argmax(pred_mask, axis=-1)
  pred_mask = pred_mask[..., tf.newaxis]
  return pred_mask[0]


def show_predictions(num=1):
  for i in range(num if num < len(dataset) else len(dataset)):  
    n = len(dataset)-i-1
    pred_mask = create_mask(model.predict(np.expand_dims(dataset[i],axis=0)))
    display([dataset[n], targets[n], pred_mask])


In [12]:
image_shape=(500,500,3)
n_classes=22
l2_value=5**-4
crop_value=((16,16),(16,16))
def create_base_vgg(trainable=True):
  #Defining Base VGG architecture
  input_layer = Input(shape=image_shape, name="input")
  #VGG-block1
  b1 = Conv2D(filters=64,kernel_size=(3,3),padding="same", activation="relu", name="conv2d_b1_1")(input_layer)
  b1 = Conv2D(filters=64,kernel_size=(3,3),padding="same", activation="relu", name="conv2d_b1_2")(b1)
  b1 = MaxPool2D(pool_size=(2,2),strides=(2,2), name="maxpool_b1")(b1)

  #VGG-block2
  b2 = Conv2D(filters=128, kernel_size=(3,3), padding="same", activation="relu", name="conv2d_b2_1")(b1)
  b2 = Conv2D(filters=128, kernel_size=(3,3), padding="same", activation="relu", name="conv2d_b2_2")(b2)
  b2 = MaxPool2D(pool_size=(2,2),strides=(2,2), name="maxpool_b2")(b2)

  #VGG-block3
  b3 = Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu", name="conv2d_b3_1")(b2)
  b3 = Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu", name="conv2d_b3_2")(b3)
  b3 = Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu", name="conv2d_b3_3")(b3)
  b3 = MaxPool2D(pool_size=(2,2),strides=(2,2), name="maxpool_b3")(b3)

  #VGG-block4
  b4 = Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu", name="conv2d_b4_1")(b3)
  b4 = Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu", name="conv2d_b4_2")(b4)
  b4 = Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu", name="conv2d_b4_3")(b4)
  b4 = MaxPool2D(pool_size=(2,2),strides=(2,2), name="maxpool_b4")(b4)

  #VGG-block5
  b5 = Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu", name="conv2d_b5_1")(b4)
  b5 = Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu", name="conv2d_b5_2")(b5)
  b5 = Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu", name="conv2d_b5_3")(b5)
  b5 = MaxPool2D(pool_size=(2,2),strides=(2,2), name="maxpool_b5")(b5)

  vgg_model = Model(input_layer, b5)
  vgg16= VGG16(weights="imagenet", include_top=False)
  vgg16.save_weights("./weights.h5")
  vgg_model.load_weights("./weights.h5")
  vgg_model.trainable=trainable

  return vgg_model

In [13]:
def create_fcn32(vgg_trainable=True, include_top=True):
  vgg_model=create_base_vgg(trainable=vgg_trainable)
  b5=vgg_model.layers[-1].output
  input_layer=vgg_model.layers[0].output
  fcn_32_block = Conv2D(4096, kernel_size=(7,7), activation='relu', padding="same", kernel_regularizer=l2(l2_value), name="conv2d_fcn32_1")(b5)
  fcn_32_block = Dropout(0.5, name="dropout_fcn32_1")(fcn_32_block)
  fcn_32_block = Conv2D(4096, kernel_size=(1,1), activation='relu', padding="same", kernel_regularizer=l2(l2_value), name="conv2d_fcn32_2")(fcn_32_block)
  fcn_32_block = Dropout(0.5, name="dropout_fcn32_2")(fcn_32_block)
  fcn_32_block = Conv2D(n_classes, kernel_size=(1,1), padding="same", kernel_regularizer=l2(l2_value), name="conv2d_fcn32_3")(fcn_32_block)
  if include_top:
    fcn_32_transpose = Conv2DTranspose(n_classes, kernel_size=(64,64), strides=(32,32), kernel_regularizer=l2(l2_value), name="deconv_fcn32")(fcn_32_block)
    fcn_32_crop = Cropping2D(crop_value, name="crop_fcn32")(fcn_32_transpose)
    fcn_32_softmax=Softmax(name="softmax_fcn32")(fcn_32_crop)
    return Model(input_layer, fcn_32_softmax)
  else:
    return Model(input_layer, fcn_32_block)

In [14]:
def create_fcn16(vgg_trainable=True, include_top=True):
  fcn_32=create_fcn32(vgg_trainable=vgg_trainable, include_top=False)
  input_layer=fcn_32.layers[0].output
  b4=fcn_32.get_layer("maxpool_b4").output
  fcn_32_block=fcn_32.layers[-1].output

  fcn_16_block_fcn32 = Conv2DTranspose(n_classes, kernel_size=(4,4), padding="valid", strides=(2,2), kernel_regularizer=l2(l2_value), name="deconv_fcn16_1")(fcn_32_block)
  fcn_16_block_fcn32 = Cropping2D(((1,0,),(1,0)), name="crop_fnc16_1")(fcn_16_block_fcn32)

  fcn_16_block = Conv2D(n_classes, kernel_size=(1,1), activation="relu", kernel_regularizer=l2(l2_value), padding="valid", name="conv2d_fcn16_1")(b4)
  fcn_16_block = Add(name="add_fcn16")([fcn_16_block_fcn32,fcn_16_block])

  if include_top:
    fcn_16_deconv = Conv2DTranspose(n_classes, kernel_size=(32,32), strides=(16,16), kernel_regularizer=l2(l2_value), name="deconv_fcn16_2")(fcn_16_block)
    fcn_16_crop = Cropping2D(((8,8,),(8,8)), name="crop_fcn16_2")(fcn_16_deconv)
    fcn_16_softmax=Softmax(name="softmax_fcn16")(fcn_16_crop)
    return Model(input_layer, fcn_16_softmax)
  else:
    return Model(input_layer, fcn_16_block)

In [15]:
def create_fcn8(vgg_trainable=True, include_top=True):
  fcn_16=create_fcn16(vgg_trainable=vgg_trainable, include_top=False)
  input_layer=fcn_16.layers[0].output
  b3=fcn_16.get_layer("maxpool_b3").output
  fcn_16_block=fcn_16.layers[-1].output

  fcn_8_block_fcn16 = Conv2DTranspose(n_classes, kernel_size=(4,4), padding="valid", strides=(2,2), kernel_regularizer=l2(l2_value), name="deconv_fnc8_1")(fcn_16_block)
  fcn_8_block_fcn16 = Cropping2D(((1,1,),(1,1)), name="crop_fnc8_1")(fcn_8_block_fcn16)

  fcn_8_block = Conv2D(n_classes, kernel_size=(1,1), activation="relu", kernel_regularizer=l2(l2_value), padding="valid", name="conv2d_fcn8_1")(b3)
  fcn_8_block = Add(name="add_fcn8")([fcn_8_block_fcn16,fcn_8_block])

  if include_top:
    fcn_8_deconv = Conv2DTranspose(n_classes, kernel_size=(16,16), strides=(8,8), kernel_regularizer=l2(l2_value), name="deconv_fnc8_2")(fcn_8_block)
    fcn_8_crop = Cropping2D(((2,2,),(2,2)), name="crop_fcn8_2")(fcn_8_deconv)
    fcn_8_softmax=Softmax(name="softmax_fcn8")(fcn_8_crop)
    return Model(input_layer, fcn_8_softmax)
  else:
    return Model(input_layer, fcn_8_block)

In [16]:
class DisplayCallback(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    clear_output(wait=True)
    show_predictions()
    print ('\nSample Prediction after epoch {}\n'.format(epoch+1))

In [17]:
class AdaptedMeanIoU(tf.keras.metrics.Metric):
  def __init__(self, name='mean_iou', n_classes=3,**kwargs):
    super(AdaptedMeanIoU, self).__init__(name=name, **kwargs)
    self.mean_iou=MeanIoU(n_classes)

  def update_state(self, y_true, y_pred, sample_weight=None):
    y_pred_converted=tf.argmax(y_pred,axis=-1)
    self.mean_iou.update_state(y_true,y_pred_converted)

  def result(self):
    return self.mean_iou.result()

  def reset_states(self):
    self.mean_iou.reset_states()

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='loss', factor=0.1, patience=1, min_lr=1**-10)

In [18]:
model=create_fcn8()
#model.summary()




In [19]:
model.compile(optimizer=SGD(learning_rate=0.1, momentum=0.9, clipnorm=1), 
              loss=SparseCategoricalCrossentropy(from_logits=True), metrics=[AdaptedMeanIoU(n_classes=22)])

In [34]:
file_list = os.listdir(dataset_path + '/SegmentationClassConvertedMin')
def generate_data(directory, batch_size):
    """Replaces Keras' native ImageDataGenerator."""
    i = 0
    file_list = os.listdir(directory)
    while True:
        image_batch_x = []
        image_batch_y = []
        for b in range(batch_size):
            if i == len(file_list):
                i = 0
                random.shuffle(file_list)
            sample = file_list[i]
            i += 1
            name = sample.split(".")[0]
            img_x = cv2.imread(dataset_path + '/JPEGImagesMax/' + name + '.jpg',1)
            image_batch_x.append(normalize(img_x).numpy())
    
            with open(dataset_path + '/SegmentationClassConvertedMin/' + name + ".npy", 'rb') as f:
                image_batch_y.append(np.load(f))
                
        yield [np.array(image_batch_x), np.array(image_batch_y)]

class DataGenerator(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_IDs, batch_size=32, dim=(500,500), n_channels=3,
                 n_classes=22, shuffle=True):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        print("len")
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(list_IDs_temp)
        print("getitem")

        return X, y

    def on_epoch_end(self, logs=None):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
        print("mischio")

    def __data_generation(self, list_IDs_temp):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        X = np.empty((self.batch_size, self.dim[0], self.dim[1], self.n_channels))
        y = np.empty((self.batch_size, self.dim[0], self.dim[1]), dtype=int)

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            # Store sample
            #X[i,] = np.load('data/' + ID + '.npy')
            img_x = normalize(cv2.imread(dataset_path + '/JPEGImagesMax/' + name + '.jpg',1))
            X[i,] = img_x

            # Store class
            with open(dataset_path + '/SegmentationClassConvertedMin/' + name + ".npy", 'rb') as f:
                img_y = np.load(f)
            y[i] = img_y
        
        print("datageneration")
        return X, y
        
#y_train.shape[0]//BATCH_SIZE//VAL_SUBSPLITS
gen = DataGenerator2(list_IDs=os.listdir('/content/SegmentationClassConvertedMin'), 
                     dataset_path='/content',
                     to_fit=True,
                     batch_size=BATCH_SIZE,
                     dim=(500,500),
                     n_channels=3,
                     n_classes=22,
                     shuffle=True)
#generate_data(dataset_path + '/SegmentationClassConvertedMin', BATCH_SIZE)
model_history = model.fit(gen, 
                          epochs=EPOCHS,
                          steps_per_epoch=STEPS_PER_EPOCH,
                          callbacks=[DisplayCallback(),reduce_lr])

Epoch 1/100


ValueError: ignored

In [33]:
import numpy as np
import cv2
from tensorflow.keras.utils import Sequence


class DataGenerator2(Sequence):
    """Generates data for Keras
    Sequence based data generator. Suitable for building data generator for training and prediction.
    """
    def __init__(self, list_IDs, dataset_path,
                 to_fit=True, batch_size=32, dim=(256, 256),
                 n_channels=1, n_classes=10, shuffle=True):
        self.list_IDs = list_IDs
        self.dataset_path=dataset_path
        self.to_fit = to_fit
        self.batch_size = batch_size
        self.dim = dim
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        """Denotes the number of batches per epoch
        :return: number of batches per epoch
        """
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        """Generate one batch of data
        :param index: index of the batch
        :return: X and y when fitting. X only when predicting
        """
        # Generate indexes of the batch
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        X = self._generate_X(list_IDs_temp)

        if self.to_fit:
            y = self._generate_y(list_IDs_temp)
            return X, y
        else:
            return X

    def on_epoch_end(self):
        """Updates indexes after each epoch
        """
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def _generate_X(self, list_IDs_temp):
        """Generates data containing batch_size images
        :param list_IDs_temp: list of label ids to load
        :return: batch of images
        """
        # Initialization
        X = np.empty((self.batch_size, *self.dim, self.n_channels))

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            # Store sample
            img_x = normalize(cv2.imread(dataset_path + '/JPEGImagesMax/' + name + '.jpg',1))
            X[i,] = img_x

        return X

    def _generate_y(self, list_IDs_temp):
        """Generates data containing batch_size masks
        :param list_IDs_temp: list of label ids to load
        :return: batch if masks
        """
        y = np.empty((self.batch_size, *self.dim), dtype=int)

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            # Store sample
            with open(dataset_path + '/SegmentationClassConvertedMin/' + name + ".npy", 'rb') as f:
                img_y = np.load(f)
            y[i] = img_y

        return y
