In [1]:
import mediapipe as mp
import cv2
import pandas as pd
import time
from dollarpy import Recognizer, Template, Point

In [2]:
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

In [3]:
recognizer = Recognizer([])

In [4]:
gesture_templates = {}

In [5]:
def detect_exercise(pose_landmarks):
    left_wrist = pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_WRIST]
    right_wrist = pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_WRIST]
    left_shoulder = pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_SHOULDER]
    right_shoulder = pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_SHOULDER]
    left_hip = pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_HIP]
    right_hip = pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_HIP]
    left_knee = pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_KNEE]
    right_knee = pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_KNEE]
    left_ankle = pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_ANKLE]
    right_ankle = pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_ANKLE]
    
    if abs(left_wrist.y - left_shoulder.y) > 0.3 and \
       abs(right_wrist.y - right_shoulder.y) > 0.3 and \
       abs(left_hip.y - left_shoulder.y) < 0.3 and \
       abs(right_hip.y - right_shoulder.y) < 0.3:
        return "pushups"

    # Example condition for squats (knee bending)
    if abs(left_hip.y - left_knee.y) < 0.2 and \
       abs(right_hip.y - right_knee.y) < 0.2 and \
       abs(left_knee.y - left_ankle.y) > 0.3 and \
       abs(right_knee.y - right_ankle.y) > 0.3:
        return "squats"

    return "unknown"
    

In [6]:
def getSquats(videoURL, save_interval=0.5):
    # Open the video file
    cap = cv2.VideoCapture(videoURL)
    if not cap.isOpened():
        print(f"Error: Cannot open video file {videoURL}")
        return

    data = []
    last_save_time = time.time()

    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
        while cap.isOpened():
            ret, frame = cap.read()

            if ret:
                # Convert the frame to RGB
                image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                image.flags.writeable = False

                # Process the frame for pose landmarks
                results = pose.process(image)

                image.flags.writeable = True
                image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

                # Check if pose landmarks are detected
                if results.pose_landmarks:
                    # Draw landmarks on the image
                    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

                    # Detect exercise and extract key points
                    exercise_label = detect_exercise(results.pose_landmarks)
                    points = results.pose_landmarks.landmark
                    key_points = {
                        'LEFT_SHOULDER': (points[mp_pose.PoseLandmark.LEFT_SHOULDER].x, points[mp_pose.PoseLandmark.LEFT_SHOULDER].y),
                        'RIGHT_SHOULDER': (points[mp_pose.PoseLandmark.RIGHT_SHOULDER].x, points[mp_pose.PoseLandmark.RIGHT_SHOULDER].y),
                        'LEFT_HIP': (points[mp_pose.PoseLandmark.LEFT_HIP].x, points[mp_pose.PoseLandmark.LEFT_HIP].y),
                        'RIGHT_HIP': (points[mp_pose.PoseLandmark.RIGHT_HIP].x, points[mp_pose.PoseLandmark.RIGHT_HIP].y),
                        'LEFT_KNEE': (points[mp_pose.PoseLandmark.LEFT_KNEE].x, points[mp_pose.PoseLandmark.LEFT_KNEE].y),
                        'RIGHT_KNEE': (points[mp_pose.PoseLandmark.RIGHT_KNEE].x, points[mp_pose.PoseLandmark.RIGHT_KNEE].y),
                    }

                    # Save data at the specified interval
                    current_time = time.time()
                    if current_time - last_save_time >= save_interval:
                        data.append([exercise_label] + [value for kp in key_points.values() for value in kp])
                        last_save_time = current_time

                # Display the frame with landmarks
                cv2.imshow("Exercise Detection", image)

            if not ret or cv2.waitKey(10) & 0xFF == ord('q'):
                break

    cap.release()
    cv2.destroyAllWindows()

    # Save data to a CSV file
    column_names = ['Exercise'] + [f'{key} X' for key in key_points.keys()] + [f'{key} Y' for key in key_points.keys()]
    df = pd.DataFrame(data, columns=column_names)
    df.to_csv("exercise_squats.csv", index=False)
    print("Data saved to exercise_squats.csv")
    
    import socket 
    soc = socket.socket()
    hostname="localhost"# 127.0.0.1 #0.0.0.0
    port=65436
    soc.bind((hostname,port))
    soc.listen(5)
    conn , addr = soc.accept()
    print("device connected"+str(addr))
    # # Send String
    msg = b"Data Saved\n"
    conn.send(msg)

    soc.close()


In [7]:
def getPushups(videoURL, save_interval=0.5):
    # Open the video file
    cap = cv2.VideoCapture(videoURL)
    if not cap.isOpened():
        print(f"Error: Cannot open video file {videoURL}")
        return

    data = []
    last_save_time = time.time()

    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
        while cap.isOpened():
            ret, frame = cap.read()

            if ret:
                # Convert the frame to RGB
                image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                image.flags.writeable = False

                # Process the frame for pose landmarks
                results = pose.process(image)

                image.flags.writeable = True
                image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

                # Check if pose landmarks are detected
                if results.pose_landmarks:
                    # Draw landmarks on the image
                    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

                    # Detect exercise and extract key points
                    exercise_label = detect_exercise(results.pose_landmarks)
                    points = results.pose_landmarks.landmark
                    key_points = {
                        'LEFT_SHOULDER': (points[mp_pose.PoseLandmark.LEFT_SHOULDER].x, points[mp_pose.PoseLandmark.LEFT_SHOULDER].y),
                        'RIGHT_SHOULDER': (points[mp_pose.PoseLandmark.RIGHT_SHOULDER].x, points[mp_pose.PoseLandmark.RIGHT_SHOULDER].y),
                        'LEFT_HIP': (points[mp_pose.PoseLandmark.LEFT_HIP].x, points[mp_pose.PoseLandmark.LEFT_HIP].y),
                        'RIGHT_HIP': (points[mp_pose.PoseLandmark.RIGHT_HIP].x, points[mp_pose.PoseLandmark.RIGHT_HIP].y),
                        'LEFT_KNEE': (points[mp_pose.PoseLandmark.LEFT_KNEE].x, points[mp_pose.PoseLandmark.LEFT_KNEE].y),
                        'RIGHT_KNEE': (points[mp_pose.PoseLandmark.RIGHT_KNEE].x, points[mp_pose.PoseLandmark.RIGHT_KNEE].y),
                    }

                    # Save data at the specified interval
                    current_time = time.time()
                    if current_time - last_save_time >= save_interval:
                        data.append([exercise_label] + [value for kp in key_points.values() for value in kp])
                        last_save_time = current_time

                # Display the frame with landmarks
                cv2.imshow("Exercise Detection", image)

            if not ret or cv2.waitKey(10) & 0xFF == ord('q'):
                break

    cap.release()
    cv2.destroyAllWindows()

    # Save data to a CSV file
    column_names = ['Exercise'] + [f'{key} X' for key in key_points.keys()] + [f'{key} Y' for key in key_points.keys()]
    df = pd.DataFrame(data, columns=column_names)
    df.to_csv("exercise_pushups.csv", index=False)
    print("Data saved to exercise_pushups.csv")
    
    import socket 
    soc = socket.socket()
    hostname="localhost"# 127.0.0.1 #0.0.0.0
    port=65436
    soc.bind((hostname,port))
    soc.listen(5)
    conn , addr = soc.accept()
    print("device connected"+str(addr))
    # # Send String
    msg = b"Data Saved\n"
    conn.send(msg)

    soc.close()


In [8]:
def calculate_angle(point1, point2, point3):
        import math
        x1, y1 = point1.x, point1.y
        x2, y2 = point2.x, point2.y
        x3, y3 = point3.x, point3.y

        vector1 = (x1 - x2, y1 - y2)
        vector2 = (x3 - x2, y3 - y2)

        dot_product = vector1[0] * vector2[0] + vector1[1] * vector2[1]
        magnitude1 = math.sqrt(vector1[0] ** 2 + vector1[1] ** 2)
        magnitude2 = math.sqrt(vector2[0] ** 2 + vector2[1] ** 2)

        angle = math.acos(dot_product / (magnitude1 * magnitude2))
        return math.degrees(angle)

In [9]:
def classify_body_movement(new_sample):
    points = [Point(x, y) for x, y in new_sample]
    templates = [template for templates in gesture_templates.values() for template in templates]
    best_match = recognizer.recognize(points, templates)
    
    if not best_match:
        return "unknown"
    return best_match.label

In [11]:
vid = "Squats.mp4"
getSquats(vid)



Data saved to exercise_data.csv
device connected('127.0.0.1', 55190)


In [12]:
vid = "Pushups.mp4"
getPushups(vid)

Data saved to exercise_data.csv
device connected('127.0.0.1', 55191)
