In [None]:
import tensorflow as tf
from tensorflow import keras
from keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
import os
import random
import numpy as np
import pickle
import cv2
import pickle

from skimage.io import imread, imshow
import matplotlib.pyplot as plt

# mount google drive
from google.colab import drive
drive.mount('/content/drive')

# define variables
seed = 1

IMG_WIDTH = 512
IMG_HEIGHT = 512
IMG_CHANNELS = 3

PLOT_SIZE = 20
VERSION = 'version_name'
VALIDATION_SPLIT = 0.15
LR = 0.001
FIRST_CONVS = 16

LOCAL_FILTER = 5
MINIMUM_DISTANCE = 3
TM = 0.2

# define paths
TRAIN_DATA_IMAGES_PATH = '/content/drive/MyDrive/unet/train_data_images/'
TEST_DATA_IMAGES_PATH = '/content/drive/MyDrive/unet/test_data_images/'
PICKLE_PATH = '/content/drive/MyDrive/unet/images_data.pickle'

SAVE_MODEL_PATH = '/content/drive/MyDrive/unet/models' + VERSION + '.h5'

In [None]:
# DEFINE FUNCTIONS
# define function for draw gradiends to a masks
def mark_gradient(x, y, image, radius):
    radius = 5
    for i in range(1, radius*2+1):
        for j in range(1, radius*2+1):
            distance = np.sqrt((i-radius)**2 + (j-radius)**2)
            if distance < radius:
                # linearni interpolace
                gray = int(-(255/radius)*distance + 255)

                # nelinearni interpolace https://www.desmos.com/calculator/fz5tqkzosn
                #nonlinear_parameter = 3 # the higher the more nonlinear
                #gray = -255*(distance/radius)**(1/nonlinear_parameter) + 255

                try:
                    image[i+x-radius, j+y-radius] = gray
                except IndexError:
                    pass
    return image

# define function for augmentation
# some functions inspired by: https://stackoverflow.com/questions/22937589/how-to-add-noise-gaussian-salt-and-pepper-etc-to-image-in-python-with-opencv/30609854#30609854
def random_noise(input_image):
    case = random.randint(1, 4)

    if case == 1: # gaussian noise
        row,col,ch= input_image.shape
        mean = 0
        sigma = 0.5
        gauss = np.random.normal(mean,sigma,(row,col,ch))
        gauss = gauss.reshape((row,col,ch)).astype('uint8')
        noise_image = input_image + gauss

    elif case == 2: # salt and pepper
        row,col,ch = input_image.shape
        s_vs_p = 0.5
        amount = 0.03
        noise_image = np.copy(input_image)

        # Salt mode
        num_salt = np.ceil(amount * input_image.size * s_vs_p)
        coords = [np.random.randint(0, i - 1, int(num_salt))
                for i in input_image.shape]
        noise_image[coords] = 1

        # Pepper mode
        num_pepper = np.ceil(amount* input_image.size * (1. - s_vs_p))
        coords = [np.random.randint(0, i - 1, int(num_pepper))
                for i in input_image.shape]
        noise_image[coords] = 0

    elif case == 3: # speckle
        row,col,ch= input_image.shape
        mean = 0
        sigma = 0.5
        gauss = np.random.normal(mean,sigma,input_image.size)
        gauss = gauss.reshape((row,col,ch)).astype('uint8')
        noise_image = input_image + input_image * gauss

    else:
        noise_image = input_image

    return noise_image

# define function for chceck if there is any other scale nearby
def no_neighbor(point, points, minimum_distance):
    if len(points) == 0:
        return True

    for loop_point in points:
        distance = np.sqrt((point[0] - loop_point[0])**2 + (point[1] - loop_point[1])**2)
        if distance < minimum_distance:
            return False

    return True

# define function for counting scales
def counterpoint(raw_image, local_filter, minimum_distance, tm, centers_to=None):
    image = raw_image*(255/raw_image.max())
    image = image.astype(int)

    if (local_filter % 2) == 0:
        local_filter += 1

    H = image.shape[0]
    W = image.shape[1]

    # MAX FILTER
    add_black = int(local_filter/2)
    padding = np.pad(np.squeeze(image), pad_width=add_black, mode='constant', constant_values=0)
    padding = np.expand_dims(padding, axis=2)
    max_filter = np.zeros((H, W, 1), np.uint8)
    for x in range(0, H):
        for y in range(0, W):
            im_filter = padding[x:x+local_filter, y:y+local_filter]
            max_filter[x][y] = im_filter.max()

    B1 = np.zeros((H, W, 1), np.uint8)
    for x in range(0, H):
        for y in range(0, W):
            if max_filter[x][y] == image[x][y]:
                B1[x][y] = 255
            else:
                B1[x][y] = 0

    # MASK OMEGA
    omega = np.zeros((H, W, 1), np.uint8)
    for x in range(0, H):
        for y in range(0, W):
            if image[x][y] == 0:
                omega[x][y] = 255
            else:
                omega[x][y] = 0

    # EROSION
    kernel = np.ones((local_filter, local_filter), np.uint8)
    erosion = cv2.erode(omega, kernel, iterations = 1)

    # XOR OF EROSION MASK AND MASK1
    B_omega = np.zeros((H, W, 1), np.uint8)
    for x in range(0, H):
        for y in range(0, W):
            if B1[x][y] == erosion[x][y]:
                B_omega[x][y] = 0
            else:
                B_omega[x][y] = 255


    # MASK1
    centroid_map = np.zeros((H, W, 1), np.uint8)
    centers = []
    for x in range(0, H):
        for y in range(0, W):
            if B_omega[x][y] == 255 and image[x][y] >= int(255*tm):
                centroid_map[x][y] = 255
                if no_neighbor((x, y), centers, MINIMUM_DISTANCE):
                    centers.append((x, y))
            else:
                centroid_map[x][y] = 0

    image = image/255

    # devide into class1 (left scales) and class2 (right scales)
    group1 = []
    group2 = []

    most_left = (W, H)
    for point in centers:
        if point[1] < most_left[1]:
            most_left = point

    measured_point = most_left
    distances = []
    nearest_point = (0, 0) # x, y
    
    centers_to_devide = centers[:]
    centers_to_devide.remove(measured_point)
    group1.append(measured_point)

    # filter close points
    while True:
        shortest_dist = H
        for point in centers_to_devide:
            distance = np.sqrt((point[0] - measured_point[0])**2 + (point[1] - measured_point[1])**2)
            if distance < shortest_dist:
                nearest_point = point
                shortest_dist = distance

        measured_point = nearest_point

        if len(distances) == 0:
            pass
        elif shortest_dist > 2*(sum(distances)/len(distances)):
            group2 = centers_to_devide[:]
            break

        distances.append(shortest_dist)
        centers_to_devide.remove(measured_point)
        group1.append(measured_point)

    # decide which group is upper -> class2 (right scales) and which is more down -> class1 (left scales)
    sum_y = 0
    for point in group1:
        sum_y += point[1]
    group1_average_y = sum_y

    sum_y = 0
    for point in group2:
        sum_y += point[1]
    group2_average_y = sum_y

    if group1_average_y > group2_average_y:
        class1 = group1[:]
        class2 = group2[:]
    else:
        class1 = group2[:]
        class2 = group1[:]


    # draw results into images
    final_image = np.append(image, image, axis=2)
    final_image = np.append(final_image, image, axis=2)
    
    for center in class1:
        final_image = cv2.circle(final_image, (center[1],center[0]), 0, (0,0,255), 4)
    for center in class2:
        final_image = cv2.circle(final_image, (center[1],center[0]), 0, (255,0,0), 4)
    
    if centers_to is not None:
        centers_original = centers_to.copy()
        for center in class1:
            centers_original = cv2.circle(centers_original, (center[1],center[0]), 0, (0,0,255), 4)
        for center in class2:
            centers_original = cv2.circle(centers_original, (center[1],center[0]), 0, (255,0,0), 4)

    else:
        centers_original = centers_to.copy()

    return [centers, final_image, centers_original]

In [None]:
# LOAD TEST AND TRAIN IMAGES
# preparing data and unet model architecture inspired by: https://github.com/bnsreenu/python_for_microscopists/blob/master/076-077-078-Unet_nuclei_tutorial.py
try:
    pickle_in = open(PICKLE_PATH, 'rb')
    gradients  = pickle.load(pickle_in)
    pickle_in.close()    

    print('labels for', len(gradients), 'images found')

except FileNotFoundError:
    print('no .pickle file in path: ', PICKLE_PATH)

train_ids = []
test_ids = []

for file_name in os.listdir(TRAIN_DATA_IMAGES_PATH):
    if file_name[-4:] == '.png' and file_name in gradients:
        train_ids.append(file_name)

for file_name in os.listdir(TEST_DATA_IMAGES_PATH):
    if file_name[-4:] == '.png':
        test_ids.append(file_name)

print('train images:', len(train_ids))
print('test images:', len(test_ids))

images = np.zeros((len(train_ids), IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS), dtype=np.uint8)
masks = np.zeros((len(train_ids), IMG_HEIGHT, IMG_WIDTH, 1), dtype=np.uint8)
test_images = np.zeros((len(test_ids), IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS), dtype=np.uint8)


for e, id in enumerate(train_ids):
    path = TRAIN_DATA_IMAGES_PATH + id

    img = imread(path)[:,:,:IMG_CHANNELS]
    images[e] = img

    class1 = gradients[id]['class1']
    class2 = gradients[id]['class2']
    mask = np.zeros((IMG_HEIGHT, IMG_WIDTH, 1), dtype=np.uint8)
    
    for point in class1:
        mask = mark_gradient(point[1], point[0], mask, point[2])

    for point in class2:
        mask = mark_gradient(point[1], point[0], mask, point[2])

    masks[e] = mask

for e, id in enumerate(test_ids):
    path = TEST_DATA_IMAGES_PATH + id
    img = imread(path)[:,:,:IMG_CHANNELS]
    test_images[e] = img

# normalize values 0 - 1
masks = masks/255
images = images/255
test_images = test_images/255

print(np.shape(images))
print(np.shape(masks))
print(np.shape(test_images))

In [None]:
# SHOW RANDOM IMAGE AND MASK
image_x = random.randint(0, len(train_ids)-1)

fig = plt.figure(figsize=(PLOT_SIZE, PLOT_SIZE))
counterpoint_results = counterpoint(masks[image_x], LOCAL_FILTER, MINIMUM_DISTANCE, TM, images[image_x])

plt.subplot(131), imshow(counterpoint_results[2])
plt.subplot(132), imshow(np.squeeze(counterpoint_results[1]))
plt.subplot(133), imshow(np.squeeze(masks[image_x]))

plt.show()

In [None]:
# BUILD THE MODEL
inputs = tf.keras.layers.Input((IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS))

# encoder
c1 = tf.keras.layers.Conv2D(FIRST_CONVS, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(inputs)
c1 = tf.keras.layers.Dropout(0.1)(c1)
c1 = tf.keras.layers.Conv2D(FIRST_CONVS, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c1)
p1 = tf.keras.layers.MaxPooling2D((2, 2))(c1)

c2 = tf.keras.layers.Conv2D(FIRST_CONVS*2, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p1)
c2 = tf.keras.layers.Dropout(0.1)(c2)
c2 = tf.keras.layers.Conv2D(FIRST_CONVS*2, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c2)
p2 = tf.keras.layers.MaxPooling2D((2, 2))(c2)
 
c3 = tf.keras.layers.Conv2D(FIRST_CONVS*4, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p2)
c3 = tf.keras.layers.Dropout(0.2)(c3)
c3 = tf.keras.layers.Conv2D(FIRST_CONVS*4, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c3)
p3 = tf.keras.layers.MaxPooling2D((2, 2))(c3)
 
c4 = tf.keras.layers.Conv2D(FIRST_CONVS*8, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p3)
c4 = tf.keras.layers.Dropout(0.2)(c4)
c4 = tf.keras.layers.Conv2D(FIRST_CONVS*8, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c4)
p4 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(c4)
 
c5 = tf.keras.layers.Conv2D(FIRST_CONVS*16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p4)
c5 = tf.keras.layers.Dropout(0.3)(c5)
c5 = tf.keras.layers.Conv2D(FIRST_CONVS*16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c5)

# decoder
u6 = tf.keras.layers.Conv2DTranspose(FIRST_CONVS*8, (2, 2), strides=(2, 2), padding='same')(c5)
u6 = tf.keras.layers.concatenate([u6, c4])
c6 = tf.keras.layers.Conv2D(FIRST_CONVS*8, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u6)
c6 = tf.keras.layers.Dropout(0.2)(c6)
c6 = tf.keras.layers.Conv2D(FIRST_CONVS*8, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c6)
 
u7 = tf.keras.layers.Conv2DTranspose(FIRST_CONVS*4, (2, 2), strides=(2, 2), padding='same')(c6)
u7 = tf.keras.layers.concatenate([u7, c3])
c7 = tf.keras.layers.Conv2D(FIRST_CONVS*4, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u7)
c7 = tf.keras.layers.Dropout(0.2)(c7)
c7 = tf.keras.layers.Conv2D(FIRST_CONVS*4, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c7)
 
u8 = tf.keras.layers.Conv2DTranspose(FIRST_CONVS*2, (2, 2), strides=(2, 2), padding='same')(c7)
u8 = tf.keras.layers.concatenate([u8, c2])
c8 = tf.keras.layers.Conv2D(FIRST_CONVS*2, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u8)
c8 = tf.keras.layers.Dropout(0.1)(c8)
c8 = tf.keras.layers.Conv2D(FIRST_CONVS*2, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c8)
 
u9 = tf.keras.layers.Conv2DTranspose(FIRST_CONVS, (2, 2), strides=(2, 2), padding='same')(c8)
u9 = tf.keras.layers.concatenate([u9, c1], axis=3)
c9 = tf.keras.layers.Conv2D(FIRST_CONVS, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u9)
c9 = tf.keras.layers.Dropout(0.1)(c9)
c9 = tf.keras.layers.Conv2D(FIRST_CONVS, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c9)
 
outputs = tf.keras.layers.Conv2D(1, (1, 1), activation='sigmoid')(c9)

# compile model and save architecture
model = tf.keras.Model(inputs=[inputs], outputs=[outputs])
opt = keras.optimizers.Adam(learning_rate=LR)
model.compile(optimizer=opt, loss='binary_crossentropy', metrics=['binary_crossentropy'])
model.save(SAVE_MODEL_PATH)
model.summary()

In [None]:
# VALIDATION SPLIT
# validation split and augmentation inspired by: https://github.com/bnsreenu/python_for_microscopists/blob/master/219_unet_small_dataset_using_functional_blocks.py
X_train, X_test, Y_train, Y_test = train_test_split(images, masks, test_size = VALIDATION_SPLIT, random_state = seed)
print(X_train.shape)
print(X_test.shape)

print(Y_train.shape)
print(Y_test.shape)

In [None]:
# AUGMENTATION
img_data_gen_args = dict(rotation_range=5,
                         vertical_flip=True,
                         #width_shift_range=0.1,
                         #height_shift_range=0.1,
                         #shear_range=0.15,
                         zoom_range=[0.7, 1],
                         fill_mode='reflect')
                         #preprocessing_function=random_noise)

mask_data_gen_args = dict(rotation_range=5,
                         vertical_flip=True,
                         #width_shift_range=0.1,
                         #height_shift_range=0.1,
                         #shear_range=0.15,
                         zoom_range=[0.7, 1],
                         fill_mode='reflect')

image_data_generator = ImageDataGenerator(**img_data_gen_args)
image_data_generator.fit(X_train, augment=True, seed=seed)

image_generator = image_data_generator.flow(X_train, seed=seed, batch_size=16)
valid_img_generator = image_data_generator.flow(X_test, seed=seed, batch_size=16)

mask_data_generator = ImageDataGenerator(**mask_data_gen_args)
mask_data_generator.fit(Y_train, augment=True, seed=seed)
mask_generator = mask_data_generator.flow(Y_train, seed=seed, batch_size=16)
valid_mask_generator = mask_data_generator.flow(Y_test, seed=seed, batch_size=16)

def my_image_mask_generator(image_generator, mask_generator):
    train_generator = zip(image_generator, mask_generator)
    for (img, mask) in train_generator:
        yield (img, mask)

my_generator = my_image_mask_generator(image_generator, mask_generator)

validation_datagen = my_image_mask_generator(valid_img_generator, valid_mask_generator)

In [None]:
# SHOW RANDOM AUGMENTED IMAGE
x = image_generator.next()
y = mask_generator.next()
for i in range(0,1):
    fig = plt.figure(figsize=(PLOT_SIZE, PLOT_SIZE))
    image = x[i]
    mask = y[i]
    plt.subplot(1,2,1)
    plt.imshow(image)
    plt.subplot(1,2,2)
    plt.imshow(np.squeeze(mask))
    plt.show()

In [None]:
# SET CHECKPOINTS
callbacks = [tf.keras.callbacks.ModelCheckpoint(SAVE_MODEL_PATH, verbose=1, save_best_only=True, save_weights_only=False, save_freq = 'epoch', monitor='val_loss', mode='min')]

In [None]:
# TRAIN MODEL
# set train parametres
batch_size = 36
steps_per_epoch = 3*(len(X_train))//batch_size
print(steps_per_epoch)

# train model
results = model.fit(my_generator, validation_data=validation_datagen, steps_per_epoch=steps_per_epoch, validation_steps=steps_per_epoch, epochs=500, callbacks=callbacks)

In [None]:
# LOAD MODEL FOR TRAIN ON A SAME WEIGHTS IN CASE OF GOOGLE COLAB SHUTDOWN
model = tf.keras.models.load_model(SAVE_MODEL_PATH)

In [None]:
# LOAD TRAINED MODEL
trained_model = tf.keras.models.load_model(SAVE_MODEL_PATH)

In [None]:
# TEST TRAINED MODEL ON TRAIN DATA
idx = random.randint(0, len(X_train)-2)
print('train image: ', idx)

fig = plt.figure(figsize=(PLOT_SIZE, PLOT_SIZE))

preds_model = trained_model.predict(np.expand_dims(X_train[idx], axis=0))
counterpoint_results = counterpoint(preds_model[0], LOCAL_FILTER, MINIMUM_DISTANCE, TM, X_train[idx])

# show results
plt.subplot(131), imshow(np.squeeze(counterpoint_results[1]))
print('found scales:', len(counterpoint_results[0]))

# show original mask
plt.subplot(132), imshow(np.squeeze(Y_train[idx]))
# show original image
plt.subplot(133), imshow(X_train[idx])
plt.show()

In [None]:
# TEST TRAINED MODEL ON TEST DATA
random.randint(0, len(X_test)-2)
print('test image: ', idx)

fig = plt.figure(figsize=(PLOT_SIZE, PLOT_SIZE))

preds_model = trained_model.predict(np.expand_dims(X_test[idx], axis=0))
counterpoint_results = counterpoint(preds_model[0], LOCAL_FILTER, MINIMUM_DISTANCE, TM, X_test[idx])

# show results
plt.subplot(131), imshow(np.squeeze(counterpoint_results[1]))
print('found scales:', len(counterpoint_results[0]))

# show original image
plt.subplot(122), imshow(X_test[idx])
plt.show()