<a href="https://colab.research.google.com/github/hossein-khalilian/Machine-Vision/blob/main/Image_Segmentation_Transfer_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Image segmentation

In [None]:
import os
from google.colab import drive
drive.mount('/content/drive/')
os.chdir("/content/drive/My Drive/app/Image_Segmentation")

In [None]:
# !wget https://s3.eu-central-1.amazonaws.com/avg-kitti/data_semantics.zip
# !unzip data_semantics.zip

In [None]:
!pip install -q git+https://github.com/tensorflow/examples.git

In [None]:
%tensorflow_version 2.x
import glob
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow_datasets as tfds
from IPython.display import clear_output
from sklearn.model_selection import train_test_split
from __future__ import absolute_import, division, print_function, unicode_literals
from tensorflow_examples.models.pix2pix import pix2pix
tfds.disable_progress_bar()

%tensorflow_version 2.x
import os
import cv2
import glob
import scipy.io
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from google.colab.patches import cv2_imshow
from sklearn.model_selection import train_test_split

from sklearn.metrics import confusion_matrix
from keras.callbacks import TensorBoard

### Import dataset


In [None]:
sh = (256, 128)
x_train = [cv2.resize(cv2.imread(file), dsize=sh) for file in sorted(glob.glob("dataset/training/image_2/*.png"))]
x_train = np.array(x_train)/255.0
print(x_train.shape)

y_train = [cv2.resize(cv2.imread(file), dsize=sh) for file in sorted(glob.glob("dataset/training/semantic/*.png"))]
y_train = np.array(y_train)
y_train = y_train[:,:,:,0:1]
y_train = y_train.astype('uint8')
print(y_train.shape)

# x_test  = [cv2.resize(cv2.imread(file), dsize=sh) for file in sorted(glob.glob("dataset/testing/image_2/*.png"))]
# x_test = np.array(x_test)/255.0
# print(x_test.shape)

index = np.random.permutation(200)
x_train = x_train[index]
y_train = y_train[index]

### Transfer Learning

In [None]:
BATCH_SIZE = 5
BUFFER_SIZE = 200
x_train, x_valid, y_train, y_valid = train_test_split(x_train, y_train, test_size=0.2)
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
valid_dataset = tf.data.Dataset.from_tensor_slices((x_valid, y_valid)).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

In [None]:
input_shape = x_train[0].shape
res_net = tf.keras.applications.ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)
res_net.trainable = False
# res_net.summary()

### Model

In [None]:
output_channels = y_train.max()+1
sh = x_train[0].shape

decoder_input = keras.Input(shape=(4,8,2048))
x = decoder_input
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)

x = layers.Conv2DTranspose(128, 3, strides=2, padding='same', use_bias=False)(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Dropout(0.3)(x)

x = layers.Conv2DTranspose(64, 3, strides=2, padding='same', use_bias=False)(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Dropout(0.3)(x)

x = layers.Conv2DTranspose(32, 3, strides=4, padding='same', use_bias=False)(x)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU()(x)
x = layers.Dropout(0.3)(x)

# x = layers.Conv2DTranspose(64, 3, strides=2, padding='same', use_bias=False)(x)
# x = layers.BatchNormalization()(x)
# x = layers.LeakyReLU()(x)
# x = layers.Dropout(0.3)(x)

decoder_output = layers.Conv2DTranspose(output_channels, 3, strides=2, padding='same', use_bias=False, activation='sigmoid')(x)

decoder = keras.Model(decoder_input, decoder_output, name='decoder')
decoder.summary()



autoencoder_input = keras.Input(shape=sh, name='img')
encoded_img = res_net(autoencoder_input)
decoded_img = decoder(encoded_img)
autoencoder = keras.Model(autoencoder_input, decoded_img, name='tl_autoencoder')
autoencoder.summary()

keras.utils.plot_model(autoencoder, 'my_first_model_with_shape_info.png', show_shapes=True)


In [None]:
def display(display_list):
    plt.figure(figsize=(15, 15))
    for i in range(len(display_list)): 
        plt.subplot(1, len(display_list), i+1)
        plt.imshow(display_list[i])
    plt.show()
        

def show_predictions():
    y_pred = np.argmax(autoencoder(x_train[0:1]), axis=-1)
    display([x_train[0], y_train[0,:,:,0], y_pred[0]])


class DisplayCallback(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    if epoch % 5 == 0:
        clear_output(wait=True)
        show_predictions()
        print ('\nSample Prediction after epoch {}\n'.format(epoch+1))
    if (epoch + 1) % 50 == 0:
      checkpoint.save(file_prefix = checkpoint_prefix)

In [None]:
autoencoder.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

In [None]:
checkpoint_dir = './training_checkpoints_image_segmentation_with_transfer_learning'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(model=autoencoder)

checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

In [None]:
skip_training = False
batch_size = 5
epochs = 300

logdir = './log'+'/transfer_learning'
tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)

if not skip_training:
    model_history = autoencoder.fit(train_dataset, validation_data=valid_dataset,
            verbose=True,
            epochs=epochs,
            callbacks=[DisplayCallback(), tensorboard_callback],
            )
#     autoencoder.save('autoencoder_image_segmentation.h5') 
if skip_training:
    checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

### Fine tuning

In [None]:
res_net.trainable = True
print("Number of layers in the base model: ", len(res_net.layers))

# Fine-tune from this layer onwards
fine_tune_at = 150

# Freeze all the layers before the `fine_tune_at` layer
for layer in res_net.layers[:fine_tune_at]:
  layer.trainable =  False

In [None]:
autoencoder.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])
autoencoder.summary()

In [None]:
checkpoint_dir = './training_checkpoints_image_segmentation_with_transfer_learning_fine_tune'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(model=autoencoder)

# checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

In [None]:
skip_training = False
batch_size = 5
epochs = 300

logdir = './log'+'/transfer_learning'
tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)

if not skip_training:
    model_history = autoencoder.fit(train_dataset, validation_data=valid_dataset,
            verbose=True,
            epochs=epochs,
            callbacks=[DisplayCallback(), tensorboard_callback],
            )
#     autoencoder.save('autoencoder_image_segmentation.h5') 
if skip_training:
    checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

In [None]:
# print(model_history.history['val_acc'][-1])
# print(model_history.history['acc'][-1])
print(model_history.history['accuracy'][-1])
print(model_history.history['val_accuracy'][-1])