In [None]:
import os
import sys
import random
import warnings

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_addons as tfa
import cv2

import matplotlib.pyplot as plt

from tqdm import tqdm
from itertools import chain
from skimage.io import imread, imshow, imread_collection, concatenate_images
from skimage.transform import resize
from skimage.morphology import label

from keras.models import Model, load_model
from keras.layers import Input
from keras.layers.core import Dropout, Lambda
from keras.layers.convolutional import Conv2D, Conv2DTranspose
from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import concatenate
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras import backend as K

warnings.filterwarnings('ignore', category=UserWarning, module='skimage')
seed = 69
random.seed = seed
np.random.seed = seed

In [None]:
# u-net 128x128, change image parameters to match or otherwise
# IMG_WIDTH = 128
# IMG_HEIGHT = 128
IMG_WIDTH = 224
IMG_HEIGHT = 160
# img_height = 160
# img_width = 224
IMG_CHANNELS = 3
TRAIN_PATH = '/content/drive/MyDrive/L2/training_set'
TEST_PATH = '/content/drive/MyDrive/L2/testing_set'


In [None]:
# trainining images and masks
train_masks_ids = os.listdir("/content/drive/MyDrive/L2/training_set/masks")
train_images_ids = os.listdir("/content/drive/MyDrive/L2/training_set/images")
# sort to match file names
train_masks_ids.sort()
train_images_ids.sort()

X_train = np.zeros((len(train_masks_ids), IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS), dtype=np.float32)
Y_train = np.zeros((len(train_masks_ids), IMG_HEIGHT, IMG_WIDTH), dtype=np.float32)
print('train images and masks')
sys.stdout.flush()

for n, id_ in tqdm(enumerate(train_masks_ids), total=len(train_masks_ids)):
    path = TRAIN_PATH
    img = imread(path + '/images/' + id_ )[:,:,:IMG_CHANNELS]
    img = resize(img, (IMG_HEIGHT, IMG_WIDTH)) #, mode='constant', preserve_range=True)
    # array = tf.keras.preprocessing.image.img_to_array(img)
    # img_n = tf.cast(array, tf.float32)
    # img_n /= 255
    # img = tf.keras.preprocessing.image.array_to_img(img_n)
    X_train[n] = img
    
    mask_ = imread(path + '/masks/' + id_ )
    mask_ = resize(mask_, (IMG_HEIGHT, IMG_WIDTH), mode='constant', preserve_range=True)
    # mask_array = tf.keras.preprocessing.image.img_to_array(mask_)
    # mask_n = tf.cast(mask_array, tf.float32)
    # mask_n /= 2
    # mask_ = tf.keras.preprocessing.image.array_to_img(mask_n)
    Y_train[n] = mask_


  

train images and masks


100%|██████████| 510/510 [00:07<00:00, 68.82it/s]


In [None]:
 # test images
test_masks_ids = os.listdir("/content/drive/MyDrive/L2/testing_set/masks")
test_images_ids = os.listdir("/content/drive/MyDrive/L2/testing_set/images")
# sort to match file names
test_masks_ids.sort()
test_images_ids.sort()

X_test = np.zeros((len(test_images_ids), IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS), dtype=np.float32)
Y_test = np.zeros((len(test_masks_ids), IMG_HEIGHT, IMG_WIDTH), dtype=np.float32)

print('test images')
sys.stdout.flush()
for n, id_ in tqdm(enumerate(test_images_ids), total=len(test_images_ids)):
    path = TEST_PATH 
    img = imread(path + '/images/' + id_)[:,:,:IMG_CHANNELS]
    img = resize(img, (IMG_HEIGHT, IMG_WIDTH)) #, mode='constant', preserve_range=True)
    # array = tf.keras.preprocessing.image.img_to_array(img)
    # img_n = tf.cast(array, tf.float32)
    # img_n /= 255
    # img = tf.keras.preprocessing.image.array_to_img(img_n)
    X_test[n] = img

    mask_ = imread(path + '/masks/' + id_ )
    mask_ = resize(mask_, (IMG_HEIGHT, IMG_WIDTH), mode='constant', preserve_range=True)
    Y_test[n] = mask_




test images


100%|██████████| 10/10 [00:00<00:00, 67.55it/s]


In [None]:
# Verify some samples

# # for n, id_ in tqdm(enumerate(train_masks_ids), total=len(train_masks_ids)):
# #     path = TRAIN_PATH
# #     print(id_)
#
i = random.randint(0, len(train_masks_ids))
id_ = train_masks_ids[i]
img = imread(TRAIN_PATH + '/images/' + id_ )[:,:,:IMG_CHANNELS]

img1 = resize(img, (128,128))#, mode='constant', preserve_range=True)

imshow(img)
plt.show()

# imshow(img1)
# plt.show()
# tf.reduce_max(Y_train[i])

In [None]:
# U-Net arch model goes here
inputs = Input((IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS))
s = Lambda(lambda x: x / 255) (inputs)

c1 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (s)
c1 = Dropout(0.1) (c1)
c1 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c1)
p1 = MaxPooling2D((2, 2)) (c1)

c2 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p1)
c2 = Dropout(0.1) (c2)
c2 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c2)
p2 = MaxPooling2D((2, 2)) (c2)

c3 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p2)
c3 = Dropout(0.2) (c3)
c3 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c3)
p3 = MaxPooling2D((2, 2)) (c3)

c4 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p3)
c4 = Dropout(0.2) (c4)
c4 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c4)
p4 = MaxPooling2D(pool_size=(2, 2)) (c4)

c5 = Conv2D(256, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p4)
c5 = Dropout(0.3) (c5)
c5 = Conv2D(256, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c5)

u6 = Conv2DTranspose(128, (3, 3), strides=(2, 2), padding='same') (c5)
u6 = concatenate([u6, c4])
c6 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u6)
c6 = Dropout(0.2) (c6)
c6 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c6)

u7 = Conv2DTranspose(64, (3, 3), strides=(2, 2), padding='same') (c6)
u7 = concatenate([u7, c3])
c7 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u7)
c7 = Dropout(0.2) (c7)
c7 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c7)

u8 = Conv2DTranspose(32, (3, 3), strides=(2, 2), padding='same') (c7)
u8 = concatenate([u8, c2])
c8 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u8)
c8 = Dropout(0.1) (c8)
c8 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c8)

u9 = Conv2DTranspose(16, (3, 3), strides=(2, 2), padding='same') (c8)
u9 = concatenate([u9, c1], axis=3)
c9 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u9)
c9 = Dropout(0.1) (c9)
c9 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c9)

outputs = Conv2D(3, (1, 1), padding="same", activation='sigmoid') (c9)
# outputs = Conv2D(1, (1, 1), padding="same", activation='sigmoid') (c9)

model = Model(inputs=[inputs], outputs=[outputs])
# model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[tf.keras.metrics.MeanIoU(num_classes=2)])
model.compile(optimizer='adam', loss='SparseCategoricalCrossentropy', metrics=['accuracy'])

model.summary()

Model: "model_4"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_5 (InputLayer)            [(None, 160, 224, 3) 0                                            
__________________________________________________________________________________________________
lambda_4 (Lambda)               (None, 160, 224, 3)  0           input_5[0][0]                    
__________________________________________________________________________________________________
conv2d_76 (Conv2D)              (None, 160, 224, 16) 448         lambda_4[0][0]                   
__________________________________________________________________________________________________
dropout_36 (Dropout)            (None, 160, 224, 16) 0           conv2d_76[0][0]                  
____________________________________________________________________________________________

In [None]:
# training the model

# checkpointer for saving best 
# early stopper callback

results = model.fit(X_train, Y_train, validation_split=0.1, batch_size=16, epochs=50, callbacks=None) 
                    # callbacks=[earlystopper, checkpointer]) maybe

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [None]:
# output is a 3 channel mask, consolidate into a single channel mask
def create_mask(pred_arr):
  temp_mask = np.zeros((IMG_HEIGHT, IMG_WIDTH), dtype=np.float32)
  for i in range(IMG_HEIGHT):
    for j in range(IMG_WIDTH):
      if(pred_arr[i,j,0] >= pred_arr[i,j,1]):
        if(pred_arr[i,j,0] >= pred_arr[i,j,2]):
          temp_mask[i,j] = 0
        else:
          temp_mask[i,j] = 2
      else:
        if(pred_arr[i,j,1] >= pred_arr[i,j,2]):
          temp_mask[i,j] = 1
        else:
          temp_mask[i,j] = 2
  
  return temp_mask


# Run predictions
preds_train = model.predict(X_train[:int(X_train.shape[0]*0.9)], verbose=1)
preds_val = model.predict(X_train[int(X_train.shape[0]*0.9):], verbose=1)
preds_test = model.predict(X_test, verbose=1)


# masks for training predictions
predicted_mask_train = np.zeros((len(preds_train),IMG_HEIGHT,IMG_WIDTH),dtype=np.int32)
for i in range(len(preds_train)):
  predicted_mask_train[i] = create_mask(preds_train[i])

# masks for validation predictions
predicted_mask_val = np.zeros((len(preds_val),IMG_HEIGHT,IMG_WIDTH),dtype=np.int32)
for i in range(len(preds_val)):
  predicted_mask_val[i] = create_mask(preds_val[i])

# masks for test predictions
predicted_mask_test = np.zeros((len(preds_test),IMG_HEIGHT,IMG_WIDTH),dtype=np.int32)
for i in range(10):
  predicted_mask_test[i] = create_mask(preds_test[i])



print('Done predicting! \n')





Done predicting! 



In [None]:
# Filter the predicted mask for improving inconsistencies

filt_predicted_mask_test = np.zeros((len(X_test),IMG_HEIGHT, IMG_WIDTH), dtype=np.float32)
for i in range(len(X_test)):
  # filt_predicted_mask_test = cv2.GaussianBlur(predicted_mask_test[i],(3,3),0)
  # filt_predicted_mask_test[i] = tfa.image.gaussian_filter2d(predicted_mask_test[i], (3,3), 1.0, 'CONSTANT', 0, None ) 
  filt_predicted_mask_test[i] = tfa.image.median_filter2d(predicted_mask_test[i], (3,3), 'CONSTANT', 0, None ) # salt and pepper
  filt_predicted_mask_test[i] = tfa.image.gaussian_filter2d(filt_predicted_mask_test[i], (3,3), 1.0, 'CONSTANT', 0, None ) # blurred





In [None]:
# # obtain diameter in both directions, diameter outer using pixel val 2, diameter inner using pixel val 1

def segmenter(array, val):
  [x,y] = array.shape
  temp = np.zeros((x,y), dtype = np.float32)
  for i in range(x):
    for j in range(y):
      if(array[i,j] == val):
        temp[i,j] = 1
      else:
        temp[i,j] = 0
  return temp

# #  boundary coords assuming convex shaped ellipse or circle

def edge_points(array):
  [x,y] = array.shape
  min_x = x
  max_x = 0
  min_y = y
  max_y = 0
  for i in range(x):
    for j in range(y):
      if(array[i,j] == 1):
        if(i <= min_x):
          min_x = i
        if(i >= max_x):
          max_x = i
        if(j <= min_y):
          min_y = j
        if(j >= max_y):
          max_y = j
  
  vert_coord = [min_x,max_x]
  hori_coord = [min_y,max_y]
  return(vert_coord, hori_coord)
  



# Get center coordinates and vert/horizontal diameters
# get dia error as compared to true mask

prediction_array = np.zeros((len(X_test), 8) , dtype = np.float32)
error_test_iris = np.zeros(len(X_test), dtype = np.float32)
error_test_pupil = np.zeros(len(X_test), dtype = np.float32)

for i in range(len(X_test)):
  temp_img1 = Y_test[i]
  temp_img2 = filt_predicted_mask_test[i]

  iris_pred_mask = segmenter(temp_img2, 2)
  pupil_pred_mask = segmenter(temp_img2, 1)

  iris_true_mask = segmenter(temp_img1, 2)
  pupil_true_mask = segmenter(temp_img1, 1)

# iris circle
  [vert,horiz] = edge_points(iris_true_mask)
  [vert1,horiz1] = edge_points(iris_pred_mask)

  diam_pred_vert = vert1[1] - vert1[0]
  diam_pred_horiz = horiz1[1] - horiz1[0]
  center_pred_vert = (vert1[1] + vert1[0])/2
  center_pred_horiz = (horiz1[1] + horiz1[0])/2  

  diam_true_vert = vert[1] - vert[0]
  diam_true_horiz = horiz[1] - horiz[0]
  center_true_vert = (vert[1] + vert[0])/2
  center_true_horiz = (horiz[1] + horiz[0])/2  

# pupil circle
  [vert2,horiz2] = edge_points(pupil_true_mask)
  [vert3,horiz3] = edge_points(pupil_pred_mask)

  diam_pred_vert_pupil = vert3[1] - vert3[0]
  diam_pred_horiz_pupil = horiz3[1] - horiz3[0]
  center_pred_vert_pupil = (vert3[1] + vert3[0])/2
  center_pred_horiz_pupil = (horiz3[1] + horiz3[0])/2  

  diam_true_vert_pupil = vert2[1] - vert2[0]
  diam_true_horiz_pupil = horiz2[1] - horiz2[0]
  center_true_vert_pupil = (vert2[1] + vert2[0])/2
  center_true_horiz_pupil = (horiz2[1] + horiz2[0])/2  


  prediction_array[i] = [diam_pred_horiz, diam_pred_vert, center_pred_horiz, center_pred_vert, diam_pred_horiz_pupil, diam_pred_vert_pupil, center_pred_horiz_pupil, center_pred_vert_pupil]

  error_test_iris[i] = np.sqrt((((diam_true_vert-diam_pred_vert)/diam_true_vert)**2 + ((diam_true_horiz-diam_pred_horiz)/diam_true_horiz)**2)/2)
  
  error_test_pupil[i] = np.sqrt((((diam_true_vert_pupil-diam_pred_vert_pupil)/diam_true_vert_pupil)**2 + ((diam_true_horiz_pupil-diam_pred_horiz_pupil)/diam_true_horiz_pupil)**2)/2)



In [None]:
# average error over all test cases:
total_avg_error_iris = np.sum(error_test_iris)/len(error_test_iris)
total_avg_error_pupil = np.sum(error_test_pupil)/len(error_test_pupil)

print('Average error in iris diameter prediction over all test cases is ', (total_avg_error_iris*100), '% \n')
print('Average error in pupil diameter prediction over all test cases is ', (total_avg_error_pupil*100), '% \n')

Average error in iris diameter prediction over all test cases is  2.8024494647979736 % 

Average error in pupil diameter prediction over all test cases is  9.027206301689148 % 



In [None]:
  # fig = plt.figure()
  # fig.set_figheight(15)
  # fig.set_figwidth(15)
  
  # plt.subplot(1,2,1)
  # plt.title('Input Image')
  # plt.imshow(iris_pred_mask)
  # plt.axis('off')

  # plt.subplot(1,2,2)
  # plt.title('Mask Ground Truth')
  # plt.imshow(iris_true_mask)
  # plt.axis('off')

  # plt.show()

In [None]:
# Displaying Test case predictions:

for i in range(len(X_test)):
  fig = plt.figure()
  fig.set_figheight(15)
  fig.set_figwidth(15)
  
  plt.subplot(1,4,1)
  plt.title('Input Image')
  plt.imshow(X_test[i])
  plt.axis('off')

  plt.subplot(1,4,2)
  plt.title('Mask Ground Truth')
  plt.imshow(Y_test[i])
  plt.axis('off')

  plt.subplot(1,4,3)
  plt.title('Mask Predicted')
  plt.imshow(predicted_mask_test[i])
  plt.axis('off')

  plt.subplot(1,4,4)
  plt.title('Filterd Mask Predicted')
  plt.imshow(filt_predicted_mask_test[i])
  plt.axis('off')

  plt.show()


In [None]:
# training data results:

for i in range(10):
  fig = plt.figure()
  fig.set_figheight(15)
  fig.set_figwidth(15)
  
  plt.subplot(1,3,1)
  plt.title('Input Image')
  plt.imshow(X_train[i])
  plt.axis('off')

  plt.subplot(1,3,2)
  plt.title('Mask Ground Truth')
  plt.imshow(Y_train[i])
  plt.axis('off')

  plt.subplot(1,3,3)
  plt.title('Mask Predicted')
  plt.imshow(predicted_mask_train[i])
  plt.axis('off')

  plt.show()

In [None]:
# # # # Checking for overfit 
# loss function and accuracy trends of training vs validation

acc = results.history['accuracy']
val_acc = results.history['val_accuracy']


loss = results.history['loss']
val_loss = results.history['val_loss']

epochs = range(50)


fig = plt.figure()
fig.set_figheight(10)
fig.set_figwidth(20)

plt.subplot(1, 2, 1)
plt.plot(epochs, acc, label='Training Accuracy')
plt.plot(epochs, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs, loss, label='Training Loss')
plt.plot(epochs, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()



In [None]:

!mkdir -p /content/drive/MyDrive/saved_model
model.save('/content/drive/MyDrive/saved_model/my_model')
# model.save('/content/drive/MyDrive')

INFO:tensorflow:Assets written to: /content/drive/MyDrive/saved_model/my_model/assets
