Name: Hassan Mansoor

CMS: 403544

Class: BSCS12A

Deep Learning Assign 03

Imports

In [None]:
import os
import cv2
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt

# **Pose Estimation With MediaPipe**

In [None]:
import mediapipe as mp

In [None]:
# setup
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=True, model_complexity=1)
mp_drawing = mp.solutions.drawing_utils

def extract_keypoints(image):
    """
    Extract 33 keypoints from image using MediaPipe pose estimation.
    Returns a flat list of (x, y, visibility) for each keypoint or None if not detected.
    """
    results = pose.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    if results.pose_landmarks:
        keypoints = []
        for lm in results.pose_landmarks.landmark:
            keypoints.extend([lm.x, lm.y, lm.visibility])
        return keypoints
    else:
        return None


# **Process Image Sequences and Save Keypoints**

In [None]:
import glob ##
import pandas as pd

def process_sequence_folder(folder_path, label):
    """
    Processes all PNG images in a sequence folder and returns a list of extracted keypoints.
    """
    frame_data = []
    images = sorted(glob.glob(os.path.join(folder_path, '*.png')))
    for img_path in images:
        image = cv2.imread(img_path)
        keypoints = extract_keypoints(image)
        if keypoints:
            frame_data.append(keypoints)
    return frame_data if frame_data else None


Process Sequence Folders

In [None]:
def process_all_sequences(base_dir, label):
    """
    Process all sequence folders (fall or ADL) and return a list of (sequence, label).
    """
    all_data = []
    folders = sorted(os.listdir(base_dir))
    for folder_name in tqdm(folders, desc=f"Processing {label} data"):
        folder_path = os.path.join(base_dir, folder_name)
        frames = process_sequence_folder(folder_path, label)
        if frames:
            all_data.append((frames, label))
    return all_data

Run The Keypoint Extraction

In [None]:
# Define paths
fall_path = os.path.join(extract_path, "/content/UR dataset/Fall")
adl_path = os.path.join(extract_path, "/content/UR dataset/ADL")

# Extract data
fall_sequences = process_all_sequences(fall_path, label=1)  # Fall = 1
adl_sequences = process_all_sequences(adl_path, label=0)    # ADL = 0

Processing 1 data: 100%|██████████| 30/30 [03:57<00:00,  7.93s/it]
Processing 0 data: 100%|██████████| 30/30 [07:59<00:00, 15.99s/it]


In [None]:
# Combine and save as numpy file
all_sequences = fall_sequences + adl_sequences
np.save("/content/drive/MyDrive/pose_sequences.npy", np.array(all_sequences, dtype=object), allow_pickle=True)


print("✅ Keypoint extraction complete. Data saved!")

✅ Keypoint extraction complete. Data saved!


Verify if saved correctly

In [None]:
loaded_data = np.load("/content/drive/MyDrive/pose_sequences.npy", allow_pickle=True)
print(f"Loaded {len(loaded_data)} sequences")


Loaded 60 sequences


In [None]:
# verify structure of 1st seq
print("Sample sequence [0]:\n")
print(loaded_data[0])

print("Sample sequence [1]:\n")
print(loaded_data[1])

Sample sequence [0]:

[list([[0.653305172920227, 0.259857177734375, 0.9999145269393921, 0.6580387353897095, 0.24872203171253204, 0.9996861219406128, 0.6611379384994507, 0.24825410544872284, 0.9997041821479797, 0.6640633344650269, 0.24804344773292542, 0.9997161030769348, 0.6494731903076172, 0.24979424476623535, 0.9995222091674805, 0.6467543244361877, 0.25006434321403503, 0.9994189739227295, 0.6440809965133667, 0.2503795027732849, 0.9993321299552917, 0.6689975261688232, 0.2523515224456787, 0.9995618462562561, 0.641743004322052, 0.2556453347206116, 0.9981404542922974, 0.6602534651756287, 0.270072877407074, 0.9998698234558105, 0.6490887403488159, 0.2716168165206909, 0.999717652797699, 0.6986157894134521, 0.3083328604698181, 0.9999655485153198, 0.6263695359230042, 0.3215116262435913, 0.9997358918190002, 0.7165960073471069, 0.38035881519317627, 0.9921701550483704, 0.6187381148338318, 0.3930400311946869, 0.9460576176643372, 0.7165890336036682, 0.4466959536075592, 0.9822412133216858, 0.6087667

Sanity ccheck

In [None]:
print(f"Frames in first sequence: {len(loaded_data[0][0])}")
print(f"Length of keypoints in first frame: {len(loaded_data[0][0][0])}")  # Should be 99


Frames in first sequence: 123
Length of keypoints in first frame: 99


looks okay.


# **visualise some keypoints**

In [None]:
sample_img_path = "/content/UR dataset/Fall/fall-01-cam0-rgb/fall-01-cam0-rgb-001.png"

In [None]:
# Read image
image = cv2.imread(sample_img_path)
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

# Run MediaPipe pose estimation
results = pose.process(rgb_image)

In [None]:
# Draw the pose landmarks on the image
if results.pose_landmarks:
    annotated_image = image.copy()
    mp_drawing.draw_landmarks(
        annotated_image,
        results.pose_landmarks,
        mp_pose.POSE_CONNECTIONS,
        landmark_drawing_spec=mp_drawing.DrawingSpec(color=(0,255,0), thickness=2, circle_radius=2),
        connection_drawing_spec=mp_drawing.DrawingSpec(color=(255,0,0), thickness=2, circle_radius=2)
    )

    # plot
    plt.figure(figsize=(8, 8))
    plt.imshow(cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB))
    plt.title("Pose Keypoints Visualization")
    plt.axis('off')
    plt.show()
else:
    print("No pose detected in the selected frame.")

# **Feature Engineering**

In [None]:
import numpy as np
import math

load saved keypoints

In [None]:
def load_keypoints(file_path):
    """
    Load previously saved keypoints data.
    Args:
        file_path (str): Path to the .npy file containing keypoints.
    Returns:
        list: Loaded keypoints data.
    """
    data = np.load(file_path, allow_pickle=True)
    print(f"Loaded {len(data)} sequences from {file_path}")
    return data

reshape keypoints

In [None]:
def reshape_keypoints(flattened_keypoints):
    """
    Reshape flattened keypoints into a list of [x, y, visibility].
    Args:
        flattened_keypoints (list or np.array): Flattened keypoints for a single frame.
    Returns:
        list: Reshaped keypoints as [ [x, y, visibility], ... ].
    """
    keypoints = np.array(flattened_keypoints).reshape(-1, 3)  # Reshape to (N, 3)
    return keypoints

Joint Angles

In [None]:
def calculate_angle(a, b, c):
    """
    Calculate the angle between three points (a, b, c).
    Args:
        a, b, c (tuple): Coordinates of the points (x, y).
    Returns:
        float: Angle in degrees.
    """
    a = np.array(a[:2])  # Take only x, y
    b = np.array(b[:2])
    c = np.array(c[:2])
    ba = a - b
    bc = c - b
    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))
    return np.degrees(angle)

In [None]:
def compute_joint_angles(keypoints):
    """
    Compute joint angles for keypoints.
    Args:
        keypoints (list): List of 33 keypoints (x, y, visibility).
    Returns:
        list: Calculated joint angles.
    """
    # Define keypoint indices for joints
    # Example: Shoulder-Elbow-Wrist
    joints = [
        (11, 13, 15),  # Left arm: Shoulder-Elbow-Wrist
        (12, 14, 16),  # Right arm: Shoulder-Elbow-Wrist
        (23, 25, 27),  # Left leg: Hip-Knee-Ankle
        (24, 26, 28),  # Right leg: Hip-Knee-Ankle
    ]
    angles = []
    for joint in joints:
        a, b, c = joint
        if (
            keypoints[a][2] > 0.5 and  # Check visibility
            keypoints[b][2] > 0.5 and
            keypoints[c][2] > 0.5
        ):
            angle = calculate_angle(keypoints[a], keypoints[b], keypoints[c])
            angles.append(angle)
        else:
            angles.append(0)  # If not visible, add 0
    return angles

CoG

In [None]:
def center_of_gravity(keypoints):
    """
    Calculate the center of gravity using shoulder and hip keypoints.
    Args:
        keypoints (list): List of 33 keypoints (x, y, visibility).
    Returns:
        tuple: (x, y) coordinates of the center of gravity.
    """
    torso_indices = [11, 12, 23, 24]  # Shoulders and hips
    valid_points = [keypoints[i][:2] for i in torso_indices if keypoints[i][2] > 0.5]
    if valid_points:
        cog = np.mean(valid_points, axis=0)
    else:
        cog = (0, 0)  # Fallback value
    return cog


Velocity/Accleration

In [None]:
def compute_velocity(keypoints_t1, keypoints_t2, fps):
    """
    Compute velocity of each keypoint between two frames.
    Args:
        keypoints_t1, keypoints_t2 (list): Keypoints of frames t1 and t2.
        fps (int): Frames per second of the video.
    Returns:
        list: Velocity for each keypoint.
    """
    velocities = []
    for p1, p2 in zip(keypoints_t1, keypoints_t2):
        if p1[2] > 0.5 and p2[2] > 0.5:  # Check visibility
            velocity = np.linalg.norm(np.array(p2[:2]) - np.array(p1[:2])) * fps
            velocities.append(velocity)
        else:
            velocities.append(0)  # If not visible, add 0
    return velocities

def compute_acceleration(vel_t1, vel_t2, fps):
    """
    Compute acceleration based on velocities between two frames.
    Args:
        vel_t1, vel_t2 (list): Velocities of frames t1 and t2.
        fps (int): Frames per second of the video.
    Returns:
        list: Acceleration for each keypoint.
    """
    return [(v2 - v1) * fps for v1, v2 in zip(vel_t1, vel_t2)]


Feature Extraction

In [None]:
def extract_features(sequences, fps=30):
    """
    Extract meaningful features from keypoints sequences.
    Args:
        sequences (list): List of sequences, each containing frames of keypoints.
        fps (int): Frames per second of the video.
    Returns:
        list: Extracted features for each sequence.
    """
    all_features = []
    for frames, label in sequences:
        sequence_features = []
        velocities = [None]  # First frame has no velocity
        for i in range(1, len(frames)):
            keypoints_t1 = reshape_keypoints(frames[i - 1])
            keypoints_t2 = reshape_keypoints(frames[i])

            # Joint Angles
            angles = compute_joint_angles(keypoints_t2)

            # Center of Gravity
            cog = center_of_gravity(keypoints_t2)

            # Velocity and Acceleration
            velocity = compute_velocity(keypoints_t1, keypoints_t2, fps)
            velocities.append(velocity)
            acceleration = compute_acceleration(velocities[i - 1], velocity, fps) if i > 1 else [0] * len(velocity)

            # Combine all features
            features = angles + list(cog) + velocity + acceleration
            sequence_features.append(features)

        all_features.append((sequence_features, label))
    return all_features

save features

In [None]:
def save_features(features, file_path):
    """
    Save extracted features to a file.
    Args:
        features (list): Extracted features.
        file_path (str): Path to save the file.
    """
    np.save(file_path, np.array(features, dtype=object), allow_pickle=True)
    print(f"Features saved to {file_path}")

In [None]:
# file paths
keypoints_file = "/content/drive/MyDrive/pose_sequences.npy"
features_file = "/content/drive/MyDrive/pose_features.npy"

In [None]:
# Load keypoints
sequences = load_keypoints(keypoints_file)

Loaded 60 sequences from /content/drive/MyDrive/pose_sequences.npy


In [None]:
# Extract features
features = extract_features(sequences, fps=30)

# Save features
save_features(features, features_file)

Features saved to /content/drive/MyDrive/pose_features.npy


Verify and inspect features

In [None]:
# Inspect the structure of the first sequence
print("Sample sequence [0]:\n")
first_sequence = features[0]
if len(first_sequence) == 2:
    sequence_data, label = first_sequence
    print(f"Label: {label}")
    print(f"Number of frames in sequence: {len(sequence_data)}\n")

    # Inspect the features of the first frame
    if len(sequence_data) > 0:
        print("Features of the first frame:")
        print(sequence_data[0])
        print(f"Total features in the first frame: {len(sequence_data[0])}")
    else:
        print(" No frames found in the first sequence.")
else:
    print("Unexpected structure in the first sequence.")

Sample sequence [0]:

Label: 1
Number of frames in sequence: 122

Features of the first frame:
[np.float64(165.46534551308847), np.float64(177.6568695207616), np.float64(171.1782576305618), np.float64(175.47291294990035), np.float64(0.6628886163234711), np.float64(0.3892749175429344), np.float64(0.11291150272024622), np.float64(0.10299942541478731), np.float64(0.10092931221541285), np.float64(0.09426438151657365), np.float64(0.0936232858924365), np.float64(0.08579124716540454), np.float64(0.0748366947737068), np.float64(0.06664424435716014), np.float64(0.026974312088375054), np.float64(0.10171294779760075), np.float64(0.09329950845632477), np.float64(0.017776317352021526), np.float64(0.015777561644690054), np.float64(0.02484559752036324), np.float64(0.11892463795332328), np.float64(0.045702349912542004), np.float64(0.08737347441489583), np.float64(0.08350327637441009), np.float64(0.025827888960580133), np.float64(0.07232321748168018), np.float64(0.0173413788542151), np.float64(0.052086

In [None]:
# sanity check# Perform sanity checks
if len(features) > 0 and len(first_sequence) == 2:
    print("The structure of the features file looks correct.")
else:
    print("The structure of the features file seems incorrect. Please check.")

The structure of the features file looks correct.


# ***Why This Output Is Good?***


The structure of the output matches the expected format:
Sequence Data: A list of feature vectors for each frame.
Label: Indicates whether the sequence is a fall (1) or not (0).
The number of features is consistent across frames (72 features per frame).
The extracted features are meaningful and useful for training a machine learning model.

# **Creating Temporal Sequences**

Since the sequences already contain individual frame-level features, we need to group them into temporal sequences. In the context of fall detection, a temporal sequence would mean a fixed-length window of features that represent the keypoints of a person over time. We’ll use a sliding window approach to create these sequences.

For example, let’s use a window size of 5 frames. Each window will contain 5 consecutive frames of features, and each window will be labeled according to whether the person is performing a fall (1) or an Activity of Daily Life (ADL) (0).

In [85]:
def create_temporal_sequences(features, window_size=5):
    """
    Create temporal sequences from a list of frame-level features.
    Args:
        features (list): List of feature vectors for each sequence.
        window_size (int): Number of frames in each temporal window.
    Returns:
        list: Temporal sequences, each containing features of a window.
    """
    temporal_sequences = []
    labels = []

    for sequence_data, label in features:
        for i in range(len(sequence_data) - window_size + 1):
            window = sequence_data[i:i + window_size]  # A window of 'window_size' frames
            temporal_sequences.append(window)
            labels.append(label)

    return np.array(temporal_sequences), np.array(labels)

In [86]:
# create temporal sequences
temporal_sequences, labels = create_temporal_sequences(features, window_size=5)

In [87]:
# inspiect/verify
print("Sample temporal sequence [0]:")
print(temporal_sequences[0])
print("Label:", labels[0])

Sample temporal sequence [0]:
[[ 1.65465346e+02  1.77656870e+02  1.71178258e+02  1.75472913e+02
   6.62888616e-01  3.89274918e-01  1.12911503e-01  1.02999425e-01
   1.00929312e-01  9.42643815e-02  9.36232859e-02  8.57912472e-02
   7.48366948e-02  6.66442444e-02  2.69743121e-02  1.01712948e-01
   9.32995085e-02  1.77763174e-02  1.57775616e-02  2.48455975e-02
   1.18924638e-01  4.57023499e-02  8.73734744e-02  8.35032764e-02
   2.58278890e-02  7.23232175e-02  1.73413789e-02  5.20864075e-02
   5.05928164e-02  1.29904975e-02  3.83343011e-02  1.04875448e-01
   3.13046344e-02  1.99386791e-01  2.31284250e-01  0.00000000e+00
   0.00000000e+00  2.24627876e-01  1.90982267e-02  0.00000000e+00
   0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00
   0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00
   0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00
   0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00
   0.00000000e+00  0.00000000e+00  0.00000000e

This looks perfect!!

# Building a Classifier

Neural Network Architecture:

We use two Dense layers with ReLU activation followed by a sigmoid output layer for binary classification (fall vs. ADL).

In [88]:
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report

In [89]:
# Reshape the input for the neural network (number of samples, number of frames, number of features)
X = temporal_sequences.reshape(temporal_sequences.shape[0], -1)  # flatten

In [90]:
# split into train/test split
X_train, X_test, y_train, y_test = train_test_split(X, labels, test_size=0.2, random_state=42)

# normalize the features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

Build the network

In [91]:
model = tf.keras.Sequential([
    tf.keras.layers.InputLayer(input_shape=(X_train.shape[1],)),  # Flattened feature vector
    tf.keras.layers.Dense(128, activation='relu'),  # First dense layer
    tf.keras.layers.Dense(64, activation='relu'),   # Second dense layer
    tf.keras.layers.Dense(1, activation='sigmoid')  # Output layer (binary classification)
])

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])



In [92]:
# Train the model
history = model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_test, y_test))

Epoch 1/10
[1m165/165[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 8ms/step - accuracy: 0.7626 - loss: 0.5337 - val_accuracy: 0.8621 - val_loss: 0.3557
Epoch 2/10
[1m165/165[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 6ms/step - accuracy: 0.8945 - loss: 0.2951 - val_accuracy: 0.8962 - val_loss: 0.3286
Epoch 3/10
[1m165/165[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 6ms/step - accuracy: 0.9276 - loss: 0.2297 - val_accuracy: 0.9023 - val_loss: 0.2725
Epoch 4/10
[1m165/165[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 6ms/step - accuracy: 0.9451 - loss: 0.1565 - val_accuracy: 0.9136 - val_loss: 0.2683
Epoch 5/10
[1m165/165[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - accuracy: 0.9549 - loss: 0.1267 - val_accuracy: 0.9242 - val_loss: 0.2426
Epoch 6/10
[1m165/165[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - accuracy: 0.9688 - loss: 0.0965 - val_accuracy: 0.9242 - val_loss: 0.2284
Epoch 7/10
[1m165/165[0m 

# **Evaluate**

In [93]:
y_pred = (model.predict(X_test) > 0.5).astype("int32")  # classifying threshold

# Print classification report
print("Classification Report:")
print(classification_report(y_test, y_pred))

[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 10ms/step
Classification Report:
              precision    recall  f1-score   support

           0       0.96      0.94      0.95       847
           1       0.89      0.93      0.91       473

    accuracy                           0.93      1320
   macro avg       0.93      0.93      0.93      1320
weighted avg       0.93      0.93      0.93      1320



# **Results:**

The classification model achieved a high overall accuracy of 93% on the fall detection task. It demonstrated strong performance on both classes, with a precision of 96% for non-fall (class 0) and 89% for fall (class 1). The model also maintained a balanced recall of 94% and 93%, respectively, indicating effective detection of fall events while minimizing false alarms. The macro and weighted averages further confirm consistent performance across both classes.

----------------------- end