In [1]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import pandas as pd 
import os
import csv
import enum
import tqdm
import cv2

In [2]:
#Load moveNet model
model = hub.load("https://tfhub.dev/google/movenet/multipose/lightning/1")
movenet = model.signatures['serving_default']

In [3]:
def get_person(persons,frame):

    scores = {}

    for i in range(len(persons)):

        y, x, c = frame.shape
        shaped = np.squeeze(np.multiply(persons[i], [y,x,1]))
        person_score = np.average(list(shaped[:,2]))
        scores[i]=person_score

    return max(scores, key=scores.get)

In [4]:
class BodyPart(enum.Enum):
    
  """Enum representing human body keypoints detected by pose estimation models."""
  NOSE = 0
  LEFT_EYE = 1
  RIGHT_EYE = 2
  LEFT_EAR = 3
  RIGHT_EAR = 4
  LEFT_SHOULDER = 5
  RIGHT_SHOULDER = 6
  LEFT_ELBOW = 7
  RIGHT_ELBOW = 8
  LEFT_WRIST = 9
  RIGHT_WRIST = 10
  LEFT_HIP = 11
  RIGHT_HIP = 12
  LEFT_KNEE = 13
  RIGHT_KNEE = 14
  LEFT_ANKLE = 15
  RIGHT_ANKLE = 16

In [5]:
class Preprocessor(object):
#     this class preprocess poses samples, it predicts keypoints on the images 
#     and save those keypoints in a csv file for the later use in the classification task 

        def __init__(self, images_in_folder,
                    csvs_out_path):
            self._images_in_folder = images_in_folder
            self._csvs_out_path = csvs_out_path
            self._csvs_out_folder_per_class = 'csv_per_pose'
            self._message = []
            
            if(self._csvs_out_folder_per_class not in os.listdir()):
                os.makedirs(self._csvs_out_folder_per_class)
            
#             get list of pose classes
            self._pose_class_names = sorted(
                [n for n in os.listdir(images_in_folder)]
            )

        def class_names(self):
            return self.pose_class_names

        def process(self):
            
#             Preprocess the images in the given folder
            for pose_class_name in self._pose_class_names:
#                 paths for pose class
                images_in_folder = os.path.join(self._images_in_folder, pose_class_name)
                csv_out_path = os.path.join(self._csvs_out_folder_per_class,
                                               pose_class_name + '.csv'
                                           )

                #               Detect landmarks in each images and write it to the csv files
                with open(csv_out_path, 'w') as csv_out_file:
                    csv_out_writer = csv.writer(csv_out_file,
                                                delimiter=',',
                                                quoting=csv.QUOTE_MINIMAL
                                               )

                    #             get the list of images
                    image_names = sorted(
                        [n for n in os.listdir(images_in_folder)]
                    )
                    for image_name in tqdm.tqdm(image_names):
                        image_path = os.path.join(images_in_folder, image_name)

                        image = cv2.imread(image_path)
                        y, x, c = image.shape
                        img = image.copy()
                        img = tf.image.resize_with_pad(tf.expand_dims(img, axis=0), 192,192)
                        input_img = tf.cast(img, dtype=tf.int32)

                        #get skeleton from movenet
                        results = movenet(input_img)
                        persons = results['output_0'].numpy()[:,:,:51].reshape((6,17,3))

                        i = get_person(persons,image)
                        pose_landmarks = np.squeeze(np.multiply(persons[i], [y,x,1]))

                        # Save landmarks if all landmarks above than the threshold
                        min_landmark_score = min(list(pose_landmarks[:,2]))
                        should_keep_image = min_landmark_score >= 0.1
                        if not should_keep_image:
                            continue

                        # writing the landmark coordinates to its csv files
                        coord = pose_landmarks.flatten().astype(np.str).tolist()
                        csv_out_writer.writerow([image_name] + coord)

            # combine all per-csv class CSVs into a sigle csv file
            all_landmarks_df = self.all_landmarks_as_dataframe()
            all_landmarks_df.to_csv(self._csvs_out_path, index=False)

            
        def all_landmarks_as_dataframe(self):
            # Merging all csv for each class into a single csv file
            total_df = None
            for class_index, class_name in enumerate(self._pose_class_names):
                csv_out_path = os.path.join(self._csvs_out_folder_per_class,
                                               class_name + '.csv'
                                           )
                per_class_df = pd.read_csv(csv_out_path, header=None)
                
                # Add the labels
                per_class_df['class_no'] = [class_index]*len(per_class_df)
                per_class_df['class_name'] = [class_name]*len(per_class_df)
                
                # Append the folder name to the filename first column
                per_class_df[per_class_df.columns[0]] = class_name + '/' +  per_class_df[per_class_df.columns[0]]
                
                if total_df is None:
                    total_df = per_class_df
                else:
                    total_df = pd.concat([total_df, per_class_df], axis=0)
            
            list_name = [[bodypart.name + '_x', bodypart.name + '_y', 
                  bodypart.name + '_score'] for bodypart in BodyPart]
            
            header_name = []
            for columns_name in list_name:
                header_name += columns_name
            header_name = ['filename'] + header_name
            header_map = { total_df.columns[i]: header_name[i]
                             for i in range(len(header_name))
                         }
            
            total_df.rename(header_map, axis=1, inplace=True)
            
            return total_df


In [6]:
# preprocess training data

images_in_folder = os.path.join('data\dataset\strokes', 'test')
csvs_out_path = '5_train_data.csv'
train_preprocessor = Preprocessor(
    images_in_folder,
    csvs_out_path
)

In [7]:
train_preprocessor.process()

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  coord = pose_landmarks.flatten().astype(np.str).tolist()
100%|██████████| 398/398 [00:19<00:00, 20.21it/s]
100%|██████████| 1664/1664 [01:08<00:00, 24.32it/s]
100%|██████████| 592/592 [00:24<00:00, 23.84it/s]
100%|██████████| 1560/1560 [01:02<00:00, 25.00it/s]
100%|██████████| 2219/2219 [01:28<00:00, 25.17it/s]
