### Environment Setup

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!pip install git+https://github.com/rcmalli/keras-vggface.git
!pip install keras_applications

In [None]:
# unzip dataset from drive
!unzip "/content/drive/MyDrive/data/xmy2vts_reduced_train.zip" -d "/content/xmy2vts_reduced_train"
!unzip "/content/drive/MyDrive/data/xmy2vts_reduced_test.zip" -d "/content/xmy2vts_reduced_test"

### Dataset loading functions

In [None]:
import os
import cv2
import numpy as np
from tensorflow.keras.utils import normalize, to_categorical
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

train_path = "/content/xmy2vts_reduced_train"
test_path = "/content/xmy2vts_reduced_test"


# carica la sequenza di frame associata ad un particolare video
def load_sequence(video_path):
  seq = []
  frames = os.listdir(video_path)
  frames.sort()
  for frame in frames:
    frame_path = video_path + '/' + frame
    img = cv2.imread(frame_path, 1)
    seq.append(img)
  return seq


# carica tutte le sequenze di frame nella directory specificata
def load_videos(groupX, groupY, dir_path):
  videos = os.listdir(dir_path)
  videos.sort()
  for video in videos:

    # carica la sequenza di frames dal file
    video_path = dir_path + '/' + video
    seq = load_sequence(video_path)
    groupX.append(seq)

    # determina la label ed inseriscila in trainY
    label = dir_path[-5:-2]
    groupY.append(label)


# carica un gruppo di dati (train set, test set)
def load_group(group_path):
  groupX = list()
  groupY = list()

  dirs = os.listdir(group_path)
  dirs.sort()
  for dir in dirs:
    dir_path = group_path + '/' + dir
    load_videos(groupX, groupY, dir_path)
  groupX = np.array(groupX)
  groupY = np.array(groupY)
  return groupX, groupY


# carica l'intero dataset
def load_dataset():
  trainX = list()
  trainY = list()
  testX = list()
  testY = list()

  # carico il dataset dalle directories
  trainX, trainY = load_group(train_path)
  testX, testY = load_group(test_path)

  # pre-processing sulle labels
  label_encoder = LabelEncoder()
  trainY = label_encoder.fit_transform(trainY)
  testY = label_encoder.fit_transform(testY)

  trainY = to_categorical(trainY)
  testY = to_categorical(testY)

  return trainX, trainY, testX, testY


def load_dataset_npz(train_path, test_path):

  # carico il dataset dalle directories
  trainData = np.load(train_path)
  trainX = trainData['trainX']
  trainY = trainData['trainY']
  del trainData

  testData = np.load(test_path)
  testX = testData['testX']
  testY = testData['testY']
  del testData

  return trainX, trainY, testX, testY

### Model 1
VGG16 w/ Imagenet

In [None]:
import keras
from keras.models import Sequential, Input, Model, load_model
from keras.layers import Dropout, Flatten, Dense
from keras.layers import LSTM, TimeDistributed
from keras import applications
from keras.optimizers import Adam
from keras.applications.vgg16 import VGG16
from keras_vggface.vggface import VGGFace

# hyperparams
n_unit = 512
batch_size = 4
learning_rate = 0.0001
epochs = 375
epochs_load = 0
epochs_save = epochs_load+epochs


def get_cnn_lstm(tuning=False):
  cnn_model = VGG16(weights='imagenet', include_top=False, input_shape=(200,200,3))
  model_input = Input(shape=(50, 200, 200, 3), name='seq_input')

  x = TimeDistributed(cnn_model)(model_input)
  x = TimeDistributed(Flatten())(x)
  x = LSTM(n_unit)(x)
  out = Dense(46, activation='softmax')(x)
  
  model = Model(inputs=[model_input], outputs=out)
  model.summary()

  if tuning and epochs_load:
    print('Loading epochs '+str(epochs_load)+' weights...')
    model.load_weights('/content/epochs'+str(epochs_load)+'.h5')
    print("...loaded!")
  
  #freeze previous layers
  for layer in cnn_model.layers:
    layer.trainable = False	
  
  optimizer = Adam(learning_rate=learning_rate)
  loss = 'categorical_crossentropy'
  model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
  
  return model


def train_test_model(trainX, trainY, testX, testY, tuning=False):
  model = get_cnn_lstm(tuning=tuning)

  model.fit(trainX, trainY,
            epochs=epochs,
            batch_size=batch_size,
            verbose=1)
  
  if tuning:
    model.save_weights('/content/epochs'+str(epochs_save)+'.h5')

  _, accuracy = model.evaluate(testX, testY, batch_size=1)
  return accuracy

### Model 2
VGG16 w/ VGGFace

In [None]:
import keras
from keras.models import Sequential, Input, Model, load_model
from keras.layers import Dropout, Flatten, Dense
from keras.layers import LSTM, TimeDistributed
from keras import applications
from keras.optimizers import Adam
from keras.applications.vgg16 import VGG16
from keras_vggface.vggface import VGGFace

# hyperparams
n_unit = 512
batch_size = 4
learning_rate = 0.0001
epochs = 325
epochs_load = 0
epochs_save = epochs_load+epochs


def get_cnn_lstm(tuning=False):
  cnn_model = VGGFace(include_top=False, input_shape=(200,200,3))
  model_input = Input(shape=(50, 200, 200, 3), name='seq_input')

  x = TimeDistributed(cnn_model)(model_input)
  x = TimeDistributed(Flatten())(x)
  x = LSTM(n_unit)(x)
  out = Dense(46, activation='softmax')(x)
  
  model = Model(inputs=[model_input], outputs=out)
  model.summary()

  if tuning and epochs_load:
    print('Loading epochs '+str(epochs_load)+' weights...')
    model.load_weights('/content/epochs'+str(epochs_load)+'.h5')
    print("...loaded!")
  
  # freeze previous layers
  for layer in cnn_model.layers:
    layer.trainable = False	
  
  optimizer = Adam(learning_rate=learning_rate)
  loss = 'categorical_crossentropy'
  model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
  
  return model


def train_test_model(trainX, trainY, testX, testY, 
                     tuning=False, save=False):
  model = get_cnn_lstm(tuning=tuning)

  model.fit(trainX, trainY,
            epochs=epochs,
            batch_size=batch_size,
            verbose=1)
  
  if tuning:
    model.save_weights('/content/epochs'+str(epochs_save)+'.h5')
  
  if save:
    print('Saving model...')
    model.save('/content/type3_model')
    print('Saving datas...')
    np.savez('/content/type3_datas', trainX=trainX, trainY=trainY,
             testX=testX, testY=testY)

  _, accuracy = model.evaluate(testX, testY, batch_size=1)
  return accuracy

### Load Dataset

In [None]:
# carica in train i dati senza mascherina
# in test i dati con mascherina
trainX, trainY, testX, testY = load_dataset()

print(trainX.shape)
print(trainY.shape)
print(testX.shape)
print(testY.shape)

In [None]:
from sklearn.model_selection import train_test_split

# carica in X tutti i dati, sia con mascherina che senza
# in Y tutte le etichette
X = np.concatenate((trainX, testX))
Y = np.concatenate((trainY, testY))

# elimino i vecchi dati per liberare spazio
del trainX
del trainY
del testX
del testY

#sceglie casualmente le sequenze e le divide in train e test
trainX, testX, trainY, testY = train_test_split(
    X, Y, test_size=0.30, random_state=42, stratify=Y)

print(trainX.shape)
print(trainY.shape)
print(testX.shape)
print(testY.shape)

**Per caricare il dataset da array numpy**

In [None]:
# carico xm2vts dagli array numpy perché pesa troppo
trainX, trainY, testX, testY = load_dataset_npz(
    'drive/MyDrive/data/xmy2vts_train70.npz',
    'drive/MyDrive/data/xmy2vts_test30.npz')

print(trainX.shape)
print(trainY.shape)
print(testX.shape)
print(testY.shape)

In [None]:
# cleanup
del trainX
del trainY
del testX
del testY

### Tuning



In [None]:
# Eseguo il tuning dei parametri del modello 
# per determinare i migliori iperparametri
accuracy = train_test_model(trainX, trainY, testX, testY, tuning=True)
accuracy = accuracy * 100.0
print('> %.3f' % accuracy)

### Experiments
Esecuzione degli esperimenti di tipo I, II e III

In [None]:
# Tipo I e Tipo III
accuracy = train_test_model(trainX, trainY, testX, testY, tuning=False, save=True)
accuracy = accuracy * 100.0
print('> %.3f' % accuracy)

In [None]:
# Tipo II
accuracy = train_test_model(testX, testY, trainX, trainY, tuning=False, save=True)
accuracy = accuracy * 100.0
print('> %.3f' % accuracy)

### Metrics

In [None]:
import sys
sys.path.insert(0,'/content/drive/MyDrive')

# function to turn onehot encoded label into integer
def multi_onehot_to_int(y):
  output = []
  for sample in range(np.shape(y)[0]):

    pred_class = []
    for c in range(len(y[sample])):
      if (y[sample][c] == 1):
        output.append(c)
  return output

In [None]:
from plotGraph import plotGraf
from sklearn import metrics
import matplotlib.pyplot as plt
import numpy as np
import keras

path = 'drive/MyDrive/xm2vts_reduced_exp'
exp_type = 'type3'
model_path = path+'/'+exp_type+'_model'
data_path = path+'/'+exp_type+'_datas.npz'


# load model and datas
model = keras.models.load_model(model_path)
data = np.load(data_path)
testX = data['testX']
testY = data['testY']
testY = multi_onehot_to_int(testY)
testY = np.array(testY)
n_output = max(testY) + 1

# predict labels
scoreY = model.predict(testX, batch_size=1)
predY = np.argmax(scoreY, axis=1)

# metrics
report = metrics.classification_report(testY, predY)
print("Report \n%s" % (report))

# confusion matrix
matrix = metrics.confusion_matrix(testY, predY)
fig = plt.gcf()
fig.set_size_inches(17, 17)

plt.imshow(matrix)
plt.colorbar()
ticks = np.linspace(0, n_output-1, num=n_output)
plt.xticks(ticks, fontsize=10)
plt.yticks(ticks, fontsize=10)

# for i in range(n_output):
#   for j in range(n_output):
#     c = matrix[j,i]
#     if c > int(len(testY) / (2 * n_output)):
#       color='black'
#     else:
#       color='white'
#     plt.text(i, j, str(c), va='center', ha='center', color=color)

plt.show()

# plot ROC and CMC curves
plotGraf = plotGraf()
plotGraf.plot(testY, predY, scoreY, exp_type, n_output)

In [None]:
# cleanup
del data
del testX
del testY