In [1]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp

from tensorflow.keras.models import load_model

In [2]:
# Path for exported data, numpy arrays
EXPORT_PATH = os.path.join('MP_Data') 

# Actions that we try to detect
ACTIONS = np.array(['stand', 'squat', 'sit', 'run'])
NUM_VIDEOS = 30
FRAMES_PER_VIDEO = 30

In [3]:
# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)

In [4]:
# Function to process image and detect pose landmarks using MediaPipe
def mediapipe_detection(image, model):
    # Convert image to RGB format
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    # Make the image unwriteable for processing
    image.flags.writeable = False
    # Process the image using the MediaPipe Pose model
    results = model.process(image)
    # Make the image writeable again
    image.flags.writeable = True
    # Convert the image back to BGR format
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

In [5]:
# Function to extract pose keypoints from MediaPipe results
def extract_keypoints(results):
    # Check if pose landmarks are detected
    if not results.pose_landmarks:
        # Return zeros for all keypoints if no landmarks detected
        return np.zeros(33 * 4)
    # Extract x, y, z, visibility for each landmark and flatten the array
    keypoints = np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in results.pose_landmarks.landmark])
    flattened_keypoints = keypoints.flatten()
    # Ensure a fixed length of keypoints array
    if flattened_keypoints.shape[0] != 33 * 4:
        flattened_keypoints = np.zeros(33 * 4)
    return flattened_keypoints

In [6]:
# Load the saved model
model = load_model('posture_recognition_model.keras')

In [7]:
colors = [(245,117,16), (117,245,16), (16,117,245)]
# Function to visualize probabilities and actions on the frame
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    num_colors = len(colors)
    
    for num, prob in enumerate(res):
        # Determine color index based on number of actions and colors
        color_index = num % num_colors
        # Draw rectangle with probability bar and action label
        cv2.rectangle(output_frame, (0, 120 + num * 40), (int(prob * 100), 150 + num * 40), colors[color_index], -1)
        cv2.putText(output_frame, f'{actions[num]}: {prob:.2f}', (10, 145 + num * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
    
    return output_frame

In [12]:
# Initialize variables for tracking sequences, predictions, and activity durations
sequence = []
predictions = []
activity_timers = {'stand': 0, 'run': 0, 'squat': 0, 'sit': 0, 'Others': 0}
# not_in_position_timer = 0;
is_first_time = False
threshold = 0.8

# Define the indices of the key points you want to check for visibility
keypoint_indices = list(range(0, 32, 1))
min_visibility_score = 0.7


# Start processing frames and tracking activities
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    # Start capturing video from the camera
    cap = cv2.VideoCapture(0)
    # Set the duration for tracking (1 minute = 60 seconds)
    tracking_duration = 60 

    # Main loop for processing frames
    while cap.isOpened():
        # Read feed
        ret, frame = cap.read()
        if not ret:
            print("Error reading frame")
            break
    
        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        
        # Inside the main loop for processing frames
        if results.pose_landmarks and all(results.pose_landmarks.landmark[i].visibility > min_visibility_score for i in keypoint_indices):
        # if results.pose_landmarks:
            mp.solutions.drawing_utils.draw_landmarks(
                frame, results.pose_landmarks, mp.solutions.pose.POSE_CONNECTIONS)

            # Prediction logic
            keypoints = extract_keypoints(results)
            sequence.append(keypoints)
            sequence = sequence[-30:]
            
            if len(sequence) == 30:
                if(is_first_time == False):
                    is_first_time = True
                    start_time = time.time()

                res = model.predict(np.expand_dims(sequence, axis=0))[0]
                predictions.append(np.argmax(res))
                
                if np.unique(predictions[-10:])[0] == np.argmax(res):
                    if res[np.argmax(res)] > threshold:
                        activity = ACTIONS[np.argmax(res)]
                        activity_timers[activity] += 1
                    else:
                       # Logic for labeling "Others" if below threshold
                        activity_timers["Others"] += 1
                else:
                    activity_timers["Others"] += 1  # Label as "Others" if predictions are inconsistent
                        
                # Visualize probabilities and actions on the frame
                frame = prob_viz(res, ACTIONS, frame, colors)
        else:
            # not_in_position_timer += 1 if is_first_time else 0
            cv2.putText(frame, 'Please adjust your position to be', (70, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (150, 150, 255), 4, cv2.LINE_AA)
            cv2.putText(frame, 'to be fully visible in the frame', (90, 230), cv2.FONT_HERSHEY_SIMPLEX, 1, (150, 150, 255), 4, cv2.LINE_AA)

        # Update activity timers based on predictions
        current_time = time.time()
        elapsed_time = current_time - start_time if is_first_time else 0
        if elapsed_time >= tracking_duration:
            # Stop processing frames after tracking duration ends
            break
        
        # Update and display the time
        current_time = time.time()
        elapsed_time = current_time - start_time if is_first_time else 0
        time_text = f"Time: {elapsed_time:.2f} sec"
        cv2.putText(frame, time_text, (10, frame.shape[0] - 30),
        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1, cv2.LINE_AA)
    
        # Show activity durations in real-time
        activity_text = ', '.join([f"{action}: {duration/8:.2f} sec" for action, duration in activity_timers.items()])
        activity_lines = activity_text.split(', ')
        y_pos = 30
        for line in activity_lines:
            cv2.putText(frame, line, (10, y_pos), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1, cv2.LINE_AA)
            y_pos += 20  # Increase y-position for next line 
            
        # Show frame to screen
        cv2.imshow('Activity Tracking', frame)
    
        # Break gracefully
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    # Release the camera and close all windows
    cap.release()
    cv2.destroyAllWindows()

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32

In [11]:
# Display activity durations after 1 minute
for activity, duration in activity_timers.items():
    print(f"{activity.capitalize()}: {duration/8:.2f} seconds")

Stand: 15.50 seconds
Run: 15.38 seconds
Squat: 13.38 seconds
Sit: 0.00 seconds
Others: 2.25 seconds


In [17]:
# Camera Testing
cap = cv2.VideoCapture(0)  # 0 for default camera, you can change this if needed

with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print("Error reading frame")
            break
    
        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)
            
        # If landmarks are detected, draw them on the frame
        if results.pose_landmarks:
            mp.solutions.drawing_utils.draw_landmarks(
                frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
    
        # Display the frame
        cv2.imshow('Movement Posture Judgement', frame)
    
        # Press 'q' to exit
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    # Release the camera and close all windows
    cap.release()
    cv2.destroyAllWindows()

<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.soluti

In [18]:
# Check the number of landmarks
num_landmarks = len(results.pose_landmarks.landmark)
print(f"Number of landmarks detected: {num_landmarks}")

Number of landmarks detected: 33


In [19]:
    # If landmarks are detected, extract keypoint values
    if results.pose_landmarks:
        keypoint_values = []
        for landmark in results.pose_landmarks.landmark:
            keypoint_values.append([landmark.x, landmark.y, landmark.z, landmark.visibility])
        
        # Print keypoint values (optional)
        print("Keypoint Values:")
        for i, keypoint in enumerate(keypoint_values):
            print(f"Keypoint {i+1}: {keypoint}")

Keypoint Values:
Keypoint 1: [0.7456334829330444, 0.6599693298339844, -2.2478208541870117, 0.9996736645698547]
Keypoint 2: [0.788932740688324, 0.565924882888794, -2.2497165203094482, 0.9992858171463013]
Keypoint 3: [0.8137670755386353, 0.5601624250411987, -2.2489633560180664, 0.9990801811218262]
Keypoint 4: [0.8362611532211304, 0.5537697076797485, -2.2496798038482666, 0.9988441467285156]
Keypoint 5: [0.7104251980781555, 0.5603903532028198, -2.274379253387451, 0.9996227025985718]
Keypoint 6: [0.6795049905776978, 0.5514631271362305, -2.274228096008301, 0.9996753931045532]
Keypoint 7: [0.6486529111862183, 0.5417776703834534, -2.274481773376465, 0.9997235536575317]
Keypoint 8: [0.848926842212677, 0.5226274728775024, -1.7987562417984009, 0.9990894794464111]
Keypoint 9: [0.5763293504714966, 0.5127745270729065, -1.9069585800170898, 0.9999147057533264]
Keypoint 10: [0.7599086761474609, 0.7376497983932495, -2.03947114944458, 0.9997648000717163]
Keypoint 11: [0.6828498840332031, 0.73699432611465

In [23]:
result_test = extract_keypoints(results)
result_test

0.6599693298339844

In [11]:
len(result_test)

132

In [6]:
for action in ACTIONS: 
    for sequence in range(NUM_VIDEOS):
        try: 
            os.makedirs(os.path.join(EXPORT_PATH, action, str(sequence)))
        except:
            pass

In [7]:
# Collect Keypoint Values for Training and Testing
cap = cv2.VideoCapture(0)

with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    for action in ACTIONS:
        for video_num in range(NUM_VIDEOS):
            for frame_num in range(FRAMES_PER_VIDEO):
                ret, frame = cap.read()
                if not ret:
                    print("Error reading frame")
                    break
    
                # Make detections
                image, results = mediapipe_detection(frame, holistic)
    
                # If landmarks are detected, draw them on the frame
                if results.pose_landmarks:
                    mp.solutions.drawing_utils.draw_landmarks(
                        frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
    
                # Display the frame with information
                if frame_num == 0:
                    cv2.putText(frame, 'STARTING COLLECTION', (120, 200),
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 4, cv2.LINE_AA)
                    cv2.putText(frame, 'Collecting frames for {} Video Number {}'.format(action, video_num), (15, 12),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', frame)
                    cv2.waitKey(3000)
                else:
                    cv2.putText(frame, 'Collecting frames for {} Video Number {}'.format(action, video_num), (15, 12),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', frame)
                    
                # Extract keypoints and add to data
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(EXPORT_PATH, action, str(video_num), str(frame_num))
                np.save(npy_path, keypoints)
    
                # Press 'q' to stop recording
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        
            print(f"Saved {action} video {video_num}")
    
    cv2.destroyAllWindows()
    cap.release()

Saved stand video 0
Saved stand video 1
Saved stand video 2
Saved stand video 3
Saved stand video 4
Saved stand video 5
Saved stand video 6
Saved stand video 7
Saved stand video 8
Saved stand video 9
Saved stand video 10
Saved stand video 11
Saved stand video 12
Saved stand video 13
Saved stand video 14
Saved stand video 15
Saved stand video 16
Saved stand video 17
Saved stand video 18
Saved stand video 19
Saved stand video 20
Saved stand video 21
Saved stand video 22
Saved stand video 23
Saved stand video 24
Saved stand video 25
Saved stand video 26
Saved stand video 27
Saved stand video 28
Saved stand video 29
Saved squat video 0
Saved squat video 1
Saved squat video 2
Saved squat video 3
Saved squat video 4
Saved squat video 5
Saved squat video 6
Saved squat video 7
Saved squat video 8
Saved squat video 9
Saved squat video 10
Saved squat video 11
Saved squat video 12
Saved squat video 13
Saved squat video 14
Saved squat video 15
Saved squat video 16
Saved squat video 17
Saved squat 

In [9]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [10]:
label_map = {label:num for num, label in enumerate(ACTIONS)}
sequences, labels = [], []
for action in ACTIONS:
    for sequence in range(NUM_VIDEOS):
        window = []
        for frame_num in range(FRAMES_PER_VIDEO):
            res = np.load(os.path.join(EXPORT_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

In [11]:
X = np.array(sequences)
y = to_categorical(labels).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [12]:
# np.array(sequences).shape
# X.shape
# X_train.shape
# y_test.shape
ACTIONS.shape[0]

4

In [13]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import TensorBoard

In [14]:
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

In [20]:
# Define LSTM model
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,132)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(ACTIONS.shape[0], activation='softmax'))

# ACTIONS[np.argmax(res)]

# Compile the model
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
# Train the model
model.fit(X_train, y_train, epochs=1000, callbacks=[tb_callback])

# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print("Test Loss:", test_loss)
print("Test Accuracy:", test_accuracy)

Epoch 1/1000
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 31ms/step - categorical_accuracy: 0.2669 - loss: 1.3842
Epoch 2/1000
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step - categorical_accuracy: 0.6224 - loss: 1.3196
Epoch 3/1000
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step - categorical_accuracy: 0.4219 - loss: 1.0679
Epoch 4/1000
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step - categorical_accuracy: 0.6185 - loss: 0.9721
Epoch 5/1000
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step - categorical_accuracy: 0.5964 - loss: 0.7879
Epoch 6/1000
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step - categorical_accuracy: 0.6510 - loss: 0.5782
Epoch 7/1000
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step - categorical_accuracy: 0.7253 - loss: 0.5701
Epoch 8/1000
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/ste

In [21]:
model.summary()

In [25]:
# Save the trained model in native Keras format
model.save('posture_recognition_model.keras')

In [36]:
results.pose_landmarks

landmark {
  x: 0.43521938
  y: 0.07314029
  z: -0.5101496
  visibility: 0.9996856
}
landmark {
  x: 0.44927162
  y: 0.04794037
  z: -0.49521646
  visibility: 0.99933124
}
landmark {
  x: 0.45647874
  y: 0.04822528
  z: -0.49511948
  visibility: 0.9993592
}
landmark {
  x: 0.46275038
  y: 0.048444238
  z: -0.49513945
  visibility: 0.9992294
}
landmark {
  x: 0.42723665
  y: 0.045145776
  z: -0.49544796
  visibility: 0.99948394
}
landmark {
  x: 0.41971678
  y: 0.04394225
  z: -0.49550533
  visibility: 0.99956095
}
landmark {
  x: 0.41244507
  y: 0.04290081
  z: -0.49555263
  visibility: 0.9995564
}
landmark {
  x: 0.46801862
  y: 0.051597122
  z: -0.35159343
  visibility: 0.99905205
}
landmark {
  x: 0.40348706
  y: 0.047095038
  z: -0.36420593
  visibility: 0.99958295
}
landmark {
  x: 0.4452041
  y: 0.10023974
  z: -0.4551972
  visibility: 0.99972284
}
landmark {
  x: 0.41967925
  y: 0.096930094
  z: -0.45687485
  visibility: 0.999806
}
landmark {
  x: 0.5173149
  y: 0.19976659
  z: 