In [17]:
import pprint

import os
import cv2
import mediapipe as mp
import numpy as np
import tensorflow as tf
from mediapipe.framework.formats import landmark_pb2
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

In [18]:
drawer = mp.solutions.drawing_utils
VisionRunningMode = mp.tasks.vision.RunningMode

In [19]:
# base options for hand and pose detection models
hand_base_options = python.BaseOptions(model_asset_path="../tasks/hand_landmarker.task")
pose_base_options = python.BaseOptions(model_asset_path="../tasks/pose_landmarker.task")

In [20]:
# options for hand detection
hand_options = vision.HandLandmarkerOptions(
    base_options=hand_base_options,
    num_hands=2,
    min_hand_detection_confidence=0.6,
    min_hand_presence_confidence=0.6,
    min_tracking_confidence=0.6,
    running_mode=VisionRunningMode.IMAGE,
)

# options for pose detection
pose_options = vision.PoseLandmarkerOptions(
    base_options=pose_base_options,
    output_segmentation_masks=True,
    min_pose_detection_confidence=0.6,
    min_pose_presence_confidence=0.6,
    min_tracking_confidence=0.6,
    running_mode=VisionRunningMode.IMAGE,
)

# create detectors
hand_detector = vision.HandLandmarker.create_from_options(hand_options)
pose_detector = vision.PoseLandmarker.create_from_options(pose_options)

In [21]:
empty_hand_landmark = np.zeros((2, 21, 3))  # right hand and left hand
empty_pose_landmark = np.zeros(33 * 3)

def to_landmark_data(
    hand_results: vision.HandLandmarkerResult, pose_results: vision.PoseLandmarkerResult
):
    """
    Extract keypoints from pose and hand results for dataset creation.
    """
    pose_landmark = empty_pose_landmark
    hand_landmark = empty_hand_landmark

    if pose_results.pose_world_landmarks:
        pose_landmark = np.array(
            [[lm.x, lm.y, lm.z] for lm in pose_results.pose_world_landmarks[0]]
        ).flatten()

    # if no hand results are available, return the empty hand keypoints
    # and concatenate it with face and pose keypoints
    if not hand_results:
        return np.concatenate([pose_landmark, hand_landmark.flatten()])

    # iterate over the detected hand landmarks
    for index, hlm in enumerate(hand_results.hand_world_landmarks):
        # determine the hand index (0 for right hand, 1 for left hand) using handedness information
        handedness = hand_results.handedness[index][0].index

        # extract the keypoints for the current hand and assign them to the appropriate index
        hand_landmark[handedness] = np.array([[lm.x, lm.y, lm.z] for lm in hlm])

    return np.concatenate([pose_landmark, hand_landmark.flatten()])

LandmarkList = landmark_pb2.NormalizedLandmarkList  # aliases for landmark types
NormalizedLandmark = landmark_pb2.NormalizedLandmark  # aliases for landmark types


def to_landmark_list(landmarks):
    """
    Create a LandmarkList from a list of landmarks or fill with empty values if no landmarks are provided.
    """
    return LandmarkList(
        landmark=([NormalizedLandmark(x=lm.x, y=lm.y, z=lm.z) for lm in landmarks])
    )


empty_pose_landmarks = to_landmark_list(
    [NormalizedLandmark(x=0.0, y=0.0, z=0.0) for _ in range(33 * 4)]
)

empty_hand_landmarks = to_landmark_list(
    [NormalizedLandmark(x=0.0, y=0.0, z=0.0) for _ in range(21 * 3)]
)


def to_drawing_landmark(hand_results, pose_results):
    """
    Convert pose and hand landmarks to LandmarkList for drawing.
    """
    pose_landmarks = (
        to_landmark_list(pose_results.pose_landmarks[0])
        if pose_results.pose_landmarks
        else empty_pose_landmarks
    )

    hand_landmarks = [empty_hand_landmarks, empty_hand_landmarks]

    if not hand_results:
        return pose_landmarks, None

    # iterate over the detected hand landmarks
    for index, hand_landmark in enumerate(hand_results.hand_landmarks):
        # determine the hand index (0 for right hand, 1 for left hand) using handedness information
        handedness = hand_results.handedness[index][0].index

        # extract the keypoints for the current hand and assign them to the appropriate index
        hand_landmarks[handedness] = to_landmark_list(hand_landmark)

    return pose_landmarks, hand_landmarks


def draw_landmark(image, hand_landmarks, pose_landmarks):
    """
    Draw detected landmarks on the image.
    """
    drawer.draw_landmarks(
        image,
        pose_landmarks,
        mp.solutions.pose.POSE_CONNECTIONS,
        drawer.DrawingSpec(color=(80, 22, 10), thickness=2, circle_radius=3),
        drawer.DrawingSpec(color=(80, 44, 121), thickness=2, circle_radius=2),
    )

    if not hand_landmarks:
        return

    for hand_landmarks in hand_landmarks:
        drawer.draw_landmarks(
            image,
            hand_landmarks,
            mp.solutions.hands.HAND_CONNECTIONS,
            drawer.DrawingSpec(color=(121, 22, 76), thickness=2, circle_radius=2),
            drawer.DrawingSpec(color=(121, 44, 250), thickness=2, circle_radius=2),
        )

In [22]:
ACTIONS = np.array([
    'selamat pagi'
])

In [23]:
def load_model(model_version=None):
    model_dir = "../../storage/models/keras"
    prefix = "signdeafspeech_sds_v_"

    if model_version:
        version = f"{prefix}{model_version}.keras"
        ks_file = os.path.join(model_dir, version)

        model = tf.keras.models.load_model(ks_file)

        return version, model

    model_files = os.listdir(model_dir)

    # filter model files by filename prefix
    versions = [file for file in model_files if file.startswith(prefix)]

    # extract version numbers from filenames
    versions = [file.split("_")[-1] for file in versions]

    # convert version numbers to tuples of integers for comparison
    versions_int = [tuple(map(int, v.split(".")[0])) for v in versions]

    # find the index of the latest version
    latest_index = versions_int.index(max(versions_int))

    # load the latest model
    latest_model_path = model_files[latest_index]

    model = tf.keras.models.load_model(os.path.join(model_dir, latest_model_path))

    return latest_model_path, model


v, model = load_model("005")

In [24]:
f"using model {v}"

'using model signdeafspeech_sds_v_005.keras'

In [25]:
class TFLiteModel:
    def __init__(self, prefix="signdeafspeech_sds_v_"):
        self.model_dir = "../../storage/models/tflite"
        self.prefix = prefix
        self.interpreter = None
        self.input_details = None
        self.output_details = None

        self.input_shape = None
        self.output_shape = None

    def load_model(self, use_latest=True, version=""):
        if not use_latest and not version:
            model_path = os.path.join(self.model_dir, f"{self.prefix}{version}")
        else:
            model_files = os.listdir(self.model_dir)

            # filter model files by filename prefix
            versions = [file for file in model_files if file.startswith(self.prefix)]

            # extract version numbers from filenames
            versions = [file.split("_")[-1] for file in versions]

            # convert version numbers to tuples of integers for comparison
            versions_int = [tuple(map(int, v.split(".")[0])) for v in versions]

            # find the index of the latest version
            latest_index = versions_int.index(max(versions_int))

            # load the latest model
            latest_model_path = model_files[latest_index]
            model_path = os.path.join(self.model_dir, latest_model_path)

        self.interpreter = tf.lite.Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()

        self.input_shape = self.input_details[0]["index"]
        self.output_shape = self.output_details[0]["index"]

        self.print_model_details()

    def print_model_details(self):
        print("Input details:")
        pprint.pprint(self.input_details)
        print()
        print("Output details:")
        pprint.pprint(self.output_details)
        print("=" * 50)

        input_shape = self.input_details[0]["shape"]
        print("Expected input shape:", input_shape)

    def predict(self, input_data):
        self.interpreter.set_tensor(self.input_shape, input_data)
        self.interpreter.invoke()

        result = self.interpreter.get_tensor(self.output_shape)

        return result


tflmodel = TFLiteModel()
tflmodel.load_model()

Input details:
[{'dtype': <class 'numpy.float32'>,
  'index': 0,
  'name': 'serving_default_input_layer_1:0',
  'quantization': (0.0, 0),
  'quantization_parameters': {'quantized_dimension': 0,
                              'scales': array([], dtype=float32),
                              'zero_points': array([], dtype=int32)},
  'shape': array([  1,  60, 225]),
  'shape_signature': array([ -1,  60, 225]),
  'sparsity_parameters': {}}]

Output details:
[{'dtype': <class 'numpy.float32'>,
  'index': 120,
  'name': 'StatefulPartitionedCall_1:0',
  'quantization': (0.0, 0),
  'quantization_parameters': {'quantized_dimension': 0,
                              'scales': array([], dtype=float32),
                              'zero_points': array([], dtype=int32)},
  'shape': array([1, 1]),
  'shape_signature': array([-1,  1]),
  'sparsity_parameters': {}}]
Expected input shape: [  1  60 225]


In [26]:
import random

colors = [
    (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) for _ in range(len(ACTIONS))
]


def confidence_bar(res, actions, input_frame):
    # create a blank image for the confidence bar
    bar_height = 40 * len(actions)
    bar_width = 300
    confidence_bar_image = np.zeros((bar_height, bar_width, 3), dtype=np.uint8)

    # add text and bars to the confidence bar image
    for num, prob in enumerate(res):
        cv2.putText(
            confidence_bar_image,
            f"{actions[num]}: {int(prob * 100)}%",
            (10, 30 + num * 40),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.5,
            (255, 255, 255),
            1,
            cv2.LINE_AA,
        )
        cv2.rectangle(
            confidence_bar_image,
            (150, 10 + num * 40),
            (150 + int(prob * 150), 30 + num * 40),
            colors[num],
            -1
        )

    # show the confidence bar in a separate window
    cv2.imshow("Confidence Bar", confidence_bar_image)

    return input_frame

In [27]:
import time

In [28]:
cap = cv2.VideoCapture(0)

# set capture properties
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 480)  # set width to 600 pixels
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 600)  # set height to 600 pixels
cap.set(cv2.CAP_PROP_FPS, 60)  # set frame rate to 60 FPS

isQuit = False
sequences = []
sequence = []

sentence = []
predictions = []

sequence_length = 60
threshold = 0.9
result = []
skip_word = "_"

while cap.isOpened():
    start = time.time()
    success, image = cap.read()

    if not success:
        print("Ignoring empty camera frame.")
        break

    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # convert cv image to mediapipe image format before being
    # passed to face, pose and hand detector
    annotated_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image_rgb)

    hand_results = hand_detector.detect(image=annotated_image)

    pose_results = pose_detector.detect(image=annotated_image)

    keypoints = to_landmark_data(hand_results, pose_results)
    sequences.append(keypoints)
    sequence = np.array(sequences[-60:]).astype(np.float32)

    # pose_landmark, hand_landmark = to_drawing_landmark(hand_results, pose_results)

    # draw_landmark(
    #     image_rgb,
    #     hand_landmarks=hand_landmark,
    #     pose_landmarks=pose_landmark,
    # )

    if len(sequence) == sequence_length:

        # predict the action label based on the sequence of keypoints
        result = tflmodel.predict(np.expand_dims(sequence, axis=0))[0]

        predicted = np.argmax(result) # index of highest result

        predicted_sentence = ACTIONS[predicted]

        # action class with the highest confidence score
        predictions.append(predicted)

        # NOTE: If the current prediction matches the most common prediction over the last 10 frames,
        #       it suggests that the current action is likely intentional and
        #       consistent with recent actions, rather than a momentary anomaly.
        if not (np.unique(predictions[-20:])[0] == predicted):
            pass

        # check if the confidence score of the current prediction index is above the threshold.
        elif result[predicted] < threshold:
            pass

        elif ACTIONS[predicted] == skip_word:
            pass

        elif not sentence or predicted_sentence != sentence[-1]:
            sentence.append(predicted_sentence)

        # limit the length of the recognized action sentence to 5 elements by
        # keeping only the last two elements so it does not exceed the text box
        sentence = sentence[-5:]

        sequences = sequences[-90:]

        predictions = predictions[-20:]

    # overlay the predicted action on the image
    image_rgb = confidence_bar(result, ACTIONS, image_rgb)

    cv2.rectangle(image_rgb, (0, 0), (640, 40), (245, 117, 16), -1)
    cv2.putText(
        image_rgb,
        " ".join(sentence),
        (3, 30),
        cv2.FONT_HERSHEY_SIMPLEX,
        1,
        (255, 255, 255),
        2,
        cv2.LINE_AA,
    )

    cv2.imshow(
        "MediaPipe Detection",
        cv2.cvtColor(image_rgb, cv2.COLOR_BGR2RGB),
    )

    if cv2.waitKey(10) & 0xFF == ord("q"):
        break

    print("exec:", time.time() - start)

cap.release()
cv2.destroyAllWindows()

exec: 0.9284236431121826
exec: 0.09030342102050781
exec: 0.08346939086914062
exec: 0.08645391464233398
exec: 0.0800771713256836
exec: 0.09999918937683105
exec: 0.08357119560241699
exec: 0.07547497749328613
exec: 0.07429265975952148
exec: 0.08378887176513672
exec: 0.09740877151489258
exec: 0.08209705352783203
exec: 0.08057761192321777
exec: 0.07056641578674316
exec: 0.08336424827575684
exec: 0.06643891334533691
exec: 0.08172750473022461
exec: 0.08339929580688477
exec: 0.10033392906188965
exec: 0.0804147720336914
exec: 0.08350229263305664
exec: 0.0939340591430664
exec: 0.0724787712097168
exec: 0.09973645210266113
exec: 0.07863068580627441
exec: 0.10483884811401367
exec: 0.08333301544189453
exec: 0.09991788864135742
exec: 0.09609293937683105
exec: 0.09951305389404297
exec: 0.10027933120727539
exec: 0.09805107116699219
exec: 0.08381152153015137
exec: 0.07546114921569824
exec: 0.07441830635070801
exec: 0.08337998390197754
exec: 0.09493231773376465
exec: 0.07149958610534668
exec: 0.082963228

In [29]:
cap.release()
cv2.destroyAllWindows()