In [None]:
ears_base_dir = "/content/drive/My Drive/ears/ears/"

In [None]:
import numpy as np
import cv2

from sklearn.metrics import plot_confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split

from collections import Counter

import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt

from keras.applications.imagenet_utils import preprocess_input
from tensorflow.keras.utils import load_img, save_img, img_to_array, to_categorical

from keras.optimizers.optimizer_v1 import Optimizer
from keras.models import Model, Sequential
from keras.layers import Input, Convolution2D, ZeroPadding2D, MaxPooling2D, Flatten, Dense, Dropout, Activation

# Data download

In [None]:
# !gdown 1CPSeum3HpopfomUEK1gybeuIVoeJT_Eo

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!cp -r "/content/drive/My Drive/ears/ears" "ears/"

In [None]:
!cp "/content/drive/My Drive/vgg_face_weights.h5" "./vgg_face_weights.h5"

In [None]:
# !cp "/content/drive/My Drive/trained_weights_2022_10_14_19_04_48.h5" "./trained_weights_2022_10_14_19_04_48.h5"

# Data loading

In [None]:
def preprocess_image(image_path):
  img = load_img(image_path, target_size=(224, 224))
  img = img_to_array(img)
  # img = np.expand_dims(img, axis=0)
  
  #preprocess_input normalizes input in scale of [-1, +1]. You must apply same normalization in prediction.
  #Ref: https://github.com/keras-team/keras-applications/blob/master/keras_applications/imagenet_utils.py (Line 45)
  img = preprocess_input(img)
  return img

In [None]:
data_augmentation = tf.keras.Sequential([
    tf.keras.layers.experimental.preprocessing.RandomRotation(0.1),
    tf.keras.layers.experimental.preprocessing.RandomTranslation(0.1, 0.1)
])

In [None]:
from os import listdir
import re

ears = listdir(ears_base_dir)
front_ears = sorted([s for s in ears if re.compile('\d\d\d_front_ear.jpg').match(s)])
back_ears = sorted([s for s in ears if re.compile('\d\d\d_back_ear.jpg').match(s)])
left_ears = sorted([s for s in ears if re.compile('\d\d\d_left_ear.jpg').match(s)])
right_ears = sorted([s for s in ears if re.compile('\d\d\d_right_ear.jpg').match(s)])
up_ears = sorted([s for s in ears if re.compile('\d\d\d_front_ear.jpg').match(s)])
zoom_ears = sorted([s for s in ears if re.compile('\d\d\d_zoom_ear.jpg').match(s)])
down_ears = sorted([s for s in ears if re.compile('\d\d\d_down_ear.jpg').match(s)])
ids_ears = [int(s[:3]) for s in front_ears]

subjects = list(zip(ids_ears, front_ears, back_ears, left_ears, right_ears, up_ears, zoom_ears, down_ears))
len(subjects)

100

In [None]:
X_train = list()
y_train = list()

In [None]:
X_train = []
y_train = []

for subject in subjects:
  ids_ear, front_ear, back_ear, left_ear, right_ear, up_ear, zoom_ear, down_ear = subject
  front_ear = preprocess_image(f'{ears_base_dir}{front_ear}')
  back_ear = preprocess_image(f'{ears_base_dir}{back_ear}')
  left_ear = preprocess_image(f'{ears_base_dir}{left_ear}')
  right_ear = preprocess_image(f'{ears_base_dir}{right_ear}')
  up_ear = preprocess_image(f'{ears_base_dir}{up_ear}')
  zoom_ear = preprocess_image(f'{ears_base_dir}{zoom_ear}')
  down_ear = preprocess_image(f'{ears_base_dir}{down_ear}')
  labels = [ids_ear] * 7

  X_train.append(front_ear)
  X_train.append(back_ear)
  X_train.append(left_ear)
  X_train.append(right_ear)
  X_train.append(up_ear)
  X_train.append(zoom_ear)
  X_train.append(down_ear)

  augmentation_rounds = 1
  for i in range(augmentation_rounds):
    front_ear_augmented = data_augmentation(front_ear, training=True)
    back_ear_augmented = data_augmentation(back_ear, training=True)
    left_ear_augmented = data_augmentation(left_ear, training=True)
    right_ear_augmented = data_augmentation(right_ear, training=True)
    up_ear_augmented = data_augmentation(up_ear, training=True)
    down_ear_augmented = data_augmentation(down_ear, training=True)

    X_train.append(front_ear_augmented)
    X_train.append(back_ear_augmented)
    X_train.append(left_ear_augmented)
    X_train.append(right_ear_augmented)
    X_train.append(up_ear_augmented)
    X_train.append(down_ear_augmented)

    labels.extend([ids_ear] * 6)

    

  y_train.extend(labels)

  #X_train = np.asarray(X_train)
  #y_train = np.asarray(y_train)

In [None]:
X_train = np.asarray(X_train)
y_train = np.asarray(y_train)

In [None]:
plt.imshow(X_train[0])

In [None]:
# X.shape

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X_train, y_train, test_size=0.15, shuffle=True)

X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.15, shuffle=True)

X_train = np.asarray(X_train)
X_test = np.asarray(X_test)
X_val = np.asarray(X_val)
y_train = np.asarray(y_train)
y_test = np.asarray(y_test)
y_val = np.asarray(y_val)

In [None]:
# assume X_train and y_train are the training data
n = X_train.shape[0]  # total number of samples
indices = np.random.permutation(n)  # shuffle the indices

# set the sizes of the train, validation, and test sets (e.g., 60%, 20%, 20%)
train_size = int(0.6 * n)
val_size = int(0.2 * n)

# split the indices into train, validation, and test sets
train_indices = indices[:train_size]
val_indices = indices[train_size:train_size+val_size]
test_indices = indices[train_size+val_size:]

# get the actual data based on the indices
X_train_split = X_train[train_indices]
y_train_split = y_train[train_indices]
X_val_split = X_train[val_indices]
y_val_split = y_train[val_indices]
X_test = X_train[test_indices]
y_test = y_train[test_indices]

In [None]:
X_train[0]

# Model

In [None]:
def loadVggFaceModel():
  model = Sequential()
  model.add(ZeroPadding2D((1,1),input_shape=(224,224, 3)))
  model.add(Convolution2D(64, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Convolution2D(64, (3, 3), activation='relu'))
  model.add(MaxPooling2D((2,2), strides=(2,2)))

  model.add(ZeroPadding2D((1,1)))
  model.add(Convolution2D(128, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Convolution2D(128, (3, 3), activation='relu'))
  model.add(MaxPooling2D((2,2), strides=(2,2)))

  model.add(ZeroPadding2D((1,1)))
  model.add(Convolution2D(256, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Convolution2D(256, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Convolution2D(256, (3, 3), activation='relu'))
  model.add(MaxPooling2D((2,2), strides=(2,2)))

  model.add(ZeroPadding2D((1,1)))
  model.add(Convolution2D(512, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Convolution2D(512, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Convolution2D(512, (3, 3), activation='relu'))
  model.add(MaxPooling2D((2,2), strides=(2,2)))

  model.add(ZeroPadding2D((1,1)))
  model.add(Convolution2D(512, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Convolution2D(512, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Convolution2D(512, (3, 3), activation='relu'))
  model.add(MaxPooling2D((2,2), strides=(2,2)))

  model.add(Convolution2D(4096, (7, 7), activation='relu'))
  model.add(Dropout(0.5))
  model.add(Convolution2D(4096, (1, 1), activation='relu'))
  model.add(Dropout(0.5))
  model.add(Convolution2D(2622, (1, 1)))
  model.load_weights("/content/drive/My Drive/vgg_face_weights.h5")

  for layer in model.layers:
    layer.trainable = False
  
  #model.add(Dense(256, activation='relu'))
 # model.add(Dense(64, activation='relu'))
  model.add(Dense(40, activation='relu'))
  model.add(Dense(107, activation='softmax'))

  return model

In [None]:
model = loadVggFaceModel()
# model.summary()


# Training

In [None]:
model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=keras.optimizers.SGD(learning_rate=0.01),
    metrics=['sparse_categorical_accuracy', 'accuracy']
)

In [None]:
def scheduler(epoch, lr):
  if epoch < 30:
    return lr
  else:
    return lr * tf.math.exp(-0.1)

callback = tf.keras.callbacks.LearningRateScheduler(scheduler)

In [None]:
from keras.callbacks import Callback

class MyCallback(Callback):
    def on_epoch_end(self, epoch, logs=None):
        model = self.model
        #test_generator.reset()
        pred = model.predict(X_test)
        pred_train = model.predict(X_train)
        predicted_test = [np.argmax(x) for x in pred]
        predicted_train = [np.argmax(x) for x in pred_train]
        accuracy_test = accuracy_score(predicted_test, y_test)
        accuracy_train = accuracy_score(predicted_train, y_train)
        print('Test Accuracy:', accuracy_test)
        print('Train Accuracy:', accuracy_train)
        print(predicted_test)

my_callback = MyCallback()

In [None]:
history = model.fit(X_train, y_train, epochs=80, batch_size=8, callbacks=[callback, my_callback], validation_data=(X_val, y_val))
# model.load_weights('./trained_weights_2022_10_14_19_04_48.h5')

In [None]:
from datetime import datetime
now = datetime.now()
timestamp = now.strftime("%Y_%m_%d_%H_%M_%S")
# model.save_weights(f'./trained_weights_{timestamp}.h5')

In [None]:
from sklearn.metrics import accuracy_score

pred_train = model.predict(X_train)
pred_test = model.predict(X_test)



In [None]:
predicted_train = [np.argmax(x[0][0]) for x in pred_train]
predicted_test = [np.argmax(x[0][0]) for x in pred_test]

In [None]:
train_acc = accuracy_score(y_train, predicted_train)
test_acc = accuracy_score(y_test, predicted_test)

print(f'Train accuracy: {train_acc}')
print(f'Test accuracy: {test_acc}')

Train accuracy: 0.500990099009901
Test accuracy: 0.2857142857142857


In [None]:
def true_positive(y_true, y_pred):
    
    tp = 0
    
    for yt, yp in zip(y_true, y_pred):
        
        if yt == 1 and yp == 1:
            tp += 1
    
    return tp

def true_negative(y_true, y_pred):
    
    tn = 0
    
    for yt, yp in zip(y_true, y_pred):
        
        if yt == 0 and yp == 0:
            tn += 1
            
    return tn

def false_positive(y_true, y_pred):
    
    fp = 0
    
    for yt, yp in zip(y_true, y_pred):
        
        if yt == 0 and yp == 1:
            fp += 1
            
    return fp

def false_negative(y_true, y_pred):
    
    fn = 0
    
    for yt, yp in zip(y_true, y_pred):
        
        if yt == 1 and yp == 0:
            fn += 1
            
    return fn

def macro_precision(y_true, y_pred):

    # find the number of classes
    num_classes = len(np.unique(y_true))

    # initialize precision to 0
    precision = 0
    
    # loop over all classes
    for class_ in list(np.unique(y_true)):
        
        # all classes except current are considered negative
        temp_true = [1 if p == class_ else 0 for p in y_true]
        temp_pred = [1 if p == class_ else 0 for p in y_pred]
        
        
        # compute true positive for current class
        tp = true_positive(temp_true, temp_pred)
        
        # compute false positive for current class
        fp = false_positive(temp_true, temp_pred)
        
        
        # compute precision for current class
        temp_precision = tp / (tp + fp + 1e-6)
        # keep adding precision for all classes
        precision += temp_precision
        
    # calculate and return average precision over all classes
    precision /= num_classes
    
    return precision

def micro_precision(y_true, y_pred):


    # find the number of classes 
    num_classes = len(np.unique(y_true))
    
    # initialize tp and fp to 0
    tp = 0
    fp = 0
    
    # loop over all classes
    for class_ in np.unique(y_true):
        
        # all classes except current are considered negative
        temp_true = [1 if p == class_ else 0 for p in y_true]
        temp_pred = [1 if p == class_ else 0 for p in y_pred]
        
        # calculate true positive for current class
        # and update overall tp
        tp += true_positive(temp_true, temp_pred)
        
        # calculate false positive for current class
        # and update overall tp
        fp += false_positive(temp_true, temp_pred)
        
    # calculate and return overall precision
    precision = tp / (tp + fp)
    return precision

def macro_recall(y_true, y_pred):

    # find the number of classes
    num_classes = len(np.unique(y_true))

    # initialize recall to 0
    recall = 0
    
    # loop over all classes
    for class_ in list(np.unique(y_true)):
        
        # all classes except current are considered negative
        temp_true = [1 if p == class_ else 0 for p in y_true]
        temp_pred = [1 if p == class_ else 0 for p in y_pred]
        
        
        # compute true positive for current class
        tp = true_positive(temp_true, temp_pred)
        
        # compute false negative for current class
        fn = false_negative(temp_true, temp_pred)
        
        
        # compute recall for current class
        temp_recall = tp / (tp + fn + 1e-6)
        
        # keep adding recall for all classes
        recall += temp_recall
        
    # calculate and return average recall over all classes
    recall /= num_classes
    
    return recall

def micro_recall(y_true, y_pred):


    # find the number of classes 
    num_classes = len(np.unique(y_true))
    
    # initialize tp and fp to 0
    tp = 0
    fn = 0
    
    # loop over all classes
    for class_ in np.unique(y_true):
        
        # all classes except current are considered negative
        temp_true = [1 if p == class_ else 0 for p in y_true]
        temp_pred = [1 if p == class_ else 0 for p in y_pred]
        
        # calculate true positive for current class
        # and update overall tp
        tp += true_positive(temp_true, temp_pred)
        
        # calculate false negative for current class
        # and update overall tp
        fn += false_negative(temp_true, temp_pred)
        
    # calculate and return overall recall
    recall = tp / (tp + fn)
    return recall

In [None]:
print(f"Macro-averaged Precision score : {macro_precision(y_test, predicted_test) }")
# print(f"Micro-averaged Precision score : {micro_precision(y_test, predicted_test)}")
print(f"Macro-averaged recall score : {macro_recall(y_test, predicted_test)}")
print(f"Train accuracy: {train_acc}")
print(f"Test accuracy: {test_acc}")

# print(f"Micro-averaged recall score : {micro_recall(y_test, predicted_test)}")


In [None]:
# print(y_train[:50])
# print(predicted_train[:50])

In [None]:
# !cp './trained_weights_2022_10_18_15_28_37.h5' '/content/drive/MyDrive'