# MoveNet Pose Estimation with 17 Keypoints Extraction

## i42 Adaption

Custom implementation of MoveNet adapting from MoveNet codebase.

Refactoring helper functions, adding functions for dataframe conversion.

Restructured inference functions to suit requirements.

### Preping functions

#### Install required packages

In [1]:
%pip install opencv-python
%pip install tensorflow
%pip install tensorflow-hub
%pip install numpy
%pip install pandas


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;4

#### Import required packages

In [10]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import cv2
import pandas as pd

#### Helper functions

In [3]:
# Dictionary that maps from joint names to keypoint indices.
KEYPOINT_DICT = {
    'nose': 0,
    'left_eye': 1,
    'right_eye': 2,
    'left_ear': 3,
    'right_ear': 4,
    'left_shoulder': 5,
    'right_shoulder': 6,
    'left_elbow': 7,
    'right_elbow': 8,
    'left_wrist': 9,
    'right_wrist': 10,
    'left_hip': 11,
    'right_hip': 12,
    'left_knee': 13,
    'right_knee': 14,
    'left_ankle': 15,
    'right_ankle': 16
}

#### Loading MoveNet - Single Pose Thunder 4

In [4]:
# Load the MoveNet model from TensorFlow Hub
try:
    module = hub.load("https://www.kaggle.com/models/google/movenet/TensorFlow2/singlepose-thunder/4")
    input_size = 256
except Exception as error:
    print(f"Failed to load the model: {error}")

def movenet(input_image):
    """Runs detection on an input image.

    Args:
        input_image: A [1, height, width, 3] tensor represents the input image
        pixels. Note that the height/width should already be resized and match the
        expected input resolution of the model before passing into this function.

    Returns:
        A [1, 1, 17, 3] float numpy array representing the predicted keypoint
        coordinates and scores.
    """
    model = module.signatures['serving_default']

    # SavedModel format expects tensor type of int32.
    input_image = tf.cast(input_image, dtype=tf.int32)
    # Run model inference.
    outputs = model(input_image)
    # Output is a [1, 1, 17, 3] tensor.
    keypoints_with_scores = outputs['output_0'].numpy()
    
    return keypoints_with_scores

### Live Stream (Image Sequence)

#### Cropping Algorithm

In [5]:
# Confidence score to determine whether a keypoint prediction is reliable.
MIN_CROP_KEYPOINT_SCORE = 0.2

def init_crop_region(image_height, image_width):
  """Defines the default crop region.

  The function provides the initial crop region (pads the full image from both
  sides to make it a square image) when the algorithm cannot reliably determine
  the crop region from the previous frame.
  """
  if image_width > image_height:
    box_height = image_width / image_height
    box_width = 1.0
    y_min = (image_height / 2 - image_width / 2) / image_height
    x_min = 0.0
  else:
    box_height = 1.0
    box_width = image_height / image_width
    y_min = 0.0
    x_min = (image_width / 2 - image_height / 2) / image_width

  return {
    'y_min': y_min,
    'x_min': x_min,
    'y_max': y_min + box_height,
    'x_max': x_min + box_width,
    'height': box_height,
    'width': box_width
  }

def torso_visible(keypoints):
  """Checks whether there are enough torso keypoints.

  This function checks whether the model is confident at predicting one of the
  shoulders/hips which is required to determine a good crop region.
  """
  return ((keypoints[0, 0, KEYPOINT_DICT['left_hip'], 2] >
           MIN_CROP_KEYPOINT_SCORE or
          keypoints[0, 0, KEYPOINT_DICT['right_hip'], 2] >
           MIN_CROP_KEYPOINT_SCORE) and
          (keypoints[0, 0, KEYPOINT_DICT['left_shoulder'], 2] >
           MIN_CROP_KEYPOINT_SCORE or
          keypoints[0, 0, KEYPOINT_DICT['right_shoulder'], 2] >
           MIN_CROP_KEYPOINT_SCORE))

def determine_torso_and_body_range(
    keypoints, target_keypoints, center_y, center_x):
  """Calculates the maximum distance from each keypoints to the center location.

  The function returns the maximum distances from the two sets of keypoints:
  full 17 keypoints and 4 torso keypoints. The returned information will be
  used to determine the crop size. See determineCropRegion for more detail.
  """
  torso_joints = ['left_shoulder', 'right_shoulder', 'left_hip', 'right_hip']
  max_torso_yrange = 0.0
  max_torso_xrange = 0.0
  for joint in torso_joints:
    dist_y = abs(center_y - target_keypoints[joint][0])
    dist_x = abs(center_x - target_keypoints[joint][1])
    if dist_y > max_torso_yrange:
      max_torso_yrange = dist_y
    if dist_x > max_torso_xrange:
      max_torso_xrange = dist_x

  max_body_yrange = 0.0
  max_body_xrange = 0.0
  for joint in KEYPOINT_DICT.keys():
    if keypoints[0, 0, KEYPOINT_DICT[joint], 2] < MIN_CROP_KEYPOINT_SCORE:
      continue
    dist_y = abs(center_y - target_keypoints[joint][0]);
    dist_x = abs(center_x - target_keypoints[joint][1]);
    if dist_y > max_body_yrange:
      max_body_yrange = dist_y

    if dist_x > max_body_xrange:
      max_body_xrange = dist_x

  return [max_torso_yrange, max_torso_xrange, max_body_yrange, max_body_xrange]

def determine_crop_region(
      keypoints, image_height,
      image_width):
  """Determines the region to crop the image for the model to run inference on.

  The algorithm uses the detected joints from the previous frame to estimate
  the square region that encloses the full body of the target person and
  centers at the midpoint of two hip joints. The crop size is determined by
  the distances between each joints and the center point.
  When the model is not confident with the four torso joint predictions, the
  function returns a default crop which is the full image padded to square.
  """
  target_keypoints = {}
  for joint in KEYPOINT_DICT.keys():
    target_keypoints[joint] = [
      keypoints[0, 0, KEYPOINT_DICT[joint], 0] * image_height,
      keypoints[0, 0, KEYPOINT_DICT[joint], 1] * image_width
    ]

  if torso_visible(keypoints):
    center_y = (target_keypoints['left_hip'][0] +
                target_keypoints['right_hip'][0]) / 2;
    center_x = (target_keypoints['left_hip'][1] +
                target_keypoints['right_hip'][1]) / 2;

    (max_torso_yrange, max_torso_xrange,
      max_body_yrange, max_body_xrange) = determine_torso_and_body_range(
          keypoints, target_keypoints, center_y, center_x)

    crop_length_half = np.amax(
        [max_torso_xrange * 1.9, max_torso_yrange * 1.9,
          max_body_yrange * 1.2, max_body_xrange * 1.2])

    tmp = np.array(
        [center_x, image_width - center_x, center_y, image_height - center_y])
    crop_length_half = np.amin(
        [crop_length_half, np.amax(tmp)]);

    crop_corner = [center_y - crop_length_half, center_x - crop_length_half];

    if crop_length_half > max(image_width, image_height) / 2:
      return init_crop_region(image_height, image_width)
    else:
      crop_length = crop_length_half * 2;
      return {
        'y_min': crop_corner[0] / image_height,
        'x_min': crop_corner[1] / image_width,
        'y_max': (crop_corner[0] + crop_length) / image_height,
        'x_max': (crop_corner[1] + crop_length) / image_width,
        'height': (crop_corner[0] + crop_length) / image_height -
            crop_corner[0] / image_height,
        'width': (crop_corner[1] + crop_length) / image_width -
            crop_corner[1] / image_width
      }
  else:
    return init_crop_region(image_height, image_width)

def crop_and_resize(image, crop_region, crop_size):
  """Crops and resize the image to prepare for the model input."""
  boxes=[[crop_region['y_min'], crop_region['x_min'],
          crop_region['y_max'], crop_region['x_max']]]
  output_image = tf.image.crop_and_resize(
      image, box_indices=[0], boxes=boxes, crop_size=crop_size)
  return output_image

def run_inference(movenet, image, crop_region, crop_size):
  """Runs model inference on the cropped region.

  The function runs the model inference on the cropped region and updates the
  model output to the original image coordinate system.
  """
  image_height, image_width, _ = image.shape
  input_image = crop_and_resize(
    tf.expand_dims(image, axis=0), crop_region, crop_size=crop_size)
  # Run model inference.
  keypoints_with_scores = movenet(input_image)
  # Update the coordinates.
  for idx in range(17):
    keypoints_with_scores[0, 0, idx, 0] = (
        crop_region['y_min'] * image_height +
        crop_region['height'] * image_height *
        keypoints_with_scores[0, 0, idx, 0]) / image_height
    keypoints_with_scores[0, 0, idx, 1] = (
        crop_region['x_min'] * image_width +
        crop_region['width'] * image_width *
        keypoints_with_scores[0, 0, idx, 1]) / image_width
  return keypoints_with_scores

#### Convert the keypoints into Dataframe

In [6]:
def keypoints_to_dataframe(keypoints_with_scores):
  """
  Converts keypoints with scores to a pandas DataFrame, reorganizes the columns, and removes eye and ear columns.
  
  Args:
    keypoints_with_scores (numpy.ndarray): A numpy array of shape 
    (1, 1, 17, 3) containing keypoints and their scores. The first 
    dimension is the batch size, the second dimension is the number 
    of instances, the third dimension is the number of keypoints 
    (17 for MoveNet), and the fourth dimension contains the 
    coordinates (x, y) and the score.
    frame_idx (int): The index of the frame to be added as a column in the DataFrame.
  
  Returns:
    pandas.DataFrame: A DataFrame containing the keypoints' coordinates 
    with columns named after the keypoint names followed by '_X' 
    and '_Y' for the x and y coordinates respectively, reorganized 
    such that x-coordinates come before y-coordinates for each keypoint, 
    and with eye and ear columns removed.
  """
  keypoints = keypoints_with_scores[0, 0, :, :2]  # Extract keypoints
  keypoint_names = [
    'Nose', 'Left Eye', 'Right Eye', 'Left Ear', 'Right Ear', 'Left Shoulder', 'Right Shoulder', 
    'Left Elbow', 'Right Elbow', 'Left Wrist', 'Right Wrist', 'Left Hip', 'Right Hip', 
    'Left Knee', 'Right Knee', 'Left Ankle', 'Right Ankle'
  ]
  
  # Create column names
  columns = []
  for name in keypoint_names:
    columns.append(f'{name}_Y')
    columns.append(f'{name}_X')

  # Flatten the keypoints array and create a DataFrame
  keypoints_flat = keypoints.flatten()
  df = pd.DataFrame([keypoints_flat], columns=columns)
  
  # Reorganize columns so that x comes before y
  x_columns = [col for col in columns if '_X' in col]
  y_columns = [col for col in columns if '_Y' in col]
  reorganized_columns = []
  for x_col, y_col in zip(x_columns, y_columns):
    reorganized_columns.append(x_col)
    reorganized_columns.append(y_col)
  df = df[reorganized_columns]
  
  return df

#### Inference on Live Video Stream Frame from OpenCV

##### Load stream

In [7]:
def load_stream(stream_path):
    """
    Load a video stream from the specified path.

    Args:
        stream_path (str): The path to the video stream.

    Returns:
        cv2.VideoCapture: The video stream object.
    """
    stream = cv2.VideoCapture(stream_path)
    if not stream.isOpened():
        print("Error: Could not load stream.")
        return None

    while stream.isOpened():
        ret, frame = stream.read()
        if not ret:
            print("Reached the end of the stream or could not read the frame.")
            break
            
        yield frame

    stream.release()
    cv2.destroyAllWindows()

##### Stream Inference

In [8]:
def frame_inference(frame, movenet, input_size, init_crop_region, run_inference, determine_crop_region):
    
    # Get the frame dimensions
    image_height, image_width, _ = frame.shape

    # Convert the frame from BGR to RGB
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Initialize the crop region
    crop_region = init_crop_region(image_height, image_width)

    # Convert the frame from BGR to RGB
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Convert the frame to a tensor
    frame_tensor = tf.convert_to_tensor(frame_rgb, dtype=tf.uint8)
    # Add batch dimension
    frame_tensor = tf.expand_dims(frame_tensor, axis=0)

    # Run inference
    keypoints_with_scores = run_inference(
        movenet, frame_tensor[0], crop_region,
        crop_size=[input_size, input_size])
    
    # Update crop region
    crop_region = determine_crop_region(
        keypoints_with_scores, image_height, image_width)

    # Set coordinates with low confidence to None
    keypoints_with_scores[0, 0, keypoints_with_scores[0, 0, :, 2] < MIN_CROP_KEYPOINT_SCORE, :2] = -99

    # Convert keypoints to DataFrame
    df = keypoints_to_dataframe(keypoints_with_scores)
    
    return df

In [11]:
# Demo of the stream_inference function
# Change the video_path variable to the path of the video or 0 for device camera

video_path = 'ADL.mp4'
for frame in load_stream(video_path):
    df = frame_inference(frame, movenet, input_size, init_crop_region, run_inference, determine_crop_region)
    print(df)


   Nose_X  Nose_Y  Left Eye_X  Left Eye_Y  Right Eye_X  Right Eye_Y  \
0   -99.0   -99.0       -99.0       -99.0        -99.0        -99.0   

   Left Ear_X  Left Ear_Y  Right Ear_X  Right Ear_Y  ...  Right Hip_X  \
0       -99.0       -99.0        -99.0        -99.0  ...        -99.0   

   Right Hip_Y  Left Knee_X  Left Knee_Y  Right Knee_X  Right Knee_Y  \
0        -99.0        -99.0        -99.0         -99.0         -99.0   

   Left Ankle_X  Left Ankle_Y  Right Ankle_X  Right Ankle_Y  
0         -99.0         -99.0          -99.0          -99.0  

[1 rows x 34 columns]
   Nose_X  Nose_Y  Left Eye_X  Left Eye_Y  Right Eye_X  Right Eye_Y  \
0   -99.0   -99.0       -99.0       -99.0        -99.0        -99.0   

   Left Ear_X  Left Ear_Y  Right Ear_X  Right Ear_Y  ...  Right Hip_X  \
0       -99.0       -99.0        -99.0        -99.0  ...        -99.0   

   Right Hip_Y  Left Knee_X  Left Knee_Y  Right Knee_X  Right Knee_Y  \
0        -99.0        -99.0        -99.0         -99.0  