In [None]:
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import cv2
import time
import csv
import queue
from datetime import datetime
import math
import numpy as np

In [None]:
model_path = "/Users/beni/Desktop/Climbing Technique Tracker/Climbing_Tracker/pose_landmarker_full.task"

In [None]:
"""The code in this block is partly taken from the official MediaPipe documentation and can be found 
here: https://ai.google.dev/edge/mediapipe/solutions/vision/pose_landmarker/python#live-stream_1"""

BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
PoseLandmarkerResult = mp.tasks.vision.PoseLandmarkerResult
VisionRunningMode = mp.tasks.vision.RunningMode

saved_results = queue.Queue()


def print_result(result: PoseLandmarkerResult, output_image: mp.Image, timestamp_ms: int):
    print('pose landmarker result: {}'.format(result))
    saved_results.put(result)
    saved_results.put(timestamp_ms)


options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=model_path),
    running_mode=VisionRunningMode.LIVE_STREAM, #sets the running mode to live stream
    result_callback=print_result)


"""The following code is written by myself using the OpenCV and MediaPipe documentation for help which
can be found in the readme file"""

#check how many frames per second Qsense sensors output
frames_per_second = 10
time_between_frames = 1 / frames_per_second


with PoseLandmarker.create_from_options(options) as landmarker:
    #Use openCV VideoCapture to start capturing from the webcam
    start_time = time.time()

    capture = cv2.VideoCapture(1)

    #Create a loop to read the latest frame from the camera using VideoCaptureRead()
    while capture.isOpened():

        ret, frame = capture.read()
     
        if not ret:
            print("broken")
            break

        #Convert the frame receieved from OpenCV to a MediaPipe Image Object
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
      
        #landmarker.detect_async(mp_image, timestamp_ms = int(time.time() * 1000))
        #from datetime import datetime
        landmarker.detect_async(mp_image, int(datetime.now().timestamp() * 1000))


        #color = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        cv2.imshow('frame', frame) #first param is window name
        cv2.waitKey(10)

        """The following frame limiter was taken from the following online answer:
         https://www.quora.com/How-do-I-decrease-the-frames-per-second-in-OpenCV-python"""
        
        elapsed_time = time.time() - start_time 
        time_to_wait = time_between_frames - elapsed_time 
        if time_to_wait > 0: 
            time.sleep(time_to_wait)
    

In [None]:
def calculate_elbow_angle(shoulder_landmark, elbow_landmark, wrist_landmark):

    #convert landmark coordinates into vectors
    
    vector_s_e = [shoulder_landmark.x - elbow_landmark.x, shoulder_landmark.y - elbow_landmark.y, shoulder_landmark.z - elbow_landmark.z]
    vector_w_e = [wrist_landmark.x - elbow_landmark.x, wrist_landmark.y - elbow_landmark.y, wrist_landmark.z - elbow_landmark.z]

    #dot product of the two vectors
    dot_product = np.dot(vector_s_e, vector_w_e)

    #magnitude of the two vectors
    magnitude_vector_s_e = np.linalg.norm(vector_s_e)
    magnitude_vector_w_e = np.linalg.norm(vector_w_e)

    #final equation to calculate angle 
    angle = math.degrees(math.acos(dot_product / ((magnitude_vector_s_e) * (magnitude_vector_w_e))))

    return angle

In [None]:
results = []
while not saved_results.empty():
    
    landmark = saved_results.get() #get pose_landmark for each frame
    try:
        landmarks = [] #initialize an array for each indidual landmark in a frame (33 total)
        for i in range(0,32):
            landmark_point = landmark.pose_landmarks[0][i] #get each landmark (0, 32) for the 0th person in the frame
            landmarks.append(landmark_point)
    except:
        print("no landmarks detected") #first couple frames might not detect a person yet

    try:
        elbow_ang = calculate_elbow_angle(landmark.pose_landmarks[0][12], landmark.pose_landmarks[0][14], landmark.pose_landmarks[0][16])
        print(elbow_ang)
    except:
        print('no elbow angle detected')

    
    
    time_stamp = saved_results.get() #get timestamp (comes after each landmark in the queue)
    
    results.append([time_stamp]+ landmarks) #append landmarks + timestamp into results array


print("Collected", len(results), "results")


"""CSV code adapted from the following website: https://www.geeksforgeeks.org/python/writing-csv-files-in-python/"""

#each frame is writen into the CSV, each column are the coordinates for a single landmark
with open('results.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerows(results)

In [None]:
#test code

all_landmarks_from_one_frame = saved_results.get()

one_landmark_from_frame = all_landmarks_from_one_frame.pose_landmarks[0][0]

print(one_landmark_from_frame)

x_value = one_landmark_from_frame.x

print(x_value)



#testing angle



shoulder_landmark = all_landmarks_from_one_frame.pose_landmarks[0][12]
elbow_landmark = all_landmarks_from_one_frame.pose_landmarks[0][14]
wrist_landmark = all_landmarks_from_one_frame.pose_landmarks[0][16]

print(shoulder_landmark.x, shoulder_landmark.y, shoulder_landmark.z)
print(elbow_landmark.x, elbow_landmark.y, elbow_landmark.z)
print(wrist_landmark.x, wrist_landmark.y, wrist_landmark.z)

angle = calculate_elbow_angle(shoulder_landmark, elbow_landmark, wrist_landmark)
print(angle)




