In [4]:
import cv2
import csv
import mediapipe as mp
import numpy as np
import os
import json
class PoseDetector:
    """
    A class to detect pose landmarks from an image.
    """
    def __init__(self, static_image_mode=False, model_complexity=1, smooth_landmarks=True,
                 enable_segmentation=False, smooth_segmentation=True,
                 min_detection_confidence=0.5, min_tracking_confidence=0.5):
        self.static_image_mode = static_image_mode
        self.model_complexity = model_complexity
        self.smooth_landmarks = smooth_landmarks
        self.enable_segmentation = enable_segmentation
        self.smooth_segmentation = smooth_segmentation
        self.min_detection_confidence = min_detection_confidence
        self.min_tracking_confidence = min_tracking_confidence

        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(self.static_image_mode, self.model_complexity,
                                      self.smooth_landmarks, self.enable_segmentation,
                                      self.smooth_segmentation, self.min_detection_confidence,
                                      self.min_tracking_confidence)
        self.mp_draw = mp.solutions.drawing_utils

    def find_pose(self, img, draw=True):
        """
        Finds pose landmarks in an image.
        """
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        self.results = self.pose.process(img_rgb)
        if self.results.pose_landmarks and draw:
            self.mp_draw.draw_landmarks(img, self.results.pose_landmarks,
                                        self.mp_pose.POSE_CONNECTIONS)
        return img

    def get_landmarks(self, img):
        """
        Extracts pose landmarks from the image.
        """
        landmarks = []
        if self.results.pose_landmarks:
            for lm_id, lm in enumerate(self.results.pose_landmarks.landmark):
                h, w, c = img.shape
                cx, cy = int(lm.x * w), int(lm.y * h)
                landmarks.append([lm_id, cx, cy])
        return landmarks

def calculate_angle(a, b, c):
    """
    Calculates the angle between three points.
    """
    a = np.array(a)  # First
    b = np.array(b)  # Mid
    c = np.array(c)  # End

    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - \
              np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)

    if angle > 180.0:
        angle = 360 - angle

    return angle

def analyze_video(frames,video_id):
    """
    Analyzes a list of video frames for squats and push-ups.
    """
    detector = PoseDetector()
    squat_counter = 0
    pushup_counter = 0
    squat_stage = None
    pushup_stage = None
    squat_form_issues = []
    pushup_form_issues = []
    frame_data = []

    for idx, frame in enumerate(frames):
        frame = detector.find_pose(frame, draw=False)
        landmarks = detector.get_landmarks(frame)
        is_form_ok_this_frame = True
        if not landmarks:
            continue

        # Squat analysis
        left_hip = landmarks[23][1:]
        left_knee = landmarks[25][1:]
        left_ankle = landmarks[27][1:]
        knee_angle = calculate_angle(left_hip, left_knee, left_ankle)

        if knee_angle > 160:
            squat_stage = "up"
        if knee_angle < 90 and squat_stage == 'up':
            squat_stage = "down"
            squat_counter += 1
            # Form check
            if abs(left_knee[0] - left_ankle[0]) > 40: # Simplified check
                squat_form_issues.append("INSUFFICIENT_DEPTH")
                is_form_ok_this_frame = False

        # Push-up analysis
        left_shoulder = landmarks[11][1:]
        left_elbow = landmarks[13][1:]
        left_wrist = landmarks[15][1:]
        elbow_angle = calculate_angle(left_shoulder, left_elbow, left_wrist)

        if elbow_angle > 160:
            pushup_stage = "up"
        if elbow_angle < 90 and pushup_stage == 'up':
            pushup_stage = "down"
            pushup_counter += 1
            # Form check
            hip_shoulder_ankle_alignment = abs(left_hip[1] - left_shoulder[1])
            if hip_shoulder_ankle_alignment > 40: # Simplified check
                pushup_form_issues.append("BODY_LINE_BREAK")
                is_form_ok_this_frame = False
        # save frame data
        frame_data.append({
            "frame_index": idx,
            "exercise": "squat" if knee_angle < 160 else "pushup" if elbow_angle < 160 else "none",
            "rep_id": squat_counter if knee_angle < 160 else pushup_counter if elbow_angle < 160 else 0,
            "is_form_ok": is_form_ok_this_frame,
            "angles": {"knee": knee_angle, "elbow": elbow_angle}
        })

    # save the analysis summary
    analysis_summary = {
            "squats": {
                "total_reps": squat_counter,
                "good_form_reps": squat_counter - len(set(squat_form_issues)),
                "common_issues": list(set(squat_form_issues))
            },
            "pushups": {
                "total_reps": pushup_counter,
                "good_form_reps": pushup_counter - len(set(pushup_form_issues)),
                "common_issues": list(set(pushup_form_issues))
            }
        }
    
    summary_video_id = {"video_id": video_id,
        "summary": analysis_summary,}

    # Create output directory
    output_dir = " report/"
    os.makedirs(output_dir, exist_ok=True)

    # 1️⃣ Save frame-by-frame data to CSV
    csv_path = os.path.join(output_dir, "result.csv")
    with open(csv_path, mode='w', newline='') as csv_file:
        if frame_data:
            writer = csv.DictWriter(csv_file, fieldnames= frame_data[0].keys())
            writer.writeheader()
            writer.writerows(frame_data)

    # 2️⃣ Save summary block to JSON
    summary_path = os.path.join(output_dir, "summary.json")
    with open(summary_path, 'w') as json_file:
        json.dump(summary_video_id, json_file, indent=4)

    return {
        "video_id": video_id,
        "summary": analysis_summary,
        "frame_data": frame_data
    }



In [None]:
def main(video_path):
    """
    Main function to run the video analysis.

    Args:
        video_path (str): The path to the video file.
    """
    # Use OpenCV to capture the video
    cap = cv2.VideoCapture(video_path)

    # Check if the video was opened successfully
    if not cap.isOpened():
        print(f"Error: Could not open video file at {video_path}")
        return

    # Read all frames from the video into a list
    frames = []
    while True:
        # Read one frame
        success, frame = cap.read()
        
        # If the frame was not read successfully, we've reached the end of the video
        if not success:
            break
        
        # Add the frame to our list
        frames.append(frame)
    
    # Release the video capture object
    cap.release()

    # Make sure we have frames to analyze
    if not frames:
        print("Error: No frames were read from the video.")
        return

    print(f"Read {len(frames)} frames from the video. Starting analysis...")

    # Call the analysis function with the list of frames
    analysis_results = analyze_video(frames,video_path)

    # Print the results in a nicely formatted way
    print("\n--- Analysis Complete ---")
    print(json.dumps(analysis_results, indent=4))
    print("-----------------------\n")


if __name__ == '__main__':

    video_file_path = 'Squats.mp4' 
    # video_file_path = 'Push-ups.mp4' 
    main(video_file_path)



## web service (Flask API Layer)

In [5]:
from flask import Flask, request, jsonify
import tempfile
import cv2
import os
import json

app = Flask(__name__)

@app.route('/analyze', methods=['POST'])
def analyze():
    # Check if video is included in the request
    if 'video' not in request.files:
        return jsonify({'error': 'No video file provided'}), 400

    video = request.files['video']

    # Check if a file was selected
    if video.filename == '':
        return jsonify({'error': 'No video file selected'}), 400

    # Use a temp file to store the uploaded video
    with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
        video.save(temp_file)
        temp_filename = temp_file.name

    try:
        # Open the video file with OpenCV
        cap = cv2.VideoCapture(temp_filename)

        if not cap.isOpened():
            return jsonify({'error': 'Could not open video file'}), 400

        # Read all frames
        frames = []
        while True:
            success, frame = cap.read()
            if not success:
                break
            frames.append(frame)

        cap.release()

        if not frames:
            return jsonify({'error': 'No frames were read from the video'}), 400

        print(f"Read {len(frames)} frames from the video. Starting analysis...")

        # Call your custom video analysis function
        analysis_results = analyze_video(frames, video.filename)

        print("\n--- Analysis Complete ---")
        print(json.dumps(analysis_results, indent=4))
        print("-----------------------\n")

        return jsonify(analysis_results)

    finally:
        # Always remove the temporary file
        if os.path.exists(temp_filename):
            os.remove(temp_filename)


In [6]:
if __name__ == '__main__':
    # Run the app.
    app.run(debug=True,use_reloader=False, host='127.0.0.1', port=5000)

 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on http://127.0.0.1:5000
Press CTRL+C to quit


Read 280 frames from the video. Starting analysis...

--- Analysis Complete ---
{
    "video_id": "Squats.mp4",
    "summary": {
        "squats": {
            "total_reps": 5,
            "good_form_reps": 4,
            "common_issues": [
                "INSUFFICIENT_DEPTH"
            ]
        },
        "pushups": {
            "total_reps": 0,
            "good_form_reps": 0,
            "common_issues": []
        }
    },
    "frame_data": [
        {
            "frame_index": 0,
            "exercise": "pushup",
            "rep_id": 0,
            "is_form_ok": true,
            "angles": {
                "knee": 172.72190927040856,
                "elbow": 105.32165123363335
            }
        },
        {
            "frame_index": 1,
            "exercise": "pushup",
            "rep_id": 0,
            "is_form_ok": true,
            "angles": {
                "knee": 171.90696253095572,
                "elbow": 114.17410047024723
            }
        },
        

127.0.0.1 - - [09/Aug/2025 10:06:05] "POST /analyze HTTP/1.1" 200 -


Read 291 frames from the video. Starting analysis...

--- Analysis Complete ---
{
    "video_id": "Push-ups.mp4",
    "summary": {
        "squats": {
            "total_reps": 0,
            "good_form_reps": 0,
            "common_issues": []
        },
        "pushups": {
            "total_reps": 3,
            "good_form_reps": 3,
            "common_issues": []
        }
    },
    "frame_data": [
        {
            "frame_index": 0,
            "exercise": "squat",
            "rep_id": 0,
            "is_form_ok": true,
            "angles": {
                "knee": 152.61702440875786,
                "elbow": 171.91606466566557
            }
        },
        {
            "frame_index": 1,
            "exercise": "none",
            "rep_id": 0,
            "is_form_ok": true,
            "angles": {
                "knee": 162.94343568524138,
                "elbow": 174.1320928848264
            }
        },
        {
            "frame_index": 2,
            "exercis

127.0.0.1 - - [09/Aug/2025 10:10:12] "POST /analyze HTTP/1.1" 200 -


In [None]:
# call endpoint from terminal 
#  http --form POST http://127.0.0.1:5000/analyze video@c:/Users/Islam/Downloads/PoseDetector/input_video/Squats.mp4
# http --form POST http://127.0.0.1:5000/analyze video@c:/Users/Islam/Downloads/PoseDetector/input_video/Push-ups.mp4
