In [115]:
import time
import pprint

import os
import cv2
import mediapipe as mp
import numpy as np
import tensorflow as tf
from mediapipe.framework.formats import landmark_pb2
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

import matplotlib.pyplot as plt
import concurrent.futures
from moviepy.editor import VideoFileClip

In [116]:
drawer = mp.solutions.drawing_utils
VisionRunningMode = mp.tasks.vision.RunningMode

In [117]:
# base options for hand and pose detection models
hand_base_options = python.BaseOptions(model_asset_path="../tasks/hand_landmarker.task")
pose_base_options = python.BaseOptions(model_asset_path="../tasks/pose_landmarker.task")

In [118]:
# options for hand detection
hand_options = vision.HandLandmarkerOptions(
    base_options=hand_base_options,
    num_hands=2,
    min_hand_detection_confidence=0.6,
    min_hand_presence_confidence=0.6,
    min_tracking_confidence=0.6,
    running_mode=VisionRunningMode.IMAGE,
)

# options for pose detection
pose_options = vision.PoseLandmarkerOptions(
    base_options=pose_base_options,
    output_segmentation_masks=True,
    min_pose_detection_confidence=0.6,
    min_pose_presence_confidence=0.6,
    min_tracking_confidence=0.6,
    running_mode=VisionRunningMode.IMAGE,
)

# create detectors
hand_detector = vision.HandLandmarker.create_from_options(hand_options)
pose_detector = vision.PoseLandmarker.create_from_options(pose_options)

In [119]:
empty_hand_landmark = np.zeros((2, 21, 3))  # right hand and left hand
empty_pose_landmark = np.zeros(33 * 3)

def to_landmark_data(
    hand_results: vision.HandLandmarkerResult, pose_results: vision.PoseLandmarkerResult
):
    """
    Extract keypoints from pose and hand results for dataset creation.
    """
    pose_landmark = empty_pose_landmark
    hand_landmark = empty_hand_landmark

    if pose_results.pose_world_landmarks:
        pose_landmark = np.array(
            [[lm.x, lm.y, lm.z] for lm in pose_results.pose_world_landmarks[0]]
        ).flatten()

    # if no hand results are available, return the empty hand keypoints
    # and concatenate it with face and pose keypoints
    if not hand_results:
        return np.concatenate([pose_landmark, hand_landmark.flatten()])

    # iterate over the detected hand landmarks
    for index, hlm in enumerate(hand_results.hand_world_landmarks):
        # determine the hand index (0 for right hand, 1 for left hand) using handedness information
        handedness = hand_results.handedness[index][0].index

        # extract the keypoints for the current hand and assign them to the appropriate index
        hand_landmark[handedness] = np.array([[lm.x, lm.y, lm.z] for lm in hlm])

    return np.concatenate([pose_landmark, hand_landmark.flatten()])

LandmarkList = landmark_pb2.NormalizedLandmarkList  # aliases for landmark types
NormalizedLandmark = landmark_pb2.NormalizedLandmark  # aliases for landmark types


def to_landmark_list(landmarks):
    """
    Create a LandmarkList from a list of landmarks or fill with empty values if no landmarks are provided.
    """
    return LandmarkList(
        landmark=([NormalizedLandmark(x=lm.x, y=lm.y, z=lm.z) for lm in landmarks])
    )


empty_pose_landmarks = to_landmark_list(
    [NormalizedLandmark(x=0.0, y=0.0, z=0.0) for _ in range(33 * 4)]
)

empty_hand_landmarks = to_landmark_list(
    [NormalizedLandmark(x=0.0, y=0.0, z=0.0) for _ in range(21 * 3)]
)


def to_drawing_landmark(hand_results, pose_results):
    """
    Convert pose and hand landmarks to LandmarkList for drawing.
    """
    pose_landmarks = (
        to_landmark_list(pose_results.pose_landmarks[0])
        if pose_results.pose_landmarks
        else empty_pose_landmarks
    )

    hand_landmarks = [empty_hand_landmarks, empty_hand_landmarks]

    if not hand_results:
        return pose_landmarks, None

    # iterate over the detected hand landmarks
    for index, hand_landmark in enumerate(hand_results.hand_landmarks):
        # determine the hand index (0 for right hand, 1 for left hand) using handedness information
        handedness = hand_results.handedness[index][0].index

        # extract the keypoints for the current hand and assign them to the appropriate index
        hand_landmarks[handedness] = to_landmark_list(hand_landmark)

    return pose_landmarks, hand_landmarks


def draw_landmark(image, hand_landmarks, pose_landmarks):
    """
    Draw detected landmarks on the image.
    """
    drawer.draw_landmarks(
        image,
        pose_landmarks,
        mp.solutions.pose.POSE_CONNECTIONS,
        drawer.DrawingSpec(color=(80, 22, 10), thickness=2, circle_radius=3),
        drawer.DrawingSpec(color=(80, 44, 121), thickness=2, circle_radius=2),
    )

    if not hand_landmarks:
        return

    for hand_landmarks in hand_landmarks:
        drawer.draw_landmarks(
            image,
            hand_landmarks,
            mp.solutions.hands.HAND_CONNECTIONS,
            drawer.DrawingSpec(color=(121, 22, 76), thickness=2, circle_radius=2),
            drawer.DrawingSpec(color=(121, 44, 250), thickness=2, circle_radius=2),
        )

In [120]:
# action lables
ACTIONS = [
    "_", "hello", "thanks", "i-love-you", "I", "Yes", "No", "Help", "Please",
    "Want", "Eat", "More", "Bathroom", "Learn", "Sign",
]

# limit to x actions for preprocessing
# NOTE: change this number into the amount of the dataset labels (if changed)
ACTIONS = np.array(ACTIONS[:8])

In [121]:
def load_model(model_version=None):
    model_dir = "../../storage/models/keras"
    prefix = "singa_slr_v_"

    if model_version:
        version = f"{prefix}{model_version}.keras"
        ks_file = os.path.join(model_dir, version)

        model = tf.keras.models.load_model(ks_file)

        return version, model

    model_files = os.listdir(model_dir)

    # filter model files by filename prefix
    versions = [file for file in model_files if file.startswith(prefix)]

    # extract version numbers from filenames
    versions = [file.split("_")[-1] for file in versions]

    # convert version numbers to tuples of integers for comparison
    versions_int = [tuple(map(int, v.split(".")[0])) for v in versions]

    # find the index of the latest version
    latest_index = versions_int.index(max(versions_int))

    # load the latest model
    latest_model_path = model_files[latest_index]

    model = tf.keras.models.load_model(os.path.join(model_dir, latest_model_path))

    return latest_model_path, model


v, model = load_model()

In [122]:
f"using model {v}"

'using model singa_slr_v_002.keras'

In [123]:
class TFLiteModel:
    def __init__(self, prefix="singa_slr_v_"):
        self.model_dir = "../../storage/models/tflite"
        self.prefix = prefix
        self.interpreter = None
        self.input_details = None
        self.output_details = None

        self.input_shape = None
        self.output_shape = None

    def load_model(self, use_latest=True, version=""):
        if not use_latest and not version:
            model_path = os.path.join(self.model_dir, f"{self.prefix}{version}")
        else:
            model_files = os.listdir(self.model_dir)

            # filter model files by filename prefix
            versions = [file for file in model_files if file.startswith(self.prefix)]

            # extract version numbers from filenames
            versions = [file.split("_")[-1] for file in versions]

            # convert version numbers to tuples of integers for comparison
            versions_int = [tuple(map(int, v.split(".")[0])) for v in versions]

            # find the index of the latest version
            latest_index = versions_int.index(max(versions_int))

            # load the latest model
            latest_model_path = model_files[latest_index]
            model_path = os.path.join(self.model_dir, latest_model_path)

        self.interpreter = tf.lite.Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()

        self.input_shape = self.input_details[0]["index"]
        self.output_shape = self.output_details[0]["index"]

        self.print_model_details()

    def print_model_details(self):
        print("Input details:")
        pprint.pprint(self.input_details)
        print()
        print("Output details:")
        pprint.pprint(self.output_details)
        print("=" * 50)

        input_shape = self.input_details[0]["shape"]
        print("Expected input shape:", input_shape)

    def predict(self, input_data):
        self.interpreter.set_tensor(self.input_shape, input_data)
        self.interpreter.invoke()

        result = self.interpreter.get_tensor(self.output_shape)

        return result


tflmodel = TFLiteModel()
tflmodel.load_model()

Input details:
[{'dtype': <class 'numpy.float32'>,
  'index': 0,
  'name': 'serving_default_input_1:0',
  'quantization': (0.0, 0),
  'quantization_parameters': {'quantized_dimension': 0,
                              'scales': array([], dtype=float32),
                              'zero_points': array([], dtype=int32)},
  'shape': array([  1,  60, 225]),
  'shape_signature': array([ -1,  60, 225]),
  'sparsity_parameters': {}}]

Output details:
[{'dtype': <class 'numpy.float32'>,
  'index': 78,
  'name': 'StatefulPartitionedCall_1:0',
  'quantization': (0.0, 0),
  'quantization_parameters': {'quantized_dimension': 0,
                              'scales': array([], dtype=float32),
                              'zero_points': array([], dtype=int32)},
  'shape': array([1, 8]),
  'shape_signature': array([-1,  8]),
  'sparsity_parameters': {}}]
Expected input shape: [  1  60 225]


In [124]:
colors = [
    (245, 117, 16),
    (117, 245, 16),
    (16, 117, 245),
    (117, 117, 16),
    (16, 245, 117),
    (245, 117, 245),
    (77, 213, 43),
    (11, 156, 245),
]


def confidence_bar(res, actions, input_frame, colors):
    output_frame = input_frame.copy()

    for num, prob in enumerate(res):
        cv2.putText(
            output_frame,
            f"{actions[num]}\t{int(prob * 100)}",
            (0, 85 + num * 40),
            cv2.FONT_HERSHEY_SIMPLEX,
            1,
            (255, 255, 255),
            2,
            cv2.LINE_AA,
        )

    return output_frame

In [125]:
import time

In [126]:
cap = cv2.VideoCapture(0)
# cap = cv2.VideoCapture("./videos/test_7.mp4")

# set capture properties
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 480)  # set width to 600 pixels
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 600)  # set height to 600 pixels
cap.set(cv2.CAP_PROP_FPS, 60)  # set frame rate to 60 FPS

isQuit = False
sequences = []
sequence = []

sentence = []
predictions = []

sequence_length = 60
threshold = 0.5

while cap.isOpened():
    start = time.time()
    success, image = cap.read()

    if not success:
        print("Ignoring empty camera frame.")
        break

    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # convert cv image to mediapipe image format before being
    # passed to face, pose and hand detector
    annotated_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image_rgb)

    hand_results = hand_detector.detect(image=annotated_image)

    pose_results = pose_detector.detect(image=annotated_image)

    keypoints = to_landmark_data(hand_results, pose_results)
    sequences.append(keypoints)
    sequence = np.array(sequences[-60:]).astype(np.float32)

    # pose_landmark, hand_landmark = to_drawing_landmark(hand_results, pose_results)

    # draw_landmark(
    #     image_rgb,
    #     hand_landmarks=hand_landmark,
    #     pose_landmarks=pose_landmark,
    # )

    if len(sequence) == sequence_length:

        # predict the action label based on the sequence of keypoints
        result = tflmodel.predict(np.expand_dims(sequence, axis=0))[0]

        # action class with the highest confidence score
        predictions.append(np.argmax(result))

        # NOTE: If the current prediction matches the most common prediction over the last 10 frames,
        #       it suggests that the current action is likely intentional and
        #       consistent with recent actions, rather than a momentary anomaly.
        if np.unique(predictions[-40:])[0] == np.argmax(result):

            # check if the confidence score of the current prediction index is above the threshold.
            if result[np.argmax(result)] > threshold:

                # checks if there are any elements in the sentence list.
                # If it's not empty, it means there are already recognized actions in the sentence.
                if len(sentence) > 0:
                    # compares the current predicted action
                    if ACTIONS[np.argmax(result)] != sentence[-1]:
                        sentence.append(ACTIONS[np.argmax(result)])
                else:
                    # no recognized actions yet
                    sentence.append(ACTIONS[np.argmax(result)])

        # limit the length of the recognized action sentence to 5 elements by
        # keeping only the last two elements so it does not exceed the text box
        if len(sentence) > 5:
            sentence = sentence[-5:]

        # overlay the predicted action on the image
        image_rgb = confidence_bar(result, ACTIONS, image_rgb, colors)

        # sequences = sequences[-10:]

    cv2.rectangle(image_rgb, (0, 0), (640, 40), (245, 117, 16), -1)
    cv2.putText(
        image_rgb,
        " ".join(sentence),
        (3, 30),
        cv2.FONT_HERSHEY_SIMPLEX,
        1,
        (255, 255, 255),
        2,
        cv2.LINE_AA,
    )

    cv2.imshow(
        "MediaPipe Detection",
        cv2.cvtColor(image_rgb, cv2.COLOR_BGR2RGB),
    )

    if cv2.waitKey(10) & 0xFF == ord("q"):
        break

    print("exec:", time.time() - start)

cap.release()
cv2.destroyAllWindows()

exec: 0.5863256454467773
exec: 0.047133445739746094
exec: 0.046491384506225586
exec: 0.04710221290588379
exec: 0.047200918197631836
exec: 0.04890751838684082
exec: 0.05208325386047363
exec: 0.046250343322753906
exec: 0.0483858585357666
exec: 0.04990434646606445
exec: 0.050951242446899414
exec: 0.047786712646484375
exec: 0.050267934799194336
exec: 0.04884076118469238
exec: 0.046944379806518555
exec: 0.049292564392089844
exec: 0.04881906509399414
exec: 0.04671931266784668
exec: 0.04602932929992676
exec: 0.06152081489562988
exec: 0.04910612106323242
exec: 0.046766042709350586
exec: 0.046927690505981445
exec: 0.04613947868347168
exec: 0.04712820053100586
exec: 0.04807114601135254
exec: 0.04630136489868164
exec: 0.0468449592590332
exec: 0.048834800720214844
exec: 0.05106544494628906
exec: 0.048050642013549805
exec: 0.04709172248840332
exec: 0.04793524742126465
exec: 0.048818111419677734
exec: 0.05099034309387207
exec: 0.04963278770446777
exec: 0.05248236656188965
exec: 0.0482027530670166
ex

In [54]:
cap.release()
cv2.destroyAllWindows()