## **Importing Libraries** 

In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
os.environ['GLOG_minloglevel'] = '2'
import gc
import cv2
import json
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from concurrent.futures import ThreadPoolExecutor
from IPython.display import HTML
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import tensorflow as tf

In [2]:
np.random.seed(42)
tf.random.set_seed(42)

---

## **Landmark Detection**

In [3]:
HAND_FILTERS = list(range(21))
POSE_FILTERS = [0, 1, 2, 3, 4, 5, 6, 11, 12, 13, 14, 15, 16]
FACE_FILTERS = [
    61, 185, 40, 39, 37, 0, 267, 269, 270, 409,
    291, 146, 91, 181, 84, 17, 314, 405, 321, 375,
    78, 191, 80, 81, 82, 13, 312, 311, 310, 415,
    95, 88, 178, 87, 14, 317, 402, 318, 324, 308,
]
HAND_FILTERS_LEN = len(HAND_FILTERS) # 21
POSE_FILTERS_LEN = len(POSE_FILTERS) # 13
FACE_FILTERS_LEN = len(FACE_FILTERS) # 40
TOTAL_FILTERS_LEN = HAND_FILTERS_LEN * 2 + POSE_FILTERS_LEN + FACE_FILTERS_LEN

In [None]:
vision_running_mode = vision.RunningMode
base_options = python.BaseOptions

hands_options_image = vision.HandLandmarkerOptions(
    running_mode=vision_running_mode.IMAGE,
    min_hand_detection_confidence=0.55,
    base_options=base_options(model_asset_path='datasets/mediapipe_models/hand_landmarker.task'),
    num_hands=2
)

pose_options_image = vision.PoseLandmarkerOptions(
    running_mode=vision_running_mode.IMAGE,
    min_pose_detection_confidence=0.55,
    base_options=base_options(model_asset_path='datasets/mediapipe_models/pose_landmarker_full.task')
)

face_options_image = vision.FaceLandmarkerOptions(
    running_mode=vision_running_mode.IMAGE,
    min_face_detection_confidence=0.55,
    base_options=base_options(model_asset_path='datasets/mediapipe_models/face_landmarker.task')
)

hands_options_video = vision.HandLandmarkerOptions(
    running_mode=vision_running_mode.VIDEO,
    min_hand_detection_confidence=0.55,
    base_options=base_options(model_asset_path='datasets/mediapipe_models/hand_landmarker.task'),
    num_hands=2
)

pose_options_video = vision.PoseLandmarkerOptions(
    running_mode=vision_running_mode.VIDEO,
    min_pose_detection_confidence=0.55,
    base_options=base_options(model_asset_path='datasets/mediapipe_models/pose_landmarker_full.task')
)

face_options_video = vision.FaceLandmarkerOptions(
    running_mode=vision_running_mode.VIDEO,
    min_face_detection_confidence=0.55,
    base_options=base_options(model_asset_path='datasets/mediapipe_models/face_landmarker.task')
)

hands_detector_image = vision.HandLandmarker.create_from_options(hands_options_image)
pose_detector_image = vision.PoseLandmarker.create_from_options(pose_options_image)
face_detector_image = vision.FaceLandmarker.create_from_options(face_options_image)
IMAGE_DETECTORS = (hands_detector_image, pose_detector_image, face_detector_image)

In [5]:
def detect(detector, image, frame_timestamp=None):
    if frame_timestamp is None:
        return detector.detect(image)
    else:
        return detector.detect_for_video(image, frame_timestamp)

def process_landmarks(landmarks, filters, start_idx, landmarks_array):
    for i in filters:
        landmarks_array[start_idx] = [landmarks[i].x, landmarks[i].y]
        start_idx += 1
    return start_idx

def extract_landmarks_from_image(image, detectors, timestamp=None):
    img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=img_rgb)
    
    hands_detector, pose_detector, face_detector = detectors

    with ThreadPoolExecutor() as executor:
        detect_args = (mp_image, timestamp) if timestamp != None else (mp_image,)
        
        hands_future = executor.submit(detect, hands_detector, *detect_args)
        pose_future = executor.submit(detect, pose_detector, *detect_args)
        face_future = executor.submit(detect, face_detector, *detect_args)
        
        hand_result = hands_future.result()
        pose_result = pose_future.result()
        face_result = face_future.result()

    hand_landmarks = hand_result.hand_landmarks
    pose_landmarks = pose_result.pose_landmarks
    face_landmarks = face_result.face_landmarks

    landmarks_array = np.full((TOTAL_FILTERS_LEN, 2), np.nan)
    arr_idx = 0
    
    if hand_landmarks:
        if hand_result.handedness[0].index == 1:
            arr_idx += HAND_FILTERS_LEN
        
        for landmarks in hand_landmarks:
            arr_idx = process_landmarks(landmarks, HAND_FILTERS, arr_idx, landmarks_array)
        
        if arr_idx == HAND_FILTERS_LEN:
            arr_idx += HAND_FILTERS_LEN
    else:
        arr_idx += HAND_FILTERS_LEN*2

    if pose_landmarks:
        arr_idx = process_landmarks(pose_landmarks[0], POSE_FILTERS, arr_idx, landmarks_array)
    else:
        arr_idx += POSE_FILTERS_LEN

    if face_landmarks:
        arr_idx = process_landmarks(face_landmarks[0], FACE_FILTERS, arr_idx, landmarks_array)
    else:
        arr_idx += FACE_FILTERS_LEN
    
    return landmarks_array

def extract_landmarks_from_video(video_path, start_frame=1, end_frame=-1):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = round(cap.get(cv2.CAP_PROP_FPS))
    frame_duration_ms = 1000 / fps
    
    hands_detector_video = vision.HandLandmarker.create_from_options(hands_options_video)
    pose_detector_video = vision.PoseLandmarker.create_from_options(pose_options_video)
    face_detector_video = vision.FaceLandmarker.create_from_options(face_options_video)
    VIDEO_DETECTORS = (hands_detector_video, pose_detector_video, face_detector_video)
    
    if start_frame < 1:
        start_frame = 1
    elif start_frame > total_frames:
        start_frame = 1
    
    if end_frame < 0 or end_frame > total_frames:
        end_frame = total_frames
    
    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame - 1)
    video_landmarks = np.zeros((end_frame - start_frame + 1, TOTAL_FILTERS_LEN, 2), dtype=object)
    
    for frame_idx in range(start_frame, end_frame + 1):
        ret, frame = cap.read()
        if not ret:
            break
        
        timestamp = int((frame_idx - 1) * frame_duration_ms)
        
        # landmarks = extract_landmarks_from_image(frame, IMAGE_DETECTORS)
        landmarks = extract_landmarks_from_image(frame, VIDEO_DETECTORS, timestamp)
        video_landmarks[frame_idx - start_frame] = landmarks
    
    cap.release()
    return video_landmarks

In [6]:
def draw_landmarks_image(image, landmarks, dot_size=5):
    for landmark in landmarks:
        x, y = landmark
        if not np.isnan(x) and not np.isnan(y):
            cv2.circle(image, (int(x * image.shape[1]), int(y * image.shape[0])), dot_size, (0, 255, 0), -1)
    return image

def draw_landmarks_video(input_path, output_path, video_landmarks, start_frame=1, end_frame=-1, dot_size=5):
    cap = cv2.VideoCapture(input_path)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'avc1'), fps, (frame_width, frame_height))

    if end_frame == -1 or end_frame > total_frames:
        end_frame = total_frames

    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame - 1)

    while cap.isOpened():
        frame_number = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
        if frame_number > end_frame:
            break

        ret, frame = cap.read()
        if not ret:
            break

        frame_landmarks = video_landmarks[frame_number - 1]
        annotated_frame = draw_landmarks_image(frame, frame_landmarks, dot_size=dot_size)

        out.write(annotated_frame)

    cap.release()
    out.release()

In [7]:
def plot_frame(frame, ax):
    ax.clear()
    x = frame[:, 0]
    y = frame[:, 1]
    ax.scatter(x, y, color='dodgerblue')
    ax.set_xticks([])
    ax.set_yticks([])
    ax.set_xlim(0, 1)
    ax.set_ylim(0, 1)
    ax.invert_yaxis()
    ax.set_xticklabels([])
    ax.set_yticklabels([])

def animate_frames(data):
    fig, ax = plt.subplots()
    anim = FuncAnimation(fig, lambda frame: plot_frame(data[frame], ax), frames=range(data.shape[0]), interval=100)
    plt.close(fig)
    return HTML(anim.to_jshtml())

---

## **Augmentation**

In [8]:
def padding(X, length=None, pad=0):
    if length is None:
        length = X.shape[0]
    
    if X.shape[0] > length:
        X_padded = X[:length]
    else:
        pad_length = length - X.shape[0]
        X_padded = np.pad(
            X, ((0, pad_length), (0, 0), (0, 0)),
            mode='constant', constant_values=pad
        )
            
    return X_padded

def remove_no_hands(video):
    frames_to_keep = []
    for i, frame in enumerate(video):
        hand_landmarks_data = frame[:43]
        if not np.all(np.isnan(hand_landmarks_data)):
            frames_to_keep.append(i)
    video_with_hands = video[frames_to_keep]
    return video_with_hands

def is_dominant_hand(video):

    left_hand_sum = np.sum(~np.isnan(video[:, slice(0, 21)]), axis=1)
    right_hand_sum = np.sum(~np.isnan(video[:, slice(21, 43)]), axis=1)

    left_dominant_count = np.sum(left_hand_sum >= right_hand_sum)
    right_dominant_count = np.sum(left_hand_sum < right_hand_sum)

    return left_dominant_count > right_dominant_count

def hflip(data):
    data[:, :, 0] = 1 - data[:, :, 0]
    return data

In [9]:
MAX_FRAME_LENGTH = 64

In [10]:
def predict_preprocess(video):
    if not is_dominant_hand(video):
        hflip(video)
    video = remove_no_hands(video)
    np.nan_to_num(video, copy=False, nan=0)
    video = padding(video, MAX_FRAME_LENGTH, -100)
    return video

## **Modelling**

In [11]:
class EarlyLateDropout(tf.keras.layers.Layer):
    def __init__(self, early_rate, late_rate, switch_epoch, **kwargs):
        super().__init__(**kwargs)
        self.supports_masking = True
        self.early_rate = early_rate
        self.late_rate = late_rate
        self.switch_epoch = switch_epoch
        self.dropout = tf.keras.layers.Dropout(early_rate)
    
    def build(self, input_shape):
        super().build(input_shape)
        agg = tf.VariableAggregation.ONLY_FIRST_REPLICA
        self._train_counter = self.add_weight(name="train_counter", shape=[], dtype=tf.int64, aggregation=agg, trainable=False)

    def call(self, inputs, training=False):
        if training:
            dropout_rate = tf.cond(self._train_counter < self.switch_epoch, lambda: self.early_rate, lambda: self.late_rate)
            x = self.dropout(inputs, training=training)
            x = tf.keras.layers.Dropout(dropout_rate)(x, training=training)
            self._train_counter.assign_add(1)
        else:
            x = inputs
        return x

In [12]:
def add_dummy_channel(x, fill_value=0):
    dummy_channel_shape = tf.concat([tf.shape(x)[:-1], [1]], axis=0)
    dummy_channel = tf.fill(dummy_channel_shape, tf.cast(fill_value, x.dtype))
    result = tf.concat([x, dummy_channel], axis=-1)
    return result

In [13]:
def scce_with_ls(y_true, y_pred):
    y_true = tf.cast(y_true, tf.int32)
    y_true = tf.one_hot(y_true, NUM_CLASSES, axis=1)
    y_true = tf.squeeze(y_true, axis=2)
    return tf.keras.losses.categorical_crossentropy(y_true, y_pred, label_smoothing=0.1)

In [14]:
model = tf.keras.models.load_model('a17.h5', custom_objects={'EarlyLateDropout': EarlyLateDropout, 'scce_with_ls': scce_with_ls, 'add_dummy_channel': add_dummy_channel})

In [39]:
video_path = 'working/real_test/shirt.mp4'
output_path = 'working/real_test/percobaan_output_video.mp4'
video = extract_landmarks_from_video(video_path).astype(np.float32)
draw_landmarks_video(video_path, output_path, video, dot_size=5)
animate_frames(video)

I0000 00:00:1718697103.766773     927 task_runner.cc:85] GPU suport is not available: INTERNAL: ; RET_CHECK failure (mediapipe/gpu/gl_context_egl.cc:77) display != EGL_NO_DISPLAYeglGetDisplay() returned error 0x300c
W0000 00:00:1718697103.779351    3451 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1718697103.788400    3465 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
I0000 00:00:1718697103.789251     927 task_runner.cc:85] GPU suport is not available: INTERNAL: ; RET_CHECK failure (mediapipe/gpu/gl_context_egl.cc:77) display != EGL_NO_DISPLAYeglGetDisplay() returned error 0x300c
W0000 00:00:1718697103.842770    3476 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1718697103

In [40]:
gc.collect()

9599

In [41]:
video_preprocessed = predict_preprocess(video.copy())
animate_frames(video_preprocessed)

In [42]:
predictions = model.predict(np.expand_dims(video_preprocessed, axis=0))



In [43]:
def load_decoder(json_file_path):
    with open(json_file_path, 'r') as json_file:
        label_to_int = json.load(json_file)
    
    int_to_label = {v: k for k, v in label_to_int.items()}
    return int_to_label

decoder_dict = load_decoder('label_encoder.json')

In [44]:
def decode_predictions(predictions, decoder_dict, num_predictions=10):
    top_prediction = np.argmax(predictions)
    top_confidence = predictions[top_prediction] * 100
    top_label = decoder_dict[top_prediction]

    top_indices = np.argsort(predictions)[-num_predictions:][::-1]
    top_confidences = predictions[top_indices] * 100
    top_labels = [decoder_dict[i] for i in top_indices]

    top_1_prediction = top_labels[0]
    top_1_confidence = top_confidences[0]

    top_1_prediction = (top_labels[0], top_confidences[0])
    top_n_predictions = [(label, confidence) for label, confidence in zip(top_labels, top_confidences)]

    return top_1_prediction, top_n_predictions

In [45]:
top_1, top_n = decode_predictions(predictions[0], decoder_dict, num_predictions=201)
top_1

('shirt', 48.05009)

In [46]:
for pred in top_n:
    print(pred[0]+',', pred[1])

shirt, 48.05009
hair, 0.5466865
apple, 0.48200902
carrot, 0.44097552
outside, 0.41744745
all, 0.41117686
after, 0.40564737
white, 0.3957626
animal, 0.39343882
if, 0.3874581
home, 0.38442102
loud, 0.38132307
will, 0.377918
napkin, 0.36966747
happy, 0.36439595
give, 0.36403054
fine, 0.3618327
mouth, 0.35738915
later, 0.3564786
blow, 0.35602775
green, 0.34835216
ride, 0.34062478
up, 0.34017807
mouse, 0.33927253
smile, 0.33880353
because, 0.33307156
brown, 0.3297482
on, 0.3285966
scissors, 0.3283438
grandma, 0.32778785
sleep, 0.32571158
child, 0.32514116
chocolate, 0.32474944
alligator, 0.32456598
time, 0.3192051
jacket, 0.31825355
night, 0.31481105
story, 0.31339288
tv, 0.30868882
owl, 0.30729964
beside, 0.30638182
airplane, 0.3057512
arm, 0.30519345
dance, 0.30393332
toothbrush, 0.30184996
person, 0.3017938
head, 0.29920673
go, 0.2984616
drink, 0.29769894
where, 0.29640314
hot, 0.29626185
who, 0.29276168
snow, 0.29123446
bug, 0.29108894
moon, 0.28597376
no, 0.28396064
find, 0.28251836
hu