In [1]:
import numpy as np
import os
import pandas as pd
import tempfile
import tqdm
import sys

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

import tensorflow as tf
from tensorflow import keras

# Load MoveNet Thunder model
from script.data import BodyPart
from script.ml import Movenet

In [2]:
# Define function to run pose estimation using MoveNet Thunder.  You'll apply MoveNet's cropping algorithm and run inference
# multiple times on the input image to improve pose estimation accuracy.
def detect(input_tensor, inference_count=3, per=None):
  # Detect pose using the full input image
  move_net = Movenet('./model/movenet_thunder')
  move_net.detect(input_tensor.numpy(), reset_crop_region=True)
  # Repeatedly using previous detection result to identify the region of interest and only cropping that region to improve
  # detection accuracy
  for _ in range(inference_count - 1):
    per = move_net.detect(input_tensor.numpy(), reset_crop_region=False)

  return per

In [3]:
import csv

class MoveNetMark(object):
  """Helper class to preprocess pose sample images for classification."""
  def __init__(self, images_in_folder, csvs_out_path):
    self._images_in_folder = images_in_folder
    self._csvs_out_path = csvs_out_path

  def process(self, detection_threshold=0.1):
    # Loop through the classes and preprocess its images
    print('Preprocessing', self._images_in_folder, file=sys.stderr)
    # Detect landmarks in each image and write it to a CSV file
    messages = []
    temp_folder = tempfile.mkdtemp()
    with open(f'{temp_folder}/temp.csv', 'w') as csv_out_file:
      # Get list of images
      image_names = sorted([n for n in os.listdir(self._images_in_folder) if not n.startswith('.')])
      csv_out_writer = csv.writer(csv_out_file, delimiter=',', quoting=csv.QUOTE_MINIMAL)
      # Detect pose landmarks from each image
      valid_image_count = 0
      for image_name in tqdm.tqdm(image_names):
        image_path = os.path.join(self._images_in_folder, image_name)
        try:
          image = tf.io.decode_jpeg(tf.io.read_file(image_path))
          _, _, channel = image.shape
        except:
          messages.append('Skipped ' + image_path + '. Invalid image.')
          continue
        # Skip images that isn't RGB because Movenet requires RGB images
        if channel != 3:
          messages.append('Skipped ' + image_path + '. Image isn\'t in RGB format.')
          continue
        person = detect(image)
        # Save landmarks if all landmarks were detected
        min_landmark_score = min([keypoint.score for keypoint in person.keypoints])
        if not min_landmark_score >= detection_threshold:
          messages.append('Skipped ' + image_path + '. No pose was confidentlly detected.')
          continue
        # Get landmarks and scale it to the same size as the input image
        pose_landmarks = np.array([[keypoint.coordinate.x, keypoint.coordinate.y, keypoint.score]
                                   for keypoint in person.keypoints], dtype=str)
        # Write the landmark coordinates to its per-class CSV file
        csv_out_writer.writerow([image_name] + pose_landmarks.flatten().tolist())
        valid_image_count += 1
      if not valid_image_count:
        raise RuntimeError('No valid images found for the "{}" class.'.format(self._images_in_folder))
    # Print the error message collected during preprocessing.
    print('\n'.join(messages))
    # For the first class, assign its data to the total dataframe
    total_df = pd.read_csv(f'{temp_folder}/temp.csv', header=None) # Combine all per-class CSVs into a single output file
    list_name = [[bodypart.name + '_x', bodypart.name + '_y', bodypart.name + '_score'] for bodypart in BodyPart]
    header_name = ['file_name']
    for columns_name in list_name:
      header_name += columns_name
    header_map = {total_df.columns[i]: header_name[i] for i in range(len(header_name))}
    total_df.rename(header_map, axis=1, inplace=True)
    total_df.to_csv(self._csvs_out_path, index=False)

In [4]:
class MoveNetClassifier(object):
    def __init__(self):
        self.model = None

    def build_model(self, model_path=None):
        self.model = keras.models.load_model(model_path)

    def predict(self, X):
        return self.model.predict(X)

In [5]:
def load_pose_landmarks(csv_path):
  # Load the CSV file
  dataframe = pd.read_csv(csv_path)
  # Convert the input features and labels into the correct format for training.
  return dataframe.pop('file_name'), dataframe.astype('float64')

In [7]:
images_folder = os.path.join("./data", 'predict')
csvs_path = tempfile.mkdtemp() + "/predict.csv"
with open(f'./pose_labels.txt', 'r') as csv_out_file:
  class_names = list(csv_out_file)
movenetmark = MoveNetMark(images_in_folder=images_folder, csvs_out_path=csvs_path)
movenetmark.process()
# Load the data
data_filename, data_value = load_pose_landmarks(csvs_path)

classifier = MoveNetClassifier()
classifier.build_model("./model/classifier")
# Classify pose in the TEST dataset using the trained model
y_pred = classifier.predict(data_value)
y_pred_label = [class_names[i].rstrip() for i in np.argmax(y_pred, axis=1)]

Preprocessing ./data\predict
100%|██████████| 24/24 [00:02<00:00,  8.82it/s]





In [26]:
for f, p in zip(data_filename, y_pred_label):
  print(f"{f} is {p}")

guy3_chair070.jpg is chair
guy3_chair071.jpg is chair
guy3_chair072.jpg is chair
guy3_chair073.jpg is chair
guy3_chair074.jpg is chair
guy3_chair075.jpg is chair
guy3_chair076.jpg is chair
guy3_chair077.jpg is chair
guy3_chair078.jpg is chair
guy3_chair079.jpg is chair
guy3_chair080.jpg is chair
guy3_chair081.jpg is chair
guy3_cobra025.jpg is cobra
guy3_cobra026.jpg is cobra
guy3_cobra027.jpg is cobra
guy3_cobra028.jpg is cobra
guy3_cobra029.jpg is cobra
guy3_cobra030.jpg is cobra
guy3_cobra031.jpg is cobra
guy3_cobra032.jpg is cobra
guy3_cobra033.jpg is cobra
guy3_cobra034.jpg is cobra
guy3_cobra035.jpg is cobra
guy3_cobra036.jpg is cobra
