## I. INSTALL & IMPORT NECESSARY PACKAGES

In [15]:
pip install -q mediapipe

Note: you may need to restart the kernel to use updated packages.


In [16]:
import requests

url = 'https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task'
filename = 'pose_landmarker.task'

response = requests.get(url)
with open(filename, 'wb') as file:
    file.write(response.content)
print(f"Downloaded {filename}")


Downloaded pose_landmarker.task


In [17]:
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np
import cv2
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision


# II. FUNCTIONS

In [18]:
profLandT = []

In [19]:
#@markdown To better demonstrate the Pose Landmarker API, we have created a set of visualization tools that will be used in this colab. These will draw the landmarks on a detect person, as well as the expected connections between those markers.


def draw_landmarks_on_image(rgb_image, detection_result):
  pose_landmarks_list = detection_result.pose_landmarks
  annotated_image = np.copy(rgb_image)

  # Loop through the detected poses to visualize.
  for idx in range(len(pose_landmarks_list)):
    pose_landmarks = pose_landmarks_list[idx]

    # Draw the pose landmarks.
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
    ])
    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      pose_landmarks_proto,
      solutions.pose.POSE_CONNECTIONS,
      solutions.drawing_styles.get_default_pose_landmarks_style())
  return annotated_image

In [20]:
def save_landmarks_and_timestamps(detection_result, timestamp_ms):
    _temp = []
    # with open(filename, 'a') as file:
    for pose_landmarks in detection_result.pose_landmarks:
        landmarks_data = []
        for landmark in pose_landmarks:
            landmarks_data.append((landmark.x, landmark.y, landmark.z))
        _temp.append([timestamp_ms, landmarks_data])
            # file.write(f"{timestamp_ms} {landmarks_data}\n")

    return _temp



In [21]:
def process_video(input_video_path, output_video_path):
    # STEP 1: Import the necessary modules.
    mp_pose = mp.solutions.pose

    # STEP 2: Create an PoseLandmarker object.
    base_options = python.BaseOptions(model_asset_path='pose_landmarker.task')
    options = vision.PoseLandmarkerOptions(
        running_mode=mp.tasks.vision.RunningMode.VIDEO,
        base_options=base_options,
        output_segmentation_masks=True)
    detector = vision.PoseLandmarker.create_from_options(options)

    # Load the input Video
    cap = cv2.VideoCapture(input_video_path)

    # Load the frame rate of the video using OpenCV’s CV_CAP_PROP_FPS
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

    frame_count = 0

    # Loop through each frame in the video using VideoCapture#read()
    while cap.isOpened():
        # Read a single frame from the video
        ret, frame = cap.read()
        
        # Check if frame reading was successful
        if not ret:
            break
        
        # Convert the frame received from OpenCV to a MediaPipe’s Image object.
        numpy_frame_from_opencv = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=numpy_frame_from_opencv)

        # Detect pose landmarks from the input image.
        timestamp_ms = int((frame_count / fps) * 1000)  # Calculate the timestamp in milliseconds
        detection_result = detector.detect_for_video(mp_image, timestamp_ms)
        
        # Save the detected landmarks and timestamps
        profLandT = save_landmarks_and_timestamps(detection_result, timestamp_ms)
        
        # Process the detection result. In this case, visualize it.
        annotated_image = draw_landmarks_on_image(numpy_frame_from_opencv, detection_result)
        
        # Convert the annotated image back to BGR for OpenCV
        output_frame = cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR)
        
        # Write the frame to the output video
        out.write(output_frame)
        
        # Display the resulting frame
        cv2.imshow('Pose Detection', output_frame)
        
        # Press 'q' on the keyboard to exit the loop
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

        frame_count += 1

    # When everything done, release the video capture and writer objects
    cap.release()
    out.release()
    cv2.destroyAllWindows()


In [22]:
# def parse_string_to_data(string):
#     # Split the string into integer part and list part
#     parts = string.split(' [', 1)
#     integer_part = int(parts[0])
#     list_part = parts[1].rstrip(']')

#     # Parse the list part into a list of tuples
#     tuples_str = list_part.split('), ')
#     tuples = []
#     for tuple_str in tuples_str:
#         tuple_str = tuple_str.strip('()')
#         values = tuple(float(value.strip()) for value in tuple_str.split(','))
#         tuples.append(values)

#     return integer_part, tuples


# print(parse_string_to_data("2666 [(0.394044429063797, 0.3920142352581024, -0.2616501748561859), (0.39692631363868713, 0.38368216156959534, -0.25449275970458984), (0.39889273047447205, 0.38373538851737976, -0.2546490430831909), (0.40079164505004883, 0.3837142586708069, -0.2546323239803314), (0.39042526483535767, 0.3834770917892456, -0.25329750776290894), (0.38848671317100525, 0.3837181627750397, -0.2534577548503876), (0.38666489720344543, 0.38405969738960266, -0.2535247206687927), (0.4024588465690613, 0.3871391713619232, -0.19061695039272308), (0.3842080533504486, 0.3888241648674011, -0.1840096116065979), (0.3979896903038025, 0.4011894166469574, -0.2349274456501007), (0.3907482624053955, 0.4016123116016388, -0.2340621054172516), (0.41958969831466675, 0.44273123145103455, -0.13212648034095764), (0.3706459701061249, 0.44326865673065186, -0.10990206152200699), (0.4432779848575592, 0.4965853691101074, -0.09258905053138733), (0.34769976139068604, 0.4999401867389679, -0.08392289280891418), (0.457807719707489, 0.5577324628829956, -0.12737758457660675), (0.33328983187675476, 0.5596362352371216, -0.13388267159461975), (0.4615366458892822, 0.5755108594894409, -0.14124354720115662), (0.33125269412994385, 0.5781159996986389, -0.14988106489181519), (0.4586290419101715, 0.5786314010620117, -0.162563756108284), (0.33358269929885864, 0.5794336795806885, -0.17228080332279205), (0.45561426877975464, 0.5746517181396484, -0.13596360385417938), (0.33532553911209106, 0.5745226740837097, -0.14453712105751038), (0.41227880120277405, 0.5731089115142822, -0.008170233108103275), (0.38214272260665894, 0.572609007358551, 0.008005061186850071), (0.4079136252403259, 0.6689869165420532, -0.02877371944487095), (0.3814394772052765, 0.6670106649398804, -0.0075009046122431755), (0.40536361932754517, 0.7615974545478821, 0.06746144592761993), (0.38541167974472046, 0.7583732008934021, 0.09151586145162582), (0.403590589761734, 0.7732747197151184, 0.07274346798658371), (0.38811787962913513, 0.7705919146537781, 0.09818940609693527), (0.40423375368118286, 0.7873839735984802, 0.010974000208079815), (0.3804031014442444, 0.7794009447097778, 0.041182342916727066)]"))

In [23]:
# # Function to load saved landmarks and timestamps
# # def load_landmarks_and_timestamps(filename):
# #     landmarks_data = []
# #     with open(filename, 'r') as file:
# #         for line in file:
# #             parts = line.strip().split(' ', 1)
# #             timestamp_ms = int(parts[0])
# #             landmarks_data.append((timestamp_ms, eval(parts[1])))
# #     return landmarks_data
# # def load_landmarks_and_timestamps(filename):
# #     landmarks_data = []
# #     with open(filename, 'r') as file:
# #         for line in file:
# #             parts = line.strip().split(' ', 1)
# #             timestamp_ms = int(parts[0])
# #             landmarks_str = parts[1:]
# #             # Deserialize landmarks data
# #             landmarks = [eval(coord) for coord in landmarks_str.split(', ')]
# #             landmarks_data.append((timestamp_ms, landmarks))
# #     return landmarks_data

# def load_landmarks_and_timestamps(filename):
#     landmarks_data = []
#     with open(filename, 'r') as file:
#         for line in file:
#             timestamp_ms, landmarks = parse_string_to_data(line)
#             print(timestamp_ms, landmarks)
#             landmarks_data.append((timestamp_ms, landmarks))
#     return landmarks_data


In [53]:
# def compute_score_comparison(current_landmarks, saved_landmarks):
#     score = 0
#     # Ensure both lists have the same length
#     # min_length = min(len(current_landmarks), len(saved_landmarks))
#     for i in range(min_length):
#         current_point = current_landmarks[i]
#         saved_point = saved_landmarks[i]

#         # Extract x, y, and z coordinates from each point
#         current_x, current_y, current_z = current_point[:3]  # Assuming first three elements are x, y, and z
#         saved_x, saved_y, saved_z = saved_point[:3]          # Assuming first three elements are x, y, and z

#         # Calculate the Euclidean distance between the points
#         distance = ((current_x - saved_x) ** 2 + (current_y - saved_y) ** 2 + (current_z - saved_z) ** 2) ** 0.5

#         # Example threshold for scoring
#         if distance < 0.1:  # Adjust threshold as needed
#             score += 1

#     # Return the score as a fraction of the number of compared points
#     return score / min_length if min_length > 0 else 0


def compute_score_comparison(current_landmarks, saved_landmarks):
    score = 0
    for i in range(len(current_landmarks)):
        current_point = current_landmarks[i]
        saved_point = saved_landmarks[i]

        # Extract x and y coordinates from each point
        current_x, current_y = current_point[:3]  # Assuming first two elements are x and y
        saved_x, saved_y = saved_point[:3]        # Assuming first two elements are x and y

        

        
         # Calculate the Euclidean distance between the points
        distance = ((current_x - saved_x) ** 2 + (current_y - saved_y) ** 2 + (current_z - saved_z) ** 2) ** 0.5

        # Example threshold for scoring
        if distance < 0.1:  # Adjust threshold as needed
            score += 1

    return score / len(current_landmarks)


# def compute_score_comparison(current_landmarks, saved_landmarks):
#     # Assuming both current_landmarks and saved_landmarks are lists of instances with x and y attributes

#     score = 0
#     for i in range(len(current_landmarks)):
#         current_point = current_landmarks[i]
#         saved_point = saved_landmarks[i]
#         print(current_point)
#         # Extract x and y coordinates from each point
#         current_x, current_y = current_point.x, current_point.y
#         saved_x, saved_y = saved_point.x, saved_point.y

#         # Calculate the Euclidean distance between the points
#         distance = ((current_x - saved_x) ** 2 + (current_y - saved_y) ** 2) ** 0.5

#         # Example threshold for scoring
#         if distance < 0.1:  # Adjust threshold as needed
#             score += 1

#     return score / len(current_landmarks)




# def compute_score_comparison(current_landmarks, saved_landmarks):
#     # Assuming both current_landmarks and saved_landmarks are lists of coordinates

#     score = 0
#     for i in range(len(current_landmarks)):
#         current_point = current_landmarks[i]
#         saved_point = saved_landmarks[i]

#         # Calculate the Euclidean distance between the points
#         distance = ((current_point.x - saved_point.x) ** 2 + (current_point.y - saved_point.y) ** 2) ** 0.5

#         # Example threshold for scoring
#         if distance < 0.1:  # Adjust threshold as needed
#             score += 1

#     return score / len(current_landmarks)

    # score = 0
    # num_points = len(current_landmarks)

    # for i in range(num_points):
    #     current_point = current_landmarks[i]
    #     saved_point = saved_landmarks[i]
        
    #     # Calculate the Euclidean distance between the points
    #     distance = ((current_point[0] - saved_point[0]) ** 2 + (current_point[1] - saved_point[1]) ** 2) ** 0.5
        
    #     # Example threshold for scoring
    #     if distance < 0.1:  # Adjust threshold as needed
    #         score += 1

    # # Normalize the score to be between 0 and 1
    # normalized_score = score / num_points
    # return normalized_score


# # Define a function to compute the score based on landmarks
# def compute_score(landmarks):
#     # Example scoring criteria:
#     # If the nose landmark is above a certain threshold, add 1 to the score
#     # If the left and right shoulders are aligned horizontally, add 1 to the score
#     # If the left and right elbows are above the corresponding shoulders, add 1 to the score
#     # If the left and right wrists are below the corresponding elbows, add 1 to the score
    
#     score = 0
    
#     # Extract landmark coordinates
#     nose = landmarks[0]
#     left_shoulder = landmarks[11]
#     right_shoulder = landmarks[12]
#     left_elbow = landmarks[13]
#     right_elbow = landmarks[14]
#     left_wrist = landmarks[15]
#     right_wrist = landmarks[16]
    
#     # Example scoring criteria
#     if nose[1] < 0.5:  # If nose is above the center of the frame
#         score += 1
    
#     if abs(left_shoulder[0] - right_shoulder[0]) < 0.1:  # If shoulders are aligned horizontally
#         score += 1
    
#     if left_elbow[1] < left_shoulder[1] and right_elbow[1] < right_shoulder[1]:  # If elbows are above shoulders
#         score += 1
    
#     if left_wrist[1] > left_elbow[1] and right_wrist[1] > right_elbow[1]:  # If wrists are below elbows
#         score += 1
    
#     # Normalize the score to be between 0 and 1
#     normalized_score = score / 4.0
#     return normalized_score



Score Video

In [25]:
# def score_video(input_video_path, landmarks_file_path, output_video_path):
#     # Load the saved landmarks and timestamps
#     landmarks_data = load_landmarks_and_timestamps(landmarks_file_path)

#     base_options = python.BaseOptions(model_asset_path='pose_landmarker.task')
#     options = vision.PoseLandmarkerOptions(
#         running_mode=mp.tasks.vision.RunningMode.VIDEO,
#         base_options=base_options,
#         output_segmentation_masks=True)
#     detector = vision.PoseLandmarker.create_from_options(options)

#     # Load the input Video
#     cap = cv2.VideoCapture(input_video_path)
#     fps = int(cap.get(cv2.CAP_PROP_FPS))
#     frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
#     frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

#     # Define the codec and create VideoWriter object
#     fourcc = cv2.VideoWriter_fourcc(*'mp4v')
#     out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

#     frame_count = 0

#     total_score = 0
#     num_frames = 0

#     # Helper function to find the closest landmarks
#     def find_closest_landmarks(timestamp):
#         closest_landmarks = None
#         min_diff = float('inf')
#         for ts, landmarks in landmarks_data:
#             diff = abs(ts - timestamp)
#             if diff < min_diff:
#                 min_diff = diff
#                 closest_landmarks = landmarks
#         return closest_landmarks if min_diff <= 75 else None  # Adjust threshold as needed

#     # Loop through each frame in the video
#     while cap.isOpened():
#         ret, frame = cap.read()

#         if not ret:
#             break

#         numpy_frame_from_opencv = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#         mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=numpy_frame_from_opencv)

#         current_frame_timestamp_ms = int((frame_count / fps) * 1000)
#         detection_result = detector.detect_for_video(mp_image, current_frame_timestamp_ms)

#         # Extract the detected landmarks
#         current_landmarks = detection_result.pose_landmarks

#         score = None
#         closest_landmarks = find_closest_landmarks(current_frame_timestamp_ms)
#         if closest_landmarks is not None:
#             # score = compute_score(closest_landmarks)
#             compute_score_comparison(current_landmarks, closest_landmarks)
#             total_score += score
#             num_frames += 1

#         if score is not None:
#             score_text = f"Score: {score * 100:.2f}%"
#             cv2.putText(frame, score_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 4, cv2.LINE_AA)
#         else:
#             print(f"No score computed for frame {frame_count} at timestamp {current_frame_timestamp_ms} ms")

#         # out.write(frame)
#         # Process the detection result. In this case, visualize it.
#         annotated_image = draw_landmarks_on_image(numpy_frame_from_opencv, detection_result)
        
#         # Convert the annotated image back to BGR for OpenCV
#         output_frame = cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR)

#         out.write(output_frame)


#         frame_count += 1

#     if num_frames > 0:
#         general_score = total_score / num_frames
#     else:
#         general_score = 0

#     print(f"General Score for the entire video: {general_score * 100:.2f}%")

#     cap.release()
#     out.release()
#     cv2.destroyAllWindows()


TRIAL - prints landmarks on student output but does not calculate score

In [55]:
def score_video(input_video_path, output_video_path):
    # Load the saved landmarks and timestamps
    # landmarks_data = load_landmarks_and_timestamps(landmarks_file_path)
    landmarks_data = profLandT

    base_options = python.BaseOptions(model_asset_path='pose_landmarker.task')
    options = vision.PoseLandmarkerOptions(
        running_mode=mp.tasks.vision.RunningMode.VIDEO,
        base_options=base_options,
        output_segmentation_masks=True)
    detector = vision.PoseLandmarker.create_from_options(options)

    # Load the input Video
    cap = cv2.VideoCapture(input_video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

    frame_count = 0

    total_score = 0
    num_frames = 0

    # Helper function to find the closest landmarks
    def find_closest_landmarks(timestamp):
        closest_landmarks = None
        min_diff = float('inf')
        for ts, landmarks in landmarks_data:
            diff = abs(ts - timestamp)
            if diff < min_diff:
                min_diff = diff
                closest_landmarks = landmarks
        return closest_landmarks if min_diff <= 100 else None  # Adjust threshold as needed

    # Loop through each frame in the video
    while cap.isOpened():
        ret, frame = cap.read()

        if not ret:
            break

        numpy_frame_from_opencv = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=numpy_frame_from_opencv)

        current_frame_timestamp_ms = int((frame_count / fps) * 1000)
        detection_result = detector.detect_for_video(mp_image, current_frame_timestamp_ms)

        # Extract the detected landmarks
        current_landmarks = detection_result.pose_landmarks


        score = None
        closest_landmarks = find_closest_landmarks(current_frame_timestamp_ms)
        if closest_landmarks is not None:
            score = compute_score_comparison(current_landmarks, closest_landmarks)
            total_score += score  # Accumulate the score
            num_frames += 1

        if score is not None:
            score_text = f"Score: {score * 100:.2f}%"
            cv2.putText(frame, score_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 4, cv2.LINE_AA)
        else:
            print(f"No score computed for frame {frame_count} at timestamp {current_frame_timestamp_ms} ms")

        # Write the frame to the output video
        out.write(frame)

        frame_count += 1

    if num_frames > 0:
        general_score = total_score / num_frames
    else:
        general_score = 0

    print(f"General Score for the entire video: {general_score * 100:.2f}%")

    # Release video capture and writer objects
    cap.release()
    out.release()
    cv2.destroyAllWindows()


# III. LOAD PARAMETERS

In [27]:
process_video("prof/full-prof.mp4", "prof/output_prof.mp4")



In [28]:
# # Run the process_video function
# process_video("prof/prof-male.mp4", "prof/output_prof-male.mp4", "prof/prof-male_landmarks.txt")


In [29]:
# score_video("student/student-male02.mp4", "prof/prof-male_landmarks.txt", "student/male01-output-try.mp4")

In [56]:
score_video("student/student-female.mp4", "student/female01-output-try1.mp4")



No score computed for frame 0 at timestamp 0 ms
No score computed for frame 1 at timestamp 33 ms
No score computed for frame 2 at timestamp 66 ms
No score computed for frame 3 at timestamp 100 ms
No score computed for frame 4 at timestamp 133 ms
No score computed for frame 5 at timestamp 166 ms
No score computed for frame 6 at timestamp 200 ms
No score computed for frame 7 at timestamp 233 ms
No score computed for frame 8 at timestamp 266 ms
No score computed for frame 9 at timestamp 300 ms
No score computed for frame 10 at timestamp 333 ms
No score computed for frame 11 at timestamp 366 ms
No score computed for frame 12 at timestamp 400 ms
No score computed for frame 13 at timestamp 433 ms
No score computed for frame 14 at timestamp 466 ms
No score computed for frame 15 at timestamp 500 ms
No score computed for frame 16 at timestamp 533 ms
No score computed for frame 17 at timestamp 566 ms
No score computed for frame 18 at timestamp 600 ms
No score computed for frame 19 at timestamp 6

KeyboardInterrupt: 