In [1]:
import os
import glob
import json
import cv2
import random
from math import ceil
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import models, layers, optimizers, Input, Model
from matplotlib import pyplot as plt
import numpy as np
from PIL import Image, ImageDraw
from google.colab.patches import cv2_imshow

#PATHS
path_train_images = '/content/drive/My Drive/images/'
path_train_jsons = '/content/drive/My Drive/jsons/'
path_predict_images = '/content/drive/My Drive/predict_images/'

#READ FILES
image_files = glob.glob(os.path.join(path_train_images, "*.jpg"))
json_files = glob.glob(os.path.join(path_train_jsons, "*.json"))
predict_images = glob.glob(os.path.join(path_predict_images, "*.JPG"))

assert len(image_files)==len(json_files), "Number of files is not equal to masks"

#PREPARE TRAIN, TEST, VALIDATION SETS
#set size
train = 0.7
test = round((1-train)/2, 2)
validation = round(1-train-test, 2)

#divide images on sets
random.shuffle(image_files)
train_image_files = image_files[:round(len(image_files)*train)]
diff = np.setdiff1d(image_files, train_image_files)
test_image_files = diff[:round(len(diff)/2)]
val_image_files = diff[round(len(diff)/2):]
print(len(train_image_files), len(test_image_files), len(val_image_files))

#prepare json files for train set
def filename_detection(path):
    filename_with_ext = os.path.basename(path)
    filename, file_extension = os.path.splitext(filename_with_ext)
    return filename

def pick_json_files(files):
    set_json_files = []
    for i in range(0, len(files)):
        filename = filename_detection(files[i])
        set_json_files.append(path_train_jsons + filename +'.json')
    return set_json_files

train_json_files = pick_json_files(train_image_files)
test_json_files = pick_json_files(test_image_files)
val_json_files = pick_json_files(val_image_files)
print(len(train_json_files), len(test_json_files), len(val_json_files))

#PREPARE IMAGE AND MASK TRAIN SETS AS ARRAYS
def create_img_mask_arrays(json_files, image_files):
    img = None
    mask = None
    for i in range(0, len(json_files)):
        json_file = json_files[i]
        img_file = image_files[i]
        img_temp = cv2.imread(img_file)
        img_temp = cv2.resize(img_temp, dsize=(256, 256), interpolation=cv2.INTER_CUBIC)
        img_temp = np.array([img_temp])
        #print(img_temp.shape)
        if img is None:
            img = img_temp
        else:
            img = np.concatenate((img, img_temp), axis=0)
        with open(json_file, "r") as read_file:
            f = read_file.read()
            obj = json.loads(f)
        h = obj['imageHeight']
        w = obj['imageWidth']
        download_img = Image.new(size = (w, h), mode = 'RGB')
        for j in range(0, len(obj['shapes'])):
            poly = obj['shapes'][j]['points']
            a = []
            for m in range(0, len(poly)):
                poly_temp = tuple(poly[m])
                a.append(poly_temp)
            ImageDraw.Draw(download_img).polygon(a, outline=(255, 255, 255), fill=(255, 255, 255))
            download_img.show()
        mask_temp = np.array(download_img)
        mask_temp = cv2.resize(mask_temp, dsize=(256, 256), interpolation=cv2.INTER_CUBIC)
        mask_temp = cv2.cvtColor(mask_temp, cv2.COLOR_BGR2GRAY)
        (threshi, mask_temp) = cv2.threshold(mask_temp, 0, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU)
        mask_temp = np.array([mask_temp])
        mask_temp = np.moveaxis(mask_temp, 0, -1)
        mask_temp = mask_temp[np.newaxis, :]
        #print(mask_temp.shape)
        if mask is None:
            mask = mask_temp
        else:
            mask = np.concatenate((mask, mask_temp), axis=0)
    return img, mask

train_img_array, train_mask_array = create_img_mask_arrays(train_json_files, train_image_files)
val_img_array, val_mask_array = create_img_mask_arrays(val_json_files, val_image_files)
test_img_array, test_mask_array = create_img_mask_arrays(test_json_files, test_image_files)
print(train_mask_array.shape)
print(train_img_array.shape)

47 10 10
47 10 10
(47, 256, 256, 1)
(47, 256, 256, 3)


In [2]:
#AUGMENTATION
bs = 4 #batch size
'''
batch mode: batch size = dataset size
mini-batch mode: 1 < batch size < dataset size - better to use this
stochastic mode: batch size = 1
'''
#for future: try to add deformations 
train_datagen = dict(rescale=1./255,
                      rotation_range=40,
                      width_shift_range=0.2,
                      height_shift_range=0.2,
                      shear_range=0.2,
                      zoom_range=0.2,
                      horizontal_flip=True,
                      fill_mode='nearest')
test_datagen = dict(rescale=1./255)

train_image_datagen = ImageDataGenerator(**train_datagen)
train_mask_datagen = ImageDataGenerator(**train_datagen)
val_image_datagen = ImageDataGenerator(**train_datagen)
val_mask_datagen = ImageDataGenerator(**train_datagen)
test_image_datagen = ImageDataGenerator(**test_datagen)
test_mask_datagen = ImageDataGenerator(**test_datagen)

seed = 1 #to apply equal augmentations to image and mask
train_image_datagen.fit(train_img_array, augment=True, seed=seed) #augment=True allows to augment data randomly; rounds - number of augmented images derived from original image
train_mask_datagen.fit(train_mask_array, augment=True, seed=seed)
train_image_generator = train_image_datagen.flow(train_img_array, seed=seed, batch_size=bs)
train_mask_generator = train_mask_datagen.flow(train_mask_array, seed=seed, batch_size=bs)
train_generator = (pair for pair in zip(train_image_generator, train_mask_generator))

val_image_datagen.fit(val_img_array, augment=True, seed=seed)
val_mask_datagen.fit(val_mask_array, augment=True, seed=seed)
val_image_generator = val_image_datagen.flow(val_img_array, seed=seed, batch_size=bs)
val_mask_generator =val_mask_datagen.flow(val_mask_array, seed=seed, batch_size=bs)
val_generator = (pair for pair in zip(val_image_generator, val_mask_generator))

test_image_datagen.fit(test_img_array, augment=True, seed=seed)
test_mask_datagen.fit(test_mask_array, augment=True, seed=seed)
test_image_generator = test_image_datagen.flow(test_img_array, seed=seed, batch_size=bs)
test_mask_generator = test_mask_datagen.flow(test_mask_array, seed=seed, batch_size=bs)
test_generator = (pair for pair in zip(test_image_generator, test_mask_generator))

#if we run the code below we can see the shape of generated batches
'''for x, y in train_generator:
  print(x.shape)
  print(y.shape)'''

'for x, y in train_generator:\n  print(x.shape)\n  print(y.shape)'

In [0]:
#MODEL PARAMETERS
train_samples = len(train_img_array)
val_samples = len(val_img_array)
neuron_number = [16, 32, 64, 128, 256, 512, 1024]
dropout = [0.1, 0.1, 0.2, 0.2, 0.3, 0.3, 0.3] 

In [4]:
#MODEL DEFINITION
import tensorflow as tf
from keras.models import Model, load_model
from keras.layers import Input, BatchNormalization
from keras.layers.core import Dropout, Lambda
from keras.layers.convolutional import Conv2D, Conv2DTranspose
from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import concatenate
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras import backend as K

'''
- padding = 'same' to make output size of conv2D equal to input size
- Initializations define the way to set the initial random weights of layers
- DROPOUT: More recent research has shown some value in applying dropout also to convolutional layers, although at much lower levels: 
  p=0.1 or 0.2. Dropout was used after the activation function of each convolutional layer: CONV->RELU->DROP.
- BatchNormalization is used after ReLu, but before Dropout
'''
#c_storage = [] - do not forged to empty this variable in cycle while looking for the best combination of model parameters (if not using "find best combination" block, un-# this line)
def conv_down(inputs, activation, kernel, depth):
  for i in range(0, depth-1):
    c = Conv2D(neuron_number[i], (3, 3), activation=activation, kernel_initializer=kernel, padding='same')(inputs)
    c = BatchNormalization()(c)
    c = Dropout(dropout[i])(c)
    c = Conv2D(neuron_number[i], (3, 3), activation=activation, kernel_initializer=kernel, padding='same')(c)
    c = BatchNormalization()(c)
    c = Dropout(dropout[i])(c)
    c_storage.append(c)
    p = MaxPooling2D((2, 2))(c)
  return p

def bottom (input, activation, kernel, depth):
  use_neuron_number = neuron_number[depth]
  c = Conv2D(use_neuron_number, (3, 3), activation=activation, kernel_initializer=kernel, padding='same')(input)
  c = BatchNormalization()(c)
  c = Dropout(dropout[depth])(c)
  c = Conv2D(use_neuron_number, (3, 3), activation=activation, kernel_initializer=kernel, padding='same')(c)
  c = BatchNormalization()(c)
  c = Dropout(dropout[depth])(c)
  return c

def conv_up (input, activation, activation_last_layer, kernel, depth):
  for i in reversed(range(0, depth-1)):
    u = Conv2DTranspose(neuron_number[i], (2, 2), strides=(2, 2), padding='same')(input)
    u = concatenate([u, c_storage[i]])
    c = Conv2D(neuron_number[i], (3, 3), activation=activation, kernel_initializer=kernel, padding='same')(u)
    c = BatchNormalization()(c)
    c = Dropout(dropout[i])(c)
    c = Conv2D(neuron_number[i], (3, 3), activation=activation, kernel_initializer=kernel, padding='same')(c)
    c = BatchNormalization()(c)
    c = Dropout(dropout[i])(c)
  output = Conv2D(1, (1, 1), activation=activation_last_layer)(c)
  return output

def unet_model(depth, activation, activation_last_layer, kernel, optimizer, loss, metric):
  inputs = Input(shape=(256, 256, 3))
  p_down = conv_down(inputs, activation, kernel, depth)
  c_bottom = bottom(p_down, activation, kernel, depth)
  outputs = conv_up(c_bottom, activation, activation_last_layer, kernel, depth)
  model = Model(inputs=[inputs], outputs=[outputs])
  model.compile(optimizer=optimizer, loss=loss, metrics=[metric])
  model.summary()
  results = model.fit_generator(train_generator, validation_data=val_generator, validation_steps=ceil(val_samples/bs), 
                              steps_per_epoch = ceil(train_samples/bs), epochs=75)
  return model, results


Using TensorFlow backend.


In [0]:
#SINGLE MODEL RUN
c_storage = []
depth = 5
activation = 'relu'
activation_last_layer = 'tanh'
kernel = 'he_normal'
optimizer = 'adam'
loss = 'binary_crossentropy'
metric = 'binary_accuracy'
model, result = unet_model(depth, activation, activation_last_layer, kernel, optimizer, loss, metric)

In [0]:
#MULTIPLE MODEL RUN - FIND BEST COMBINATION
metr_result = {}
loss_result = {}
metr_val_result = {}
loss_val_result = {}
kernels = ['Zeros', 'Ones', 'RandomNormal', 'RandomUniform', 'TruncatedNormal', 'VarianceScaling', 'he_uniform', 'Orthogonal', 
           'Identity', 'lecun_uniform', 'glorot_normal', 'glorot_uniform', 'he_normal', 'lecun_normal']
activations = ['selu', 'elu', 'relu', 'linear', 'exponential', 'hard_sigmoid', 'sigmoid', 'tanh', 'softsign', 'softplus', 'softmax']
optimizers = ['SGD', 'RMSprop', 'Adagrad', 'Adadelta', 'Adam', 'Adamax', 'Nadam']
losses = ['mean_squared_error', 'mean_absolute_error', 'mean_absolute_percentage_error', 'mean_squared_logarithmic_error', 'squared_hinge',
          'hinge', 'logcosh', 'huber_loss', 'binary_crossentropy', 'kullback_leibler_divergence', 'poisson', 'cosine_proximity']
activations_last_layer = ['tanh', 'sigmoid']
metrics = ['accuracy', 'binary_accuracy', 'cosine_proximity']
depths = [2, 3, 4, 5, 6]
'''
depths = [4, 5]
kernels = kernels[0:1]
activations = activations[0:1]
optimizers = optimizers[0:1]
losses = losses[0:1]
activations_last_layer = activations_last_layer[0:1]
metrics = metrics[0:2]
'''
for depth in depths:
  for activation_last_layer in activations_last_layer:
    for kernel in kernels:
      for activation in activations:
        for optimizer in optimizers:
          for loss in losses:
            for metric in metrics:
              c_storage = []
              model, result = unet_model(depth, activation, activation_last_layer, kernel, optimizer, loss, metric)
              loss_result.update({str(depth)+' '+str(activation)+' '+str(activation_last_layer)+' '+str(kernel)+' '+str(optimizer)+' '+str(loss)+' '+str(metric):result.history['loss']})
              loss_val_result.update({str(depth)+' '+str(activation)+' '+str(activation_last_layer)+' '+str(kernel)+' '+str(optimizer)+' '+str(loss)+' '+str(metric):result.history['val_loss']})
              metr_result.update({str(depth)+' '+str(activation)+' '+str(activation_last_layer)+' '+str(kernel)+' '+str(optimizer)+' '+str(loss)+' '+str(metric):result.history[str(metric)]})
              metr_val_result.update({str(depth)+' '+str(activation)+' '+str(activation_last_layer)+' '+str(kernel)+' '+str(optimizer)+' '+str(loss)+' '+str(metric):result.history['val_'+str(metric)]})

#Найти информацию как это оценить, чтобы найти наилучшую комбинацию

In [0]:
#MULTIPLE MODEL EVELUATION - IN CASE OF LOW NUMBER OF COMBINATIONS - ЕЩЕ НЕ ДОДЕЛАЛА

#print(len(list(acc_result.items())))
#print(len(list(loss_result.items())))
#print(len(list(acc_val_result.items())))
#print(len(list(loss_val_result.items())))
leng = len(list(acc_result.items()))
length = len(list(loss_result.values())[0])
print(length)

epochs = range(1, length + 1)
print(len(epochs))

ax = plt.gca()
#print(list(loss_result.keys())[1])
#print(list(loss_result.values())[1])

for i in range(0, leng):
  print(i)
  color = next(ax._get_lines.prop_cycler)['color']
  plt.plot(epochs, list(loss_result.values())[i], color = color, label='Training loss: '+ str(list(loss_result.keys())[i])) 
plt.title('Training losses')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()



In [0]:
#SINGLE MODEL EVALUATION

acc =result.history['acc'] #['****'] depends on metric
val_acc = result.history['val_acc']
loss = result.history['loss']
val_loss = result.history['val_loss']
print('train acc: ', np.mean(acc))
print('val acc: ', np.mean(val_acc))
print('train loss: ', np.mean(loss))
print('val loss: ', np.mean(val_loss))

epochs = range(1, len(acc) + 1)
plt.plot(epochs, loss, 'bo', label='Training loss') 
plt.plot(epochs, val_loss, 'b', label='Validation loss') 
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

plt.plot(epochs, acc, 'bo', label='Training acc')
plt.plot(epochs, val_acc, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

test_samples = len(test_image_files)
test_loss, test_acc = model.evaluate_generator(test_generator, steps=ceil(test_samples/bs))
print('test acc:', np.mean(test_acc))
print('test loss:', np.mean(test_loss))

In [0]:
#PREDICT MASK
#prepare image for prediction
predict_images = glob.glob(os.path.join(path_predict_images, "*.JPG"))
img_predict = cv2.imread(predict_images[0])
img_predict = cv2.resize(img_predict, dsize=(256, 256), interpolation=cv2.INTER_CUBIC)
img_predict = img_predict[np.newaxis, :]

#predict
result = model.predict(img_predict)
result=np.squeeze(result)
result[result > .5] = 255
result[result <= .5] = 0

#show image and predicted mask
cv2_imshow(img_predict)
cv2_imshow(result)
