In [1]:
import time

### Pipeline Timings

In [2]:
import itertools
import glob
import os
import typing as tp
import tqdm

import imageio.v3 as iio
import mediapipe as mp
import numpy as np
import torch

import model.classifiers as classifiers
import model.transforms as transforms

import utils.utils_camera_systems as utils_camera_systems
import utils.utils_kalman_filter as utils_kalman_filter
import utils.utils_mediapipe as utils_mediapipe
import utils.utils_unified_format as utils_unified_format
from config import DATA_CONFIG, TRAIN_CONFIG, KALMAN_FILTER_CONFIG

In [3]:
### MediaPipe Extractor
### ------------------------------
mp_holistic = mp.solutions.holistic

DEPTH_FOLDER = DATA_CONFIG.dataset.undistorted
GESTURES = TRAIN_CONFIG.gesture_set.gestures
CAMERA = 'center'

mp_solver_settings = dict(
    static_image_mode=False,
    model_complexity=2,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5,
)
mp_solver = mp_holistic.Holistic(**mp_solver_settings)

path_depth = len(DEPTH_FOLDER.split(os.path.sep))
# folder_paths = []
folder_paths = sorted(glob.glob(os.path.join(
    DEPTH_FOLDER,
    'G101',
    'select',
    'right',
    'trial1',
    f'cam_{CAMERA}',
)))
# for gesture in GESTURES:
#     folder_paths.extend(sorted(glob.glob(os.path.join(
#         DEPTH_FOLDER,
#         'G*',
#         gesture,
#         '*',
#         'trial*',
#         f'cam_{CAMERA}',
#     ))))


### Filtration
### ------------------------------
WINDOW_SIZE = 7

KALMAN_PARAMS = KALMAN_FILTER_CONFIG.init_params.as_dict()
KALMAN_HEURISTICS_FUNC = KALMAN_FILTER_CONFIG.heuristics.as_dict()

CAMERA_PARAMS_PATH = DATA_CONFIG.cameras[f'{CAMERA}_camera_params']

image_size, intrinsic = utils_camera_systems.get_camera_params(CAMERA_PARAMS_PATH)
camera_systems = utils_camera_systems.CameraSystems(image_size, intrinsic)
depth_extractor = utils_camera_systems.DepthExtractor(WINDOW_SIZE)

kfs = []
for i in range(utils_unified_format.TOTAL_POINTS_COUNT):
    point = i
    if point >= 18:
        point = 4
    params = KALMAN_FILTER_CONFIG.init_params.as_dict()
    params['sigma_u'] = params.pop('sigma_u_points')[point]
    params['init_Q'] = np.copy(params['init_Q']) * (params['sigma_u'] ** 2)
    kfs.append(utils_kalman_filter.KalmanFilter(**params, **KALMAN_HEURISTICS_FUNC))
kalman_filters = utils_kalman_filter.KalmanFilters(kfs)


### Classifier Inference
### ------------------------------
exp_id = 1
device = 'cpu'

checkpoint_path = os.path.join(
    TRAIN_CONFIG.train_params.output_data,
    f'experiment_{str(exp_id).zfill(3)}',
    'checkpoint.pth',
)

label_map = TRAIN_CONFIG.gesture_set.label_map
inv_label_map = TRAIN_CONFIG.gesture_set.inv_label_map

to_keep = TRAIN_CONFIG.transforms_params.to_keep
shape_limit = TRAIN_CONFIG.transforms_params.shape_limit

test_transforms = transforms.TestTransforms(
    to_keep=to_keep,
    shape_limit=shape_limit,
    device=device,
)
label_transforms = transforms.LabelsTransforms(
    shape_limit=shape_limit,
    device=device,
)

model = classifiers.LSTMClassifier(sum(to_keep), len(label_map))
model.to(device)
model.load_state_dict(torch.load(checkpoint_path, map_location=device))
model.eval()

LSTMClassifier(
  (positional_embeddings): PositionalEncoding()
  (linear1): Linear(in_features=30, out_features=256, bias=True)
  (lstm1): LSTM(256, 256, num_layers=2, batch_first=True)
  (linear2): Linear(in_features=256, out_features=3, bias=True)
)

In [4]:
timings_mp = []
timings_filter = []
timings_model = []

for trial_path in tqdm.tqdm(folder_paths):
    color_paths = sorted(glob.glob(os.path.join(trial_path, 'color', '*.jpg')))
    depth_paths = sorted(glob.glob(os.path.join(trial_path, 'depth', '*.png')))

    path_info = color_paths[0].split(os.path.sep)
    path_info[path_depth+1] = path_info[path_depth+1].replace('_', '-')

    ### MediaPipe Extractor
    ### ------------------------------
    mp_solver.reset()

    ### Filtration
    ### ------------------------------
    predicted = None


    for i, (image_path, depth_path) in enumerate(zip(color_paths, depth_paths)):
        color_image = iio.imread(image_path)
        depth_image = iio.imread(depth_path).T

        ### MediaPipe Extractor
        ### ------------------------------
        start_ts = time.time()  ###

        landmarks = mp_solver.process(color_image)

        joined_landmarks = itertools.chain(
            landmarks.pose_landmarks.landmark if landmarks.pose_landmarks is not None else utils_mediapipe.EMPTY_POSE,
            landmarks.left_hand_landmarks.landmark if landmarks.left_hand_landmarks is not None else utils_mediapipe.EMPTY_HAND,
            landmarks.right_hand_landmarks.landmark if landmarks.right_hand_landmarks is not None else utils_mediapipe.EMPTY_HAND,
        )
        frame_points = utils_mediapipe.landmarks_to_array(joined_landmarks)[:, :3]
        mp_points = frame_points.reshape(-1)

        end_ts = time.time()  ###
        timings_mp.append(end_ts - start_ts)  ###


        ### Filtration
        ### ------------------------------
        start_ts = time.time()  ###

        mp_points = utils_mediapipe.mediapipe_to_unified(
            mp_points.reshape(-1, utils_mediapipe.TOTAL_POINTS_COUNT, 3)
        ).reshape(-1, 3 * utils_unified_format.TOTAL_POINTS_COUNT)

        frame_points = mp_points.reshape(-1, 3)
        frame_points = camera_systems.zero_points_outside_screen(
            frame_points,
            is_normalized=True,
            inplace=True,
        )
        frame_points = camera_systems.normalized_to_screen(
            frame_points,
            inplace=True,
        )

        depths = depth_extractor.get_depth_in_window(
            depth_image,
            frame_points,
            predicted,
        )

        if predicted is None:
            kalman_filters.reset([
                np.array([[point], [0]])
                for point in depths
            ])
        depths_filtered = kalman_filters.update(
            depths,
            use_heuristic=True,
            projection=0,
        )

        predicted = kalman_filters.predict(projection=0)
        depths_filtered = tp.cast(tp.List[float], depths_filtered)
        predicted = tp.cast(tp.List[float], predicted)

        frame_points[:, 2] = depths_filtered
        frame_points = camera_systems.screen_to_world(
            frame_points,
            inplace=True,
        )

        end_ts = time.time()  ###
        timings_filter.append(end_ts - start_ts)  ###


        ### Classifier Inference
        ### ------------------------------
        start_ts = time.time()  ###

        points = test_transforms(frame_points.flatten()[None, ...])
        with torch.no_grad():
            prediction = model(points, use_hidden=True)

        prediction_probs, prediction_labels = prediction.max(dim=-1)

        end_ts = time.time()  ###
        timings_model.append(end_ts - start_ts)  ###

100%|██████████| 1/1 [00:09<00:00,  9.64s/it]


In [5]:
if timings_mp:
    print('MP: ', sum(timings_mp) / len(timings_mp))
if timings_filter:
    print('Filter: ', sum(timings_filter) / len(timings_filter))
if timings_model:
    print('Model: ', sum(timings_model) / len(timings_model))

MP:  0.05851172415678166
Filter:  0.0029274510943199977
Model:  0.0015342905501688807
