# x, y 좌표를 이용한 운동 분류

In [21]:
import cv2
import numpy as np
import mediapipe as mp
from collections import deque, Counter
from keras.models import load_model
import threading
import time
import tempfile
import os
from gtts import gTTS

def extract_pose_landmarks(results):
    xyz_list = []

    points = [
        11, 12, 13, 14, 15, 16, 23, 24, 25, 26, 27, 28
    ]

    lm = results.pose_landmarks.landmark

    neck_x = (lm[11].x + lm[12].x) / 2
    neck_y = (lm[11].y + lm[12].y) / 2
    #neck_z = (lm[11].z + lm[12].z) / 2

    for idx in points:
        x = lm[idx].x - neck_x
        y = lm[idx].y - neck_y
        #z = lm[idx].z - neck_z

        xyz_list.append([x, y])

    return xyz_list

# ---- TTS Thread ----
class TextToSpeechThread(threading.Thread):
    def __init__(self, text):
        super().__init__()
        self.text = text

    def run(self):
        try:
            tts = gTTS(text=self.text, lang='ko')
            with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmpfile:
                tts.save(tmpfile.name)
                os.system(f"mpg123 {tmpfile.name}")
                os.remove(tmpfile.name)
        except Exception as e:
            print(f"[TTS Error] {e}")

# ---- Prediction Thread ----
class PredictionThread(threading.Thread):
    def __init__(self, model, sequence, lock):
        super().__init__()
        self.model = model
        self.sequence = sequence
        self.lock = lock
        self.result = None
        self.running = True

    def run(self):
        while self.running:
            with self.lock:
                if len(self.sequence) == 20:
                    input_data = np.array(list(self.sequence)).reshape(1, 20, 24)
                else:
                    input_data = None

            if input_data is not None:
                prediction = self.model.predict(input_data, verbose=0)
                self.result = prediction
            else:
                time.sleep(0.01)

    def stop(self):
        self.running = False

class AngleGuid():
    def __init__(self, exercise):
        self.vectors = {0: None, 1: None}
        self.initialized = {0: False, 1: False}
        self.count = 0

        self.r = (0, 0, 255)
        self.g = (0, 255, 0)
        self.b = (255, 0, 0)
        self.p = (147, 20, 255)

        self.state = {0: "up", 1: "down"}
        self.exercise = exercise

        self.last_tts_time = 0
        self.last_time = 0
        self.tts_play = False

        if self.exercise == "squat":
            self.squat_init()
        elif self.exercise == "shoulder":
            self.shoulder_press_init()
        elif self.exercise == "knee":
            self.knee_raise_init()
    
    def squat_init(self):
        self.up_angle = 160
        self.down_angle = 130

    def shoulder_press_init(self):
        self.up_angle = 130
        self.down_angle = 50
        self.limit_angle = 110

    def knee_raise_init(self):
        self.up_angle = 70
        self.down_angle = 160

    def update(self, index, angle, passing):
        if passing: return

        if self.exercise == "shoulder":
            if self.state[index] == "up" and angle > self.up_angle:
                self.state[index] = "down"

                if self.state[0] == "down" and self.state[1] == "down":
                    self.count +=1
            elif self.state[index] == "down":
                if angle < self.down_angle:
                    self.state[index] = "up"
        elif self.exercise == "squat":
            if self.state[index] == "down" and angle < self.down_angle:
                self.state[index] = "up"

                if self.state[0] == "up" and self.state[1] == "up":
                    self.count += 1
            elif self.state[index] == "up" and angle > self.up_angle:
                self.state[index] = "down"
        elif self.exercise == "knee":
            if self.state[index] == "up" and angle < self.up_angle:
                self.state[index] = "down"
                self.count += 0.5
            elif self.state[index] == "down" and angle > self.down_angle:
                self.state[index] = "up"

    def to_pixel(self, frame, idx, landmarks):
        return np.array([
            landmarks[idx].x * frame.shape[1],
            landmarks[idx].y * frame.shape[0]
        ], dtype=np.float32)

    def calculate_angle(self, a, b, c):
        a = np.array([a.x, a.y])
        b = np.array([b.x, b.y])
        c = np.array([c.x, c.y])
        ba = a - b
        bc = c - b
        cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
        angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))
        return np.degrees(angle)
    
    def get_point_by_angle(self, origin, center, from_point, angle_deg, length=None):
        vec = from_point - center
        if length is None:
            length = np.linalg.norm(vec)

        # 단위 벡터
        unit_vec = vec / (np.linalg.norm(vec) + 1e-6)

        # 회전 각도: 관절 기준이라 180도에서 빼야 함
        theta = np.radians(180 - angle_deg)

        rot_matrix = np.array([
            [np.cos(theta), -np.sin(theta)],
            [np.sin(theta),  np.cos(theta)]
        ])

        rotated_vec = rot_matrix @ unit_vec
        target_point = origin + rotated_vec * length
        return target_point


    def draw_exercise_line(self, frame, landmarks):
        joint_map = {
            "shoulder": [(12, 14, 16), (11, 13, 15)],
            "squat": [(24, 26, 28), (23, 25, 27)],
            "knee": [(24, 26, 28), (23, 25, 27)]
        }

        guide_angle = {
            "shoulder": lambda idx: -120 * (1 if idx == 0 else -1),
            "squat": lambda idx: 90 * (1 if idx == 1 else -1),
            "knee": lambda idx: -98
        }

        for idx, (a, b, c) in enumerate(joint_map[self.exercise]):
            passing = False
            pt1 = self.to_pixel(frame, a, landmarks)
            pt2 = self.to_pixel(frame, b, landmarks)
            pt3 = self.to_pixel(frame, c, landmarks)

            angle = self.calculate_angle(landmarks[a], landmarks[b], landmarks[c])
            if (landmarks[c].y > landmarks[a].y and self.exercise == "shoulder"):
                passing = True

            self.update(idx, angle, passing)

            if self.exercise == "shoulder":
                origin = pt1
                center = pt2
                point = pt1
            elif self.exercise == "squat":
                origin = pt2
                center = pt2
                point = pt3
            elif self.exercise == "knee":
                origin = pt1
                center = pt2
                point = pt1

            # 초기 가이드 벡터
            if not self.initialized[idx]:
                self.vectors[idx] = self.get_point_by_angle(origin, center, point, guide_angle[self.exercise](idx))
                self.initialized[idx] = True

            # 가이드 라인
            cv2.line(frame, tuple(origin.astype(int)), tuple(self.vectors[idx].astype(int)), self.p, 2)

            # 관절 색
            if self.exercise == "shoulder":
                if angle > self.up_angle and landmarks[c].y < landmarks[a].y:
                    color = self.g 
                else:
                    color = self.r
            elif self.exercise == "knee":
                color = self.g if angle < self.up_angle else self.r
            elif self.exercise == "squat":
                color = self.r if angle > self.down_angle else self.g

            # 관절 라인
            cv2.line(frame, tuple(pt1.astype(int)), tuple(pt2.astype(int)), color, 2)
            cv2.line(frame, tuple(pt2.astype(int)), tuple(pt3.astype(int)), color, 2)

            # 점 & 각도 표시
            for pt in [pt1, pt2, pt3]:
                cv2.circle(frame, tuple(pt.astype(int)), 4, self.b, -1)

            cv2.putText(frame, str(int(angle)), (int(pt2[0]), int(pt2[1])), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)


class AngleGuidThreaded:
    def __init__(self, exercise):
        self.guid = AngleGuid(exercise)
        self.lock = threading.Lock()

    def set_exercise(self, exercise):
        with self.lock:
            self.exercise = exercise

    def draw(self, frame, landmarks):
        with self.lock:
            self.guid.draw_exercise_line(frame, landmarks)

    def get_count(self):
        with self.lock:
            return self.guid.count
    
    def set_count(self):
        with self.lock:
            self.guid.count = 0

    def get_current_angles(self):
        with self.lock:
            return self.guid.current_angle
        
    def get_state(self):
        with self.lock:
            return self.guid.state

path = "/home/shin/deeplearning-repo-1/dataset/stand_knee_raise/9.mp4"

cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
mp_pose = mp.solutions.pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
sequence = deque(maxlen=20)

execrise_list = ["Standing", "Standing Knee Raise", "Shoulder Press", "Squat", "Side Lunge"]
model = load_model('exercise_classifier.h5')

lock = threading.Lock()
predict_thread = PredictionThread(model, sequence, lock)
predict_thread.start()

last_exercise = None
last_tts_time = 0  # TTS 중복 방지 타이머

guid = AngleGuidThreaded("squat")
start_time = time.time()
countdown_done = False

prev_count = 0

try:
    if cap.isOpened():
        while True:
            ret, image = cap.read()
            if not ret:
                break

            elapsed = time.time() - start_time
            if not countdown_done:
                countdown = max(0, 5 - int(elapsed))
                cv2.putText(image, f"{countdown}...", (image.shape[1] // 2, image.shape[0] // 2), cv2.FONT_HERSHEY_SIMPLEX, 3, (0, 0, 255), 5)
                if countdown == 0:
                    countdown_done = True
            else:
                #image = cv2.resize(image, (640, 480))
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                results = mp_pose.process(image)
                image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
                if results.pose_landmarks is None:
                    continue

                landmarks = extract_pose_landmarks(results)

                with lock:
                    sequence.append(landmarks)

                guid.draw(image, results.pose_landmarks.landmark)
                count = guid.get_count()
                state = guid.get_state()

                if predict_thread.result is not None:
                    predict = predict_thread.result
                    predict_class = int(np.argmax(predict))
                    label = execrise_list[predict_class]

                    # 이전에 예측된 운동과 현재 운동이 다를 경우 다시 예측된 운동명 TTS 재생
                    if label != last_exercise:
                        if time.time() - last_tts_time > 5:
                            tts_thread = TextToSpeechThread(f"{label}")
                            #tts_thread.start()
                            last_exercise = label
                            last_tts_time = time.time()

                    # 화면에 예측 표시
                    #cv2.putText(frame, f"{label}", (10, 45), cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 0, 0), 3)
                    
                cv2.putText(image, f"Count: {int(count)}", (10, 45), cv2.FONT_HERSHEY_SIMPLEX, 2, (0,255,255), 4)
                cv2.putText(image, f"left: {state[0]}, right: {state[1]}", (10, 145), cv2.FONT_HERSHEY_SIMPLEX, 2, (0,255,255), 4)

                if count > prev_count:
                    tts_thread = TextToSpeechThread(f"{count}")
                    tts_thread.start()
                    prev_count = count

            cv2.imshow('Exercise Classifier', image)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
finally:
    predict_thread.stop()
    predict_thread.join()
    cap.release()
    mp_pose.close()
    cv2.destroyAllWindows()

I0000 00:00:1743559938.006961  634561 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1743559938.008308  791145 gl_context.cc:369] GL version: 3.2 (OpenGL ES 3.2 Mesa 24.2.8-1ubuntu1~24.04.1), renderer: Mesa Intel(R) UHD Graphics (CML GT2)
W0000 00:00:1743559938.078420  791131 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1743559938.125898  791132 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
High Performance MPEG 1.0/2.0/2.5 Audio Player for Layers 1, 2 and 3
	version 1.32.5; written and copyright by Michael Hipp and others
	free software (LGPL) without any warranty but with best wishes

Directory: /tmp/
Playing MPEG stream 1 of 1: tmpgi74y0ft.mp3 ...

MPEG 2.0 L III cbr64 24000 mono

[0:00] Decoding of tmpgi74y0ft.mp3 finished.
High Performance 

High Performance MPEG 1.0/2.0/2.5 Audio Player for Layers 1, 2 and 3
	version 1.32.5; written and copyright by Michael Hipp and others
	free software (LGPL) without any warranty but with best wishes

Directory: /tmp/
Playing MPEG stream 1 of 1: tmp0vol8801.mp3 ...

MPEG 2.0 L III cbr64 24000 mono

[0:00] Decoding of tmp0vol8801.mp3 finished.


# x, y좌표 + 관절 각도를 이용한 운동 분류

In [None]:
import cv2
import numpy as np
import tempfile
import mediapipe as mp
from collections import deque, Counter
import threading
import time
import tempfile
import os
from gtts import gTTS
from tensorflow.keras.models import load_model

def calculate_angle(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)
    ba = a - b
    bc = c - b
    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
    angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))
    return np.degrees(angle)

def extract_pose_angles(results, frame):
    lm = results.pose_landmarks.landmark
    image_height, image_width, _ = frame.shape

    neck_x = (lm[11].x + lm[12].x) / 2
    neck_y = (lm[11].y + lm[12].y) / 2
    landmarks = [(lm[i].x - neck_x, lm[i].y - neck_y) for i in range(33)]

    angle_joints = [
        (11, 13, 15), (12, 14, 16),  # 양팔
        (23, 25, 27), (24, 26, 28),  # 양다리
        (13, 11, 23), (14, 12, 24),  # 상체
        (11, 23, 25), (12, 24, 26),  # 골반
    ]

    landmark_points = [11, 12, 13, 14, 15, 16, 23, 24, 25, 26, 27, 28]

    xy_features = []
    for i in landmark_points:
        x, y = landmarks[i]
        xy_features.extend([x, y])

    angles = []
    features_per_video = []
    for a, b, c in angle_joints:
        angle = calculate_angle(landmarks[a], landmarks[b], landmarks[c])
        angles.append(angle)

        cx = int(lm[b].x * image_width)
        cy = int(lm[b].y * image_height)
        cv2.putText(frame, f"{int(angle)}", (cx, cy), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    full_feature = xy_features + angles
    features_per_video.append(full_feature)
    
    return features_per_video

class TextToSpeechThread(threading.Thread):
    def __init__(self, text):
        super().__init__()
        self.text = text

    def run(self):
        try:
            tts = gTTS(text=self.text, lang='ko')
            with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmpfile:
                tts.save(tmpfile.name)
                os.system(f"mpg123 {tmpfile.name}")
                os.remove(tmpfile.name)
        except Exception as e:
            print(f"[TTS Error] {e}")

class PredictionThread(threading.Thread):
    def __init__(self, model, sequence, lock):
        super().__init__()
        self.model = model
        self.sequence = sequence
        self.lock = lock
        self.result = None
        self.running = True

    def run(self):
        while self.running:
            with self.lock:
                if len(self.sequence) == 20:
                    input_data = np.array(list(self.sequence)).reshape(1, 20, 32)
                else:
                    input_data = None

            if input_data is not None:
                prediction = self.model.predict(input_data, verbose=0)
                self.result = prediction
            else:
                time.sleep(0.01)

    def stop(self):
        self.running = False

cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
mp_pose = mp.solutions.pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
sequence = deque(maxlen=20)

execrise_list = ["Standing", "Standing Knee Raise", "Shoulder Press", "Squat", "Front Lunge"]
model = load_model("exercise_classifier_angle.h5")

lock = threading.Lock()
predict_thread = PredictionThread(model, sequence, lock)
predict_thread.start()

fix_exercise = False
fixed_class = None
last_tts_time = 0
last_exercise = None

guid = AngleGuid()
guid.init("shoulder")
guid.set_direction("up")

try:
    if cap.isOpened():
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            # frame = cv2.resize(frame, (1280, 720))

            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = mp_pose.process(image)
            if results.pose_landmarks is None:
                continue

            features = extract_pose_angles(results, image)
            with lock:
                sequence.append(features)

            if predict_thread.result is not None:
                predict = predict_thread.result
                predict_class = int(np.argmax(predict))
                label = execrise_list[predict_class]

                # 이전에 예측된 운동과 현재 운동이 다를 경우 다시 예측된 운동명 TTS 재생
                if label != last_exercise:
                    if time.time() - last_tts_time > 5:
                        tts_thread = TextToSpeechThread(f"{label}")
                        tts_thread.start()
                        last_tts_time = time.time()
                        last_exercise = label

                cv2.putText(frame, f"{label}", (10, 45), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 3)

            cv2.imshow('Exercise Classifier', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
finally:
    predict_thread.stop()
    predict_thread.join()
    cap.release()
    mp_pose.close()
    cv2.destroyAllWindows()


2025-03-27 15:41:18.463071: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1743057678.479619    4383 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1743057678.485218    4383 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1743057678.499100    4383 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1743057678.499115    4383 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1743057678.499117    4383 computation_placer.cc:177] computation placer alr