In [12]:
import cv2
import os
import time
import csv
import warnings
import pickle

import numpy as np
import pandas as pd

import mediapipe as mp
from mediapipe.framework.formats import landmark_pb2

mp_drawing = mp.solutions.drawing_utils # Drawing helpers
mp_holistic = mp.solutions.holistic # Mediapipe Solutions


In [13]:
desire_fps = 15
num_frames = 15

original_width, original_height = 500, 500
small_width, small_height = 150, 150

# V3 MODEL DETECTION

In [6]:
with open('../model/v3_model.pkl', 'rb') as f:
    model = pickle.load(f)

In [7]:
def check_direction(landmark):
    direction = "Right"
    smallest_value = min(landmark, key=landmark.get)
    highest_value = max(landmark, key=landmark.get)
    
    if smallest_value == "nose":
        direction = 0
    elif highest_value == "nose":
        direction = 1
    else:
        direction = 2
    
    return direction


In [None]:
camera = cv2.VideoCapture("http://192.168.50.234:5000/video")

frames = []
detected = []
face_direction = []

desire_fps = 5
frame_delay = 1.0 / desire_fps

if not camera.isOpened():
    print("Error: Could not open camera.")
else:
    print("Camera is ready. Press 'q' to quit.")

with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    start_time = time.time()

    while camera.isOpened():
        delay_frame_time = time.time()
        ret, frame = camera.read()

        if ret:

            frame = cv2.resize(frame, (original_width, original_height))

            if len(frames) < num_frames:
                frames.append(cv2.resize(frame, (small_width, small_height)))

                frame_detected = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frame_detected.flags.writeable = False

                results = holistic.process(frame_detected)

                frame_detected.flags.writeable = True
                frame_detected = cv2.cvtColor(frame_detected, cv2.COLOR_RGB2BGR)

                nose = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.NOSE]
                ear_r = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_EAR]
                ear_l = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.LEFT_EAR]
                wrist_r = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_WRIST]
                wrist_l = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_WRIST]

                temporary_list = []
                temporary_list.append(nose)
                temporary_list.append(ear_l)
                temporary_list.append(ear_r)
                temporary_list.append(wrist_r)
                temporary_list.append(wrist_l)
                
                face_direction.append(check_direction({"nose": nose.x, "ear_r": ear_r.x, "ear_l": ear_l.x}))
                detected.append(temporary_list)
            else:
                frames.pop(0)
                face_direction.pop(0)
                detected.pop(0)

                frames.append(cv2.resize(frame, (small_width, small_height)))

                frame_detected = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frame_detected.flags.writeable = False

                results = holistic.process(frame_detected)
                
                frame_detected.flags.writeable = True
                frame_detected = cv2.cvtColor(frame_detected, cv2.COLOR_RGB2BGR)
                
                nose = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.NOSE]
                ear_r = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_EAR]
                ear_l = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.LEFT_EAR]
                wrist_r = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_WRIST]
                wrist_l = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_WRIST]
                
                temporary_list = []
                temporary_list.append(nose)
                temporary_list.append(ear_l)
                temporary_list.append(ear_r)
                temporary_list.append(wrist_r)
                temporary_list.append(wrist_l)
                
                face_direction.append(check_direction({"nose": nose.x, "ear_r": ear_r.x, "ear_l": ear_l.x}))
                detected.append(temporary_list)
            
            if len(frames) >= num_frames:
                stacked1_frames = np.vstack((frames[0], frames[1], frames[2]))
                stacked2_frames = np.vstack((frames[3], frames[4], frames[5]))
                stacked3_frames = np.vstack((frames[6], frames[7], frames[8]))
                stacked4_frames = np.vstack((frames[9], frames[10], frames[11]))
                stacked5_frames = np.vstack((frames[12], frames[13], frames[14]))

                stacked1_frames = cv2.resize(stacked1_frames, (small_width, original_height))
                stacked2_frames = cv2.resize(stacked2_frames, (small_width, original_height))
                stacked3_frames = cv2.resize(stacked3_frames, (small_width, original_height))
                stacked4_frames = cv2.resize(stacked4_frames, (small_width, original_height))
                stacked5_frames = cv2.resize(stacked5_frames, (small_width, original_height))
                
                final_layout = np.hstack((frame, stacked1_frames, stacked2_frames, stacked3_frames, stacked4_frames, stacked5_frames))
            else:
                final_layout = frame

            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image.flags.writeable = False

            flatten_detected = [landmark for sublist in detected for landmark in sublist]

            landmark_list = landmark_pb2.NormalizedLandmarkList()
            landmark_list.landmark.extend(flatten_detected)


            try:
                lv = landmark_list.landmark
                counter = 0
                featureper_frames = 0
                motion_row = []
                
                for lndmrk in lv:
                    motion_row.append(lndmrk.x)
                    motion_row.append(lndmrk.y)
                    motion_row.append(lndmrk.z)
                    motion_row.append(lndmrk.visibility)
                    counter += 1

                    if counter % 5 == 0:
                        motion_row.append(face_direction[featureper_frames])
                        featureper_frames += 1 

                motion_row = list(np.array(motion_row))
                
                motion_detected = pd.DataFrame([motion_row])
                motion_class = model.predict(motion_detected)[0]

                warnings.filterwarnings("ignore", category=UserWarning, module="sklearn")
                
                cv2.putText(final_layout, f'Class: {motion_class}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            except Exception as e:
                pass


            cv2.imshow("Camera Feed", final_layout)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        
camera.release()
cv2.destroyAllWindows()



Camera is ready. Press 'q' to quit.


MESA: error: ZINK: failed to choose pdev
I0000 00:00:1757088183.170217   98610 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1757088183.225542   99293 gl_context.cc:369] GL version: 3.1 (OpenGL ES 3.1 Mesa 24.0.9-0ubuntu0.3), renderer: D3D12 (AMD Radeon(TM) Graphics)
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1757088183.821393   99273 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1757088184.168477   99274 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1757088184.191760   99273 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.
W0000 00:00:1757088184.197273   99282 inference_feedback_manager.cc:

# V4 MODEL DETECTION

In [16]:
with open('../model/v4_model.pkl', 'rb') as f:
    model = pickle.load(f)

In [17]:
def side_state(state, lndmrkX):
    if state == 0:
        if lndmrkX["nose"] == min(lndmrkX.values()):
            direction = 1 # Kiri
        elif lndmrkX["nose"] == max(lndmrkX.values()):      
            direction = 2 # Kanan
        else:
            direction = 0 # Tengah
    else:
        if lndmrkX["nose"] == min(lndmrkX.values()):
            direction = 2 # Kanan
        elif lndmrkX["nose"] == max(lndmrkX.values()):      
            direction = 1 # Kiri
        else:
            direction = 0 # Tengah
    
    return direction

def hand_state(state, lndmrkZ):
    if state == 0:
        if lndmrkZ["wrist_r"] and lndmrkZ["wrist_l"] < lndmrkZ["nose"]:
            hand = 1 # Terlihat
        else:
            hand = 0 # Tidak Terlihat
    else:
        if lndmrkZ["wrist_r"] and lndmrkZ["wrist_l"] < lndmrkZ["nose"]:
            hand = 0 # Terlihat
        else:
            hand = 1 # Tidak Terlihat
    return hand

def get_extFeature_value(lndmrk):
    noseX, noseY, noseZ = lndmrk["nose"].x, lndmrk["nose"].y, lndmrk["nose"].z
    earLX, earLY, earLZ = lndmrk["ear_l"].x, lndmrk["ear_l"].y, lndmrk["ear_l"].z
    earRX, earRY, earRZ = lndmrk["ear_r"].x, lndmrk["ear_r"].y, lndmrk["ear_r"].z
    wristRX, wristRY, wristRZ = lndmrk["wrist_r"].x, lndmrk["wrist_r"].y, lndmrk["wrist_r"].z
    wristLX, wristLY, wristLZ = lndmrk["wrist_l"].x, lndmrk["wrist_l"].y, lndmrk["wrist_l"].z
    
    lndmrkX = {"nose": noseX, "ear_l": earLX, "ear_r": earRX, "wrist_r": wristRX, "wrist_l": wristLX }
    lndmrkY = {"nose": noseY, "ear_l": earLY, "ear_r": earRY, "wrist_r": wristRY, "wrist_l": wristLY }
    lndmrkZ = {"nose": noseZ, "ear_l": earLZ, "ear_r": earRZ, "wrist_r": wristRZ, "wrist_l": wristLZ }

    if noseZ < (earLZ and earRZ):
        state = 0
        side = side_state(0, lndmrkX)
        hand = hand_state(0, lndmrkZ)
    else:
        state = 1
        side = side_state(1, lndmrkX)
        hand = hand_state(1, lndmrkZ)

    return side, state, hand




In [23]:
camera = cv2.VideoCapture("test/vidio/record_nm.mp4")

frames = []
detected = []

# ==== EXTENDED FEATURE ====
face_direction = []
face_shown = []
hand_shown = []

desire_fps = 5
frame_delay = 1.0 / desire_fps

saved = 0

if not camera.isOpened():
    print("Error: Could not open camera.")
else:
    print("Camera is ready. Press 'q' to quit.")

with mp_holistic.Holistic(min_detection_confidence=0.1, min_tracking_confidence=0.1) as holistic:
    start_time = time.time()
    
    while camera.isOpened:
        delay_frame_time = time.time()
        countdown_time = 1
        
        ret, frame = camera.read()
        
        if ret:

            frame = cv2.resize(frame, (original_width, original_height))

            if len(frames) < num_frames:
                try:
                    frames.append(cv2.resize(frame, (small_width, small_height)))
                    
                    frame_detected = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    frame_detected.flags.writeable = False
                    results = holistic.process(frame_detected)
                    frame_detected.flags.writeable = True
                    frame_detected = cv2.cvtColor(frame_detected, cv2.COLOR_RGB2BGR)

                    nose = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.NOSE]
                    ear_r = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_EAR]
                    ear_l = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.LEFT_EAR]
                    wrist_r = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_WRIST]
                    wrist_l = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_WRIST]

                    temporary_list = []
                    temporary_list.append(nose)
                    temporary_list.append(ear_l)
                    temporary_list.append(ear_r)
                    temporary_list.append(wrist_r)
                    temporary_list.append(wrist_l)
                    
                    extended_feature = get_extFeature_value({"nose": nose, "ear_r": ear_r, "ear_l": ear_l, "wrist_r": wrist_r, "wrist_l": wrist_l})
                    face_direction.append(extended_feature[0])
                    face_shown.append(extended_feature[1])
                    hand_shown.append(extended_feature[2])

                    detected.append(temporary_list)
                    
                except Exception as e:
                    error_text = f"No detection: {e}"
                    # cv2.putText(frame, error_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
                
            else:
                try:
                    frames.append(cv2.resize(frame, (small_width, small_height)))

                    frame_detected = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    frame_detected.flags.writeable = False
                    results = holistic.process(frame_detected)
                    frame_detected.flags.writeable = True
                    frame_detected = cv2.cvtColor(frame_detected, cv2.COLOR_RGB2BGR)

                    nose = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.NOSE]
                    ear_r = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_EAR]
                    ear_l = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.LEFT_EAR]
                    wrist_r = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_WRIST]
                    wrist_l = results.pose_landmarks.landmark[mp.solutions.holistic.PoseLandmark.RIGHT_WRIST]
                    
                    temporary_list = []
                    temporary_list.append(nose)
                    temporary_list.append(ear_l)
                    temporary_list.append(ear_r)
                    temporary_list.append(wrist_r)
                    temporary_list.append(wrist_l)

                    extended_feature = get_extFeature_value({"nose": nose, "ear_r": ear_r, "ear_l": ear_l, "wrist_r": wrist_r, "wrist_l": wrist_l})
                    face_direction.append(extended_feature[0])
                    face_shown.append(extended_feature[1])
                    hand_shown.append(extended_feature[2])

                    frames.pop(0)

                    face_direction.pop(0)
                    face_shown.pop(0)
                    hand_shown.pop(0)

                    detected.pop(0)

                    detected.append(temporary_list)
                except Exception as e:
                    error_text = f"No detection: {e}"
                    # cv2.putText(frame, error_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

            if len(frames) >= 15:
                stacked1_frames = np.vstack((frames[0], frames[1], frames[2]))
                stacked2_frames = np.vstack((frames[3], frames[4], frames[5]))
                stacked3_frames = np.vstack((frames[6], frames[7], frames[8]))
                stacked4_frames = np.vstack((frames[9], frames[10], frames[11]))
                stacked5_frames = np.vstack((frames[12], frames[13], frames[14]))

                stacked1_frames = cv2.resize(stacked1_frames, (small_width, original_height))
                stacked2_frames = cv2.resize(stacked2_frames, (small_width, original_height))
                stacked3_frames = cv2.resize(stacked3_frames, (small_width, original_height))
                stacked4_frames = cv2.resize(stacked4_frames, (small_width, original_height))
                stacked5_frames = cv2.resize(stacked5_frames, (small_width, original_height))
                
                final_layout = np.hstack((frame, stacked1_frames, stacked2_frames, stacked3_frames, stacked4_frames, stacked5_frames))
            else:
                final_layout = frame

            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image.flags.writeable = False

            flat_detected = [landmark for sublist in detected for landmark in sublist]

            landmark_list = landmark_pb2.NormalizedLandmarkList()
            landmark_list.landmark.extend(flat_detected)
            

            # Print the elapsed time in seconds since the start of the loop
            
            fps = time.time() - delay_frame_time
            sleep_time = max(0, frame_delay - fps)
            
            time.sleep(sleep_time)

            try:
                lv = landmark_list.landmark

                counter = 0
                featureper_frames = 0
                motion_row = []
                
                for lndmrk in lv:
                    motion_row.append(lndmrk.x)
                    motion_row.append(lndmrk.y)
                    motion_row.append(lndmrk.z)
                    motion_row.append(lndmrk.visibility)
                    counter += 1

                    if counter % 5 == 0:
                        motion_row.append(face_direction[featureper_frames])
                        motion_row.append(face_shown[featureper_frames])
                        motion_row.append(hand_shown[featureper_frames])
                        featureper_frames += 1 

                motion_row = list(np.array(motion_row))
                
                motion_detected = pd.DataFrame([motion_row])
                motion_class = model.predict(motion_detected)[0]

                warnings.filterwarnings("ignore", category=UserWarning, module="sklearn")
                cv2.putText(final_layout, f'Class: {motion_class}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

            except Exception as e:
                print(f"Error : {e}")
                cv2.putText(final_layout, f'Len: {len(motion_row)}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            
            # print(f'Class: {motion_class}')

            cv2.imshow("Camera Capture", final_layout)

            key = cv2.waitKey(1) & 0xFF
            if key == ord('q'):
                break
                
camera.release()
cv2.destroyAllWindows()

Camera is ready. Press 'q' to quit.


I0000 00:00:1757090063.938961  100551 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1757090063.976465  106227 gl_context.cc:369] GL version: 3.1 (OpenGL ES 3.1 Mesa 24.0.9-0ubuntu0.3), renderer: D3D12 (AMD Radeon(TM) Graphics)
W0000 00:00:1757090064.388000  106216 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1757090064.555985  106217 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1757090064.570517  106221 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1757090064.571134  106218 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00

Error : X has 23 features, but StandardScaler is expecting 345 features as input.
Error : X has 46 features, but StandardScaler is expecting 345 features as input.
Error : X has 69 features, but StandardScaler is expecting 345 features as input.
Error : X has 92 features, but StandardScaler is expecting 345 features as input.
Error : X has 115 features, but StandardScaler is expecting 345 features as input.
Error : X has 138 features, but StandardScaler is expecting 345 features as input.
Error : X has 161 features, but StandardScaler is expecting 345 features as input.
Error : X has 184 features, but StandardScaler is expecting 345 features as input.
Error : X has 207 features, but StandardScaler is expecting 345 features as input.
Error : X has 230 features, but StandardScaler is expecting 345 features as input.
Error : X has 253 features, but StandardScaler is expecting 345 features as input.
Error : X has 276 features, but StandardScaler is expecting 345 features as input.
Error : 