<a href="https://colab.research.google.com/github/AbrahamAzizi/ComputerVision/blob/main/ImageSegmentation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import os

# Install TensorFlow
try:
  # %tensorflow_version only exists in Colab.
  %tensorflow_version 2.x
except Exception:
  pass
# os.environ["CUDA_VISIBLE_DEVICES"]="-1" 
import tensorflow as tf
import numpy as np

# Set the seed for random operations. 
# This let our experiments to be reproducible. 
SEED = 1234
tf.random.set_seed(SEED)  
#np.random.seed = SEED


device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0': 
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  print(
      '\n\nThis error most likely means that this notebook is not '
      'configured to use a GPU.  Change this in Notebook Settings via the '
      'command palette (cmd/ctrl-shift-P) or the Edit menu.\n\n')
  raise SystemError('GPU device not found')

Colab only includes TensorFlow 2.x; %tensorflow_version has no effect.
Found GPU at: /device:GPU:0


In [None]:
# ImageDataGenerator
# ------------------

from tensorflow.keras.preprocessing.image import ImageDataGenerator
validation_split = 0.01
apply_data_augmentation = False

# Create training ImageDataGenerator object
# We need two different generators for images and corresponding masks
if apply_data_augmentation:
    train_img_data_gen = ImageDataGenerator(rotation_range=10,
                                            width_shift_range=10,
                                            height_shift_range=10,
                                            zoom_range=0.3,
                                            horizontal_flip=True,
                                            vertical_flip=True,
                                            fill_mode='constant',
                                            cval=0,
                                            rescale=1./255,
                                            validation_split=validation_split)
    
    train_mask_data_gen = ImageDataGenerator(rotation_range=10,
                                             width_shift_range=10,
                                             height_shift_range=10,
                                             zoom_range=0.3,
                                             horizontal_flip=True,
                                             vertical_flip=True,
                                             fill_mode='constant',
                                             cval=0,
                                             rescale=1./255,
                                             validation_split=validation_split)
else:
    train_img_data_gen = ImageDataGenerator(rescale=1./255,validation_split=validation_split)
    train_mask_data_gen = ImageDataGenerator(rescale=1./255,validation_split=validation_split)

# Create validation and test ImageDataGenerator objects
#valid_img_data_gen = ImageDataGenerator(rescale=1./255)
#valid_mask_data_gen = ImageDataGenerator()
test_img_data_gen = ImageDataGenerator(rescale=1./255)
test_mask_data_gen = ImageDataGenerator()

In [None]:
# Create generators to read images from dataset directory
# -------------------------------------------------------

# Get current working directory
#cwd = os.getcwd()
from google.colab import drive
drive.mount('/content/gdrive')
cwd = '/content/gdrive/My Drive'
dataset_dir = os.path.join(cwd, 'Segmentation_Dataset')



# Batch size
bs = 10

# img shape
img_h = 256
img_w = 256

# Training
# Two different generators for images and masks
# ATTENTION: here the seed is important!! We have to give the same SEED to both the generator
# to apply the same transformations/shuffling to images and corresponding masks
training_dir = os.path.join(dataset_dir, 'training')
train_img_gen = train_img_data_gen.flow_from_directory(os.path.join(training_dir, 'images'),
                                                       target_size=(img_h, img_w),
                                                       batch_size=bs, 
                                                       color_mode='rgb',
                                                       class_mode=None, # Because we have no class subfolders in this case
                                                       shuffle=True,
                                                       interpolation='bilinear',
                                                       subset='training',
                                                       seed=SEED)  
train_mask_gen = train_mask_data_gen.flow_from_directory(os.path.join(training_dir, 'masks'),
                                                         target_size=(img_h, img_w),
                                                         batch_size=bs,
                                                         color_mode='grayscale',
                                                         class_mode=None, # Because we have no class subfolders in this case
                                                         shuffle=True,
                                                         interpolation='bilinear',
                                                         subset='training',
                                                         seed=SEED)
train_gen = zip(train_img_gen, train_mask_gen)

# Validation
validation_dir = os.path.join(dataset_dir, 'validation')
valid_img_gen = train_img_data_gen.flow_from_directory(os.path.join(training_dir, 'images'),
                                                       target_size=(img_h, img_w),
                                                       batch_size=bs, 
                                                       color_mode='rgb',
                                                       class_mode=None, # Because we have no class subfolders in this case
                                                       shuffle=False,
                                                       interpolation='bilinear',
                                                       subset='validation',
                                                       seed=SEED)
valid_mask_gen = train_mask_data_gen.flow_from_directory(os.path.join(training_dir, 'masks'),
                                                         target_size=(img_h, img_w),
                                                         batch_size=bs, 
                                                         color_mode='grayscale',
                                                         class_mode=None, # Because we have no class subfolders in this case
                                                         shuffle=False,
                                                         interpolation='bilinear',
                                                         subset='validation',
                                                         seed=SEED)
valid_gen = zip(valid_img_gen, valid_mask_gen)

# Test
test_dir = os.path.join(dataset_dir, 'test')
test_dir = os.path.join(test_dir, 'images')
test_img_gen = test_img_data_gen.flow_from_directory(test_dir,
                                                     target_size=(img_h, img_w),
                                                     batch_size=1, 
                                                     class_mode=None, # Because we have no class subfolders in this case
                                                     shuffle=False,
                                                     interpolation='bilinear',
                                                     seed=SEED)


#not important but to avoid error
test_mask_gen = test_mask_data_gen.flow_from_directory(test_dir,
                                                       target_size=(img_h, img_w),
                                                       batch_size=bs, 
                                                       class_mode=None, # Because we have no class subfolders in this case
                                                       shuffle=False,
                                                       interpolation='bilinear',
                                                       seed=SEED)

test_gen = zip(test_img_gen, test_mask_gen)

Mounted at /content/gdrive
Found 7571 images belonging to 1 classes.
Found 7571 images belonging to 1 classes.
Found 76 images belonging to 1 classes.
Found 76 images belonging to 1 classes.
Found 1234 images belonging to 1 classes.
Found 1234 images belonging to 1 classes.


In [None]:
# Create Dataset objects
# ----------------------

def prepare_target(x_, y_):
    
    y_ = tf.cast(y_,tf.int32)
    
    return x_, y_


# Training
# --------
train_dataset = tf.data.Dataset.from_generator(lambda: train_gen,
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, img_h, img_w, 1]))

train_dataset = train_dataset.map(prepare_target)

# Repeat
train_dataset = train_dataset.repeat()

# Validation
# ----------
valid_dataset = tf.data.Dataset.from_generator(lambda: valid_gen, 
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, img_h, img_w, 1]))
valid_dataset = valid_dataset.map(prepare_target)

# Repeat
valid_dataset = valid_dataset.repeat()

# Test
# ----
test_dataset = tf.data.Dataset.from_generator(lambda: test_gen, output_types=(tf.float32, tf.float32), output_shapes=([None, img_h, img_w, 3], [None, img_h, img_w, 1]))
test_dataset = test_dataset.map(prepare_target)
#test_dataset = valid_dataset.repeat()




In [None]:
# Create Model
# ------------


def create_model(depth, start_f, num_classes, dynamic_input_shape):

    model = tf.keras.Sequential()
    
    # Encoder
    # -------
    for i in range(depth):
        
        if i == 0:
            if dynamic_input_shape:
                input_shape = [None, None, 3]
            else:
                input_shape = [img_h, img_w, 3]
        else:
            input_shape=[None]
        
        model.add(tf.keras.layers.Conv2D(filters=start_f, 
                                         kernel_size=(3, 3),
                                         strides=(1, 1),
                                         padding='same',
                                         input_shape=input_shape))
        model.add(tf.keras.layers.ReLU())
        model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))

        start_f *= 2

    # Decoder
    # -------
    for i in range(depth):
        model.add(tf.keras.layers.UpSampling2D(2, interpolation='bilinear'))
        model.add(tf.keras.layers.Conv2D(filters=start_f // 2,
                                         kernel_size=(3, 3),
                                         strides=(1, 1),
                                         padding='same'))

        model.add(tf.keras.layers.ReLU())

        start_f = start_f // 2

    # Prediction Layer
    # ----------------
    model.add(tf.keras.layers.Conv2D(filters=num_classes,
                                     kernel_size=(1, 1),
                                     strides=(1, 1),
                                     padding='same',
                                     activation='sigmoid')) # sigmoid
    
    return model

 

In [None]:
# Optimization params
# -------------------
model = create_model(depth=4, start_f=4, num_classes=1, dynamic_input_shape=False)   

def my_IoU(y_true, y_pred):
    # from pobability to predicted class {0, 1}
    y_pred = tf.cast(y_pred > 0.5, tf.float32) # when using sigmoid. Use argmax for softmax

    # A and B
    intersection = tf.reduce_sum(y_true * y_pred)
    # A or B
    union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) - intersection
    # IoU
    return intersection / union

# Loss
# Sparse Categorical Crossentropy to use integers (mask) instead of one-hot encoded labels
#loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) 
loss = tf.keras.losses.BinaryCrossentropy() 
# learning rate
lr = 1e-3
#lr = 5e-4
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
#optimizer=keras.optimizers.Adadelta(lr)
# -------------------
#
# Validation metrics
# ------------------

metrics = ['accuracy']
# ------------------

# Compile Model
model.compile(optimizer=optimizer, loss=loss, metrics=[my_IoU])

In [None]:
import os
from datetime import datetime

epochs = 100  #### set repeat in training dataset


# from tensorflow.compat.v1 import ConfigProto
# from tensorflow.compat.v1 import InteractiveSession

# config = ConfigProto()
# config.gpu_options.allow_growth = True
# session = InteractiveSession(config=config)

exps_dir = os.path.join(cwd, 'segmentation_experiments')
if not os.path.exists(exps_dir):
    os.makedirs(exps_dir)

now = datetime.now().strftime('%b%d_%H-%M-%S')

model_name = 'CNN'

exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
if not os.path.exists(exp_dir):
    os.makedirs(exp_dir)
    
callbacks = []

# Model checkpoint
# ----------------
ckpt_dir = os.path.join(exp_dir, 'ckpts')
if not os.path.exists(ckpt_dir):
    os.makedirs(ckpt_dir)

ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp_{epoch:02d}.ckpt'), 
                                                   save_weights_only=True)  # False to save the model directly
callbacks.append(ckpt_callback)

# Visualize Learning on Tensorboard
# ---------------------------------
tb_dir = os.path.join(exp_dir, 'tb_logs')
if not os.path.exists(tb_dir):
    os.makedirs(tb_dir)
    
# By default shows losses and metrics for both training and validation
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                             profile_batch=0,
                                             histogram_freq=0)  # if 1 shows weights histograms
callbacks.append(tb_callback)

# Early Stopping
# --------------
early_stop = False
if early_stop:
    es_callback = tf.keras.callback.EarlyStopping(monitor='val_loss', patience=10)
    callbacks.append(es_callback)


with tf.device('/device:GPU:0'):
    model.fit(x=train_dataset,
          epochs=epochs,  #### set repeat in training dataset
          steps_per_epoch=len(train_img_gen),
          validation_data=valid_dataset,
          validation_steps=len(valid_img_gen), 
          callbacks=callbacks)




Epoch 1/100
159/758 [=====>........................] - ETA: 2:49:12 - loss: 0.6151 - my_IoU: 0.0047

In [None]:
# Just for exercise try to restore a model after training it
# !! Use this just when restoring model.. 
# ---------------------------------------
from google.colab import drive
drive.mount('/content/gdrive')
cwd = '/content/gdrive/My Drive'
restore_model = True
if restore_model:
    model = create_model(depth=4, #4
                         start_f=4, #4
                         num_classes=1, #3
                         dynamic_input_shape=True)

    model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True) , 
                  metrics=['accuracy'])  # Needed for loading weights

    load_this = os.path.join( cwd, 'segmentation_experiments', 'CNN_Dec16_20-07-25', 'ckpts', 'cp_100.ckpt')
    print('Loading: ' + load_this)
    model.load_weights(load_this)  # use this if you want to restore saved model
# ----------------------------------------


Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
Loading: /content/gdrive/My Drive/segmentation_experiments/CNN_Dec16_20-07-25/ckpts/cp_100.ckpt


<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f9be701ec88>

In [None]:
from PIL import Image
import os
from datetime import datetime


csv_fname = cwd + '/results_'
csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'



test_img_dir = os.path.join(test_dir, 'img')
img_filenames = next(os.walk(test_img_dir))[2]
results={}
i=0


def rle_encode(img):
      # Flatten column-wise
      pixels = img.T.flatten()
      pixels = np.concatenate([[0], pixels, [0]])
      runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
      runs[1::2] -= runs[::2]
      return ' '.join(str(x) for x in runs)

      
with open(csv_fname, 'w') as f:
  
  f.write('ImageId,EncodedPixels,Width,Height\n')
  for img_filename in img_filenames:

     img = Image.open(os.path.join(test_img_dir, img_filename))
     img = img.resize((img_h, img_w))
     img_arr = np.expand_dims(np.array(img), 0)  
     activations = model.predict(x=img_arr / 255.)
    
     #print (out_softmax[0,:,:,1])
     #print (out_softmax.shape)
     #break
     # Get predicted class as the index corresponding to the maximum value in the vector probability
    

     #predicted_class = tf.argmax(activation, -1)
     predicted_class = tf.where(activations>0.5,1,0) #1,0
     #print (predicted_class)
     #break
     #print (predicted_class)

     predicted_class = predicted_class[0]
     #prediction_img = np.zeros([img_h, img_w, 3])
     #prediction_img[np.where(predicted_class == 1)] = 1
     #prediction_img[np.where(predicted_class == 2)] = 1

     image_encoded = rle_encode(predicted_class.numpy())
     results[img_filename] =   image_encoded
     split = img_filename.split('.')
     img_id = split[0]
     i += 1
     #print ('\n' + i.__str__() + ') \"' + img_filename+'\" : ' + image_encoded)
     print ('\n' + i.__str__() + ') \"' + img_filename)
     write_output = f.write(img_id + ',' + image_encoded + ',' + '256' + ',' + '256' + '\n')

     #break
     #print (predicted_class)

#create_csv(results,results_dir=gdrive_cwd)


35


1) "2148.tif

2) "2150.tif

3) "2156.tif

4) "2157.tif

5) "2149.tif

6) "2152.tif


KeyboardInterrupt: ignored

In [None]:
import os
from datetime import datetime

def create_csv(results, results_dir='./'):

    csv_fname = gdrive_cwd + '/results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(csv_fname, 'w') as f:

      f.write('ImageId,EncodedPixels,Width,Height\n')

      for key, value in results.items():
          f.write(key + ',' + str(value) + ',' + '256' + ',' + '256' + '\n')