In [None]:
from sklearn.datasets import load_files       
from keras.utils import np_utils
import numpy as np
from glob import glob
from keras.preprocessing import image                  
from tqdm import tqdm
from PIL import ImageFile  
from sklearn.model_selection import train_test_split
from keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D
from keras.layers import Dropout, Flatten, Dense
from keras.models import Sequential
from keras.callbacks import ModelCheckpoint  
from keras.callbacks import EarlyStopping
from keras import optimizers
from keras.layers import BatchNormalization
from keras.regularizers import l1, l2
from keras.layers import GaussianNoise
from sklearn.model_selection import train_test_split
from keras.preprocessing.image import ImageDataGenerator
from keras.applications.resnet50 import ResNet50
from keras.applications.resnet50 import preprocess_input
import keras 
from keras.models import Model
from keras.applications import MobileNet
from keras.applications import MobileNetV2
import time

# define function to load train, test, and validation datasets
def load_dataset(path):
    data = load_files(path)
    images = np.array(data['filenames'])
    images_targets = np_utils.to_categorical(np.array(data['target']), 5) # [1,0] is no_punch, [0,1] is right_punch
    return images, images_targets

#load training dataset
train_images, train_targets = load_dataset('../Images/train') 
test_images, test_targets = load_dataset('../Images/test')

print('There are %s total training images.\n' % len(train_images))
print('There are %s total testing images.\n' % len(test_images))

In [None]:
# Create functions to load the images into 4D numpy tensors, load data into tensors and create a training-validation split
# out of the training data.

# Functions to load images into 4d Numpy arrays.
def path_to_tensor(img_path):
    # loads RGB image as PIL.Image.Image type
    img = image.load_img(img_path, target_size=(224, 224))
    # convert PIL.Image.Image type to 3D tensor with shape (224, 224, 3)
    x = image.img_to_array(img)
    # convert 3D tensor to 4D tensor with shape (1, 224, 224, 3) and return 4D tensor
    return np.expand_dims(x, axis=0)

def paths_to_tensor(img_paths):
    list_of_tensors = [path_to_tensor(img_path) for img_path in tqdm(img_paths)]
    return np.vstack(list_of_tensors)

ImageFile.LOAD_TRUNCATED_IMAGES = True                 

# pre-process and load the data for Keras
train_tensors = paths_to_tensor(train_images).astype('float32')/255
test_tensors = paths_to_tensor(test_images).astype('float32')/255

In [None]:
import matplotlib.pyplot as plt


plt.imshow(train_tensors[416, :,:,:])
plt.show()

train_targets[416]



In [None]:
# Visualize some of the training tensors
import matplotlib.pyplot as plt

for i in range (400,425):
    plt.imshow(train_tensors[i, :,:,:])
    plt.show()
    print(train_targets[i])

In [None]:

# Split the training data into training and validation sets.
train_tensors, valid_tensors, train_targets, valid_targets = train_test_split(train_tensors, train_targets, 
                                                                             test_size=0.2, 
                                                                             random_state=42)

In [None]:
# Set some adjustable variables outside the function and setup CNN compiling, fiting and testing function with early-stopping
# and model-checkpointing. Reload best weights and test model with best weights on testing set.

epochs = 1000
patience = 20
rmsprop_lowLR = optimizers.RMSprop(lr=0.00001)
optimizer= rmsprop_lowLR

def model_compile_fit(model):
    
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
    print("Optimizer:", optimizer)
    print("Patience:", patience)
    
    checkpointer = ModelCheckpoint(filepath='saved_models/weights.best.from_scratch.hdf5', 
                               verbose=1, save_best_only=True)
    
    early_stopping = EarlyStopping(monitor='val_loss', min_delta=0, patience=patience, verbose=0, mode='auto', 
          baseline=None, restore_best_weights=False)
    
    model.fit(train_tensors, train_targets, 
          validation_data=(valid_tensors, valid_targets),
          epochs=epochs, batch_size=20, callbacks=[checkpointer,
          early_stopping], verbose=2)
    
    model.load_weights('saved_models/weights.best.from_scratch.hdf5')
    
    # get index of predicted action for each image in test set
    action_predictions = [np.argmax(model.predict(np.expand_dims(tensor, axis=0))) for tensor in test_tensors]

    # report test accuracy
    test_accuracy = 100*np.sum(np.array(action_predictions)==np.argmax(test_targets, axis=1))/len(2) #len(2) is len(diff actions)
    print('Test accuracy: %.4f%%' % test_accuracy)
              


In [None]:
#Same function as above but with ImageDataGenerator to allow image augmentation.

def augmented_data_model_compile_fit(model):
    
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
    print("Optimizer:", optimizer)
    print("Patience:", patience)
    
    checkpointer = ModelCheckpoint(filepath='saved_models/weights.best.from_scratch.hdf5', 
                               verbose=1, save_best_only=True)
    
    early_stopping = EarlyStopping(monitor='val_loss', min_delta=0, patience=patience, verbose=0, mode='auto', 
          baseline=None, restore_best_weights=False)
    
    #Create and configure augmented image generator

    datagen =ImageDataGenerator(width_shift_range=0.1, # randomly shift images horizontally (10% of total width)
                                height_shift_range=0.1, # randomly shift images vertically (10% of total height)
                                zoom_range=0.2) # range for random zoom)
              
    
    #fit augmented image generator on data
    datagen.fit(train_tensors)
    
    batch_size = 20
    
    # train the model
    model.fit_generator(datagen.flow(train_tensors, train_targets, batch_size=batch_size),
                          steps_per_epoch=3*(train_tensors.shape[0]//batch_size),
                          epochs=epochs, verbose=2, callbacks=[checkpointer, early_stopping],
                         validation_data=(valid_tensors, valid_targets))
    
    # load the weights that generated the best validation accuracy
    model.load_weights('saved_models/weights.best.from_scratch.hdf5')
       
    # get index of predicted action for each image in test set
    action_predictions = [np.argmax(model.predict(np.expand_dims(tensor, axis=0))) for tensor in test_tensors]

    # report test accuracy
    test_accuracy = 100*np.sum(np.array(action_predictions)==np.argmax(test_targets, axis=1))/len(action_predictions) 
    print('Test accuracy: %.4f%%' % test_accuracy)

In [None]:
patience= 15

aug_model9 = Sequential()
aug_model9.add(Conv2D(input_shape=(224,224,3), filters=16, kernel_size=(5,5), padding='same', activation='relu', strides=1))
aug_model9.add(MaxPooling2D(pool_size=(3,3)))
aug_model9.add(BatchNormalization())
aug_model9.add(Conv2D(input_shape=train_tensors.shape, filters=32, kernel_size=(4,4), padding='same', activation='relu', strides=1))
aug_model9.add(MaxPooling2D(pool_size=(2,2)))
aug_model9.add(BatchNormalization())
aug_model9.add(Conv2D(input_shape=train_tensors.shape, filters=64, kernel_size=(3,3), padding='same', activation='relu'))
aug_model9.add(MaxPooling2D(pool_size=(2,2), strides=1))
aug_model9.add(BatchNormalization())
aug_model9.add(Conv2D(input_shape=(224,224,3), filters=128, kernel_size=(3,3), padding='same', activation='relu'))
aug_model9.add(MaxPooling2D(pool_size=(2,2), strides=1))
aug_model9.add(BatchNormalization())
aug_model9.add(Conv2D(input_shape=(224,224,3), filters=256, kernel_size=(3,3), padding='same', activation='relu'))
aug_model9.add(MaxPooling2D(pool_size=(2,2), strides=1))
aug_model9.add(BatchNormalization())
aug_model9.add(Conv2D(input_shape=(224,224,3), filters=512, kernel_size=(3,3), padding='same', activation='relu'))
aug_model9.add(MaxPooling2D(pool_size=(2,2), strides=1))
aug_model9.add(BatchNormalization())
aug_model9.add(Conv2D(input_shape=(224,224,3), filters=1024, kernel_size=(3,3), padding='same', activation='relu'))
aug_model9.add(MaxPooling2D(pool_size=(2,2), strides=1))
aug_model9.add(BatchNormalization())
aug_model9.add(GlobalAveragePooling2D(data_format=None))
aug_model9.add(Dense(3, activation = 'softmax'))

aug_model9.summary()
augmented_data_model_compile_fit(aug_model9)

In [None]:
# load the weights that generated the best validation accuracy
aug_model9.load_weights('saved_models/weights.best.from_scratch.hdf5')
       
# get index of predicted action for each image in test set
action_predictions = [np.argmax(aug_model9.predict(np.expand_dims(tensor, axis=0))) for tensor in test_tensors]

# report test accuracy
test_accuracy = 100*np.sum(np.array(action_predictions)==np.argmax(test_targets, axis=1))/len(action_predictions) 
print('Test accuracy: %.4f%%' % test_accuracy)

In [None]:
# load the weights that generated the best validation accuracy
transfer_model.load_weights('saved_models/weights.best.transfer.hdf5')
       
# get index of predicted action for each image in test set
action_predictions = [np.argmax(transfer_model.predict(np.expand_dims(tensor, axis=0))) for tensor in test_bottlenecks]

# report test accuracy
test_accuracy = 100*np.sum(np.array(action_predictions)==np.argmax(test_targets, axis=1))/len(action_predictions) 
print('Test accuracy: %.4f%%' % test_accuracy)

In [None]:
###### Fine tuning MobileNetV1
patience = 15

# create the base pre-trained model
base_model = MobileNet(weights='imagenet', include_top=False)

base_model.summary()

In [None]:
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
predictions = Dense(3, activation='softmax')(x)

# this is the model we will train
mobilenetV1_model = Model(inputs=base_model.input, outputs=predictions)
mobilenetV1_model.summary()

# let's visualize layer names and layer indices to see how many layers
# we should freeze:
for i, layer in enumerate(fine_tuned_model.layers):
    print(i, layer.name)

In [None]:
# we chose to train the top 2 inception blocks, i.e. we will freeze
# the first 249 layers and unfreeze the rest:
for layer in mobilenetV1_model.layers[:20]:
    layer.trainable = False
for layer in mobilenetV1_model.layers[20:]:
    layer.trainable = True

In [None]:

# compile the model (should be done *after* setting layers to non-trainable)
mobilenetV1_model.compile(loss='categorical_crossentropy', optimizer=keras.optimizers.Adam(lr=0.0003), metrics=['accuracy'])

early_stopping = EarlyStopping(monitor='val_loss', min_delta=0, patience=patience, verbose=0, mode='auto', 
          baseline=None, restore_best_weights=False)

checkpointer = ModelCheckpoint(filepath='saved_models/weights.best.MobileNetV1.hdf5', 
                               verbose=1, save_best_only=True)

mobilenetV1_model.fit(train_tensors, train_targets, 
          validation_data=(valid_tensors, valid_targets),
          epochs=epochs, batch_size=20, callbacks=[checkpointer, early_stopping], verbose=2)

In [None]:
# load the weights that generated the best validation accuracy
mobilenetV1_model.load_weights('saved_models/weights.best.MobileNetV1.hdf5')
       
# get index of predicted action for each image in test set
action_predictions = [np.argmax(mobilenetV1_model.predict(np.expand_dims(tensor, axis=0))) for tensor in test_tensors]

# report test accuracy
test_accuracy = 100*np.sum(np.array(action_predictions)==np.argmax(test_targets, axis=1))/len(action_predictions) 
print('Test accuracy: %.4f%%' % test_accuracy)

#Compute average prediction time
computational_times = []

for i in range(50):
    
    start = time.time()
    mobilenetV1_model.predict(np.expand_dims(test_tensors[i], axis=0))
    end = time.time()
    
    elapsed = end - start
    computational_times.append(elapsed)

print("Average computational time per prediction:", np.sum(computational_times)/len(computational_times)*1000, "ms")

In [None]:
###### Fine tuning MobileNetV2
patience = 10

# create the base pre-trained model
base_model = MobileNetV2(weights='imagenet', include_top=False)

base_model.summary()

In [None]:
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
predictions = Dense(3, activation='softmax')(x)

# this is the model we will train
mobilenetV2_model = Model(inputs=base_model.input, outputs=predictions)
mobilenetV2_model.summary()

# let's visualize layer names and layer indices to see how many layers
# we should freeze:
for i, layer in enumerate(fine_tuned_model.layers):
    print(i, layer.name)

In [None]:
# we chose to train the top 2 inception blocks, i.e. we will freeze
# the first 249 layers and unfreeze the rest:
for layer in mobilenetV2_model.layers[:70]:
    layer.trainable = False
for layer in mobilenetV2_model.layers[70:]:
    layer.trainable = True

In [None]:
# compile the model (should be done *after* setting layers to non-trainable)
mobilenetV2_model.compile(loss='categorical_crossentropy', optimizer=keras.optimizers.Adam(lr=0.0003), metrics=['accuracy'])

early_stopping = EarlyStopping(monitor='val_loss', min_delta=0, patience=patience, verbose=0, mode='auto', 
          baseline=None, restore_best_weights=False)

checkpointer = ModelCheckpoint(filepath='saved_models/weights.best.MobileNetV2.hdf5', 
                               verbose=1, save_best_only=True)

mobilenetV2_model.fit(train_tensors, train_targets, 
          validation_data=(valid_tensors, valid_targets),
          epochs=epochs, batch_size=20, callbacks=[checkpointer, early_stopping], verbose=2)

In [None]:
# load the weights that generated the best validation accuracy
mobilenetV2_model.load_weights('saved_models/weights.best.MobileNetV2.hdf5')
       
# get index of predicted action for each image in test set
action_predictions = [np.argmax(mobilenetV2_model.predict(np.expand_dims(tensor, axis=0))) for tensor in test_tensors]

# report test accuracy
test_accuracy = 100*np.sum(np.array(action_predictions)==np.argmax(test_targets, axis=1))/len(action_predictions) 
print('Test accuracy: %.4f%%' % test_accuracy)

#Compute average prediction time
computational_times = []

for i in range(50):
    
    start = time.time()
    mobilenetV2_model.predict(np.expand_dims(test_tensors[i], axis=0))
    end = time.time()
    
    elapsed = end - start
    computational_times.append(elapsed)

print("Average computational time per prediction: {.3f} ms".format(np.sum(computational_times)/len(computational_times)*1000) m")