In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [2]:
import os
os.chdir("/content/drive/My Drive")
root_dir = "/content/drive/My Drive/tensors"

In [3]:
import numpy as np
import os

def get_training_data(data_path):
    # Labels dict
    behaviours = {'book': 0,
                'cell_phone': 1,
                'drink_water': 2,
                'speaking': 3,
                'stand_up': 4,
                'sit_down': 5,
                'face_up': 6,
                'face_down': 7,
                'face_left': 8,
                'face_right': 9,
                'face_upper_left': 10,
                'face_upper_right': 11,
                'face_lower_left': 12,
                'face_lower_right': 13,
                'hands_up': 14,
                'look_up': 15,
                'look_down': 16,
                'look_left': 17,
                'look_right': 18,
                'look_upper_left': 19,
                'look_upper_right': 20,
                'look_lower_left': 21,
                'look_lower_right': 22,
                'makefau1': 23,
                'makefau4': 24,
                'makefau5': 25,
                'makefau7': 26,
                'makefau9': 27,
                'makefau17': 28,
                'makefau23': 29,
                'makefau28': 30}

    # initialize folder names to read from
    folder_names = []
    for k in behaviours:
        folder_names.append(k + '_frames')

    # empty lists to store training data
    x_data = []
    y_data = []

    # loop through folders to load data into lists
    for folder_name in folder_names:
        folder_path = os.path.join(data_path, folder_name)
        i = 0
        for file_name in os.listdir(folder_path):
            if file_name.endswith('.npz'):
                file_path = os.path.join(folder_path, file_name)
                data = np.load(file_path, allow_pickle=True)
                frames = data['frames']
                labels = data['labels']
                # ignore data that faces are not detected
                if len(frames) == 0 or len(labels) == 0:
                    continue
                x_data.append(frames)
                y_data.append(labels)
            i += 1

    return x_data, y_data

In [4]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Conv2D, Flatten, MaxPooling2D, TimeDistributed
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
from keras.optimizers import Adam

def define_model(input_shape, num_classes):
    # define the model architecture

    cnn_model = Sequential()
    cnn_model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(64, 64, 1)))
    cnn_model.add(MaxPooling2D(pool_size=(2, 2)))
    cnn_model.add(Flatten())

    model = Sequential()
    model.add(TimeDistributed(cnn_model, input_shape=(240, 64, 64, 1)))  
    model.add(LSTM(64))
    model.add(Dropout(0.2))
    model.add(Dense(num_classes, activation='softmax'))
    return model

def model_training(x_data, y_data):

    input_shape = x_data.shape
    num_classes = 31
    model = define_model(input_shape, num_classes)

    # compile the model
    optimizer = Adam(learning_rate=0.001)
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    # train the model
    model.fit(x_data, y_data, validation_split=0.2, epochs=30, batch_size=4)

    return model

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Reshape
from sklearn.preprocessing import MinMaxScaler

def training(root_dir, data_batch_num):
    print("processing data batch {}".format(data_batch_num))

    x_data, y_data = get_training_data(root_dir + str(data_batch_num) + '/')

    # Reshape each 3D array in x_data to a 2D array of shape (num_frames, img_height * img_width)
    x_data_2d = [x.reshape(x.shape[0], -1) for x in x_data]

    # Normalize x_data_2d to the range [0, 1]
    scaler = MinMaxScaler()
    x_data_norm_2d = [scaler.fit_transform(x) for x in x_data_2d]

    # Reshape each 2D array back to a 3D array of shape (num_frames, img_height, img_width)
    x_data = [x.reshape(x.shape[0], 64, 64) for x in x_data_norm_2d]

    # find the maximum length of frames arrays
    max_length = max(len(frames) for frames in x_data)

    # restrict the shape of x_data to become a 10 second video data of 24 fps
    if max_length > 240:
        x_data = x_data[:240]

    # pad the frames arrays with zeros to 240 if less than 240
    x_padded = pad_sequences(x_data, maxlen=240, dtype='float32', padding='post', truncating='post')

    # y_data squeeze
    y_data = [labels[0] for labels in y_data]

    # reshape x_padded to have the correct shape for model input
    x_reshaped = np.reshape(x_padded, (len(x_data), 240, 64, 64, 1))

    # transform into np array
    x_train = np.array(x_reshaped)
    print(x_train.shape)
    y_train = np.array(y_data)

    y_train = to_categorical(y_train, num_classes=31)

    # training
    model = model_training(x_train, y_train)

    return model

In [None]:
# using a for loop to train all models would very likely
# cause not enough memory error due to the size of data
# and complexity of model
# when necessary, manually inputting data batch might be required

# uncomment this part to use for loop training
num_batch = 157

curr_progress = len([f for f in os.listdir('/content/drive/My Drive/test_models2/') 
if f.endswith('.h5') and os.path.isfile(os.path.join('/content/drive/My Drive/test_models2/', f))])

for i in range(curr_progress, num_batch):
    model = training(root_dir, i)
    # save model
    model.save('/content/drive/My Drive/test_models2/model_' + str(i) + '.h5')

# uncomment this part to use manual input training
# model = training(root_dir, 0)
# # save model
# model.save('/content/drive/My Drive/models/model_' + str(0) + '.h5')

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
processing data batch 76
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
processing data batch 77
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30

In [5]:
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Reshape
from sklearn.preprocessing import MinMaxScaler


def prepare_test_data(data_path, behaviour_num):
    x_data, y_data = get_training_data(data_path)

    # Reshape each 3D array in x_data to a 2D array of shape (num_frames, img_height * img_width)
    x_data_2d = [x.reshape(x.shape[0], -1) for x in x_data]

    # Normalize x_data_2d to the range [0, 1]
    scaler = MinMaxScaler()
    x_data_norm_2d = [scaler.fit_transform(x) for x in x_data_2d]

    # Reshape each 2D array back to a 3D array of shape (num_frames, img_height, img_width)
    x_data = [x.reshape(x.shape[0], 64, 64) for x in x_data_norm_2d]

    # find the maximum length of frames arrays
    max_length = max(len(frames) for frames in x_data)

    # restrict the shape of x_data to become a 10 second video data of 24 fps
    if max_length > 240:
        x_data = x_data[:240]

    # pad the frames arrays with zeros to 240 if less than 240
    x_padded = pad_sequences(x_data, maxlen=240, dtype='float32', padding='post', truncating='post')

    # y_data squeeze
    y_data = [labels[0] for labels in y_data]

    # reshape x_padded to have the correct shape for model input
    x_reshaped = np.reshape(x_padded, (len(x_data), 240, 64, 64, 1))

    # transform into np array
    x_test = np.array(x_reshaped)
    y_test = np.array(y_data)

    y_test = to_categorical(y_test, num_classes=31)

    x_test = x_test[behaviour_num]
    y_test = y_test[behaviour_num]

    return x_test, y_test

In [6]:
behaviours = {'book': 0,
            'cell_phone': 1,
            'drink_water': 2,
            'speaking': 3,
            'stand_up': 4,
            'sit_down': 5,
            'face_up': 6,
            'face_down': 7,
            'face_left': 8,
            'face_right': 9,
            'face_upper_left': 10,
            'face_upper_right': 11,
            'face_lower_left': 12,
            'face_lower_right': 13,
            'hands_up': 14,
            'look_up': 15,
            'look_down': 16,
            'look_left': 17,
            'look_right': 18,
            'look_upper_left': 19,
            'look_upper_right': 20,
            'look_lower_left': 21,
            'look_lower_right': 22,
            'makefau1': 23,
            'makefau4': 24,
            'makefau5': 25,
            'makefau7': 26,
            'makefau9': 27,
            'makefau17': 28,
            'makefau23': 29,
            'makefau28': 30}

def get_key_by_value(dictionary, value):
    for key, val in dictionary.items():
        if val == value:
            return key
    return None  # Value not found in the dictionary      

In [7]:
import random

def generate_random_batches(exclude_value, num_values, maximum_value):

    all_values = list(range(0, maximum_value))
    all_values.remove(exclude_value)
    random_values = random.sample(all_values, num_values)

    return random_values

In [8]:
import numpy as np
from tensorflow.keras.models import load_model

def evaluate_model(model_num, total_model_num):
    # list to store all evaluation accuracies:
    overall_accuracy = []

    # Load the saved model
    model = load_model('/content/drive/My Drive/test_models2/model_' + str(model_num) + '.h5')

    # randomly select 20 datasets
    random_model_batch = generate_random_batches(model_num, 20, total_model_num)

    for curr_model_num in random_model_batch:
        # test on all models except for the one it trained on
        if not curr_model_num == model_num:
            print("Evaluating model {} with dataset from model {}".format(model_num, curr_model_num))

            curr_accuracy = 0
            # 31 labels
            for behaviour_label_num in range(31):
                behaviour_name = get_key_by_value(behaviours, behaviour_label_num)
                print("Evaluating behaviour <{}>".format(behaviour_name))

                x_test, y_test = prepare_test_data(root_dir + str(curr_model_num) + '/', behaviour_label_num)
    
                # Reshape x_test to match the input shape of the trained model
                x_test_reshaped = np.expand_dims(x_test, axis=0)  # Add an extra dimension for the num_classes dimension

                # Reshape y_test to match the expected shape of the labels
                y_test_reshaped = np.expand_dims(y_test, axis=0)  # Add an extra dimension for the num_classes dimension

                # Evaluate the model on the test data
                loss, accuracy = model.evaluate(x_test_reshaped, y_test_reshaped)
                
                # add the accuracy (which is either 0 or 1)
                curr_accuracy += accuracy

            # get percentage accuracy of current model on all labels
            curr_accuracy = curr_accuracy / 31

            # store in list
            overall_accuracy.append(curr_accuracy)

    # get the averaged evaluation on sampled models
    overall_accuracy = np.sum(overall_accuracy) / 20

    return overall_accuracy

In [None]:
import numpy as np

model_accuracies = []
total_model_num = 157

# beware, this chunk can take very long time to run

# check current progress, and set i to current progress
with open("/content/drive/My Drive/model_weights/weights_evaluation.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()
    count = sum(1 for s in lines if s.startswith("M"))

for i in range(count, total_model_num):
    print("Evaluating model {}".format(i))

    # get model accuracy for model i
    model_accuracy = evaluate_model(i, total_model_num)

    with open("/content/drive/My Drive/model_weights/weights_evaluation.txt", "a", encoding="utf-8") as f:
        f.write("Model " + str(i) + ": " + str(model_accuracy))
        f.write("\n")

    # add to total accuracy list
    model_accuracies.append(model_accuracy)

    print("Model {}'s accuracy is {}".format(i, model_accuracy))

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Evaluating behaviour <makefau1>
Evaluating behaviour <makefau4>
Evaluating behaviour <makefau5>
Evaluating behaviour <makefau7>
Evaluating behaviour <makefau9>
Evaluating behaviour <makefau17>
Evaluating behaviour <makefau23>
Evaluating behaviour <makefau28>
Evaluating model 153 with dataset from model 32
Evaluating behaviour <book>
Evaluating behaviour <cell_phone>
Evaluating behaviour <drink_water>
Evaluating behaviour <speaking>
Evaluating behaviour <stand_up>
Evaluating behaviour <sit_down>
Evaluating behaviour <face_up>
Evaluating behaviour <face_down>
Evaluating behaviour <face_left>
Evaluating behaviour <face_right>
Evaluating behaviour <face_upper_left>
Evaluating behaviour <face_upper_right>
Evaluating behaviour <face_lower_left>
Evaluating behaviour <face_lower_right>
Evaluating behaviour <hands_up>
Evaluating behaviour <look_up>
Evaluating behaviour <look_down>
Evaluating behaviour <look_left>
Evaluating behavi

In [9]:
def normalize_weights(array):
    total_sum = sum(array)
    normalized_array = [element / total_sum for element in array]
    return normalized_array

def get_top_indices(array, num_indices):
    normalized_array = normalize_weights(array)
    indices = sorted(range(len(normalized_array)), key=lambda i: normalized_array[i], reverse=True)[:num_indices]
    return indices

# calculate the weights based on each model's overall performance
evaluated_weights = []

with open("/content/drive/My Drive/model_weights/weights_evaluation.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()
    for line in lines:
        evaluated_weights.append(float(line.split(": ")[1]))

# get the top 5 models who has the highest evaluation accuracy
top_5_models_idx = get_top_indices(evaluated_weights, 5)

top_5_model_weights = []
for idx in top_5_models_idx:
    top_5_model_weights.append(evaluated_weights[idx])

top_5_model_weights = normalize_weights(top_5_model_weights)

print(top_5_model_weights)

[0.2162162162162162, 0.2054054054054054, 0.2, 0.1891891891891892, 0.18918918918918917]


In [48]:
import numpy as np
import random

random_batch_value = random.randint(0, 156)
random_behaviour_value = random.randint(0 ,30)
random_batch_data_path = root_dir + str(random_batch_value) + '/'
behaviour_name = get_key_by_value(behaviours, random_behaviour_value)

print("Testing with dataset {}, behaviour is {}".format(str(random_batch_value), behaviour_name))

x_test, y_test = prepare_test_data(random_batch_data_path, random_behaviour_value)

# Reshape x_test to match the input shape of the trained model
x_test_reshaped = np.expand_dims(x_test, axis=0)  

# Reshape y_test to match the expected shape of the labels
y_test_reshaped = np.expand_dims(y_test, axis=0)  


Testing with dataset 127, behaviour is makefau9


In [49]:
%%time
import numpy as np
from tensorflow.keras.models import load_model

predictions = {}

for model_idx in top_5_models_idx:
    model = load_model('/content/drive/My Drive/test_models2/model_' + str(model_idx) + '.h5', compile=False)
    probabilities = model.predict_on_batch(x_test_reshaped)
    predicted_class_probability = np.max(probabilities)
    predicted_class = np.argmax(probabilities, axis=1)
    predicted_class_probability *= top_5_model_weights[top_5_models_idx.index(model_idx)]
    predicted_label = get_key_by_value(behaviours, predicted_class)
    if predicted_label in predictions:
        predictions[predicted_label] += predicted_class_probability
    else:
        predictions[predicted_label] = predicted_class_probability

print("Probabilities distribution:")
print(predictions)

prediction = max(predictions, key=lambda k: predictions[k])
print("Prediction is {}".format(prediction))

Probabilities distribution:
{'face_upper_right': 0.0768909808751699, 'face_upper_left': 0.028195442742592575, 'face_left': 0.02575189769268036}
Prediction is face_upper_right
CPU times: user 3.19 s, sys: 328 ms, total: 3.52 s
Wall time: 3.95 s
