In [39]:
import numpy as np
import cv2
from numpy.core.fromnumeric import trace
import mediapipe_utils as mpu
from pathlib import Path
from FPS import FPS, now
import depthai as dai
import marshal
import sys
from string import Template
from math import sin, cos
from BlazeposeDepthaiEdge import BlazeposeDepthai
from BlazeposeRenderer import BlazeposeRenderer
import time as time


In [40]:
important_landmarks = ['left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow', 'left_wrist', 'right_wrist', 'left_hip', 'right_hip', 'left_knee', 'right_knee', 'left_ankle', 'right_ankle', 'left_heel', 'right_heel', 'left_foot_index', 'right_foot_index']
important_landmarks = [mpu.KEYPOINT_DICT[l] for l in important_landmarks]

In [42]:
is_press = False
is_recording = True
is_down = False
tracker = BlazeposeDepthai(
                input_src="rgb",
                pd_model=None, 
                pd_score_thresh=0.5,
                pp_model=None,
                lm_model=None,
                lm_score_thresh=0.7,
                xyz = True,
                crop=False,
                smoothing= True,
                filter_window_size=5,
                filter_velocity_scale=10,
                stats=False,               
                internal_fps=None,
                internal_frame_height=1080,
                trace=False,
                force_detection=False)   

renderer = BlazeposeRenderer(
                tracker)
#Reset coordernates
LEFT_SHOULDER = None
RIGHT_SHOULDER = None
LEFT_ELBOW = None
RIGHT_ELBOW = None
LEFT_WRIST = None
RIGHT_WRIST = None
LEFT_HIP = None
RIGHT_HIP = None
LEFT_KNEE = None
RIGHT_KNEE = None
LEFT_ANKLE = None
RIGHT_ANKLE = None

press_target_size, press_target_v, press_target_h= 5, .1, .7

left_pos_queue = []
right_pos_queue = []

left_vel_queue = []
right_vel_queue = []

queue_size = 5

squat_count = 0
press_count = 0

squat_box_up = 40
squat_box_down = 70

dx = -15
vid_cod = cv2.VideoWriter_fourcc(*'XVID')
if is_recording:
    output = cv2.VideoWriter("output/cam_video_1.mp4", vid_cod, 20.0,  (1920,1080))

while True:
    try:
        # Run blazepose on next frame
        frame, body = tracker.next_frame()
        if frame is None: break
        
        
        # # Get 3d coordinates
        if body:
            LEFT_SHOULDER = body.landmarks[mpu.KEYPOINT_DICT['left_shoulder']] if renderer.is_present(body, mpu.KEYPOINT_DICT['left_shoulder']) else None
            RIGHT_SHOULDER = body.landmarks[mpu.KEYPOINT_DICT['right_shoulder']] if renderer.is_present(body, mpu.KEYPOINT_DICT['right_shoulder']) else None
            LEFT_ELBOW = body.landmarks[mpu.KEYPOINT_DICT['left_elbow']] if renderer.is_present(body, mpu.KEYPOINT_DICT['left_elbow']) else None
            RIGHT_ELBOW = body.landmarks[mpu.KEYPOINT_DICT['right_elbow']] if renderer.is_present(body, mpu.KEYPOINT_DICT['right_elbow']) else None
            LEFT_WRIST = body.landmarks[mpu.KEYPOINT_DICT['left_wrist']] if renderer.is_present(body, mpu.KEYPOINT_DICT['left_wrist']) else None
            RIGHT_WRIST = body.landmarks[mpu.KEYPOINT_DICT['right_wrist']] if renderer.is_present(body, mpu.KEYPOINT_DICT['right_wrist']) else None
            LEFT_HIP = body.landmarks[mpu.KEYPOINT_DICT['left_hip']] if renderer.is_present(body, mpu.KEYPOINT_DICT['left_hip']) else None
            RIGHT_HIP = body.landmarks[mpu.KEYPOINT_DICT['right_hip']] if renderer.is_present(body, mpu.KEYPOINT_DICT['right_hip']) else None
            LEFT_KNEE = body.landmarks[mpu.KEYPOINT_DICT['left_knee']] if renderer.is_present(body, mpu.KEYPOINT_DICT['left_knee']) else None
            RIGHT_KNEE = body.landmarks[mpu.KEYPOINT_DICT['right_knee']] if renderer.is_present(body, mpu.KEYPOINT_DICT['right_knee']) else None
            LEFT_ANKLE = body.landmarks[mpu.KEYPOINT_DICT['left_ankle']] if renderer.is_present(body, mpu.KEYPOINT_DICT['left_ankle']) else None
            RIGHT_ANKLE = body.landmarks[mpu.KEYPOINT_DICT['right_ankle']] if renderer.is_present(body, mpu.KEYPOINT_DICT['right_ankle']) else None
            LEFT_HEEL = body.landmarks[mpu.KEYPOINT_DICT['left_heel']] if renderer.is_present(body, mpu.KEYPOINT_DICT['left_heel']) else None
            RIGHT_HEEL = body.landmarks[mpu.KEYPOINT_DICT['right_heel']] if renderer.is_present(body, mpu.KEYPOINT_DICT['right_heel']) else None
            LEFT_FOOT_INDEX = body.landmarks[mpu.KEYPOINT_DICT['left_foot_index']] if renderer.is_present(body, mpu.KEYPOINT_DICT['left_foot_index']) else None
            RIGHT_FOOT_INDEX = body.landmarks[mpu.KEYPOINT_DICT['right_foot_index']] if renderer.is_present(body, mpu.KEYPOINT_DICT['right_foot_index']) else None

            if is_press:
                if LEFT_SHOULDER is not None and RIGHT_SHOULDER is not None and LEFT_ELBOW is not None and RIGHT_ELBOW is not None and LEFT_WRIST is not None and RIGHT_WRIST is not None:
                    left_theta = mpu.angle(LEFT_SHOULDER, LEFT_ELBOW, LEFT_WRIST)
                    right_theta = mpu.angle(RIGHT_SHOULDER, RIGHT_ELBOW, RIGHT_WRIST)       
                    v = np.array([LEFT_SHOULDER[0] - RIGHT_SHOULDER[0], LEFT_SHOULDER[1] - RIGHT_SHOULDER[1]])
                    theta = np.deg2rad(90)
                    c, s = np.cos(theta), np.sin(theta)
                    rot = np.array(((c, -s), (s, c)))
                    v_rot = np.dot(rot, v)
                    ellipse_r = int(np.sqrt(np.linalg.norm(v))*press_target_size)
                    right_zone_pos = (int(RIGHT_SHOULDER[0] - v[0]*press_target_h - v_rot[0]*press_target_v), int(RIGHT_SHOULDER[1] - v[1]*press_target_h + v_rot[1]*press_target_v))
                    left_zone_pos = (int(LEFT_SHOULDER[0] + v[0]*press_target_h + v_rot[0]*press_target_v),  int(LEFT_SHOULDER[1]  + v[1]*press_target_h + v_rot[1]*press_target_v))

                    if np.linalg.norm(np.array(RIGHT_WRIST[0:2]) - np.array([right_zone_pos[0], right_zone_pos[1]]))<ellipse_r:
                        cv2.ellipse(frame, right_zone_pos , (ellipse_r, ellipse_r), 0, 0, 360, (0, 255, 0),-1)
                    else:
                        cv2.ellipse(frame, right_zone_pos , (ellipse_r, ellipse_r), 0, 0, 360, (0, 255, 0), 2)
                    
                    if np.linalg.norm(np.array(LEFT_WRIST[0:2]) - np.array([left_zone_pos[0], left_zone_pos[1]]))<ellipse_r:     
                        cv2.ellipse(frame, left_zone_pos, (ellipse_r, ellipse_r), 0, 0, 360, (0, 255, 0), -1)
                    else:
                        cv2.ellipse(frame, left_zone_pos, (ellipse_r, ellipse_r), 0, 0, 360, (0, 255, 0), 2)

                    #count number of presses
                    if np.linalg.norm(np.array(RIGHT_WRIST[0:2]) - np.array([right_zone_pos[0], right_zone_pos[1]]))<ellipse_r and np.linalg.norm(np.array(LEFT_WRIST[0:2]) - np.array([left_zone_pos[0], left_zone_pos[1]]))<ellipse_r:
                        if not is_down:
                            is_down = True
                            down_time = time.time()
                    else:
                        if is_down:
                            is_down = False
                            up_time = time.time()
                            if up_time - down_time > 0.5:
                                press_count += 1
                        
                        #Dislay number of presses in frame
                    cv2.putText(frame, "Press Count: " + str(press_count), (int(np.mean([LEFT_SHOULDER[0], RIGHT_SHOULDER[0]])+dx-50), int(np.mean([LEFT_SHOULDER[1], RIGHT_SHOULDER[1]])-np.linalg.norm(v)*.8)), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
            else:
                #Code for squats
                if LEFT_HIP is not None and RIGHT_HIP is not None and LEFT_KNEE is not None and RIGHT_KNEE is not None and LEFT_ANKLE is not None and RIGHT_ANKLE is not None:
                    left_theta = mpu.angle(LEFT_HIP, LEFT_KNEE, LEFT_ANKLE)
                    right_theta = mpu.angle(RIGHT_HIP, RIGHT_KNEE, RIGHT_ANKLE)

                    # add bounding box for squat
                    points = [[LEFT_KNEE[0],LEFT_KNEE[1]-squat_box_up],
                        [RIGHT_KNEE[0],RIGHT_KNEE[1]-squat_box_up],
                        [RIGHT_KNEE[0], RIGHT_KNEE[1]+squat_box_down],
                        [LEFT_KNEE[0], LEFT_KNEE[1]+squat_box_down]]
                        
                    if LEFT_HIP[1]<=LEFT_KNEE[1]-squat_box_up and RIGHT_HIP[1]<=RIGHT_KNEE[1]-squat_box_up:
                        cv2.polylines(frame, [np.array(points)], True, (0, 255, 0), 2)
                    else:
                        cv2.fillPoly(frame, [np.array(points)], (0, 255, 0))
                        

                    if left_theta>90 and right_theta>90:
                        if not is_down:
                            is_down = True
                            down_time = time.time()
                    else:
                        if is_down:
                            is_down = False
                            up_time = time.time()
                            if up_time - down_time > 0.5:
                                squat_count += 1
                                print("Squat count: ", squat_count)
                                print("Time: ", up_time - down_time)
                                print("Left angle: ", left_theta)
                                print("Right angle: ", right_theta)
                                print("")
                    #display squat count and time in frame
                cv2.putText(frame, "Squat count: " + str(squat_count), (int(np.mean([LEFT_SHOULDER[0], RIGHT_SHOULDER[0]])+dx-50), int(np.mean([LEFT_SHOULDER[1], RIGHT_SHOULDER[1]])-np.linalg.norm(v)*.5)), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
                cv2.putText(frame, "Time: " + f"{(up_time - down_time):.2f}", (int(np.mean([LEFT_SHOULDER[0], RIGHT_SHOULDER[0]])+dx-50), int(np.mean([LEFT_SHOULDER[1], RIGHT_SHOULDER[1]])-np.linalg.norm(v)*.5)+30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
        
        # Draw 2d skeleton
        #flip frame 
        # frame = cv2.flip(frame, 1)
        frame = renderer.draw(frame, body, my_keypoints=important_landmarks)

        frame = cv2.resize(frame, (0,0), fx=1920/1792, fy=1080/1008)
        
        if is_recording:
            output.write(frame)

        key = renderer.waitKey(delay=1)

        if key == 27 or key == ord('q'):
            break
    except Exception as e:
        print(e)
        break

renderer.exit()
tracker.exit()

if is_recording:
    output.release()
cv2.destroyAllWindows()


Pose detection blob file : D:\StrongAI\models\pose_detection_sh4.blob
Landmarks using blob file : D:\StrongAI\models\pose_landmark_full_sh4.blob
Internal camera FPS set to: 18
Internal camera image size: 1792 x 1008 - pad_h: 392
Creating pipeline...
Creating Color Camera...
Creating MonoCameras, Stereo and SpatialLocationCalculator nodes...
RGB calibration lens position: 0
Creating Pose Detection pre processing image manip...
Creating Pose Detection Neural Network...
Creating Pose Detection post processing Neural Network...
Creating Landmark pre processing image manip...
Creating DiveideBy255 Neural Network...
Creating Landmark Neural Network...
Pipeline created.
Pipeline started - USB speed: HIGH
Squat count:  1
Time:  3.7872979640960693
Left angle:  87.84016578919203
Right angle:  86.87530852103392

Squat count:  2
Time:  1.595045804977417
Left angle:  87.35081756767627
Right angle:  84.73044185225731

Squat count:  3
Time:  1.4158976078033447
Left angle:  83.59820362430241
Right ang