In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from matplotlib import pyplot as plt
import segmentation_models as sm # sm.__version__ '1.0.1'
from tensorflow.keras.metrics import MeanIoU
import random
from PIL import Image
from keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

sm.set_framework('tf.keras')

In [None]:
'''
Folders structure:
basepath contains train and test folders
in my case .../basepath/code/current_code_folder

img_path - contains patches 128x128 png files
msk_path - contains corresponding masks files

Train contains only images with target class the data is very imbalanced,
the content of the target classis significantly less than the background
and we should focus on the target class.

Test sample is used to quick model evaluation during training and also
contains only those images on which the target class is present. 

The validation set contains all images obtained during slicing
and is used for an objective final evaluation of the model.
'''
#basepath = 'C:/Users/UserName/Desktop/basepath/'
basepath = os.path.dirname(os.path.dirname((os.getcwd()).replace('\\', '/'))) + '/'

###train
train_img_path = basepath + 'train/patches/img/'
train_msk_path = basepath + 'train/patches_msk/img/' 
###test
test_img_path = basepath + 'test/patches/img/'
test_msk_path = basepath + 'test/patches_msk/img/'
###valid
valid_img_path = basepath + 'test/patches/'
valid_msk_path = basepath + 'test/patches_msk/'

In [None]:
def sanity_check(img_dir, msk_dir):
    '''
    It is good practice to check
    is the images match the masks before training.
    '''
    img_list = os.listdir(img_dir)
    msk_list = os.listdir(msk_dir)
    
    num_images = len(img_list)
    img_num = random.randint(0, num_images-1)

    img_for_plot = cv2.imread(img_dir + img_list[img_num], 1)
    img_for_plot = cv2.cvtColor(img_for_plot, cv2.COLOR_BGR2RGB)

    mask_for_plot =cv2.imread(msk_dir + msk_list[img_num], 0)

    plt.figure(figsize=(12, 8))
    plt.subplot(121)
    plt.imshow(img_for_plot)
    plt.title('Image')
    plt.subplot(122)
    plt.imshow(mask_for_plot, cmap='gray')
    plt.title('Mask')
    plt.show()

In [None]:
'''
launch this code a few times to make sure that masks corespond to image
'''
sanity_check(train_img_path, train_msk_path)
sanity_check(test_img_path, test_msk_path)
sanity_check(valid_img_path, valid_msk_path)

In [None]:
%%time
train_images = []
train_masks = []
test_images = []
test_masks = []


#3 chanels because segmentation models expect 3 chanel image
for img in os.listdir(train_img_path):
    img = cv2.imread(train_img_path + img, 1)# 1 means read as RGB
    train_images.append(img)
            
train_images = np.array(train_images)

for img in os.listdir(train_msk_path):
    img = cv2.imread(train_msk_path + img, 0)# 0 means read as grayscale
    train_masks.append(np.array(img))
            
train_masks = np.array(train_masks)/255
y_train = np.expand_dims(train_masks, axis=3).astype(int)

for img in os.listdir(test_img_path):
    img = cv2.imread(test_img_path + img, 1)
    test_images.append(np.array(img))
            
test_images = np.array(test_images)

for img in os.listdir(test_msk_path):
    image = Image.open(test_msk_path + img)
    test_masks.append(np.array(image))
            
test_masks = np.array(test_masks)/255
y_test = np.expand_dims(test_masks, axis=3).astype(int)

print(train_images.shape)#[0,..., 255]
print(y_train.shape)#[0, 1]
print(test_images.shape)#[0,..., 255]
print(y_test.shape)#[0, 1]

In [None]:
#Reused parameters in all models
n_classes=1
activation='sigmoid'

seed = 42

LR = 0.00001
optim = tf.keras.optimizers.Adam(LR)
'''
binary_focal_loss
gamma – Float or integer, focusing parameter for modulating factor (1 - p), default 2.0.
alpha weighting factor default 0.25.

Dice_loss

https://segmentation-models.readthedocs.io/en/latest/api.html#losses
'''

binary_focal_loss = sm.losses.BinaryFocalLoss(alpha=0.25, gamma=4.0)
dice_loss = sm.losses.DiceLoss()

total_loss = binary_focal_loss + dice_loss

metrics = [sm.metrics.IOUScore(threshold=0.5), sm.metrics.FScore(threshold=0.5)]

In [None]:
BACKBONE = 'resnet34'
preprocess_input = sm.get_preprocessing(BACKBONE)

# preprocess input
X_train = preprocess_input(train_images)
X_test = preprocess_input(test_images)

In [None]:
img_data_gen_args = dict(rotation_range=90,
                         width_shift_range=0.3,
                         height_shift_range=0.3,
                         shear_range=0.5,
                         zoom_range=0.3,
                         horizontal_flip=True,
                         vertical_flip=True,
                         fill_mode='reflect')

mask_data_gen_args = dict(rotation_range=90,
                          width_shift_range=0.3,
                          height_shift_range=0.3,
                          shear_range=0.5,
                          zoom_range=0.3,
                          horizontal_flip=True,
                          vertical_flip=True,
                          fill_mode='reflect',
                          preprocessing_function = lambda x: np.where(x>0, 1, 0).astype(x.dtype)) #Binarize the output again. 



image_data_generator = ImageDataGenerator(**img_data_gen_args)
image_data_generator.fit(X_train, augment=True, seed=seed)

train_img_gen = image_data_generator.flow(X_train, seed=seed)
test_img_gen = image_data_generator.flow(X_test, seed=seed)

msk_data_generator = ImageDataGenerator(**mask_data_gen_args)
msk_data_generator.fit(y_train, augment=True, seed=seed)

train_msk_gen = msk_data_generator.flow(y_train, seed=seed)
test_msk_gen = msk_data_generator.flow(y_test, seed=seed)

In [None]:
def my_image_mask_generator(image_generator, mask_generator):
    train_generator = zip(image_generator, mask_generator)
    for (img, mask) in train_generator:
        yield (img, mask)

my_generator = my_image_mask_generator(train_img_gen, train_msk_gen)
validation_datagen = my_image_mask_generator(test_img_gen, test_msk_gen)

In [None]:
# One more sanity check
x = train_img_gen.next()
y = train_msk_gen.next()
for i in range(0,1):
    image = x[i]
    mask = y[i]
    plt.subplot(1,2,1)
    plt.imshow(image[:,:,0], cmap='gray')
    plt.subplot(1,2,2)
    plt.imshow(mask[:,:,0], cmap='gray')
    plt.show()

In [None]:
# define model
model1 = sm.Unet(BACKBONE,
                 encoder_weights='imagenet',
                 encoder_freeze=True,
                 decoder_block_type = 'transpose',# I always have better results with transpose then UpSampling
                 classes=n_classes,
                 activation=activation)

# compile keras model with defined optimozer, loss and metrics
model1.compile(optimizer=optim,
               loss = total_loss,
               metrics=metrics)


print(model1.summary())

In [None]:
filepath=basepath + 'new_models/' +  "saved_models/" + BACKBONE + "-{epoch:02d}-{val_iou_score:.2f}.hdf5" #File name includes epoch and validation accuracy.

early_stop = EarlyStopping(monitor='val_iou_score',
                           patience=8,
                           mode = 'max',
                           verbose=1)


#Use Mode = max for accuracy and min for loss. 
checkpoint = ModelCheckpoint(filepath,
                             monitor='val_iou_score',
                             verbose=1,
                             save_best_only=True,
                             mode='max')

callbacks_list = [checkpoint]

In [None]:
batch_size = 16
steps_per_epoch = 3*(len(X_train))//batch_size

In [None]:
history=model1.fit(my_generator,
                    validation_data=validation_datagen,
                    batch_size=batch_size, 
                    steps_per_epoch=steps_per_epoch,
                    validation_steps=steps_per_epoch,
                    epochs= 1,
                    verbose=1,
                    callbacks=callbacks_list)

In [None]:
history = history1

In [None]:
#plot the training and validation accuracy and loss at each epoch
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, 'y', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

acc = history.history['iou_score']
val_acc = history.history['val_iou_score']

plt.plot(epochs, acc, 'y', label='Training iou_score')
plt.plot(epochs, val_acc, 'r', label='Validation iou_score')
plt.title('Training and validation iou_score')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
valid_images = []

for img in os.listdir(valid_img_path):
    if(os.path.isfile(valid_img_path + img)):
        img = cv2.imread(valid_img_path + img, 1)
        valid_images.append(np.array(img))
            
valid_images = np.array(valid_images)
X_valid = preprocess_input(valid_images)

from PIL import Image

valid_masks = []

for img in os.listdir(valid_msk_path):
    if(os.path.isfile(valid_msk_path + img)):
        image = Image.open(valid_msk_path + img)
        valid_masks.append(np.array(image))
            
valid_masks = np.array(valid_masks)/255
y_valid = np.expand_dims(valid_masks, axis=3).astype(int)

In [None]:
###
model_name = 'weights-improvement-21-0.71.hdf5'
model = keras.models.load_model(basepath + 'new_models/saved_models/' + model_name, compile = False)

In [None]:
prediction_ = model1.predict(X_valid)
y_pred_thresholded = prediction_ > 0.5
intersection = np.logical_and(y_valid, y_pred_thresholded)
union = np.logical_or(y_valid, y_pred_thresholded)
iou_score = np.sum(intersection) / np.sum(union)
print(" IoU socre is: ", iou_score)