# 0. 라이브러리 설치

In [1]:
# 1. 라이브러리 설치 (최초 1회만 실행됨)
!pip install opencv-python mediapipe tensorflow numpy

Collecting mediapipe
  Downloading mediapipe-0.10.21-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
INFO: pip is looking at multiple versions of mediapipe to determine which version is compatible with other requirements. This could take a while.
  Downloading mediapipe-0.10.20-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
  Downloading mediapipe-0.10.18-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
  Downloading mediapipe-0.10.15-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
  Downloading mediapipe-0.10.14-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
Collecting protobuf<5,>=4.25.3 (from mediapipe)
  Downloading protobuf-4.25.8-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.3-py3-none-any.whl.metadata (1.6 kB)
Downloading mediapipe-0.10.14-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_

#1. 기본 설정

## 1-1. 피쳐 엔지니어링

In [None]:
import cv2
import mediapipe as mp
import numpy as np
import joblib
import tensorflow as tf
import PIL.Image
import io
import base64
from IPython.display import display, Javascript
from google.colab.output import eval_js
from base64 import b64decode, b64encode
import os

# ==========================================
# 1. 헬퍼 함수 및 Feature Engineering (학습 코드와 동일 유지)
# ==========================================
def calculate_angle(a, b, c):
    reliability = a[3] * b[3] * c[3]
    if a[3] < 0.5 or b[3] < 0.5 or c[3] < 0.5: return None, reliability
    a_xyz, b_xyz, c_xyz = a[:3], b[:3], c[:3]
    try:
        ba = a_xyz - b_xyz
        bc = c_xyz - b_xyz
        cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
        angle = np.degrees(np.arccos(np.clip(cosine_angle, -1.0, 1.0)))
        return angle, reliability
    except: return None, reliability

def calculate_plane_normal(a, b, c):
    reliability = a[3] * b[3] * c[3]
    if a[3] < 0.5 or b[3] < 0.5 or c[3] < 0.5: return None, reliability
    a_xyz, b_xyz, c_xyz = a[:3], b[:3], c[:3]
    try:
        v1 = b_xyz - a_xyz
        v2 = c_xyz - a_xyz
        normal = np.cross(v1, v2)
        norm_len = np.linalg.norm(normal)
        if norm_len == 0: return None, reliability
        return normal / norm_len, reliability
    except: return None, reliability

def feature_engineering(landmarks):
    """
    landmarks: (33, 4) shape의 numpy array [x, y, z, visibility]
    """
    if landmarks is None: return None
    NOSE = 0
    L_EYE, R_EYE = 2, 5
    L_EAR, R_EAR = 7, 8
    L_SHOULDER, R_SHOULDER = 11, 12
    L_ELBOW, R_ELBOW = 13, 14
    L_WRIST, R_WRIST = 15, 16
    L_HIP, R_HIP = 23, 24

    angle_features, angle_rels = [], []
    normal_features, normal_rels = [], []

    angles_to_calc = [
        (landmarks[L_SHOULDER], landmarks[L_ELBOW], landmarks[L_WRIST]),
        (landmarks[R_SHOULDER], landmarks[R_ELBOW], landmarks[R_WRIST]),
        (landmarks[L_ELBOW], landmarks[L_SHOULDER], landmarks[L_HIP]),
        (landmarks[R_ELBOW], landmarks[R_SHOULDER], landmarks[R_HIP]),
        (landmarks[L_EAR], landmarks[L_SHOULDER], landmarks[L_HIP]),
        (landmarks[R_EAR], landmarks[R_SHOULDER], landmarks[R_HIP])
    ]
    for p1, p2, p3 in angles_to_calc:
        ang, rel = calculate_angle(p1, p2, p3)
        angle_features.append(ang)
        angle_rels.append(rel)

    face_normal, face_rel = calculate_plane_normal(landmarks[NOSE], landmarks[L_EYE], landmarks[R_EYE])
    normal_features.append(face_normal)
    normal_rels.append(face_rel)

    sh_nose_normal, sh_nose_rel = calculate_plane_normal(landmarks[L_SHOULDER], landmarks[R_SHOULDER], landmarks[NOSE])
    normal_features.append(sh_nose_normal)
    normal_rels.append(sh_nose_rel)

    norm_angles_vec = [(val / 360.0) if val is not None else 0.0 for val in angle_features]
    flat_normals_vec = []
    for vec in normal_features:
        if vec is not None: flat_normals_vec.extend(vec)
        else: flat_normals_vec.extend([0.0, 0.0, 0.0])

    final_features = norm_angles_vec + flat_normals_vec + angle_rels + normal_rels
    return np.array(final_features)

## 1-2. 모델 로드

In [16]:
import os
import joblib
import numpy as np
import tensorflow as tf

# ==========================================
# 사용자 설정: 여기서 모델 종류와 경로만 바꾸세요
# ==========================================
# 사용 가능한 옵션: 'rf' (Random Forest), 'svm' (SVM), 'keras' (Keras DL)
CURRENT_MODEL_TYPE = 'rf'

# 각 모델 파일 경로 설정
MODEL_PATHS = {
    'rf': 'pose_classifier_rf.pkl',
    'svm': 'pose_classifier_svm.pkl',
    'keras': 'pose_classifier_keras.h5'
}

# ==========================================
# 2. 모델 로드 함수 (통합)
# ==========================================
def load_pose_model(model_type, paths):
    path = paths.get(model_type)

    if not os.path.exists(path):
        print(f"오류: '{path}' 파일이 없습니다. 경로를 확인하세요.")
        return None

    if model_type in ['rf', 'svm']:
        # Scikit-learn 계열 (joblib 사용)
        model = joblib.load(path)
        print(f"[{model_type.upper()}] Scikit-learn 모델 로드 완료!")
        return model

    elif model_type == 'keras':
        # Deep Learning 계열 (Keras 사용)
        model = tf.keras.models.load_model(path)
        print(f"[{model_type.upper()}] Keras 모델 로드 완료!")
        return model

    else:
        print("지원하지 않는 모델 타입입니다.")
        return None

# 모델 로드 실행
model = load_pose_model(CURRENT_MODEL_TYPE, MODEL_PATHS)

# ==========================================
# 3. 예측 수행 헬퍼 함수 (추후 루프 안에서 사용)
# ==========================================
def predict_pose(model, features, model_type):
    """
    모델 타입에 따라 알맞은 전처리와 예측 결과를 반환합니다.
    """
    if model is None or features is None:
        return None

    # 1) Scikit-learn 계열 (RF, SVM)
    if model_type in ['rf', 'svm']:
        # 입력: 2차원 배열 필요 (1, N) -> reshape(1, -1) 권장
        input_data = features.reshape(1, -1)
        prediction = model.predict(input_data)[0] # 결과가 배열로 나오므로 첫 번째 요소 선택
        return prediction

    # 2) Keras 계열 (Deep Learning)
    elif model_type == 'keras':
        # 입력: 반드시 (1, features_len) 형태여야 함
        input_data = features.reshape(1, -1)

        # 예측: 확률 분포 반환 (예: [0.1, 0.8, 0.1])
        prediction_probs = model.predict(input_data, verbose=0)

        # 가장 높은 확률의 인덱스 추출
        prediction = np.argmax(prediction_probs)
        return prediction

# ==========================================
# MediaPipe 설정 (기존 코드 유지)
# ==========================================
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils

print(f"\n현재 설정된 모델: {CURRENT_MODEL_TYPE}")

[RF] Scikit-learn 모델 로드 완료!

현재 설정된 모델: rf


# 2. 웹캠 통신 함수

In [3]:
def js_to_image(js_reply):
  """JS 객체를 OpenCV 이미지로 변환"""
  image_bytes = b64decode(js_reply.split(',')[1])
  jpg_as_np = np.frombuffer(image_bytes, dtype=np.uint8)
  img = cv2.imdecode(jpg_as_np, flags=1)
  return img

def bbox_to_bytes(bbox_array):
  """OpenCV 이미지를 JS 전송용 byte로 변환"""
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  iobuf = io.BytesIO()
  bbox_PIL.save(iobuf, format='png')
  bbox_bytes = 'data:image/png;base64,{}'.format((str(b64encode(iobuf.getvalue()), 'utf-8')))
  return bbox_bytes

def video_stream():
  js = Javascript('''
    var video;
    var div = null;
    var stream;
    var captureCanvas;
    var imgElement;
    var labelElement;

    var pendingResolve = null;
    var shutdown = false;

    function removeDom() {
       stream.getVideoTracks()[0].stop();
       video.remove();
       div.remove();
       video = null;
       div = null;
       stream = null;
       imgElement = null;
       captureCanvas = null;
       labelElement = null;
    }

    function onAnimationFrame() {
      if (!shutdown) {
        window.requestAnimationFrame(onAnimationFrame);
      }
      if (pendingResolve) {
        var result = "";
        if (!shutdown) {
          captureCanvas.getContext('2d').drawImage(video, 0, 0, 640, 480);
          result = captureCanvas.toDataURL('image/jpeg', 0.8)
        }
        var lp = pendingResolve;
        pendingResolve = null;
        lp(result);
      }
    }

    async function createDom() {
      if (div !== null) {
        return stream;
      }
      div = document.createElement('div');
      div.style.border = '2px solid black';
      div.style.padding = '3px';
      div.style.width = '100%';
      div.style.maxWidth = '600px';
      document.body.appendChild(div);

      const modelOut = document.createElement('div');
      modelOut.innerHTML = "<span>Status:</span>";
      labelElement = document.createElement('span');
      labelElement.innerText = 'No data';
      labelElement.style.fontWeight = 'bold';
      modelOut.appendChild(labelElement);
      div.appendChild(modelOut);

      video = document.createElement('video');
      video.style.display = 'block';
      video.width = div.clientWidth - 6;
      video.setAttribute('playsinline', '');
      video.onclick = () => { shutdown = true; };
      stream = await navigator.mediaDevices.getUserMedia(
          {video: { facingMode: "user"}});
      div.appendChild(video);

      imgElement = document.createElement('img');
      imgElement.style.position = 'absolute';
      imgElement.style.zIndex = 1;
      imgElement.onclick = () => { shutdown = true; };
      div.appendChild(imgElement);

      const instruction = document.createElement('div');
      instruction.innerHTML =
          '<span style="color: red; font-weight: bold;">' +
          'When finished, click here or on the video to stop this demo</span>';
      div.appendChild(instruction);
      instruction.onclick = () => { shutdown = true; };

      video.srcObject = stream;
      await video.play();

      captureCanvas = document.createElement('canvas');
      captureCanvas.width = 640;
      captureCanvas.height = 480;
      window.requestAnimationFrame(onAnimationFrame);

      return stream;
    }
    async function stream_frame(label, imgData) {
      if (shutdown) {
        removeDom();
        shutdown = false;
        return '';
      }
      var preCreate = Date.now();
      stream = await createDom();

      var preShow = Date.now();
      if (label != "") {
        labelElement.innerHTML = label;
      }
      if (imgData != "") {
        var videoRect = video.getClientRects()[0];
        imgElement.style.top = videoRect.top + "px";
        imgElement.style.left = videoRect.left + "px";
        imgElement.style.width = videoRect.width + "px";
        imgElement.style.height = videoRect.height + "px";
        imgElement.src = imgData;
      }
      var preCapture = Date.now();
      var result = await new Promise(function(resolve, reject) {
        pendingResolve = resolve;
      });
      shutdown = false;

      return {'create': preShow - preCreate,
              'show': preCapture - preShow,
              'capture': Date.now() - preCapture,
              'img': result};
    }
    ''')
  display(js)

def video_frame(label, bbox):
  data = eval_js('stream_frame("{}", "{}")'.format(label, bbox))
  return data

# 3. 메인 실행 (영상)

In [17]:
# ==========================================
# 2. 예측 통합 헬퍼 함수 (핵심 모듈)
# ==========================================
def predict_wrapper(model, features, model_type):
    """
    모델 타입에 따라 적절한 예측 함수를 호출하고,
    (상태 문자열, 'Good'일 확률)을 반환합니다.
    """
    if features is None:
        return "No Pose", 0.0

    # 입력 차원 맞추기 (1, N)
    input_data = features.reshape(1, -1)

    prob_good = 0.0

    try:
        # A. 머신러닝 모델 (RF, SVM)
        if model_type in ['rf', 'svm']:
            # predict_proba 반환값: [[prob_bad, prob_good]]
            probs = model.predict_proba(input_data)[0]
            prob_good = probs[1] # 1번 인덱스가 Good이라고 가정

        # B. 딥러닝 모델 (Keras)
        elif model_type == 'keras':
            # 반환값 형태: [[0.78]] (2차원 배열 안에 값 1개)
            pred = model.predict(input_data, verbose=0)

            prob_good = pred[0][0]

    except Exception as e:
        print(f"Prediction Logic Error: {e}")
        return "Error", 0.0

    # 상태 결정 (임계값 0.5)
    if prob_good > 0.5:
        return "Good", prob_good
    else:
        return "Bad", prob_good

# ==========================================
# 3. 실시간 웹캠 분석 루프
# ==========================================
# 스트리밍 시작 (Colab 환경 가정)
try:
    video_stream() # 이미 정의되어 있다고 가정
except NameError:
    print("video_stream() 함수가 정의되지 않았습니다. 이전 셀을 실행했는지 확인하세요.")

label_html = 'Starting Posture Analysis...'
bbox = ''
count = 0

while True:
    # Colab JS 프록시를 통해 프레임 받기
    try:
        js_reply = video_frame(label_html, bbox)
        if not js_reply: break
    except: break

    # 1. 이미지 변환 및 전처리
    img = js_to_image(js_reply["img"])
    img = cv2.flip(img, 1) # 거울 모드
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    # 2. MediaPipe 분석
    results = pose.process(img_rgb)

    status = "No Pose"
    prediction_prob = 0.0
    color = (200, 200, 200) # 회색 (Default)

    # 3. 랜드마크 감지 시 처리
    if results.pose_landmarks:
        try:
            # (1) 랜드마크 추출 및 Numpy 변환
            landmarks_list = []
            for lm in results.pose_landmarks.landmark:
                landmarks_list.append([lm.x, lm.y, lm.z, lm.visibility])
            landmarks_arr = np.array(landmarks_list)

            # (2) Feature Engineering (기존 함수 사용)
            features = feature_engineering(landmarks_arr)

            if features is not None:
                # (3) [모듈화된 예측 함수 호출]
                # 여기서 모델 타입에 따라 알아서 처리됩니다.
                status, prediction_prob = predict_wrapper(model, features, CURRENT_MODEL_TYPE)

                # 색상 설정
                if status == "Good":
                    color = (0, 255, 0) # 초록
                elif status == "Bad":
                    color = (0, 0, 255) # 빨강
                else:
                    color = (0, 255, 255) # 노랑 (에러 등)

                label_html = f"[{CURRENT_MODEL_TYPE.upper()}] Status: {status} ({prediction_prob*100:.1f}%)"

            # (4) 시각화: 랜드마크 그리기
            mp_drawing.draw_landmarks(
                img,
                results.pose_landmarks,
                mp_pose.POSE_CONNECTIONS,
                mp_drawing.DrawingSpec(color=color, thickness=2, circle_radius=2),
                mp_drawing.DrawingSpec(color=(255,255,255), thickness=2, circle_radius=2)
            )

            # (5) 시각화: 텍스트 오버레이
            cv2.rectangle(img, (0,0), (300, 60), (245, 117, 16), -1)
            cv2.putText(img, f"{status} {prediction_prob*100:.0f}%", (10, 40),
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
            cv2.putText(img, f"Model: {CURRENT_MODEL_TYPE}", (10, 15), # 현재 모델 표시
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1)

        except Exception as e:
            print(f"Main Loop Error: {e}")
            pass
    else:
        label_html = 'Pose Not Detected'

    # 4. 출력용 이미지 생성 (RGBA)
    img_rgba = cv2.cvtColor(img, cv2.COLOR_BGR2RGBA)
    bbox_bytes = bbox_to_bytes(img_rgba)
    bbox = bbox_bytes

<IPython.core.display.Javascript object>

  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  bbox_PIL = PIL.Image.fromarray(bbox_array, '