In [2]:
import mediapipe as mp
from mediapipe.framework.formats import landmark_pb2
import tensorflow as tf

from ultralytics import YOLO
import numpy as np
import cv2
import time
import pandas as pd
import csv
import os
import pickle
import warnings

# --- Configuration ---
FIXED_SIZE = 640
SMALL_SIZE = 150, 150
NUM_FRAMES = 15
model_yolo = YOLO("../../model/yolo/yolo12n.pt") 

# --- Load LSTM Model Artifacts ---
# with open('../../model/trained/lstm_s/lstm_model_v0.pkl', 'rb') as f:
#     artifacts = pickle.load(f)

MODEL_DIR = '../../model/trained/gru/'
MODEL_PKL_NAME = 'gru_model_v2.pkl'

with open(f'{MODEL_DIR}{MODEL_PKL_NAME}', 'rb') as f:
    artifacts = pickle.load(f)

scaler = artifacts['scaler']
label_encoder = artifacts['label_encoder']
keras_model_path = artifacts['model_filename'] # e.g., 'single_lstm_weights.keras'
NUM_CLASSES = artifacts['num_classes']

try:
    # --- Load LTSM Single Keras ---
    # model_pred = tf.keras.models.load_model(f'../../model/trained/lstm_s/single_lstm_weights_v2.best.keras')

    # --- Load LTSM Bi Keras ---
    model_pred = tf.keras.models.load_model(f'{MODEL_DIR}{keras_model_path}')

    print("LSTM Model (Keras) loaded successfully.")
except Exception as e:
    print(f"Error loading Keras model: {e}")
    model_pred = None

mp_drawing = mp.solutions.drawing_utils # Drawing helpers
mp_holistic = mp.solutions.holistic # Mediapipe Solutions

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

2025-11-20 15:54:07.178504: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
I0000 00:00:1763625428.416080     877 gpu_device.cc:2020] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 3213 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 4050 Laptop GPU, pci bus id: 0000:01:00.0, compute capability: 8.9


LSTM Model (Keras) loaded successfully.


In [3]:
def cropped_frame(frame):
    results = model_yolo(frame, classes=[0], verbose=False)
    boxes = results[0].boxes
    plotted_frame = results[0].plot() 

    try:
        if len(boxes) > 0:
            # Get the bounding box coordinates for the first detected object
            x1, y1, x2, y2 = boxes.xyxy[0].cpu().numpy().astype(int)

            cropped_frame = frame[y1:y2, x1:x2]
            crop_h, crop_w = cropped_frame.shape[:2]

            # We want to fit the largest dimension (width or height) to the FIXED_SIZE
            scale = FIXED_SIZE / max(crop_w, crop_h)
            new_w = int(crop_w * scale)
            new_h = int(crop_h * scale)

            # Resize the cropped frame to the new dimensions
            resized_img = cv2.resize(cropped_frame, (new_w, new_h), interpolation=cv2.INTER_AREA)

            # Background frame, that are not filled with boxes
            final_frame = np.full((FIXED_SIZE, FIXED_SIZE, 3), 255, dtype=np.uint8)
            
            # dw and dh are the space left over after placing the image
            dw = FIXED_SIZE - new_w
            dh = FIXED_SIZE - new_h

            # Calculate the starting position (top-left corner) for centering
            top = dh // 2
            bottom = top + new_h
            left = dw // 2
            right = left + new_w

            final_frame[top:bottom, left:right] = resized_img

            return final_frame

        else:
            print("No objects detected in the image.")
    except Exception as e:
        print(f"Error processing frame: {e}")

In [4]:
def side_state(state, lndmrkX):
    if state == 0:
        if lndmrkX["nose"] == min(lndmrkX.values()):
            direction = 1 # Kiri
        elif lndmrkX["nose"] == max(lndmrkX.values()):      
            direction = 2 # Kanan
        else:
            direction = 0 # Tengah
    else:
        if lndmrkX["nose"] == min(lndmrkX.values()):
            direction = 2 # Kanan
        elif lndmrkX["nose"] == max(lndmrkX.values()):      
            direction = 1 # Kiri
        else:
            direction = 0 # Tengah
    
    return direction

def hand_state(state, lndmrkZ):
    if state == 0:
        if lndmrkZ["wrist_r"] and lndmrkZ["wrist_l"] < lndmrkZ["nose"]:
            hand = 1 # Terlihat
        else:
            hand = 0 # Tidak Terlihat
    else:
        if lndmrkZ["wrist_r"] and lndmrkZ["wrist_l"] < lndmrkZ["nose"]:
            hand = 0 # Terlihat
        else:
            hand = 1 # Tidak Terlihat
    return hand

def get_extFeature_value(lndmrk):
    noseX, noseY, noseZ = lndmrk["nose"].x, lndmrk["nose"].y, lndmrk["nose"].z
    earLX, earLY, earLZ = lndmrk["ear_l"].x, lndmrk["ear_l"].y, lndmrk["ear_l"].z
    earRX, earRY, earRZ = lndmrk["ear_r"].x, lndmrk["ear_r"].y, lndmrk["ear_r"].z
    wristRX, wristRY, wristRZ = lndmrk["wrist_r"].x, lndmrk["wrist_r"].y, lndmrk["wrist_r"].z
    wristLX, wristLY, wristLZ = lndmrk["wrist_l"].x, lndmrk["wrist_l"].y, lndmrk["wrist_l"].z
    
    lndmrkX = {"nose": noseX, "ear_l": earLX, "ear_r": earRX, "wrist_r": wristRX, "wrist_l": wristLX }
    lndmrkY = {"nose": noseY, "ear_l": earLY, "ear_r": earRY, "wrist_r": wristRY, "wrist_l": wristLY }
    lndmrkZ = {"nose": noseZ, "ear_l": earLZ, "ear_r": earRZ, "wrist_r": wristRZ, "wrist_l": wristLZ }

    if noseZ < (earLZ and earRZ):
        state = 0
        side = side_state(0, lndmrkX)
        hand = hand_state(0, lndmrkZ)
    else:
        state = 1
        side = side_state(1, lndmrkX)
        hand = hand_state(1, lndmrkZ)

    return side, state, hand

In [9]:
def classficiation_missing_calculated(class_name, percentage, counter, global_class):
    # The new, calculated classification, default to keeping the old one
    new_classification = global_class
    new_counter = counter
    
    # Check if the current frame meets the confidence threshold (>= 75.0%)
    if percentage >= 75.0:
        # 1. The confidence is high enough. We increment the counter.
        new_counter += 1
        
        # 2. Check if the counter has reached the persistence threshold (>= 10)
        if new_counter >= 10:
            # We have 10 consecutive high-confidence frames!
            new_classification = class_name # Update the classification
            new_counter = 0 # Reset the counter to start over
            print(f"Updated Classification to: {new_classification} with confidence {percentage:.2f}%")
    else:
        # The confidence is too low (< 75.0%). Reset the counter regardless of the class.
        new_counter = 0
        
    return new_classification, percentage, new_counter

In [10]:
# Define common parameters for clarity
FONT = cv2.FONT_HERSHEY_SIMPLEX
FONT_SCALE = 1
TEXT_THICKNESS = 2
TEXT_COLOR = (255, 255, 255) # White text
BLUE_COLOR = (255, 0, 0)   # Blue
RED_COLOR = (0, 0, 255) # Red
ORG = (10, 30)

In [11]:
def draw_text_with_background(img, text, org, font, font_scale, text_color, bg_color, thickness):
    # 1. Get the size of the text
    (text_w, text_h), baseline = cv2.getTextSize(text, font, font_scale, thickness)

    # Calculate coordinates for the background rectangle
    # org is the bottom-left corner of the text.
    # We use a small padding (e.g., 5 pixels)
    padding = 5
    
    # Top-left corner of the rectangle
    x1 = org[0] - padding
    y1 = org[1] - text_h - baseline - padding
    
    # Bottom-right corner of the rectangle
    x2 = org[0] + text_w + padding
    y2 = org[1] + baseline + padding

    # 2. Draw the filled rectangle (Background)
    cv2.rectangle(img, (x1, y1), (x2, y2), bg_color, -1) # -1 means filled

    # 3. Draw the text over the rectangle
    cv2.putText(img, text, org, font, font_scale, text_color, thickness)
    
    return img

In [None]:
MODEL_TYPE = 'nn' # Default
if 'lstm_s' in MODEL_DIR or 'lstm_s' in keras_model_path:
    MODEL_TYPE = 'lstm_single'
elif 'lstm_bi' in MODEL_DIR or 'lstm_bi' in keras_model_path:
    MODEL_TYPE = 'lstm_bi'
elif 'gru' in MODEL_DIR or 'gru' in keras_model_path:
    MODEL_TYPE = 'gru'

try:
    cap.release()
    cv2.destroyAllWindows()
except:
    pass
# cap = cv2.VideoCapture('http://192.168.100.197:5000/video')
cap = cv2.VideoCapture(f'../../assets/test/vidio_test_2.mp4')
# cap = cv2.VideoCapture(f'../../assets/dataset/v2/nodong_5.mp4')


frames = []
detected = []

# ==== EXTENDED FEATURE ====
face_direction = []
face_shown = []
hand_shown = []

undetectable_image = None
final_layout = None 

global_class = ""
counter_detection = 0

saved = 0

if not cap.isOpened():
    print("Error opening video file")
else:
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while cap.isOpened():
            start_time = time.time()
            ret, frame = cap.read()
            if ret:
                frame_cropped = cropped_frame(frame)
                
                # FIX 1: Skip if initial cropping failed (frame_cropped is None)
                if frame_cropped is None:
                    print("Skipping frame: cropped_frame returned None.")
                    continue

                # Prepare the current frame for side display (resized) and an empty list for detection
                current_small_frame = cv2.resize(frame_cropped, (SMALL_SIZE))
                current_detection_list = []
                detection_successful = False

                try:
                    # Attempt Mediapipe detection
                    frames_mp = cv2.cvtColor(frame_cropped, cv2.COLOR_BGR2RGB)
                    frames_mp.flags.writeable = False
                    
                    results = holistic.process(frames_mp)
                    frames_mp.flags.writeable = True
                    frames_mp = cv2.cvtColor(frame_cropped, cv2.COLOR_RGB2BGR) 

                    # Extract landmarks if available
                    if results.pose_landmarks:
                        nose = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.NOSE]
                        ear_r = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_EAR]
                        ear_l = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.LEFT_EAR]
                        wrist_r = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_WRIST]
                        wrist_l = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_WRIST]

                        current_detection_list.append(nose)
                        current_detection_list.append(ear_l)
                        current_detection_list.append(ear_r)
                        current_detection_list.append(wrist_r)
                        current_detection_list.append(wrist_l)
                        detection_successful = True
                    else:
                        raise ValueError("No pose landmarks detected.")
                        
                except Exception as e:
                    # Detection failed (Mediapipe error or no landmarks found)
                    error_text = f"No detection: {e}"
                    cv2.putText(frame_cropped, error_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
                    
                if detection_successful:
                    # This block is now ONLY executed if detection_successful is True
                    extended_feature = get_extFeature_value({"nose": nose, "ear_r": ear_r, "ear_l": ear_l, "wrist_r": wrist_r, "wrist_l": wrist_l})

                    if len(frames) < NUM_FRAMES:
                        frames.append(current_small_frame)

                        face_direction.append(extended_feature[0])
                        face_shown.append(extended_feature[1])
                        hand_shown.append(extended_feature[2])
                        
                        detected.append(current_detection_list)
                    else:
                        frames.pop(0)

                        face_direction.pop(0)
                        face_shown.pop(0)
                        hand_shown.pop(0)

                        detected.pop(0)

                        frames.append(current_small_frame)
                        
                        face_direction.append(extended_feature[0])
                        face_shown.append(extended_feature[1])
                        hand_shown.append(extended_feature[2])
                        
                        detected.append(current_detection_list)
                else:
                    print(f"Skipping frame {time.time() - start_time}ms: No pose landmarks detected.")
                    pass
                
                # --- LAYOUT AND DISPLAY LOGIC ---
                if len(frames) == NUM_FRAMES:
                    stacked1_frames = np.vstack((frames[0], frames[1], frames[2]))
                    stacked2_frames = np.vstack((frames[3], frames[4], frames[5]))
                    stacked3_frames = np.vstack((frames[6], frames[7], frames[8]))
                    stacked4_frames = np.vstack((frames[9], frames[10], frames[11]))
                    stacked5_frames = np.vstack((frames[12], frames[13], frames[14]))

                    stacked1_frames = cv2.resize(stacked1_frames, (SMALL_SIZE[0], FIXED_SIZE))
                    stacked2_frames = cv2.resize(stacked2_frames, (SMALL_SIZE[0], FIXED_SIZE))
                    stacked3_frames = cv2.resize(stacked3_frames, (SMALL_SIZE[0], FIXED_SIZE))
                    stacked4_frames = cv2.resize(stacked4_frames, (SMALL_SIZE[0], FIXED_SIZE))
                    stacked5_frames = cv2.resize(stacked5_frames, (SMALL_SIZE[0], FIXED_SIZE))

                    # All components are guaranteed to be valid images here
                    final_layout = np.hstack((frame_cropped, stacked1_frames, stacked2_frames, stacked3_frames, stacked4_frames, stacked5_frames))
                else:
                    final_layout = frame_cropped
                
                if final_layout is not None and final_layout.size > 0:
                    flat_detected = [landmark for sublist in detected for landmark in sublist]
                    # print(len(flat_detected)) # Should always be 5 * NUM_FRAMES when full

                    if len(flat_detected) > 0:
                        landmark_list = landmark_pb2.NormalizedLandmarkList()
                        landmark_list.landmark.extend(flat_detected)
                    
                    try:
                        counter = 0
                        featurePer_frames = 0
                        motion_row = []

                        for lndmrk in landmark_list.landmark:
                            motion_row.append(lndmrk.x)
                            motion_row.append(lndmrk.y)
                            motion_row.append(lndmrk.z)
                            motion_row.append(lndmrk.visibility)
                            counter += 1

                            if counter % 5 == 0:
                                motion_row.append(face_direction[featurePer_frames])
                                motion_row.append(face_shown[featurePer_frames])
                                motion_row.append(hand_shown[featurePer_frames])
                                featurePer_frames += 1

                        motion_row = list(np.array(motion_row))
                        
                        X_data = np.array(motion_row).reshape(1, -1) # Shape (1, 300)
                        X_scaled = scaler.transform(X_data)

                        is_rnn_model = not('nn' in keras_model_path.lower())  # True if not NN model
                        
                        
                        if MODEL_TYPE == 'lstm_bi' or MODEL_TYPE == 'lstm_s':
                            X_final = X_scaled.reshape(1, 1, X_scaled.shape[1])
                        elif MODEL_TYPE == 'nn':
                            X_final = X_scaled
                        else:
                            X_final = X_scaled.reshape(1, 1, X_scaled.shape[1])


                        # X_reshaped = X_scaled.reshape(1, 1, X_scaled.shape[1])

                        try:
                            # 3. Predict the probabilities (for classification)
                            y_pred_probs = model_pred.predict(X_final, verbose=0)
                            # 4. Get the predicted class index
                            y_pred_index = np.argmax(y_pred_probs, axis=1)[0]
                            confidence = y_pred_probs[0][y_pred_index] * 100
                            
                            motion_class = label_encoder.inverse_transform([y_pred_index])[0]

                            warnings.filterwarnings("ignore", category=UserWarning, module="sklearn")

                            # --- Update Text Display ---
    
                            # confidence_text = f" ({confidence:.4f}%)"
                            # display_text = f'Class: {motion_class} | {confidence_text} of Model {MODEL_TYPE}'
                            
                            global_class, confidence_score, counter_detection = classficiation_missing_calculated(motion_class, confidence, counter_detection, global_class)

                            try:
                                text_to_display = f'Class: {global_class} | Confidence: {confidence_score:.2f}%'
                                draw_text_with_background(
                                    final_layout, 
                                    text_to_display, 
                                    ORG, 
                                    FONT, 
                                    FONT_SCALE, 
                                    TEXT_COLOR, 
                                    BLUE_COLOR, 
                                    TEXT_THICKNESS
                                )
                            except:
                                text_to_display = f'Class: {motion_class} | Confidence: {confidence:.2f}%'
                                draw_text_with_background(
                                    final_layout, 
                                    text_to_display, 
                                    ORG, 
                                    FONT, 
                                    FONT_SCALE, 
                                    TEXT_COLOR, 
                                    RED_COLOR, 
                                    TEXT_THICKNESS
                                )
                            
                        except Exception as e:
                            error_text = f"Prediction Error: {e}"
                            cv2.putText(final_layout, error_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

                        # motion_row = list(np.array(motion_row))
                        # motion_detected = pd.DataFrame([motion_row])
                        # motion_class = model_pred.predict(motion_detected)[0]

                        cv2.imshow('Video', final_layout)

                        key = cv2.waitKey(1) & 0xFF

                        if key == ord('q'):
                            break
                    
                            
                    except Exception as e:
                        print(f"Error displaying frame: {e}")
                
            else:
                break

cap.release()
cv2.destroyAllWindows()

I0000 00:00:1763625712.490486     877 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1763625712.525358   70609 gl_context.cc:369] GL version: 3.2 (OpenGL ES 3.2 Mesa 25.0.7-0ubuntu0.24.04.2), renderer: llvmpipe (LLVM 20.1.2, 256 bits)
W0000 00:00:1763625713.082372   70599 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1763625713.854131   70600 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1763625713.877534   70602 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1763625713.879833   70607 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W

Error displaying frame: X has 23 features, but StandardScaler is expecting 345 features as input.
Error displaying frame: X has 46 features, but StandardScaler is expecting 345 features as input.
Error displaying frame: X has 69 features, but StandardScaler is expecting 345 features as input.
Error displaying frame: X has 92 features, but StandardScaler is expecting 345 features as input.
Error displaying frame: X has 115 features, but StandardScaler is expecting 345 features as input.
Error displaying frame: X has 138 features, but StandardScaler is expecting 345 features as input.
Error displaying frame: X has 161 features, but StandardScaler is expecting 345 features as input.
Error displaying frame: X has 184 features, but StandardScaler is expecting 345 features as input.
Error displaying frame: X has 207 features, but StandardScaler is expecting 345 features as input.
Error displaying frame: X has 230 features, but StandardScaler is expecting 345 features as input.
Error displayi