In [None]:
IMPORTING LIBRARIES

In [8]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow import keras
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
import mediapipe as mp

In [None]:
LOADING MODEL

In [5]:
model = joblib.load("/Users/Adeena/Downloads/arm_raise_model.pkl")
print("Model loaded successfully")

Model loaded successfully


In [10]:
# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()
mp_drawing = mp.solutions.drawing_utils

# Start webcam
cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Flip for mirror effect and convert to RGB
    frame = cv2.flip(frame, 1)
    img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(img_rgb)

    if results.pose_landmarks:
        # Extract pose landmarks: 33 keypoints × x, y, z = 99 features
        landmarks = []
        for lm in results.pose_landmarks.landmark:
            landmarks.extend([lm.x, lm.y, lm.z])

        # Predict posture (no scaling)
        prediction = model.predict([landmarks])[0]
        label = "Correct" if prediction == 1 else "Incorrect"
        color = (0, 255, 0) if label == "Correct" else (0, 0, 255)

        # Draw label
        cv2.putText(frame, f"Posture: {label}", (20, 50),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.2, color, 3)

        # Draw skeleton
        mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

    # Display window
    cv2.imshow("PostureGuardian - Live Posture Checker", frame)
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()
cv2.waitKey(1)

I0000 00:00:1750803058.646378 1347191 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.4), renderer: Apple M4
W0000 00:00:1750803058.707918 1368130 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1750803058.719893 1368130 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


-1

In [2]:
import cv2
import mediapipe as mp
import numpy as np
import joblib
from collections import deque
import time

# Load the trained model, scaler, and label encoder
try:
    model = joblib.load('/Users/Adeena/Downloads/posture_classifier.pkl')
    scaler = joblib.load('/Users/Adeena/Downloads/posture_scaler.pkl')
    label_encoder = joblib.load('/Users/Adeena/Downloads/label_encoder.pkl')
    print("Model loaded successfully!")
except FileNotFoundError as e:
    print(f"Error loading model files: {e}")
    print("Make sure you have trained and saved the model first.")
    exit()

# Initialize MediaPipe Pose with same parameters as training
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(
    static_image_mode=False,
    model_complexity=1,
    enable_segmentation=False,
    min_detection_confidence=0.7,  # Same as training
    min_tracking_confidence=0.5
)
mp_drawing = mp.solutions.drawing_utils

def calculate_angle(a, b, c):
    """Calculate angle between three points (same as training)"""
    a, b, c = np.array(a), np.array(b), np.array(c)
    
    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    
    if angle > 180.0:
        angle = 360 - angle
        
    return angle

def calculate_elevation_angle(reference_point, target_point):
    """Calculate elevation angle relative to reference point (same as training)"""
    ref, target = np.array(reference_point), np.array(target_point)
    
    dy = target[1] - ref[1]
    dx = target[0] - ref[0]
    
    angle = np.arctan2(dy, dx) * 180.0 / np.pi
    return angle

def calculate_pose_features(landmarks_3d):
    """
    Calculate the SAME features as during training
    This is CRITICAL - must match training exactly
    """
    # Convert to numpy array
    points = np.array(landmarks_3d).reshape(-1, 3)
    
    features = []
    
    # Add raw normalized coordinates (first 99 features)
    features.extend(landmarks_3d)
    
    # Calculate the SAME enhanced features as training
    try:
        # Key landmarks for arm raise
        left_shoulder = points[11]   # LEFT_SHOULDER
        left_elbow = points[13]      # LEFT_ELBOW  
        left_wrist = points[15]      # LEFT_WRIST
        right_shoulder = points[12]  # RIGHT_SHOULDER
        right_elbow = points[14]     # RIGHT_ELBOW
        right_wrist = points[16]     # RIGHT_WRIST
        left_hip = points[23]        # LEFT_HIP
        right_hip = points[24]       # RIGHT_HIP
        
        # Calculate angles (same as training)
        left_arm_angle = calculate_angle(left_shoulder, left_elbow, left_wrist)
        right_arm_angle = calculate_angle(right_shoulder, right_elbow, right_wrist)
        
        # Shoulder elevation angles
        hip_midpoint = (left_hip + right_hip) / 2
        left_shoulder_elevation = calculate_elevation_angle(hip_midpoint, left_shoulder)
        right_shoulder_elevation = calculate_elevation_angle(hip_midpoint, right_shoulder)
        
        # Additional features (same as training)
        features.extend([
            left_arm_angle, 
            right_arm_angle,
            left_shoulder_elevation, 
            right_shoulder_elevation,
            abs(left_arm_angle - right_arm_angle),  # Symmetry
            (left_shoulder_elevation + right_shoulder_elevation) / 2  # Average elevation
        ])
        
    except Exception as e:
        # If calculation fails, add zeros (same as training)
        features.extend([0, 0, 0, 0, 0, 0])
    
    return features

def is_pose_valid(landmarks):
    """Check if pose detection is reliable (same as training)"""
    key_landmarks = [11, 12, 13, 14, 15, 16, 23, 24]  # Critical points for arm raise
    
    for idx in key_landmarks:
        if idx < len(landmarks):
            lm = landmarks[idx]
            if lm.x < 0 or lm.x > 1 or lm.y < 0 or lm.y > 1:
                return False
        else:
            return False
    
    return True

# Initialize prediction smoothing
prediction_history = deque(maxlen=5)  # Keep last 5 predictions for smoothing
confidence_threshold = 0.6  # Minimum confidence for prediction

# Performance tracking
fps_counter = 0
start_time = time.time()

# Start webcam
cap = cv2.VideoCapture(0)

# Set camera properties for better performance
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)

print("Starting live posture detection...")
print("Press 'q' to quit, 'r' to reset prediction history")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        print("Failed to read from camera")
        break
    
    # Flip for mirror effect
    frame = cv2.flip(frame, 1)
    
    # Convert to RGB for MediaPipe
    img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    img_rgb.flags.writeable = False
    
    # Process with MediaPipe
    results = pose.process(img_rgb)
    
    # Make frame writable again
    img_rgb.flags.writeable = True
    
    if results.pose_landmarks:
        # Validate pose quality
        if is_pose_valid(results.pose_landmarks.landmark):
            # Extract landmarks the SAME way as training
            landmarks_3d = []
            for lm in results.pose_landmarks.landmark:
                landmarks_3d.extend([lm.x, lm.y, lm.z])
            
            # Calculate enhanced features (SAME as training)
            features = calculate_pose_features(landmarks_3d)
            
            # Convert to numpy array and reshape for prediction
            features_array = np.array(features).reshape(1, -1)
            
            # CRITICAL: Apply the SAME scaling as training
            features_scaled = scaler.transform(features_array)
            
            # Get prediction and probability
            prediction = model.predict(features_scaled)[0]
            prediction_proba = model.predict_proba(features_scaled)[0]
            
            # Get confidence (probability of predicted class)
            confidence = prediction_proba[prediction]
            
            # Convert prediction back to label
            label = label_encoder.inverse_transform([prediction])[0]
            
            # Add to history for smoothing
            prediction_history.append((prediction, confidence))

            # Smooth predictions (majority vote with confidence weighting)
            if len(prediction_history) >= 3:
                # Weight recent predictions more heavily - fix the weights array
                # Make sure weights array matches the possible length of prediction_history
                all_weights = np.array([0.1, 0.15, 0.2, 0.25, 0.3])  # 5 weights for maxlen=5
                weights = all_weights[-len(prediction_history):]  # Take the last N weights
    
                weighted_predictions = []
    
                for i, (pred, conf) in enumerate(prediction_history):
                    if conf > confidence_threshold:
                        # Now i will always be within bounds of weights array
                        weighted_predictions.extend([pred] * int(weights[i] * 10))
    
                if weighted_predictions:
                    # Majority vote
                    smoothed_prediction = max(set(weighted_predictions), key=weighted_predictions.count)
                    smoothed_label = label_encoder.inverse_transform([smoothed_prediction])[0]
                else:
                    smoothed_label = "Uncertain"
                    confidence = 0.5
            else:
                smoothed_label = label if confidence > confidence_threshold else "Uncertain"
            
            # Set colors based on prediction and confidence
            if smoothed_label == "Correct":
                color = (0, 255, 0)  # Green
                status_color = (0, 200, 0)
            elif smoothed_label == "Incorrect":
                color = (0, 0, 255)  # Red  
                status_color = (0, 0, 200)
            else:
                color = (0, 255, 255)  # Yellow for uncertain
                status_color = (0, 200, 200)
            
            # Draw enhanced information
            cv2.putText(frame, f"Posture: {smoothed_label}", (20, 50),
                       cv2.FONT_HERSHEY_SIMPLEX, 1.2, color, 3)
            
            cv2.putText(frame, f"Confidence: {confidence:.2f}", (20, 90),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.8, status_color, 2)
            
            # Show some key angles for debugging
            if len(features) > 99:  # If we have calculated features
                left_angle = features[99]   # left_arm_angle
                right_angle = features[100] # right_arm_angle
                cv2.putText(frame, f"L.Arm: {left_angle:.1f}°", (20, 130),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
                cv2.putText(frame, f"R.Arm: {right_angle:.1f}°", (20, 150),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
            
            # Draw skeleton
            mp_drawing.draw_landmarks(
                frame, 
                results.pose_landmarks, 
                mp_pose.POSE_CONNECTIONS,
                landmark_drawing_spec=mp.solutions.drawing_styles.get_default_pose_landmarks_style()
            )
            
        else:
            # Pose quality too low
            cv2.putText(frame, "Pose Detection: Poor Quality", (20, 50),
                       cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 255), 2)
    else:
        # No pose detected
        cv2.putText(frame, "No Pose Detected", (20, 50),
                   cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 255), 2)
    
    # Calculate and display FPS
    fps_counter += 1
    if fps_counter % 30 == 0:  # Update every 30 frames
        elapsed_time = time.time() - start_time
        fps = fps_counter / elapsed_time
        fps_counter = 0
        start_time = time.time()
    
    try:
        cv2.putText(frame, f"FPS: {fps:.1f}", (frame.shape[1] - 120, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
    except:
        pass
    
    # Instructions
    cv2.putText(frame, "Press 'q' to quit, 'r' to reset", (20, frame.shape[0] - 20),
               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
    
    # Display the frame
    cv2.imshow("PostureGuardian - Live Posture Checker", frame)
    
    # Handle key presses
    key = cv2.waitKey(1) & 0xFF
    if key == ord('q'):
        break
    elif key == ord('r'):
        prediction_history.clear()
        print("Prediction history reset")

# Cleanup
cap.release()
cv2.destroyAllWindows()
cv2.waitKey(1)
pose.close()
print("PostureGuardian stopped.")

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
I0000 00:00:1750806909.463081 1426964 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.4), renderer: Apple M4
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1750806909.507704 1428336 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1750806909.518434 1428336 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


Model loaded successfully!
Starting live posture detection...
Press 'q' to quit, 'r' to reset prediction history


W0000 00:00:1750806910.708193 1428341 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.


PostureGuardian stopped.
