In [179]:
import matplotlib.pyplot as plt
import pandas as pd
import torch
import torch.nn as nn

import numpy as np
import torch.optim as optim
import torch.utils.data as data
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
import ast
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


In [143]:
dataset = pd.read_csv('../landmark_extraction_mechanism/labeled_dataset.csv')

In [144]:
unlabel = pd.read_csv('../landmark_extraction_mechanism/dataset.csv')

In [145]:
unlabel['label'] = dataset['Label']
dataset = unlabel

In [146]:
def is_float(num):
    try:
        float(num)
        return True
    except ValueError:
        return False

In [147]:
dataset['landmarks'] = dataset['landmarks'].apply(lambda arr: np.array([float(n) for n in arr.split() if is_float(n)]))

### Dataset csv

Each row contains a video (so we can label the data), also the timestamp of the frame where that sequence set of landmarks was captured and 

In [149]:

# Group the data by 'video' and 'group'
grouped_data = dataset.groupby(['video', 'group'])

# Define the sequence length
sequence_length = 10

# Create lists to store the sequences and labels
sequences = []
labels = []

# Iterate over each group
for group, data in grouped_data:
    landmarks = data['landmarks'].tolist()
    group_labels = data['label'].tolist()
    
    # Create sequences of landmarks
    for i in range(len(landmarks) - sequence_length + 1):
        sequence = landmarks[i:i+sequence_length]
        sequences.append(sequence)
        labels.append(group_labels[i+sequence_length-1])

In [154]:
sequences = np.array(sequences)

In [155]:
scaler = MinMaxScaler()
normalized_sequences = np.zeros_like(sequences)

for i in range(sequences.shape[0]):
    for j in range(sequences.shape[1]):
        # Flatten the landmarks for each set within the sequence
        landmarks_flattened = np.reshape(sequences[i, j], (-1, 1))
        # Normalize the landmarks
        landmarks_normalized = scaler.fit_transform(landmarks_flattened)
        # Reshape the normalized landmarks back to the original shape
        normalized_landmarks = np.reshape(landmarks_normalized, sequences[i, j].shape)
        # Update the normalized landmarks in the sequences array
        normalized_sequences[i, j] = normalized_landmarks

In [164]:
label_encoder = LabelEncoder()
labels_encoded = label_encoder.fit_transform(labels)

In [166]:
train_X, test_X, train_y, test_y = train_test_split(normalized_sequences, labels_encoded, test_size=0.2, shuffle=True)

In [167]:
print(train_X.shape)
print(train_y)

(35, 10, 57)
[1 0 1 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 1 1 0 1 0 1 0 0 1 0 1 0 0 0 1 0 1]


In [168]:
train_X_tensor = torch.Tensor(train_X)
train_y_tensor = torch.Tensor(train_y)

In [171]:
train_dataset = TensorDataset(train_X_tensor, train_y_tensor)
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)

In [169]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.dropout = nn.Dropout(0.2)
        self.fc1 = nn.Linear(hidden_size, 32)
        self.fc2 = nn.Linear(32, num_classes)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        _, (h_n, _) = self.lstm(x)
        x = self.dropout(h_n[-1])
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.sigmoid(x)
        return x

In [172]:
input_size = train_X.shape[2]
hidden_size = 64
num_classes = 1
num_epochs = 10
learning_rate = 0.001

# Instantiate the model
model = LSTMModel(input_size, hidden_size, num_classes)

In [174]:
# Define the loss function and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [175]:

# Train the model
for epoch in range(num_epochs):
    for inputs, labels in train_dataloader:
        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    # Print the loss for every epoch
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')

Epoch 1/10, Loss: 0.6634728312492371
Epoch 2/10, Loss: 0.7621971964836121
Epoch 3/10, Loss: 0.5117874145507812
Epoch 4/10, Loss: 0.48220014572143555
Epoch 5/10, Loss: 0.6094920635223389
Epoch 6/10, Loss: 0.3894810974597931
Epoch 7/10, Loss: 0.31822773814201355
Epoch 8/10, Loss: 0.30327558517456055
Epoch 9/10, Loss: 0.7119357585906982
Epoch 10/10, Loss: 0.6103209853172302


In [178]:
test_X_tensor = torch.Tensor(test_X)
test_y_tensor = torch.Tensor(test_y)

test_dataset = TensorDataset(test_X_tensor, test_y_tensor)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)  # Set shuffle to False for evaluation


In [182]:

# Set the model to evaluation mode
model.eval()

# Create lists to store the predicted labels and ground truth labels
predicted_labels = []
true_labels = []

# Iterate over the test data
for inputs, labels in test_dataloader:
    # Forward pass
    outputs = model(inputs)

    # Get the predicted labels by taking the maximum probability
    _, predicted = torch.max(outputs, dim=1)

    # Append the predicted labels and true labels to the lists
    predicted_labels.extend(predicted.tolist())
    true_labels.extend(labels.tolist())

# Convert the predicted labels and true labels to numpy arrays
predicted_labels = np.array(predicted_labels)
true_labels = np.array(true_labels)

# Calculate accuracy
accuracy = accuracy_score(true_labels, predicted_labels)

# Calculate precision, recall, and F1 score
precision = precision_score(true_labels, predicted_labels, average='weighted')
recall = recall_score(true_labels, predicted_labels, average='weighted')
f1 = f1_score(true_labels, predicted_labels, average='weighted')

# Print the metrics
print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1:.2f}")


Accuracy: 0.67
Precision: 0.44
Recall: 0.67
F1 Score: 0.53


  _warn_prf(average, modifier, msg_start, len(result))


In [183]:
model

LSTMModel(
  (lstm): LSTM(57, 64, batch_first=True)
  (dropout): Dropout(p=0.2, inplace=False)
  (fc1): Linear(in_features=64, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)