### Testing pose detection

In [2]:
import cv2
import mediapipe as mp

In [1]:
# Initialize MediaPipe Pose solution.
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# Initialize MediaPipe drawing utility.
mp_drawing = mp.solutions.drawing_utils

# Capture video from the default camera.
cap = cv2.VideoCapture(0)

while cap.isOpened():
    success, image = cap.read()
    if not success:
        print("Ignoring empty camera frame.")
        continue

    # Convert the BGR image to RGB.
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Process the image and detect the pose.
    results = pose.process(image)

    print(results.x, results.y, results.z)

    """
    results object contains the x y and z axes values of the point that will be drawn
                                visibility number which indicated whether that point is clear
    """


    # Draw the pose annotations on the image.
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    if results.pose_landmarks:
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

    # Display the resulting image.
    cv2.imshow('MediaPipe Pose', image)

    # Break the loop if 'q' is pressed.
    if cv2.waitKey(5) & 0xFF == ord('q'):
        break

pose.close()
cap.release()


NameError: name 'mp' is not defined

### Relevant code : 

In [2]:
import cv2
import mediapipe as mp
import numpy as np
import os
from sklearn.model_selection import train_test_split
from tqdm import tqdm

Function to get the pose from the frame

In [4]:
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, model_complexity=1, enable_segmentation=False, min_detection_confidence=0.5)

def extract_pose_data(frame):
    # Convert the color space from BGR to RGB
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    
    # Process the frame to detect the pose
    results = pose.process(frame_rgb)

    # Check if pose landmarks are detected
    if results.pose_landmarks:
        # Extract landmarks data
        pose_data = np.array([[lm.x, lm.y, lm.z, lm.visibility] for lm in results.pose_landmarks.landmark]).flatten()
        return pose_data
    return np.zeros(132)  # Assuming 33 landmarks each with 4 values (x, y, z, visibility)


I0000 00:00:1701150395.601538       1 gl_context.cc:344] GL version: 2.1 (2.1 Metal - 88), renderer: Apple M1 Pro


INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


Generating the sequences from the dataset

In [17]:
def create_sequences(image_folder, label_folder, sequence_length):
    sequences = []
    labels = []

    image_files = sorted(os.listdir(image_folder))
    for i in tqdm(range(len(image_files) - sequence_length + 1), desc="Processing sequences"):
        sequence = []
        sequence_labels = []

        for j in range(i, i + sequence_length):
            img_path = os.path.join(image_folder, image_files[j])
            frame = cv2.imread(img_path)
            if frame is None:
                print(f"Failed to read image: {img_path}")
                break
            pose_data = extract_pose_data(frame)
            sequence.append(pose_data)

            label_path = os.path.join(label_folder, image_files[j].replace('.png', '.txt'))
            with open(label_path, 'r') as file:
                if file is None:
                    print(f"failed to read label: {label_path}")
                fall_label = int(file.read().split()[0])
                sequence_labels.append(fall_label)

        if len(sequence) == sequence_length:
            sequences.append(sequence)
            # Use the most frequent label in the sequence as the sequence label
            labels.append(max(set(sequence_labels), key=sequence_labels.count))

    return np.array(sequences), np.array(labels)

# Example usage
sequence_length = 30
sequences, labels = create_sequences("/Users/varunshankarhoskere/Desktop/Academics/PES/Capstone/Datasets/images", 
                                     "/Users/varunshankarhoskere/Desktop/Academics/PES/Capstone/Datasets/labels", 
                                     sequence_length)


Processing sequences:   0%|          | 0/8375 [00:00<?, ?it/s]

Processing sequences: 100%|██████████| 8375/8375 [1:33:57<00:00,  1.49it/s]


In [5]:
def create_image_sequences(image_folder, sequence_length):
    sequences = []
    image_files = sorted(os.listdir(image_folder))

    for i in tqdm(range(len(image_files) - sequence_length + 1), desc="Processing image sequences"):
        sequence = []

        for j in range(i, i + sequence_length):
            img_path = os.path.join(image_folder, image_files[j])
            frame = cv2.imread(img_path)
            if frame is None:
                print(f"Failed to read image: {img_path}")
                break
            pose_data = extract_pose_data(frame)
            sequence.append(pose_data)

        if len(sequence) == sequence_length:
            sequences.append(sequence)

    return np.array(sequences)

def create_label_sequences(label_folder, sequence_length, image_files):
    labels = []

    for i in tqdm(range(len(image_files) - sequence_length + 1), desc="Processing label sequences"):
        sequence_labels = []

        for j in range(i, i + sequence_length):
            label_path = os.path.join(label_folder, image_files[j].replace('.png', '.txt'))
            with open(label_path, 'r') as file:
                if file is None:
                    print(f"failed to read label: {label_path}")
                fall_label = int(file.read().split()[0])
                sequence_labels.append(fall_label)

        # Use the most frequent label in the sequence as the sequence label
        if sequence_labels:
            labels.append(max(set(sequence_labels), key=sequence_labels.count))

    return np.array(labels)

# Example usage
sequence_length = 30
image_folder = "/Users/varunshankarhoskere/Desktop/Academics/PES/Capstone/Datasets/images"
label_folder = "/Users/varunshankarhoskere/Desktop/Academics/PES/Capstone/Datasets/labels"

image_sequences = create_image_sequences(image_folder, sequence_length)
image_files = sorted(os.listdir(image_folder))  # Ensure this matches the sorting in image sequence creation
label_sequences = create_label_sequences(label_folder, sequence_length, image_files)

# Now image_sequences and label_sequences contain your data


Processing image sequences: 100%|██████████| 8375/8375 [1:33:26<00:00,  1.49it/s]
Processing label sequences: 100%|██████████| 8375/8375 [00:04<00:00, 2025.00it/s]


In [7]:
import torch
from torch.utils.data import DataLoader, TensorDataset

In [9]:
# Convert sequences and labels to PyTorch tensors
sequences_tensor = torch.tensor(image_sequences, dtype=torch.float32)
labels_tensor = torch.tensor(label_sequences, dtype=torch.long)

# Create a dataset and data loader
dataset = TensorDataset(sequences_tensor, labels_tensor)
data_loader = DataLoader(dataset, batch_size=32, shuffle=True)

# LSTM model initialization and training loop
# (Use the FallDetectionLSTM class and training loop provided earlier)


In [10]:
import torch.nn as nn

class FallDetectionLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(FallDetectionLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # Initialize hidden and cell states
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size)

        # Decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
        return out


In [11]:
from sklearn.model_selection import train_test_split

# Assuming image_sequences and label_sequences are your data
train_seqs, test_seqs, train_labels, test_labels = train_test_split(
    image_sequences, label_sequences, test_size=0.2, random_state=42
)

train_dataset = TensorDataset(torch.tensor(train_seqs, dtype=torch.float32), torch.tensor(train_labels, dtype=torch.long))
test_dataset = TensorDataset(torch.tensor(test_seqs, dtype=torch.float32), torch.tensor(test_labels, dtype=torch.long))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [12]:
import torch.optim as optim

In [15]:
def train_model(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        for sequences, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

def evaluate_model(model, test_loader):
    model.eval()
    total, correct = 0, 0
    with torch.no_grad():
        for sequences, labels in test_loader:
            outputs = model(sequences)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    print(f'Accuracy: {accuracy:.2f}%')

input_size = 132
hidden_size = 132
num_layers = 3
num_classes = 2

# Model Initialization
model = FallDetectionLSTM(input_size, hidden_size, num_layers, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training and Evaluation
train_model(model, train_loader, criterion, optimizer, num_epochs=10)
evaluate_model(model, test_loader)


Epoch [1/10], Loss: 0.2466
Epoch [2/10], Loss: 0.0670
Epoch [3/10], Loss: 0.0312
Epoch [4/10], Loss: 0.1829
Epoch [5/10], Loss: 0.0271
Epoch [6/10], Loss: 0.0004
Epoch [7/10], Loss: 0.0562
Epoch [8/10], Loss: 0.0281
Epoch [9/10], Loss: 0.0881
Epoch [10/10], Loss: 0.2218
Accuracy: 98.51%
