In [1]:
import os
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import time
from collections import deque
from tensorflow.keras.models import load_model

In [2]:
MODEL_PATH = r"F:\training_model\4\fold_5_model.h5"         
WINDOW_SIZE = 60
CAMERA_INDEX = 1        
FRAME_WIDTH = 1920
FRAME_HEIGHT = 1080

CLASS_NAMES = [
    "correct", "incorrect down half", "incorrect hip up", "incorrect hip up down half",
    "incorrect hip up up half", "incorrect knee bend", "incorrect knee bend down half",
    "incorrect knee bend up half", "incorrect only body up", "incorrect up half"
]

print("Loading model from:", MODEL_PATH)
model = load_model(MODEL_PATH)
print("Model loaded. Input shape expected by model:", model.input_shape)

Loading model from: F:\training_model\4\fold_5_model.h5
Model loaded. Input shape expected by model: (None, 60, 25)


In [3]:
def calculate_angle(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)
    ba = a - b
    bc = c - b
    cos_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
    angle = np.arccos(np.clip(cos_angle, -1.0, 1.0))
    angle1 = np.degrees(angle)
    return float(angle1)

def extract_landmarks(frame, prev_landmarks=None):
    global current_pose_landmarks
    image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose_model.process(image_rgb)

    if not results.pose_landmarks:
        # coba sekali dengan static mode (mungkin lebih sensitif pada single frame)
        pose_tmp = mp_pose.Pose(static_image_mode=True)
        results_tmp = pose_tmp.process(image_rgb)
        pose_tmp.close()
        if results_tmp.pose_landmarks:
            results = results_tmp

    if results.pose_landmarks:
        current_pose_landmarks = results.pose_landmarks  # simpan untuk menggambar
        return results.pose_landmarks.landmark
    else:
        return prev_landmarks

def detect_side(nose, left_shoulder, right_shoulder):
    if (left_shoulder.x + right_shoulder.x)/2 <  nose.x :
        return 'right'
    else:
        return 'left'

def process_landmarks(landmarks, side):
    idx_map = {
        'left': [8, 12, 14, 16, 24, 26, 28],
        'right': [7, 11, 13, 15, 23, 25, 27]
    }
    idxs = idx_map[side]
    try:
        keypoints = [landmarks[i] for i in idxs]
    except Exception as e:
        raise ValueError("Landmark index error: " + str(e))
        
    coords = [(kp.x, kp.y, kp.z) for kp in keypoints]
    shoulder_coord = coords[1]
    wrist_coord = coords[3]    
    center = coords[4]
    y_shoulder = shoulder_coord[1] - wrist_coord[1]
    normalized = [ (x - center[0], y - center[1], z - center[2]) for (x,y,z) in coords ]


    angle_shoulder = calculate_angle(coords[0], coords[1], coords[4])
    angle_elbow = calculate_angle(coords[1], coords[2], coords[3])
    angle_hip = calculate_angle(coords[1], coords[4], coords[5])
    angle_knee = calculate_angle(coords[4], coords[5], coords[6])

    feature_vector = [angle_shoulder, angle_elbow, angle_hip, angle_knee]
    for (x,y,z) in normalized:
        feature_vector.extend([x, y, z])

    return feature_vector, y_shoulder

## penggunaan metode hysteresis

In [26]:
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
pose_model = mp_pose.Pose(static_image_mode=False,
                          model_complexity=1,
                          enable_segmentation=False,
                          min_detection_confidence=0.6,
                          min_tracking_confidence=0.6)

frame_window = deque(maxlen=WINDOW_SIZE)
results_log = []   
repetition_count = 0
correct = 0
incorrect = 0
prev_landmarks = None
current_prediction = "-"
csv_columns = ["Repetition", "Prediction Latency (s)"] + CLASS_NAMES
current_pose_landmarks = None
prev_shoulder_y = None

down_1 = False
up = False
down_2 = False
hys_state = "STABLE"
still_frames = 0

cap = cv2.VideoCapture(CAMERA_INDEX)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)

print("Starting capture. Press 'q' window to quit.")
start_t = time.time()
try:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print("Frame read failed, exiting.")
            break

        landmarks = extract_landmarks(frame, prev_landmarks)
        if landmarks is None:
            cv2.putText(frame, "No landmarks detected", (20,120), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,255), 2)
            cv2.imshow("Push-up Classifier", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            continue
        
        prev_landmarks = landmarks

        nose = landmarks[0]
        left_shoulder = landmarks[11]
        right_shoulder = landmarks[12]
        side = detect_side(nose, left_shoulder, right_shoulder)

        try:
            features, shoulder = process_landmarks(landmarks, side)
        except Exception as e:
            print("Processing landmarks failed:", e)
            cv2.imshow("Push-up Classifier", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            continue
        current_shoulder_y = shoulder
        
        delta_y = current_shoulder_y - prev_shoulder_y if prev_shoulder_y is not None else 0.0
        if abs(delta_y) < 0.01:
            still_frames +=1
        else:
            still_frames = 0
            
        if hys_state in ["STABLE", "UP"] and delta_y < -0.01:
            hys_state = "DOWN"
        elif hys_state in ["STABLE", "DOWN"] and delta_y > 0.01: 
            hys_state = "UP"
        
        if prev_shoulder_y is not None:
            if hys_state == "DOWN" and not down_1:
                down_1 = True
            elif hys_state == "UP" and down_1 and not up:
                up = True
            elif hys_state == "DOWN" and down_1 and up:
                down_2 = True
            elif still_frames > 4 and down_1 and up:
                down_2 = True
        prev_shoulder_y = current_shoulder_y  

        if down_1:
            frame_window.append(features)

        if down_1 and up and down_2: 
            if len(frame_window) == WINDOW_SIZE:
                input_data = np.array([frame_window], dtype=np.float32) 
                try:
                    start_pred = time.time()
                    prediction = model.predict(input_data, verbose=0)[0]
                    end_pred = time.time()
                    latency_pred = end_pred - start_pred
                except Exception as e:
                    print("Model prediction error:", e)
                    prediction = np.zeros(len(CLASS_NAMES))
                class_idx = int(np.argmax(prediction)) if prediction.size else -1
                class_name = CLASS_NAMES[class_idx] if class_idx >= 0 else "unknown"
                current_prediction = class_name
                repetition_count += 1
                if class_name == "correct":
                    correct += 1
                else:
                    incorrect += 1

                print(f"Repetisi #{repetition_count}: {class_name}  -> probs: {np.round(prediction,3)}")
                results_log.append([repetition_count, latency_pred] + prediction.tolist())
            elif 20 < len(frame_window) < WINDOW_SIZE:
                pad_len = WINDOW_SIZE - len(frame_window)
                padded_seq = list(frame_window) + [[-1]*len(features)]*pad_len
                input_data = np.array([padded_seq], dtype=np.float32)
                try:
                    start_pred = time.time()
                    prediction = model.predict(input_data, verbose=0)[0] 
                    end_pred = time.time()
                    latency_pred = end_pred - start_pred
                except Exception as e:
                    print("Model prediction error:", e)
                    prediction = np.zeros(len(CLASS_NAMES))
                class_idx = int(np.argmax(prediction)) if prediction.size else -1
                class_name = CLASS_NAMES[class_idx] if class_idx >= 0 else "unknown"
                current_prediction = class_name
                repetition_count += 1
                if class_name == "correct":
                    correct += 1
                else:
                    incorrect += 1

                print(f"Repetisi #{repetition_count}: {class_name}  -> probs: {np.round(prediction,3)}")
                results_log.append([repetition_count, latency_pred] + prediction.tolist())
            elif len(frame_window) <= 20:
                print("Movements are invalid (window < 20). Collected:", len(frame_window))
            else:
                print("Movements are too slow (window > 60). Collected:", len(frame_window))
            
            down_1 = up = down_2 = False
            frame_window.clear()

        if current_pose_landmarks is not None:
            mp_drawing.draw_landmarks(
                frame,
                current_pose_landmarks,
                mp_pose.POSE_CONNECTIONS,
                mp_drawing.DrawingSpec(color=(0,255,0), thickness=2, circle_radius=2),
                mp_drawing.DrawingSpec(color=(0,0,255), thickness=2)
            )

        cv2.putText(frame, f"Repetition: {repetition_count}", (20, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0,255,0), 2)
        cv2.putText(frame, f"Prediction: {current_prediction}", (20, 80),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,255,255), 2)
        cv2.putText(frame, f"Correct Movements: {correct}", (20, 120),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (10,10,10), 2)
        cv2.putText(frame, f"Incorrect Movements: {incorrect}", (20, 160),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (10,10,10), 2)

        cv2.imshow("Push-up Classifier", frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

finally:
    cap.release()
    cv2.destroyAllWindows()
    pose_model.close()
    
if results_log:
    CSV_OUTPUT_PATH = r"F:\data_test\test\results\25.csv"
    os.makedirs(os.path.dirname(CSV_OUTPUT_PATH), exist_ok=True)
    df = pd.DataFrame(results_log, columns=csv_columns)
    df.to_csv(CSV_OUTPUT_PATH, index=False)
    print("Saved predictions to:", CSV_OUTPUT_PATH)
else:
    print("No repetitions recorded; CSV not written.")


Starting capture. Press 'q' window to quit.
Movements are invalid (window < 20). Collected: 20
Repetisi #1: incorrect knee bend up half  -> probs: [0.    0.    0.    0.    0.    0.    0.    0.999 0.    0.   ]
Repetisi #2: incorrect knee bend up half  -> probs: [0.    0.    0.    0.    0.    0.001 0.    0.999 0.    0.   ]
Repetisi #3: incorrect knee bend up half  -> probs: [0.    0.    0.    0.    0.    0.    0.    0.999 0.    0.   ]
Repetisi #4: incorrect knee bend up half  -> probs: [0.    0.    0.    0.    0.    0.001 0.    0.999 0.    0.   ]
Repetisi #5: incorrect knee bend up half  -> probs: [0.    0.    0.    0.    0.    0.    0.    0.999 0.    0.   ]
Repetisi #6: incorrect knee bend up half  -> probs: [0.    0.    0.    0.    0.    0.001 0.    0.998 0.    0.   ]
Repetisi #7: incorrect knee bend up half  -> probs: [0.    0.    0.    0.    0.    0.    0.    0.999 0.    0.   ]
Repetisi #8: incorrect knee bend up half  -> probs: [0.    0.    0.    0.    0.    0.002 0.    0.997 0.001 