In [193]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Flatten, Dense, Dropout
from tensorflow.keras.layers import DepthwiseConv2D
from tensorflow.keras.layers import Add
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, ReLU, Reshape, GlobalAveragePooling2D, Activation,UpSampling2D, AveragePooling2D
from tensorflow.keras.preprocessing.image import ImageDataGenerator

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import itertools

from mediapipe.python.solutions import pose as mp_pose
import cv2
pose_tracker = mp_pose.Pose()

In [194]:
train_path = 'C:/Users/Prasanna P M/Human Project/ML_aug/Human Action Recognition/TRAIN_landmarks_removed/'
valid_path = 'C:/Users/Prasanna P M/Human Project/ML_aug/Human Action Recognition/VALID_landmarks_removed/'
test_path = 'C:/Users/Prasanna P M/Human Project/ML_aug/Human Action Recognition/TEST_landmarks_removed/'

train_batches = ImageDataGenerator(preprocessing_function=tf.keras.applications.mobilenet.preprocess_input).flow_from_directory(
    directory=train_path, target_size=(224,224), batch_size=32)

valid_batches = ImageDataGenerator(preprocessing_function=tf.keras.applications.mobilenet.preprocess_input).flow_from_directory(
    directory=valid_path, target_size=(224, 224), batch_size=32)

test_batches = ImageDataGenerator(preprocessing_function=tf.keras.applications.mobilenet.preprocess_input).flow_from_directory(
    directory=test_path, target_size=(224,224), batch_size=32, shuffle=False)

Found 8376 images belonging to 15 classes.
Found 2412 images belonging to 15 classes.
Found 1197 images belonging to 15 classes.


In [29]:
import os
import cv2
import numpy as np
from mediapipe.python.solutions import pose as mp_pose
from tensorflow.keras.preprocessing.image import ImageDataGenerator

def flow_from_directory_with_landmarks(
    generator,
    directory,
    target_size=(224, 224),
    color_mode="rgb",
    classes=None,
    class_mode="categorical",
    batch_size=32,
    shuffle=True,
    seed=None,
    save_to_dir=None,
    save_prefix="",
    save_format="png",
    follow_links=False,
    subset=None,
    interpolation="nearest",
):
    # Get the original flow_from_directory output
    iterator = generator.flow_from_directory(
        directory,
        target_size=target_size,
        color_mode=color_mode,
        classes=classes,
        class_mode=class_mode,
        batch_size=batch_size,
        shuffle=shuffle,
        seed=seed,
        save_to_dir=save_to_dir,
        save_prefix=save_prefix,
        save_format=save_format,
        follow_links=follow_links,
        subset=subset,
        interpolation=interpolation,
    )

    # Create empty lists to store image data and landmarks
    image_data = []
    landmarks = []
    labels = []

    # Load and preprocess each image with landmarks
    pose_tracker = mp_pose.Pose()
    for image_path, label in zip(iterator.filepaths, iterator.labels):
        # Read the image
        image = cv2.imread(image_path)

        # Extract landmarks using mediapipe.pose
        results = pose_tracker.process(image)
        pose_landmarks = results.pose_landmarks

        # Check if landmarks were detected
        if pose_landmarks is not None:
            # Extract landmark coordinates
            landmark_coordinates = [[lmk.x, lmk.y, lmk.z] for lmk in pose_landmarks.landmark]
            landmarks.append(landmark_coordinates)
            image_data.append(image)
            labels.append(label)

    # Convert the image data and landmarks to numpy arrays
    image_data = np.array(image_data)
    landmarks = np.array(landmarks)
    labels = np.array(labels)

    # Return the modified image data, landmarks, and labels
    return image_data, landmarks, labels


# Usage example
train_path = 'C:/Users/Prasanna P M/Human Project/ML_aug/Human Action Recognition/TRAIN_landmarks/'
train_batches = ImageDataGenerator(preprocessing_function=tf.keras.applications.mobilenet.preprocess_input)

# Generate batches of image data and landmarks from the directory
train_images, train_landmarks, train_labels = flow_from_directory_with_landmarks(
    train_batches,
    directory=train_path,
    target_size=(224, 224),
    batch_size=32,
    class_mode='categorical',
    shuffle=True
)

# Verify the shapes of the data
print(train_images.shape)       # Shape of train_images
print(train_landmarks.shape)    # Shape of train_landmarks
print(train_labels.shape)       # Shape of train_labels

# Convert training labels to categorical format
train_labels_encoded = tf.keras.utils.to_categorical(train_labels, num_classes=15)
print(train_labels_encoded.shape)


Found 10554 images belonging to 15 classes.
(8394, 224, 224, 3)
(8394, 33, 3)
(8394,)
(8394, 15)


In [31]:
val_path = 'C:/Users/Prasanna P M/Human Project/ML_aug/Human Action Recognition/VALID_landmarks/'
val_batches = ImageDataGenerator(preprocessing_function=tf.keras.applications.mobilenet.preprocess_input)

# Generate batches of image data and landmarks from the validation directory
val_images, val_landmarks, val_labels = flow_from_directory_with_landmarks(
    val_batches,
    directory=val_path,
    target_size=(224, 224),
    batch_size=32,
    class_mode='categorical',
    shuffle=True
)


valid_labels_encoded = tf.keras.utils.to_categorical(val_labels, num_classes=15)
# Verify the shapes of the validation data
print(val_images.shape)       # Shape of val_images
print(val_landmarks.shape)    # Shape of val_landmarks
print(valid_labels_encoded.shape)       # Shape of val_labels

Found 2990 images belonging to 15 classes.
(2371, 224, 224, 3)
(2371, 33, 3)
(2371, 15)


In [156]:
print(val_landmarks.shape)

(2371, 33, 3)


In [117]:
class FullBodyPoseEmbedder(object):
  """Converts 3D pose landmarks into 3D embedding."""

  def __init__(self, torso_size_multiplier=2.5):
    # Multiplier to apply to the torso to get minimal body size.
    self._torso_size_multiplier = torso_size_multiplier

    # Names of the landmarks as they appear in the prediction.
    self._landmark_names = [
        'nose',
        'left_eye_inner', 'left_eye', 'left_eye_outer',
        'right_eye_inner', 'right_eye', 'right_eye_outer',
        'left_ear', 'right_ear',
        'mouth_left', 'mouth_right',
        'left_shoulder', 'right_shoulder',
        'left_elbow', 'right_elbow',
        'left_wrist', 'right_wrist',
        'left_pinky_1', 'right_pinky_1',
        'left_index_1', 'right_index_1',
        'left_thumb_2', 'right_thumb_2',
        'left_hip', 'right_hip',
        'left_knee', 'right_knee',
        'left_ankle', 'right_ankle',
        'left_heel', 'right_heel',
        'left_foot_index', 'right_foot_index',
    ]

  def __call__(self, landmarks):
    """Normalizes pose landmarks and converts to embedding
    
    Args:
      landmarks - NumPy array with 3D landmarks of shape (N, 3).

    Result:
      Numpy array with pose embedding of shape (M, 3) where `M` is the number of
      pairwise distances defined in `_get_pose_distance_embedding`.
    """
    assert landmarks.shape[0] == len(self._landmark_names), 'Unexpected number of landmarks: {}'.format(landmarks.shape[0])

    # Get pose landmarks.
    landmarks = np.copy(landmarks)

    # Normalize landmarks.
    landmarks = self._normalize_pose_landmarks(landmarks)

    # Get embedding.
    embedding = self._get_pose_distance_embedding(landmarks)

    return embedding

  def _normalize_pose_landmarks(self, landmarks):
    """Normalizes landmarks translation and scale."""
    landmarks = np.copy(landmarks)

    # Normalize translation.
    pose_center = self._get_pose_center(landmarks)
    landmarks -= pose_center

    # Normalize scale.
    pose_size = self._get_pose_size(landmarks, self._torso_size_multiplier)
    landmarks /= pose_size
    # Multiplication by 100 is not required, but makes it eaasier to debug.
    landmarks *= 100

    return landmarks

  def _get_pose_center(self, landmarks):
    """Calculates pose center as point between hips."""
    left_hip = landmarks[self._landmark_names.index('left_hip')]
    right_hip = landmarks[self._landmark_names.index('right_hip')]
    center = (left_hip + right_hip) * 0.5
    return center

  def _get_pose_size(self, landmarks, torso_size_multiplier):
    """Calculates pose size.
    
    It is the maximum of two values:
      * Torso size multiplied by `torso_size_multiplier`
      * Maximum distance from pose center to any pose landmark
    """
    # This approach uses only 2D landmarks to compute pose size.
    landmarks = landmarks[:, :2]

    # Hips center.
    left_hip = landmarks[self._landmark_names.index('left_hip')]
    right_hip = landmarks[self._landmark_names.index('right_hip')]
    hips = (left_hip + right_hip) * 0.5

    # Shoulders center.
    left_shoulder = landmarks[self._landmark_names.index('left_shoulder')]
    right_shoulder = landmarks[self._landmark_names.index('right_shoulder')]
    shoulders = (left_shoulder + right_shoulder) * 0.5

    # Torso size as the minimum body size.
    torso_size = np.linalg.norm(shoulders - hips)

    # Max dist to pose center.
    pose_center = self._get_pose_center(landmarks)
    max_dist = np.max(np.linalg.norm(landmarks - pose_center, axis=1))

    return max(torso_size * torso_size_multiplier, max_dist)

  def _get_pose_distance_embedding(self, landmarks):
    """Converts pose landmarks into 3D embedding.

    We use several pairwise 3D distances to form pose embedding. All distances
    include X and Y components with sign. We differnt types of pairs to cover
    different pose classes. Feel free to remove some or add new.
    
    Args:
      landmarks - NumPy array with 3D landmarks of shape (N, 3).

    Result:
      Numpy array with pose embedding of shape (M, 3) where `M` is the number of
      pairwise distances.
    """
    embedding = np.array([
        # One joint.

        self._get_distance(
            self._get_average_by_names(landmarks, 'left_hip', 'right_hip'),
            self._get_average_by_names(landmarks, 'left_shoulder', 'right_shoulder')),

        self._get_distance_by_names(landmarks, 'left_shoulder', 'left_elbow'),
        self._get_distance_by_names(landmarks, 'right_shoulder', 'right_elbow'),

        self._get_distance_by_names(landmarks, 'left_elbow', 'left_wrist'),
        self._get_distance_by_names(landmarks, 'right_elbow', 'right_wrist'),

        self._get_distance_by_names(landmarks, 'left_hip', 'left_knee'),
        self._get_distance_by_names(landmarks, 'right_hip', 'right_knee'),

        self._get_distance_by_names(landmarks, 'left_knee', 'left_ankle'),
        self._get_distance_by_names(landmarks, 'right_knee', 'right_ankle'),

        # Two joints.

        self._get_distance_by_names(landmarks, 'left_shoulder', 'left_wrist'),
        self._get_distance_by_names(landmarks, 'right_shoulder', 'right_wrist'),

        self._get_distance_by_names(landmarks, 'left_hip', 'left_ankle'),
        self._get_distance_by_names(landmarks, 'right_hip', 'right_ankle'),

        # Four joints.

        self._get_distance_by_names(landmarks, 'left_hip', 'left_wrist'),
        self._get_distance_by_names(landmarks, 'right_hip', 'right_wrist'),

        # Five joints.

        self._get_distance_by_names(landmarks, 'left_shoulder', 'left_ankle'),
        self._get_distance_by_names(landmarks, 'right_shoulder', 'right_ankle'),
        
        self._get_distance_by_names(landmarks, 'left_hip', 'left_wrist'),
        self._get_distance_by_names(landmarks, 'right_hip', 'right_wrist'),

        # Cross body.

        self._get_distance_by_names(landmarks, 'left_elbow', 'right_elbow'),
        self._get_distance_by_names(landmarks, 'left_knee', 'right_knee'),

        self._get_distance_by_names(landmarks, 'left_wrist', 'right_wrist'),
        self._get_distance_by_names(landmarks, 'left_ankle', 'right_ankle'),

#         Body bent direction.

        self._get_distance(
            self._get_average_by_names(landmarks, 'left_wrist', 'left_ankle'),
            landmarks[self._landmark_names.index('left_hip')]),
        self._get_distance(
            self._get_average_by_names(landmarks, 'right_wrist', 'right_ankle'),
            landmarks[self._landmark_names.index('right_hip')]),
    ])

    return embedding

  def _get_average_by_names(self, landmarks, name_from, name_to):
    lmk_from = landmarks[self._landmark_names.index(name_from)]
    lmk_to = landmarks[self._landmark_names.index(name_to)]
    return (lmk_from + lmk_to) * 0.5

  def _get_distance_by_names(self, landmarks, name_from, name_to):
    lmk_from = landmarks[self._landmark_names.index(name_from)]
    lmk_to = landmarks[self._landmark_names.index(name_to)]
    return self._get_distance(lmk_from, lmk_to)

  def _get_distance(self, lmk_from, lmk_to):
    return lmk_to - lmk_from

In [118]:
class PoseSample(object):

  def __init__(self, name, landmarks, class_name, embedding):
    self.name = name
    self.landmarks = landmarks
    self.class_name = class_name
    
    self.embedding = embedding


class PoseSampleOutlier(object):

  def __init__(self, sample, detected_class, all_classes):
    self.sample = sample
    self.detected_class = detected_class
    self.all_classes = all_classes



In [119]:
import csv
import numpy as np
import os

class PoseClassifier(object):
  """Classifies pose landmarks."""

  def __init__(self,
               pose_samples_folder,
               pose_embedder,
               file_extension='csv',
               file_separator=',',
               n_landmarks=33,
               n_dimensions=3,
               top_n_by_max_distance=30,
               top_n_by_mean_distance=10,
               axes_weights=(1., 1., 0.2)):
    self._pose_embedder = pose_embedder
    self._n_landmarks = n_landmarks
    self._n_dimensions = n_dimensions
    self._top_n_by_max_distance = top_n_by_max_distance
    self._top_n_by_mean_distance = top_n_by_mean_distance
    self._axes_weights = axes_weights

    self._pose_samples = self._load_pose_samples(pose_samples_folder,
                                                 file_extension,
                                                 file_separator,
                                                 n_landmarks,
                                                 n_dimensions,
                                                 pose_embedder)

  def _load_pose_samples(self,
                         pose_samples_folder,
                         file_extension,
                         file_separator,
                         n_landmarks,
                         n_dimensions,
                         pose_embedder):
    """Loads pose samples from a given folder.
    
    Required folder structure:
      neutral_standing.csv
      pushups_down.csv
      pushups_up.csv
      squats_down.csv
      ...

    Required CSV structure:
      sample_00001,x1,y1,z1,x2,y2,z2,....
      sample_00002,x1,y1,z1,x2,y2,z2,....
      ...
    """
    # Each file in the folder represents one pose class.
    file_names = [name for name in os.listdir(pose_samples_folder) if name.endswith(file_extension)]

    pose_samples = []
    for file_name in file_names:
      # Use file name as pose class name.
      class_name = file_name[:-(len(file_extension) + 1)]
      
      # Parse CSV.
      with open(os.path.join(pose_samples_folder, file_name)) as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=file_separator)
        for row in csv_reader:
          assert len(row) == n_landmarks * n_dimensions + 1, 'Wrong number of values: {}'.format(len(row))
          landmarks = np.array(row[1:], np.float32).reshape([n_landmarks, n_dimensions])
          pose_samples.append(PoseSample(
              name=row[0],
              landmarks=landmarks,
              class_name=class_name,
              embedding=pose_embedder(landmarks),
          ))

    return pose_samples

  def find_pose_sample_outliers(self):
    """Classifies each sample against the entire database."""
    # Find outliers in target poses
    outliers = []
    for sample in self._pose_samples:
      # Find nearest poses for the target one.
      pose_landmarks = sample.landmarks.copy()
      pose_classification = self.__call__(pose_landmarks)
      class_names = [class_name for class_name, count in pose_classification.items() if count == max(pose_classification.values())]

      # Sample is an outlier if nearest poses have different class or more than
      # one pose class is detected as nearest.
      if sample.class_name not in class_names or len(class_names) != 1:
        outliers.append(PoseSampleOutlier(sample, class_names, pose_classification))

    return outliers

  def __call__(self, pose_landmarks):
    """Classifies given pose.

    Classification is done in two stages:
      * First we pick top-N samples by MAX distance. It allows to remove samples
        that are almost the same as given pose, but has few joints bent in the
        other direction.
      * Then we pick top-N samples by MEAN distance. After outliers are removed
        on a previous step, we can pick samples that are closes on average.
    
    Args:
      pose_landmarks: NumPy array with 3D landmarks of shape (N, 3).

    Returns:
      Dictionary with count of nearest pose samples from the database. Sample:
        {
          'pushups_down': 8,
          'pushups_up': 2,
        }
    """
    # Check that provided and target poses have the same shape.
    assert pose_landmarks.shape == (self._n_landmarks, self._n_dimensions), 'Unexpected shape: {}'.format(pose_landmarks.shape)

    # Get given pose embedding.
    pose_embedding = self._pose_embedder(pose_landmarks)
    flipped_pose_embedding = self._pose_embedder(pose_landmarks * np.array([-1, 1, 1]))

    # Filter by max distance.
    #
    # That helps to remove outliers - poses that are almost the same as the
    # given one, but has one joint bent into another direction and actually
    # represnt a different pose class.
    max_dist_heap = []
    for sample_idx, sample in enumerate(self._pose_samples):
      max_dist = min(
          np.max(np.abs(sample.embedding - pose_embedding) * self._axes_weights),
          np.max(np.abs(sample.embedding - flipped_pose_embedding) * self._axes_weights),
      )
      max_dist_heap.append([max_dist, sample_idx])

    max_dist_heap = sorted(max_dist_heap, key=lambda x: x[0])
    max_dist_heap = max_dist_heap[:self._top_n_by_max_distance]

    # Filter by mean distance.
    #
    # After removing outliers we can find the nearest pose by mean distance.
    mean_dist_heap = []
    for _, sample_idx in max_dist_heap:
      sample = self._pose_samples[sample_idx]
      mean_dist = min(
          np.mean(np.abs(sample.embedding - pose_embedding) * self._axes_weights),
          np.mean(np.abs(sample.embedding - flipped_pose_embedding) * self._axes_weights),
      )
      mean_dist_heap.append([mean_dist, sample_idx])

    mean_dist_heap = sorted(mean_dist_heap, key=lambda x: x[0])
    mean_dist_heap = mean_dist_heap[:self._top_n_by_mean_distance]

    # Collect results into map: (class_name -> n_samples)
    class_names = [self._pose_samples[sample_idx].class_name for _, sample_idx in mean_dist_heap]
    result = {class_name: class_names.count(class_name) for class_name in set(class_names)}

    return result

In [150]:

import cv2
from matplotlib import pyplot as plt
import numpy as np
import os
from PIL import Image
import sys
import tqdm

from mediapipe.python.solutions import drawing_utils as mp_drawing
from mediapipe.python.solutions import pose as mp_pose


class BootstrapHelper(object):
  """Helps to bootstrap images and filter pose samples for classification."""

  def __init__(self,
               images_in_folder,
               images_out_folder,
               csvs_out_folder):
    self._images_in_folder = images_in_folder
    self._images_out_folder = images_out_folder
    self._csvs_out_folder = csvs_out_folder

    # Get list of pose classes and print image statistics.
    self._pose_class_names = sorted([n for n in os.listdir(self._images_in_folder) if not n.startswith('.')])
    
  def bootstrap(self, per_pose_class_limit=None):
    """Bootstraps images in a given folder.
    
    Required image in folder (same use for image out folder):
      pushups_up/
        image_001.jpg
        image_002.jpg
        ...
      pushups_down/
        image_001.jpg
        image_002.jpg
        ...
      ...

    Produced CSVs out folder:
      pushups_up.csv
      pushups_down.csv

    Produced CSV structure with pose 3D landmarks:
      sample_00001,x1,y1,z1,x2,y2,z2,....
      sample_00002,x1,y1,z1,x2,y2,z2,....
    """
    # Create output folder for CVSs.
    if not os.path.exists(self._csvs_out_folder):
      os.makedirs(self._csvs_out_folder)

    for pose_class_name in self._pose_class_names:
      print('Bootstrapping ', pose_class_name, file=sys.stderr)

      # Paths for the pose class.
      images_in_folder = os.path.join(self._images_in_folder, pose_class_name)
      images_out_folder = os.path.join(self._images_out_folder, pose_class_name)
      csv_out_path = os.path.join(self._csvs_out_folder, pose_class_name + '.csv')
      if not os.path.exists(images_out_folder):
        os.makedirs(images_out_folder)

      with open(csv_out_path, 'w') as csv_out_file:
        csv_out_writer = csv.writer(csv_out_file, delimiter=',', quoting=csv.QUOTE_MINIMAL)
        # Get list of images.
        image_names = sorted([n for n in os.listdir(images_in_folder) if not n.startswith('.')])
        if per_pose_class_limit is not None:
          image_names = image_names[:per_pose_class_limit]

        # Bootstrap every image.
        for image_name in tqdm.tqdm(image_names):
          # Load image.
          input_frame = cv2.imread(os.path.join(images_in_folder, image_name))
          input_frame = cv2.cvtColor(input_frame, cv2.COLOR_BGR2RGB)

          # Initialize fresh pose tracker and run it.
          with mp_pose.Pose() as pose_tracker:
            result = pose_tracker.process(image=input_frame)
            pose_landmarks = result.pose_landmarks

          # Save image with pose prediction (if pose was detected).
          output_frame = input_frame.copy()
          if pose_landmarks is not None:
            mp_drawing.draw_landmarks(
                image=output_frame,
                landmark_list=pose_landmarks,
                connections=mp_pose.POSE_CONNECTIONS)
          output_frame = cv2.cvtColor(output_frame, cv2.COLOR_RGB2BGR)
          cv2.imwrite(os.path.join(images_out_folder, image_name), output_frame)
          
          # Save landmarks if pose was detected.
          if pose_landmarks is not None:
            # Get landmarks.
            frame_height, frame_width = output_frame.shape[0], output_frame.shape[1]
            pose_landmarks = np.array(
                [[lmk.x * frame_width, lmk.y * frame_height, lmk.z * frame_width]
                 for lmk in pose_landmarks.landmark],
                dtype=np.float32)
            assert pose_landmarks.shape == (33, 3), 'Unexpected landmarks shape: {}'.format(pose_landmarks.shape)
            csv_out_writer.writerow([image_name] + pose_landmarks.flatten().astype(np.str).tolist())

          # Draw XZ projection and concatenate with the image.
          projection_xz = self._draw_xz_projection(
              output_frame=output_frame, pose_landmarks=pose_landmarks)
          output_frame = np.concatenate((output_frame, projection_xz), axis=1)

  def _draw_xz_projection(self, output_frame, pose_landmarks, r=0.5, color='red'):
    frame_height, frame_width = output_frame.shape[0], output_frame.shape[1]
    img = Image.new('RGB', (frame_width, frame_height), color='white')

    if pose_landmarks is None:
      return np.asarray(img)

    # Scale radius according to the image width.
    r *= frame_width * 0.01

    draw = ImageDraw.Draw(img)
    for idx_1, idx_2 in mp_pose.POSE_CONNECTIONS:
      # Flip Z and move hips center to the center of the image.
      x1, y1, z1 = pose_landmarks[idx_1] * [1, 1, -1] + [0, 0, frame_height * 0.5]
      x2, y2, z2 = pose_landmarks[idx_2] * [1, 1, -1] + [0, 0, frame_height * 0.5]

      draw.ellipse([x1 - r, z1 - r, x1 + r, z1 + r], fill=color)
      draw.ellipse([x2 - r, z2 - r, x2 + r, z2 + r], fill=color)
      draw.line([x1, z1, x2, z2], width=int(r), fill=color)

    return np.asarray(img)

  def align_images_and_csvs(self, print_removed_items=False):
    """Makes sure that image folders and CSVs have the same sample.
    
    Leaves only intersetion of samples in both image folders and CSVs.
    """
    for pose_class_name in self._pose_class_names:
      # Paths for the pose class.
      images_out_folder = os.path.join(self._images_out_folder, pose_class_name)
      csv_out_path = os.path.join(self._csvs_out_folder, pose_class_name + '.csv')

      # Read CSV into memory.
      rows = []
      with open(csv_out_path) as csv_out_file:
        csv_out_reader = csv.reader(csv_out_file, delimiter=',')
        for row in csv_out_reader:
          rows.append(row)

      # Image names left in CSV.
      image_names_in_csv = []

      # Re-write the CSV removing lines without corresponding images.
      with open(csv_out_path, 'w') as csv_out_file:
        csv_out_writer = csv.writer(csv_out_file, delimiter=',', quoting=csv.QUOTE_MINIMAL)
        for row in rows:
          image_name = row[0]
          image_path = os.path.join(images_out_folder, image_name)
          if os.path.exists(image_path):
            image_names_in_csv.append(image_name)
            csv_out_writer.writerow(row)
          elif print_removed_items:
            print('Removed image from CSV: ', image_path)

      # Remove images without corresponding line in CSV.
      for image_name in os.listdir(images_out_folder):
        if image_name not in image_names_in_csv:
          image_path = os.path.join(images_out_folder, image_name)
          os.remove(image_path)
          if print_removed_items:
            print('Removed image from folder: ', image_path)

  def analyze_outliers(self, outliers):
    """Classifies each sample agains all other to find outliers.
    
    If sample is classified differrrently than the original class - it sould
    either be deleted or more similar samples should be aadded.
    """
    for outlier in outliers:
      image_path = os.path.join(self._images_out_folder, outlier.sample.class_name, outlier.sample.name)

      print('Outlier')
      print('  sample path =    ', image_path)
      print('  sample class =   ', outlier.sample.class_name)
      print('  detected class = ', outlier.detected_class)
      print('  all classes =    ', outlier.all_classes)

      img = cv2.imread(image_path)
      img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
      show_image(img, figsize=(20, 20))

  def remove_outliers(self, outliers):
    """Removes outliers from the image folders."""
    for outlier in outliers:
      image_path = os.path.join(self._images_out_folder, outlier.sample.class_name, outlier.sample.name)
      os.remove(image_path)

  def print_images_in_statistics(self):
    """Prints statistics from the input image folder."""
    self._print_images_statistics(self._images_in_folder, self._pose_class_names)

  def print_images_out_statistics(self):
    """Prints statistics from the output image folder."""
    self._print_images_statistics(self._images_out_folder, self._pose_class_names)

  def _print_images_statistics(self, images_folder, pose_class_names):
    print('Number of images per pose class:')
    for pose_class_name in pose_class_names:
      n_images = len([
          n for n in os.listdir(os.path.join(images_folder, pose_class_name))
          if not n.startswith('.')])
      print('  {}: {}'.format(pose_class_name, n_images))

In [126]:
bootstrap_images_in_folder = 'C:/Users/Prasanna P M/Human Project/ML_aug/Human Action Recognition/TRAIN_landmarks_removed/'

# Output folders for bootstrapped images and CSVs.
bootstrap_images_out_folder = 'C:/Users/Prasanna P M/Human Project/Bootstrap Train Images Out/'
bootstrap_csvs_out_folder = 'C:/Users/Prasanna P M/Human Project/Bootstrap Train Csv/'

In [127]:
bootstrap_helper = BootstrapHelper(
    images_in_folder=bootstrap_images_in_folder,
    images_out_folder=bootstrap_images_out_folder,
    csvs_out_folder=bootstrap_csvs_out_folder,
)

In [128]:
bootstrap_helper.print_images_in_statistics()

Number of images per pose class:
  calling: 801
  clapping: 457
  cycling: 246
  dancing: 433
  drinking: 758
  eating: 449
  fighting: 226
  hugging: 636
  laughing: 945
  listening_to_music: 817
  running: 390
  sitting: 440
  sleeping: 544
  texting: 642
  using_laptop: 592


In [129]:
# Set limit to some small number for debug.
bootstrap_helper.bootstrap(per_pose_class_limit=None)

Bootstrapping  calling
100%|██████████| 801/801 [06:17<00:00,  2.12it/s]
Bootstrapping  clapping
100%|██████████| 457/457 [03:22<00:00,  2.26it/s]
Bootstrapping  cycling
100%|██████████| 246/246 [01:50<00:00,  2.24it/s]
Bootstrapping  dancing
100%|██████████| 433/433 [03:13<00:00,  2.24it/s]
Bootstrapping  drinking
100%|██████████| 758/758 [05:38<00:00,  2.24it/s]
Bootstrapping  eating
100%|██████████| 449/449 [03:17<00:00,  2.27it/s]
Bootstrapping  fighting
100%|██████████| 226/226 [01:42<00:00,  2.21it/s]
Bootstrapping  hugging
100%|██████████| 636/636 [04:27<00:00,  2.38it/s]
Bootstrapping  laughing
100%|██████████| 945/945 [06:49<00:00,  2.31it/s]
Bootstrapping  listening_to_music
100%|██████████| 817/817 [06:01<00:00,  2.26it/s]
Bootstrapping  running
100%|██████████| 390/390 [02:57<00:00,  2.20it/s]
Bootstrapping  sitting
100%|██████████| 440/440 [03:18<00:00,  2.22it/s]
Bootstrapping  sleeping
100%|██████████| 544/544 [03:56<00:00,  2.30it/s]
Bootstrapping  texting
100%|████████

In [147]:
# Check how many images were bootstrapped.
bootstrap_helper.print_images_out_statistics()

Number of images per pose class:
  calling: 801
  clapping: 457
  cycling: 246
  dancing: 433
  drinking: 758
  eating: 449
  fighting: 226
  hugging: 636
  laughing: 945
  listening_to_music: 817
  running: 390
  sitting: 440
  sleeping: 544
  texting: 642
  using_laptop: 592


In [148]:
pose_embedder = FullBodyPoseEmbedder()

# Classifies give pose against database of poses.
pose_classifier = PoseClassifier(
    pose_samples_folder=bootstrap_csvs_out_folder,
    pose_embedder=pose_embedder,
    top_n_by_max_distance=30,
    top_n_by_mean_distance=10)

outliers = pose_classifier.find_pose_sample_outliers()
print('Number of outliers: ', len(outliers))

Number of outliers:  3702


In [152]:
print(pose_classifier)

<__main__.PoseClassifier object at 0x000001C44B47DFD0>


In [153]:
print(dir(pose_classifier))  # Prints the list of attributes and methods of pose_classifier


['__call__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_axes_weights', '_load_pose_samples', '_n_dimensions', '_n_landmarks', '_pose_embedder', '_pose_samples', '_top_n_by_max_distance', '_top_n_by_mean_distance', 'find_pose_sample_outliers']


In [155]:
help(PoseClassifier)  # Prints the documentation for the PoseClassifier class



Help on class PoseClassifier in module __main__:

class PoseClassifier(builtins.object)
 |  PoseClassifier(pose_samples_folder, pose_embedder, file_extension='csv', file_separator=',', n_landmarks=33, n_dimensions=3, top_n_by_max_distance=30, top_n_by_mean_distance=10, axes_weights=(1.0, 1.0, 0.2))
 |  
 |  Classifies pose landmarks.
 |  
 |  Methods defined here:
 |  
 |  __call__(self, pose_landmarks)
 |      Classifies given pose.
 |      
 |      Classification is done in two stages:
 |        * First we pick top-N samples by MAX distance. It allows to remove samples
 |          that are almost the same as given pose, but has few joints bent in the
 |          other direction.
 |        * Then we pick top-N samples by MEAN distance. After outliers are removed
 |          on a previous step, we can pick samples that are closes on average.
 |      
 |      Args:
 |        pose_landmarks: NumPy array with 3D landmarks of shape (N, 3).
 |      
 |      Returns:
 |        Dictionary wit

In [158]:
# Iterate over each pose landmark
for pose_landmarks in val_landmarks:
    # Classify the pose landmarks
    classification_result = pose_classifier(pose_landmarks)

    # Print the classification result
    print(classification_result)


{'sleeping': 1, 'listening_to_music': 3, 'drinking': 1, 'hugging': 1, 'eating': 2, 'laughing': 2}
{'eating': 5, 'sleeping': 1, 'using_laptop': 2, 'sitting': 2}
{'texting': 1, 'listening_to_music': 1, 'drinking': 1, 'eating': 1, 'calling': 6}
{'sleeping': 1, 'texting': 1, 'listening_to_music': 1, 'sitting': 1, 'using_laptop': 1, 'eating': 3, 'calling': 1, 'laughing': 1}
{'sleeping': 2, 'fighting': 1, 'eating': 2, 'calling': 1, 'laughing': 4}
{'sleeping': 1, 'listening_to_music': 3, 'running': 1, 'hugging': 1, 'calling': 3, 'laughing': 1}
{'drinking': 1, 'running': 4, 'sitting': 1, 'eating': 1, 'calling': 3}
{'texting': 2, 'listening_to_music': 2, 'running': 2, 'sitting': 1, 'hugging': 1, 'cycling': 1, 'laughing': 1}
{'sleeping': 1, 'texting': 2, 'listening_to_music': 1, 'laughing': 6}
{'sleeping': 1, 'texting': 1, 'clapping': 2, 'hugging': 2, 'using_laptop': 2, 'laughing': 2}
{'sleeping': 1, 'texting': 1, 'running': 2, 'hugging': 1, 'cycling': 1, 'using_laptop': 2, 'eating': 1, 'laughin

KeyboardInterrupt: 

In [159]:
# Iterate over the validation landmarks and classify each pose
predictions = []
for landmarks in val_landmarks:
    classification_result = pose_classifier(landmarks)
    predicted_class = max(classification_result, key=classification_result.get)
    predictions.append(predicted_class)



Accuracy: 0.0


  accuracy = np.mean(predictions == ground_truth)


In [189]:
class_mapping = {
    'calling': 0,
    'clapping': 1,
    'cycling': 2,
    'dancing': 3,
    'drinking': 4,
    'eating': 5,
    'fighting': 6,
    'hugging': 7,
    'laughing': 8,
    'listening_to_music': 9,
    'running': 10,
    'sitting': 11,
    'sleeping': 12,
    'texting': 13,
    'using_laptop': 14
}

# Convert the predictions from class strings to numeric labels
numeric_predictions = [class_mapping[prediction] for prediction in predictions]


numeric_predictions = np.array(numeric_predictions)

In [190]:
print(val_labels)

[ 0  0  0 ... 14 14 14]


In [191]:
print(numeric_predictions)

[ 9  5  0 ... 14 14  8]


In [192]:
correct_predictions = (numeric_predictions == val_labels)

# Calculate accuracy
accuracy = np.mean(correct_predictions)

# Print accuracy
print("Accuracy:", accuracy)


Accuracy: 0.19105862505272037
