## Setup ( If run on non-colab environment)
Begin by installing and importing some necessary libraries

In [None]:
!pip install opencv-python
!pip install tensorflow
!pip install imageio
!pip install mlxtend

## Import library
Import all library which will be used later

In [None]:
import os
os.environ['PYTHONHASHSEED']='0'

import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder

import cv2 # process video files
import imageio
import tensorflow as tf
import math
from google.colab import drive

Check exisitng GPU device

In [None]:
physical_devices = tf.config.list_physical_devices('GPU')

print("GPU:", tf.config.list_physical_devices('GPU'))
print("Num GPUs:", len(physical_devices))

## Set Random Seed
Set random seed to ensure the result reproducibility

In [None]:
def reset_seed(seed_value):
    random.seed(seed_value)
    np.random.seed(seed_value)
    tf.compat.v1.set_random_seed(seed_value)
    tf.random.set_seed(seed_value)
    tf.keras.utils.set_random_seed(seed_value)
    tf.config.experimental.enable_op_determinism()

from keras import backend as K
session_conf = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)
sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf)
tf.compat.v1.keras.backend.set_session(sess)

## Access Dataset

In [None]:
# folder path that you store the video file and csv file
drive.mount('/content/drive')
dataset_file_path = "/content/drive/My Drive/"

dataset_filepath = dataset_file_path+"final_exam/input_data_new_2.csv"
df = pd.read_csv(dataset_filepath)

In [None]:
def convert_time_to_seconds(time_str):
    h, m, s = map(int, time_str.split(':'))
    return h * 3600 + m * 60 + s

df["start_time_seconds"]=df["Start_Time"].apply(convert_time_to_seconds)
df["end_time_seconds"]=df["End_Time"].apply(convert_time_to_seconds)
class_list = df.Movement_Name.unique()

In [None]:
video_list = df.File_Path.unique()
video_list

## Video Dataset Split
Split video into train, validation and test dataset

In [None]:
import random

# Set a specific seed value (for example, 42)
random.seed(42)

train_split_ratio = 0.8

random.shuffle(video_list)

split_idx = int(len(video_list) * train_split_ratio)

# Split data into train and test sets
train_videos_list_init = video_list[:split_idx]
test_videos_list = video_list[split_idx:]


train_validate_ratio = 0.75
random.shuffle(train_videos_list_init)

split_idx = int(len(train_videos_list_init) * train_validate_ratio)

train_videos_list= train_videos_list_init[:split_idx]
validate_videos_list = train_videos_list_init[split_idx:]

In [None]:
train_videos_list

In [None]:
validate_videos_list

In [None]:
test_videos_list

## Extract Frame
Extract frame and video preprocessing process (frame skipping, frame resize, frame color code conversion, color code normalization)

In [None]:
#Extract frame from video
def extract_frame(path,start_time,end_time):
  frame_list = []
  SEQUENCE_LENGTH = 24

  # Open the video file for reading
  video_reader = cv2.VideoCapture(path)

  # Get the frames per second (fps), start_frame, and end_frame for the specified video duration
  fps = int(video_reader.get(cv2.CAP_PROP_FPS))
  frame_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
  start_frame = int(start_time*fps)
  end_frame = int(end_time*fps)
  interval_frame = end_frame - start_frame

  #calculate the skip frame number formula total duration of interval frame / sequence length
  skip_frame_window = max(interval_frame/SEQUENCE_LENGTH,1)

  # Get the height and width of the video frames
  height = video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT)
  width = video_reader.get(cv2.CAP_PROP_FRAME_WIDTH)
  print(f'fps:{fps}, frame_cnt:{frame_count} interval_frame:{interval_frame}, height:{height}, width:{width}, start_frame:{start_frame},end_frame:{end_frame},skip:{skip_frame_window}')


  # Set the video reader to the start frame
  video_reader.set(cv2.CAP_PROP_POS_FRAMES,start_frame)
  current_frame = start_frame

  # Iterate through frames in the specified interval
  while video_reader.isOpened() and math.ceil(current_frame) < end_frame:
    ret, frame = video_reader.read()
    if not ret:
      break

    # Convert the frame from BGR to RGB
    frame_rgb_opencv = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Resize the frame to a specific size (720x1280)
    resized_frame = cv2.resize(frame_rgb_opencv,(150,150))

    #normalize data to [0,1]
    frame = tf.image.convert_image_dtype(resized_frame, tf.float32)

    #brightness_increase = 70/255
    #brightened_frame = np.where((1 - frame) < brightness_increase, 1, frame + brightness_increase)

    # Append the resized frame to the frame_list
    frame_list.append(frame)

    # Set the video reader to the next frame position
    video_reader.set(cv2.CAP_PROP_POS_FRAMES, int(current_frame + skip_frame_window) )

    # Increment the current_frame by skip_frame_window
    current_frame += skip_frame_window


  # Release the video reader and close any remaining OpenCV windows
  video_reader.release()
  cv2.destroyAllWindows()

  # Return the list of extracted frames
  return frame_list;

Extract frame and store in X_train, X_validate, X_Test. And store the relative label in Y_train, Y_validate, Y_Test.

In [None]:
reset_seed(42)

from tensorflow.keras.utils import to_categorical

X_train = [] #Save the frame
Y_train = [] #Save the label
X_validate = [] #Save the frame
Y_validate = [] #Save the label
X_test = [] #Save the frame
Y_test = [] #Save the label

for index,row in df.iterrows():
    start_time = row["start_time_seconds"]
    end_time = row["end_time_seconds"]
    path = dataset_file_path + row["File_Path"] # Get the vidoe file path
    label = row["Movement_Name"] # Get the Movement lable
    video_path = row["File_Path"]

    if video_path in train_videos_list:
        Y_train.append(np.where(class_list == label)[0][0])
        frame_list = extract_frame(path,start_time,end_time)
        X_train.append(np.asarray(frame_list))
    elif video_path in validate_videos_list:
        Y_validate.append(np.where(class_list == label)[0][0])
        frame_list = extract_frame(path,start_time,end_time)
        X_validate.append(np.asarray(frame_list))
    else:
        Y_test.append(np.where(class_list == label)[0][0])
        frame_list = extract_frame(path,start_time,end_time)
        X_test.append(np.asarray(frame_list))

#encode the label by using one-hot encoded
one_hot_encoded_labels_train = to_categorical(Y_train)
Y_train = one_hot_encoded_labels_train

one_hot_encoded_labels_validate = to_categorical(Y_validate)
Y_validate = one_hot_encoded_labels_validate

one_hot_encoded_labels_test = to_categorical(Y_test)
Y_test = one_hot_encoded_labels_test


## Training Data Augmentation
Augmentation techniques include rotate, flipping, grayscale and color inverted.

### Flipping

In [None]:
def data_augmentation_flip_bright(X_train,Y_train):
    X_train_aug = []
    Y_train_aug = []

    for frame_list in X_train:
        frame_list_aug = []
        for frame in frame_list:
            flipped_frame = cv2.flip(frame,1)

            brightness_increase = 75/255
            brightened_frame = np.where((1 - flipped_frame) < brightness_increase, 1, flipped_frame + brightness_increase)

            frame_list_aug.append(flipped_frame)
        X_train_aug.append(np.asarray(frame_list_aug))

    for label in Y_train:
        Y_train_aug.append(label)

    return X_train_aug,Y_train_aug

### Rotate

In [None]:
def data_augmentation_rotate_contrast(X_train,Y_train):
    X_train_aug = []
    Y_train_aug = []

    for frame_list in X_train:
        frame_list_aug = []
        for frame in frame_list:
            # Define the rotation angle (in degrees)
            angle = 180  # Example rotation angle

            # Get image dimensions for rotation
            height, width = frame.shape[:2]

            # Calculate the rotation matrix
            rotation_matrix = cv2.getRotationMatrix2D((width / 2, height / 2), angle, 1)

            # Perform the rotation
            rotated_frame = cv2.warpAffine(frame, rotation_matrix, (width, height), flags=cv2.INTER_LINEAR)


            alpha = 1.5  # Contrast control (1.0 means no change)
            adjusted_frame = cv2.multiply(rotated_frame, np.array([alpha]))

            frame_list_aug.append(adjusted_frame)
        X_train_aug.append(np.asarray(frame_list_aug))

    for label in Y_train:
        Y_train_aug.append(label)

    return X_train_aug,Y_train_aug

### Grayscale

In [None]:
def data_augmentation_grayscale(X_train,Y_train):
    X_train_aug = []
    Y_train_aug = []

    for frame_list in X_train:
        frame_list_aug = []
        for frame in frame_list:
            gray_img = cv2.cvtColor((frame * 255).astype(np.uint8),cv2.COLOR_RGB2GRAY)
            normalized_frame = gray_img / 255.0

             # Expand dimensions to match the required channel size (e.g., 3 for RGB)
            expanded_frame = np.expand_dims(normalized_frame, axis=-1)  # Add a channel dimension

            # Duplicate the grayscale channel to match the number of channels in the original RGB frames
            stacked_frame = np.concatenate([expanded_frame] * 3, axis=-1)  # Repeat grayscale channel 3 times

            frame_list_aug.append(stacked_frame)

        X_train_aug.append(np.asarray(frame_list_aug))

    for label in Y_train:
        Y_train_aug.append(label)

    return X_train_aug,Y_train_aug

### Color Inverted

In [None]:
def data_augmentation_inverted(X_train,Y_train):
    X_train_aug = []
    Y_train_aug = []

    for frame_list in X_train:
        frame_list_aug = []
        for frame in frame_list:
            invert_img =  cv2.bitwise_not((frame * 255).astype(np.uint8))
            normalized_frame = invert_img / 255.0

            frame_list_aug.append(normalized_frame)

        X_train_aug.append(np.asarray(frame_list_aug))

    for label in Y_train:
        Y_train_aug.append(label)

    return X_train_aug,Y_train_aug

Data augmentation and combine with existing training data

In [None]:
X_train_aug_1,Y_train_aug_1 = data_augmentation_flip_bright(X_train,Y_train)
X_train_aug_2,Y_train_aug_2 = data_augmentation_rotate_contrast(X_train,Y_train)
X_train_aug_3,Y_train_aug_3 = data_augmentation_grayscale(X_train,Y_train)

X_train_aug_1 = np.array([np.array(frame_list) for frame_list in X_train_aug_1])
X_train_aug_2 = np.array([np.array(frame_list) for frame_list in X_train_aug_2])
X_train_aug_3 = np.array([np.array(frame_list) for frame_list in X_train_aug_3])

X_train = np.append(X_train, X_train_aug_1, axis=0)
Y_train = np.append(Y_train, Y_train_aug_1, axis=0)
X_train = np.append(X_train, X_train_aug_2, axis=0)
Y_train = np.append(Y_train, Y_train_aug_2, axis=0)
X_train = np.append(X_train, X_train_aug_3, axis=0)
Y_train = np.append(Y_train, Y_train_aug_3, axis=0)

## Data augmentation visualisation

In [None]:
ori_images = np.clip(X_train[2][1]*255, 0, 255).astype(np.uint8)
plt.imshow(ori_images)

In [None]:
flip_images = np.clip(X_train_aug_1[2][1]*255, 0, 255).astype(np.uint8)
plt.imshow(flip_images)

In [None]:
rotate_images = np.clip(X_train_aug_2[2][1]*255, 0, 255).astype(np.uint8)
plt.imshow(rotate_images)

In [None]:
gray_images = np.clip(X_train_aug_3[2][1]*255, 0, 255).astype(np.uint8)
plt.imshow(gray_images)

# Construct Model - Using Pytorch RestNet50
To avoid the overfitting issue, the model will apply dropout, regularization and batch normalization technique


In [None]:
import torch.nn as nn
from torchvision.models import resnet50, ResNet50_Weights


class ResNet50_LSTM(nn.Module):
    def __init__(self, num_classes, dropout_rate=0.4):
        super(ResNet50_LSTM, self).__init__()

        weights = ResNet50_Weights.DEFAULT
        self.resnet = resnet50(weights=weights)
        self.resnet = nn.Sequential(*list(self.resnet.children())[:-1])

        self.lstm = nn.LSTM(input_size=2048, hidden_size=256, num_layers=1, batch_first=True)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        batch_size, timesteps, H, W, C = x.size()
        c_in = x.view(batch_size * timesteps, C, H, W)

        c_out = self.resnet(c_in)
        r_out = c_out.view(batch_size, timesteps, -1)
        r_out, (hn, cn) = self.lstm(r_out)

        r_out = self.dropout(r_out[:, -1, :])
        out = self.fc(r_out)
        return out


In [None]:
from torch.utils.data import DataLoader, TensorDataset
import torch
import numpy as np



batch_size = 8

Y_train_indices = np.argmax(Y_train, axis=1)
Y_validate_indices = np.argmax(Y_validate, axis=1)
Y_test_indices = np.argmax(Y_test, axis=1)


train_data = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(Y_train_indices, dtype=torch.long))
validate_data = TensorDataset(torch.tensor(X_validate, dtype=torch.float32), torch.tensor(Y_validate_indices, dtype=torch.long))
test_data = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(Y_test_indices, dtype=torch.long))


train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
validate_loader = DataLoader(validate_data, batch_size=batch_size)
test_loader = DataLoader(test_data, batch_size=batch_size)

## Model training
To avoid overfitting issue, reduce LR technique will be utilised.

In [None]:
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        total_loss += loss.item()

        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        loss.backward()
        optimizer.step()

    avg_loss = total_loss / len(train_loader)
    accuracy = correct / total
    return avg_loss, accuracy

In [None]:
def validate(model, test_loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = total_loss / len(test_loader)
    accuracy = correct / total
    return avg_loss, accuracy

In [None]:
# reset_seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_classes = len(class_list)
model = ResNet50_LSTM(num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001,weight_decay=0.001)

num_epochs = 20
train_losses = []
valid_losses = []
valid_accuracies = []
train_accuracies = []

for epoch in range(num_epochs):
    train_loss, train_accuracy = train(model, train_loader, criterion, optimizer, device)
    valid_loss, valid_accuracy = validate(model, validate_loader, criterion, device)

    train_losses.append(train_loss)
    valid_losses.append(valid_loss)

    train_accuracies.append(train_accuracy)
    valid_accuracies.append(valid_accuracy)

    print(f'Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Validation Loss: {valid_loss:.4f}, Validation Accuracy: {valid_accuracy:.4f}')

In [None]:
def evaluate(model, test_loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    pred_list =[]

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            total_loss += loss.item()

            pred = output.argmax(dim=1)
            pred_list.append(pred.tolist())
            correct += pred.eq(target).sum().item()
            total += target.size(0)

    avg_loss = total_loss / len(test_loader)
    accuracy = correct / total
    return avg_loss, accuracy,pred_list

In [None]:
from sklearn.metrics import confusion_matrix
import numpy as np

def get_all_preds_and_labels(model, data_loader, device):
    all_preds = []
    all_labels = []

    model.eval()
    with torch.no_grad():
        for data, target in data_loader:
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            _, preds = torch.max(outputs, 1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(target.cpu().numpy())

    return np.array(all_preds), np.array(all_labels)

# 使用测试集获取预测和标签
preds, labels = get_all_preds_and_labels(model, test_loader, device)

In [None]:
conf_matrix = confusion_matrix(labels, preds)
print(conf_matrix)

In [None]:
model = model.to(device)
test_loss, test_accuracy, y_pred= evaluate(model, test_loader, criterion, device)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns


sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.show()

In [None]:
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(valid_accuracies, label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(valid_losses, label='Validation Loss')
plt.title('Training and Validation Loss')
plt.legend()