In [179]:
import cv2
import numpy as np
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2
import csv
import os

In [3]:
model_path = "C:\\Users\\kyles\\Desktop\\CS5100\\Final_Project\\pose_landmarker_heavy.task"
min_pose_detection_confidence = 0.5
min_pose_presence_confidence = 0.5
min_tracking_confidence = 0.5
num_poses = 1
last_timestamp_ms = 0
labels = [
    'Right Knee',
    'Left Knee',
    'Right Ankle',
    'Left Ankle',
    'Right Heel',
    'Left Heel',
    'Right Foot Index',
    'Left Foot Index'
]


In [6]:
base_options = python.BaseOptions(model_asset_path=model_path)
options = vision.PoseLandmarkerOptions(base_options=base_options,
                                      running_mode=vision.RunningMode.VIDEO,
                                      num_poses=num_poses,
                                      min_pose_detection_confidence=min_pose_detection_confidence,
                                      min_pose_presence_confidence=min_pose_presence_confidence,
                                      min_tracking_confidence=min_tracking_confidence,
                                      output_segmentation_masks=False)

In [4]:
def draw_skeleton(detection_result):
    pose_landmarks_list = detection_result.pose_landmarks
    blank = np.zeros((500, 800, 3), np.uint8)
    
    for idx in range(len(pose_landmarks_list)):
        pose_landmarks = pose_landmarks_list[idx]

        pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
        pose_landmarks_proto.landmark.extend([
            landmark_pb2.NormalizedLandmark(
                x=landmark.x,
                y=landmark.y,
                z=landmark.z) for landmark in pose_landmarks
        ])
        mp.solutions.drawing_utils.draw_landmarks(
            blank,
            pose_landmarks_proto,
            mp.solutions.pose.POSE_CONNECTIONS,
            mp.solutions.drawing_styles.get_default_pose_landmarks_style())
    return blank

In [5]:
def draw_landmarks_on_image(rgb_image, detection_result):
    pose_landmarks_list = detection_result.pose_landmarks
    annotated_image = np.copy(rgb_image)

    # Loop through the detected poses to visualize.
    for idx in range(len(pose_landmarks_list)):
        pose_landmarks = pose_landmarks_list[idx]

        pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
        pose_landmarks_proto.landmark.extend([
            landmark_pb2.NormalizedLandmark(
                x=landmark.x,
                y=landmark.y,
                z=landmark.z) for landmark in pose_landmarks
        ])
        mp.solutions.drawing_utils.draw_landmarks(
            annotated_image,
            pose_landmarks_proto,
            mp.solutions.pose.POSE_CONNECTIONS,
            mp.solutions.drawing_styles.get_default_pose_landmarks_style())
    return annotated_image

In [420]:
def get_lower_half_data(pose_data, x, y):
    lower_half_data = []
    trimmed_data = pose_data[x:y]

    for i in range(len(trimmed_data)):
        for each in trimmed_data[i].pose_landmarks:
            lower_half_data.append(each[23::])
    return lower_half_data


In [421]:
def test_image_range(data, x, y, save):
    test_range = data[x:y]
    path = r'C:\Users\kyles\Desktop\CS5100\Final_Project\skeleton_data'
    for i in range(len(test_range)):
        out_file = shortened + '_' + str(i) + '.jpg'
        test_output = draw_skeleton(test_range[i])
        if save:
            cv2.imwrite(os.path.join(path, out_file), test_output)
            continue
        while True:
            cv2.imshow('result', test_output)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    cv2.destroyAllWindows()


In [422]:
def create_output(labels, data):
    output = []
    line = []
    
    for i in range(len(data)):
        for j in range(len(labels)):
            data_point = [labels[j], data[i][j]]
            line.append(data_point)
        output.append(line)
        line = []
            
    return output

In [7]:
'''

1) Resize and convert frame from RGB to BGR
2) Detect poses
    - z represents distance -- we want closer (penalty taker) pose
    - append relevant data to lists for feature extraction
3) Display image with pose draw onto it

'''

'\n\n1) Resize and convert frame from RGB to BGR\n2) Detect poses\n    - z represents distance -- we want closer (penalty taker) pose\n    - append relevant data to lists for feature extraction\n3) Display image with pose draw onto it\n\n'

In [743]:
pose_data = []
mp_data = []
pose_draw_data = []
time_stamps = []

video_source = "C:\\Users\\kyles\\Desktop\\CS5100\\Final_Project\\Project_Data\\PEN70.mp4"
shortened = video_source[-9:-4]
with vision.PoseLandmarker.create_from_options(options) as landmarker:
    # Use OpenCV’s VideoCapture to start capturing from the webcam.
    cap = cv2.VideoCapture(video_source)
    # Create a loop to read the latest frame from the camera using VideoCapture#read()
    while cap.isOpened():
        success, image = cap.read()
        if not success:
            
            break
           
        #image = cv2.flip(image, 1)
        # Convert the frame received from OpenCV to a MediaPipe’s Image object.
        mp_image = mp.Image(
            image_format=mp.ImageFormat.SRGB,
            data=cv2.cvtColor(image[200:800, 500:1200], cv2.COLOR_BGR2RGB))
        timestamp_ms = int(cv2.getTickCount() / cv2.getTickFrequency() * 1000)
        result = landmarker.detect_for_video(mp_image, timestamp_ms)
        pose_data.append(result.pose_world_landmarks)
        annotated_image = draw_landmarks_on_image(mp_image.numpy_view(), result)
        
        if result.pose_world_landmarks:
            if result.pose_world_landmarks[0][0].z > -.3 or result.pose_world_landmarks[0][1].z > -.3:
                # get frame_ms -- append to list
                frame_ms = cap.get(cv2.CAP_PROP_POS_MSEC)
                time_stamps.append(frame_ms / 1000)
                # draw frames
                pose_draw_data.append(result)
                mp_data.append(mp_image)
               
            
        
        cv2.imshow("MediaPipe Pose Landmark", cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB))

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

In [751]:
x = 28
y = 33
test_image_range(pose_draw_data, x, y, False)

lower_output = get_lower_half_data(pose_draw_data, x, y)
final_output = create_output(labels, lower_output)
len(final_output)

5

In [752]:
selected_times = time_stamps[x:y]
output_with_times = list(zip(selected_times, final_output))
len(output_with_times)

5

In [753]:
def write_to_csv(data, filename):
    with open(filename, 'a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([])
        writer.writerows(data)
    

In [754]:
filename = 'data.csv'
write_to_csv(output_with_times, filename)