AI Prediction testing with mediapipe coordinates

In [17]:
import mediapipe as mp
import cv2
import numpy as np
import os
from collections import Counter

from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

import joblib
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader


In [10]:
### Import the data, or locate

# Extract frames and label them
def extract_skeleton(video):
    mp_pose = mp.solutions.pose
    pose  = mp_pose.Pose(static_image_mode=False)
    cap = cv2.VideoCapture(video)
    keypoints_sequence = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        result = pose.process(rgb)

        if result.pose_landmarks:
            landmarks = result.pose_landmarks.landmark
            keypoints = []
            for lm in landmarks:
                keypoints.extend((lm.x, lm.y, lm.z, lm.visibility))
            if len(keypoints) != 132:
                keypoints = [0.0] * 132
        else:
            keypoints = [0.0] * 132

        keypoints_sequence.append(keypoints)
    
    cap.release()
    pose.close()
    return np.array(keypoints_sequence)

# (Collect the angles that we thought about, might be useful)

In [14]:
def build_dataset(folder_path):
    X, y = [], []
    label_counts = Counter()
    video_stats = []

    for root, _, files in os.walk(folder_path):
        for file in files:
            if not file.lower().endswith(".mp4"):
                continue

            video_path = os.path.join(root, file)
            label = 1 if "enter" in file.lower() else 0
            skeleton = extract_skeleton(video_path)

            num_frames = skeleton.shape[0]
            video_stats.append((file, num_frames, label))
            if num_frames == 0:
                print(f"Warning: No frames extracted for {file}")
                continue

            feature = skeleton.flatten()
            X.append(feature)
            y.append(label)
            label_counts[label] += 1

    print("Label Distribution:", label_counts)
    print("Frame stats per video:")
    for name, frames, label in video_stats:
        print(f"{name}: {frames} frames, label: {label}")
        
    return np.array(X), np.array(y)

In [13]:
def train_and_evaluate_model(X, y):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf.fit(X_train, y_train)
    rf_preds = rf.predict(X_test)
    print("Random Forest:")
    print(classification_report(y_test, rf.predict(X_test)))

    knn = KNeighborsClassifier(n_neighbors=5)
    knn.fit(X_train, y_train)
    knn_preds = knn.predict(X_test)
    print("KNN:")
    print(classification_report(y_test, knn.predict(X_test)))

    joblib.dump(rf, 'rf_model.pkl')
    joblib.dump(knn, 'knn_model.pkl')

    return X_test, y_test, rf_preds, knn_preds

In [15]:
trimmed_videos_path = "C:/Users/hanna/Documents/Thesis/datainsamling/data/processed_videos/"
X, y = build_dataset(trimmed_videos_path)
X_test, y_test, rf_preds, knn_preds = train_and_evaluate_model(X, y)

Label Distribution: Counter({0: 80, 1: 79})
Frame stats per video:
front_enter_10_trimmed.mp4: 20 frames, label: 1
front_enter_11_trimmed.mp4: 20 frames, label: 1
front_enter_11_trimmed_1.mp4: 20 frames, label: 1
front_enter_12_trimmed.mp4: 20 frames, label: 1
front_enter_12_trimmed_1.mp4: 20 frames, label: 1
front_enter_13_trimmed.mp4: 20 frames, label: 1
front_enter_13_trimmed_1.mp4: 20 frames, label: 1
front_enter_14_trimmed.mp4: 20 frames, label: 1
front_enter_14_trimmed_1.mp4: 20 frames, label: 1
front_enter_15_trimmed.mp4: 20 frames, label: 1
front_enter_15_trimmed_1.mp4: 20 frames, label: 1
front_enter_16_trimmed.mp4: 20 frames, label: 1
front_enter_16_trimmed_1.mp4: 20 frames, label: 1
front_enter_17_trimmed.mp4: 20 frames, label: 1
front_enter_17_trimmed_1.mp4: 20 frames, label: 1
front_enter_18_trimmed.mp4: 20 frames, label: 1
front_enter_18_trimmed_1.mp4: 20 frames, label: 1
front_enter_19_trimmed.mp4: 20 frames, label: 1
front_enter_19_trimmed_1.mp4: 20 frames, label: 1
fro

In [16]:
# rf false classifications

# False positives
rf_fp_indices = np.where((rf_preds == 1) & (y_test == 0))[0]

# False negatives
rf_fn_indices = np.where((rf_preds == 0) & (y_test == 1))

print(f"False Positives for RF (predicted 'enter' but actually 'pass'): {len(rf_fp_indices)}")
print(f"False Negatives for RF (predicted 'pass' but actually 'enter'): {len(rf_fn_indices)}")


# knn false classifications

# False positives
knn_fp_indices = np.where((knn_preds == 1) & (y_test == 0))[0]

# False negatives
knn_fn_indices = np.where((knn_preds == 0) & (y_test == 1))

print(f"False Positives for KNN (predicted 'enter' but actually 'pass'): {len(knn_fp_indices)}")
print(f"False Negatives for KNN (predicted 'pass' but actually 'enter'): {len(knn_fn_indices)}")

False Positives for RF (predicted 'enter' but actually 'pass'): 0
False Negatives for RF (predicted 'pass' but actually 'enter'): 1
False Positives for KNN (predicted 'enter' but actually 'pass'): 4
False Negatives for KNN (predicted 'pass' but actually 'enter'): 1


In [18]:
# LSTM

def build_timeseries_dataset(folder_path, max_seq_len=100):
    X, y = [], []
    label_counts = Counter()

    for root, _, files in os.walk(folder_path):
        for file in files:
            if not file.lower().endswith(".mp4"):
                continue
            video_path = os.path.join(root, file)
            label = 1 if "enter" in os.path.basename(root).lower() else 0
            skeleton = extract_skeleton(video_path)

            # Pad or truncate
            if skeleton.shape[0] < max_seq_len:
                pad_len = max_seq_len - skeleton.shape[0]
                padding = np.zeros((pad_len, skeleton.shape[1]))
                skeleton = np.vstack([skeleton, padding])
            else:
                skeleton = skeleton[:max_seq_len]

            X.append(skeleton)
            y.append(label)
            label_counts[label] += 1
    
    print("Label distribution: ", label_counts)
    return np.array(X), np.array(y)

In [19]:
class PoseDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.y)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


In [20]:

class LSTMClassifier(nn.Module):

    def __init__(self, input_size, hidden_size=64, num_layers=1, num_classes=2):
        super(LSTMClassifier, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = out[:, -1, :]
        out = self.fc(out)
        return out

In [21]:
def train_lstm(X, y, epochs=20, batch_size=16, lr=0.001):
    dataset = PoseDataset(X,y)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    input_size = X.shape[2]
    model = LSTMClassifier(input_size)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for batch_x, batch_y in dataloader:
            optimizer.zero_grad()
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(dataloader):.4f}")
    
    return model

In [22]:
def evaluate_lstm(model, X, y):
    dataset = PoseDataset(X, y)
    dataloader = DataLoader(dataset, batch_size=16, shuffle=False)

    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch_x, batch_y in dataloader:
            outputs = model(batch_x)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch_y.cpu().numpy())

    print(classification_report(all_labels, all_preds))

In [23]:
trimmed_videos_path = "C:/Users/hanna/Documents/Thesis/datainsamling/data/processed_videos/"
X, y = build_timeseries_dataset(trimmed_videos_path, max_seq_len=100)
model = train_lstm(X, y, epochs=20)
evaluate_lstm(model, X, y)

Label distribution:  Counter({0: 80, 1: 79})
Epoch 1/20, Loss: 0.6944
Epoch 2/20, Loss: 0.6935
Epoch 3/20, Loss: 0.6935
Epoch 4/20, Loss: 0.6932
Epoch 5/20, Loss: 0.6935
Epoch 6/20, Loss: 0.6940
Epoch 7/20, Loss: 0.6932
Epoch 8/20, Loss: 0.6936
Epoch 9/20, Loss: 0.6934
Epoch 10/20, Loss: 0.6933
Epoch 11/20, Loss: 0.6931
Epoch 12/20, Loss: 0.6938
Epoch 13/20, Loss: 0.6934
Epoch 14/20, Loss: 0.6932
Epoch 15/20, Loss: 0.6932
Epoch 16/20, Loss: 0.6934
Epoch 17/20, Loss: 0.6936
Epoch 18/20, Loss: 0.6933
Epoch 19/20, Loss: 0.6936
Epoch 20/20, Loss: 0.6936
              precision    recall  f1-score   support

           0       0.50      1.00      0.67        80
           1       0.00      0.00      0.00        79

    accuracy                           0.50       159
   macro avg       0.25      0.50      0.33       159
weighted avg       0.25      0.50      0.34       159



  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
