In [36]:
import mediapipe as mp
import cv2
import numpy as np
import pandas as pd
import datetime

import pickle

import warnings
warnings.filterwarnings('ignore')

# Drawing helpers
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

In [37]:
# Determine important landmarks for plank
IMPORTANT_LMS = [
    "NOSE",
    "LEFT_SHOULDER",
    "RIGHT_SHOULDER",
    "RIGHT_ELBOW",
    "LEFT_ELBOW",
    "RIGHT_WRIST",
    "LEFT_WRIST",
    "LEFT_HIP",
    "RIGHT_HIP",
]

# Generate all columns of the data frame

HEADERS = ["label"] # Label column

for lm in IMPORTANT_LMS:
    HEADERS += [f"{lm.lower()}_x", f"{lm.lower()}_y", f"{lm.lower()}_z", f"{lm.lower()}_v"]

In [38]:
def rescale_frame(frame, percent=50):
    '''
    Rescale a frame from OpenCV to a certain percentage compare to its original frame
    '''
    width = int(frame.shape[1] * percent/ 100)
    height = int(frame.shape[0] * percent/ 100)
    dim = (width, height)
    return cv2.resize(frame, dim, interpolation =cv2.INTER_AREA)


def save_frame_as_image(frame, message: str = None):
    '''
    Save a frame as image to display the error
    '''
    now = datetime.datetime.now()

    if message:
        cv2.putText(frame, message, (50, 150), cv2.FONT_HERSHEY_COMPLEX, 0.4, (0, 0, 0), 1, cv2.LINE_AA)
        
    print("Saving ...")
    cv2.imwrite(f"../data/logs/bicep_{now}.jpg", frame)


def calculate_angle(point1: list, point2: list, point3: list) -> float:
    '''
    Calculate the angle between 3 points
    Unit of the angle will be in Degree
    '''
    point1 = np.array(point1)
    point2 = np.array(point2)
    point3 = np.array(point3)

    # Calculate algo
    angleInRad = np.arctan2(point3[1] - point2[1], point3[0] - point2[0]) - np.arctan2(point1[1] - point2[1], point1[0] - point2[0])
    angleInDeg = np.abs(angleInRad * 180.0 / np.pi)

    angleInDeg = angleInDeg if angleInDeg <= 180 else 360 - angleInDeg
    return angleInDeg


def extract_important_keypoints(results, important_landmarks: list) -> list:
    '''
    Extract important keypoints from mediapipe pose detection
    '''
    landmarks = results.pose_landmarks.landmark

    data = []
    for lm in important_landmarks:
        keypoint = landmarks[mp_pose.PoseLandmark[lm].value]
        data.append([keypoint.x, keypoint.y, keypoint.z, keypoint.visibility])
    
    return np.array(data).flatten().tolist()

In [39]:
class BicepPoseAnalysis:
    def __init__(self, side: str, stage_down_threshold: float, stage_up_threshold: float, peak_contraction_threshold: float, loose_upper_arm_angle_threshold: float, visibility_threshold: float):
        # Initialize thresholds
        self.stage_down_threshold = stage_down_threshold
        self.stage_up_threshold = stage_up_threshold
        self.peak_contraction_threshold = peak_contraction_threshold
        self.loose_upper_arm_angle_threshold = loose_upper_arm_angle_threshold
        self.visibility_threshold = visibility_threshold

        self.side = side
        self.counter = 0
        self.stage = "down"
        self.is_visible = True
        self.detected_errors = {
            "LOOSE_UPPER_ARM": 0,
            "PEAK_CONTRACTION": 0,
        }

        # Params for loose upper arm error detection
        self.loose_upper_arm = False

        # Params for peak contraction error detection
        self.peak_contraction_angle = 1000
        self.peak_contraction_frame = None
        
    def get_joints(self, landmarks) -> bool:
        '''
        Check for joints' visibility then get joints coordinate
        '''
        side = self.side.upper()

        # Check visibility
        joints_visibility = [ landmarks[mp_pose.PoseLandmark[f"{side}_SHOULDER"].value].visibility, landmarks[mp_pose.PoseLandmark[f"{side}_ELBOW"].value].visibility, landmarks[mp_pose.PoseLandmark[f"{side}_WRIST"].value].visibility ]

        is_visible = all([ vis > self.visibility_threshold for vis in joints_visibility ])
        self.is_visible = is_visible

        if not is_visible:
            return self.is_visible
        
        # Get joints' coordinates
        self.shoulder = [ landmarks[mp_pose.PoseLandmark[f"{side}_SHOULDER"].value].x, landmarks[mp_pose.PoseLandmark[f"{side}_SHOULDER"].value].y ]
        self.elbow = [ landmarks[mp_pose.PoseLandmark[f"{side}_ELBOW"].value].x, landmarks[mp_pose.PoseLandmark[f"{side}_ELBOW"].value].y ]
        self.wrist = [ landmarks[mp_pose.PoseLandmark[f"{side}_WRIST"].value].x, landmarks[mp_pose.PoseLandmark[f"{side}_WRIST"].value].y ]

        return self.is_visible
    
    def analyze_pose(self, landmarks, frame):
        '''
        - Bicep Counter
        - Errors Detection
        '''
        self.get_joints(landmarks)

        # Cancel calculation if visibility is poor
        if not self.is_visible:
            return (None, None)

        # * Calculate curl angle for counter
        bicep_curl_angle = int(calculate_angle(self.shoulder, self.elbow, self.wrist))
        if bicep_curl_angle > self.stage_down_threshold:
            self.stage = "down"
        elif bicep_curl_angle < self.stage_up_threshold and self.stage == "down":
            self.stage = "up"
            self.counter += 1
        
        # * Calculate the angle between the upper arm (shoulder & joint) and the Y axis
        shoulder_projection = [ self.shoulder[0], 1 ] # Represent the projection of the shoulder to the X axis
        ground_upper_arm_angle = int(calculate_angle(self.elbow, self.shoulder, shoulder_projection))

        # * Evaluation for LOOSE UPPER ARM error
        if ground_upper_arm_angle > self.loose_upper_arm_angle_threshold:
            # Limit the saved frame
            if not self.loose_upper_arm:
                self.loose_upper_arm = True
                # save_frame_as_image(frame, f"Loose upper arm: {ground_upper_arm_angle}")
                self.detected_errors["LOOSE_UPPER_ARM"] += 1
        else:
            self.loose_upper_arm = False
        
        # * Evaluate PEAK CONTRACTION error
        if self.stage == "up" and bicep_curl_angle < self.peak_contraction_angle:
            # Save peaked contraction every rep
            self.peak_contraction_angle = bicep_curl_angle
            self.peak_contraction_frame = frame
            
        elif self.stage == "down":
            # * Evaluate if the peak is higher than the threshold if True, marked as an error then saved that frame
            if self.peak_contraction_angle != 1000 and self.peak_contraction_angle >= self.peak_contraction_threshold:
                # save_frame_as_image(self.peak_contraction_frame, f"{self.side} - Peak Contraction: {self.peak_contraction_angle}")
                self.detected_errors["PEAK_CONTRACTION"] += 1
            
            # Reset params
            self.peak_contraction_angle = 1000
            self.peak_contraction_frame = None
        
        return (bicep_curl_angle, ground_upper_arm_angle)
    
    

In [40]:
VIDEO_DEMO_PATH = "C:/Users/Rocky/Desktop/pdl_minip/myenv/bc_demo.mp4"

In [41]:
# Load input scaler
with open("C:/Users/Rocky/Desktop/pdl_minip/myenv/scaler.pkl", "rb") as f:
    input_scaler = pickle.load(f)

3.2. Detection with Deep Learning model

In [42]:
# Load model
with open("C:/Users/Rocky/Desktop/pdl_minip/myenv/bicep_dp.pkl", "rb") as f:
    DL_model = pickle.load(f)

In [8]:
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import joblib  # Import for loading the scaler

# Initialize Mediapipe
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

# Define expected columns for input
EXPECTED_COLUMNS = [
    'NOSE_x', 'NOSE_y', 'NOSE_z',
    'LEFT_SHOULDER_x', 'LEFT_SHOULDER_y', 'LEFT_SHOULDER_z',
    'RIGHT_SHOULDER_x', 'RIGHT_SHOULDER_y', 'RIGHT_SHOULDER_z',
    'RIGHT_ELBOW_x', 'RIGHT_ELBOW_y', 'RIGHT_ELBOW_z',
    'LEFT_ELBOW_x', 'LEFT_ELBOW_y', 'LEFT_ELBOW_z',
    'RIGHT_WRIST_x', 'RIGHT_WRIST_y', 'RIGHT_WRIST_z',
    'LEFT_WRIST_x', 'LEFT_WRIST_y', 'LEFT_WRIST_z',
    'LEFT_HIP_x', 'LEFT_HIP_y', 'LEFT_HIP_z',
    'RIGHT_HIP_x', 'RIGHT_HIP_y', 'RIGHT_HIP_z',
    'label'
]

def extract_important_keypoints(results):
    """ Extracts only the required pose landmarks from Mediapipe. """
    landmarks = results.pose_landmarks.landmark
    keypoints = {col: 0 for col in EXPECTED_COLUMNS[:-1]}  # Default to 0 if keypoint is missing
    
    keypoint_map = {
        'NOSE': mp_pose.PoseLandmark.NOSE,
        'LEFT_SHOULDER': mp_pose.PoseLandmark.LEFT_SHOULDER,
        'RIGHT_SHOULDER': mp_pose.PoseLandmark.RIGHT_SHOULDER,
        'RIGHT_ELBOW': mp_pose.PoseLandmark.RIGHT_ELBOW,
        'LEFT_ELBOW': mp_pose.PoseLandmark.LEFT_ELBOW,
        'RIGHT_WRIST': mp_pose.PoseLandmark.RIGHT_WRIST,
        'LEFT_WRIST': mp_pose.PoseLandmark.LEFT_WRIST,
        'LEFT_HIP': mp_pose.PoseLandmark.LEFT_HIP,
        'RIGHT_HIP': mp_pose.PoseLandmark.RIGHT_HIP,
    }
    
    for key, landmark_enum in keypoint_map.items():
        if landmarks[landmark_enum].visibility > 0.65:  # Apply visibility threshold
            keypoints[f"{key}_x"] = landmarks[landmark_enum].x
            keypoints[f"{key}_y"] = landmarks[landmark_enum].y
            keypoints[f"{key}_z"] = landmarks[landmark_enum].z

    return [keypoints[col] for col in EXPECTED_COLUMNS[:-1]]

# Load trained model and scaler
bicep_dp = joblib.load("C:/Users/Rocky/Desktop/pdl_minip/myenv/bicep_dp.pkl")  # Load trained deep learning model
input_scaler = joblib.load("C:/Users/Rocky/Desktop/pdl_minip/myenv/scaler.pkl")  # Load pre-fitted scaler

# Open webcam (0 for default camera)
cap = cv2.VideoCapture(0)

# Set resolution for webcam
cap.set(3, 640)  # Width
cap.set(4, 480)  # Height

with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, image = cap.read()
        if not ret:
            break

        # Convert image to RGB
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Process with Mediapipe
        results = pose.process(image_rgb)
        if not results.pose_landmarks:
            print("No human detected")
            cv2.putText(image, "No human detected", (10, 30),
                        cv2.FONT_HERSHEY_COMPLEX, 0.6, (0, 0, 255), 2)
            cv2.imshow("Workout Analysis", image)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            continue

        try:
            # Extract keypoints
            row = extract_important_keypoints(results)

            # Convert to DataFrame
            X = pd.DataFrame([row], columns=EXPECTED_COLUMNS[:-1])

            # Scale input using loaded scaler
            X_scaled = pd.DataFrame(input_scaler.transform(X), columns=EXPECTED_COLUMNS[:-1])

            # Predict class
            prediction = bicep_dp.predict(X_scaled)
            predicted_class = np.argmax(prediction, axis=1)[0]

            # Display classification on screen
            if predicted_class == 1:  # Assuming 1 = Correct and 0 = Incorrect
                feedback_text = "Correct"
                color = (0, 255, 0)  # Green for correct
            else:
                feedback_text = "Wrong"
                color = (0, 0, 255)  # Red for wrong

            cv2.putText(image, feedback_text, (10, 50), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)

            # Draw pose landmarks
            mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

        except Exception as e:
            print(f"Error: {e}")

        # Show the image
        cv2.imshow("Workout Analysis", image)

        # Press 'q' to exit
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 640ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 47ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3