In [4]:
!pip install fastapi "uvicorn[standard]" opencv-python torch torchvision ultralytics mediapipe python-multipart numpy

Collecting fastapi
  Downloading fastapi-0.121.3-py3-none-any.whl.metadata (30 kB)
Collecting python-multipart
  Downloading python_multipart-0.0.20-py3-none-any.whl.metadata (1.8 kB)
Collecting uvicorn[standard]
  Downloading uvicorn-0.38.0-py3-none-any.whl.metadata (6.8 kB)
Collecting starlette<0.51.0,>=0.40.0 (from fastapi)
  Downloading starlette-0.50.0-py3-none-any.whl.metadata (6.3 kB)
Collecting pydantic!=1.8,!=1.8.1,!=2.0.0,!=2.0.1,!=2.1.0,<3.0.0,>=1.7.4 (from fastapi)
  Downloading pydantic-2.12.4-py3-none-any.whl.metadata (89 kB)
Collecting annotated-doc>=0.0.2 (from fastapi)
  Downloading annotated_doc-0.0.4-py3-none-any.whl.metadata (6.6 kB)
Collecting annotated-types>=0.6.0 (from pydantic!=1.8,!=1.8.1,!=2.0.0,!=2.0.1,!=2.1.0,<3.0.0,>=1.7.4->fastapi)
  Downloading annotated_types-0.7.0-py3-none-any.whl.metadata (15 kB)
Collecting pydantic-core==2.41.5 (from pydantic!=1.8,!=1.8.1,!=2.0.0,!=2.0.1,!=2.1.0,<3.0.0,>=1.7.4->fastapi)
  Downloading pydantic_core-2.41.5-cp310-cp310-

In [5]:
import cv2
import torch
import numpy as np
import mediapipe as mp
from ultralytics import YOLO
from PIL import Image
from torchvision import transforms
import time

class EmotionAnalyzer:
    def __init__(self, model_path):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # 학습된 모델 로드 (구조에 맞게 불러오기)
        # 만약 YOLO 포맷으로 저장했다면 YOLO(model_path), pytorch dict라면 load_state_dict
        try:
            self.model = YOLO(model_path) # YOLO 포맷인 경우
        except:
            print("YOLO 로드 실패, PyTorch 가중치로 로드 시도")
            # 커스텀 로드 로직 필요 (생략)
            
        self.mp_face_detection = mp.solutions.face_detection
        self.face_detection = self.mp_face_detection.FaceDetection(min_detection_confidence=0.5)
        
        # 웃음 상태 관리
        self.is_laughing = False
        self.laugh_start_time = 0
        self.peak_intensity = 0.0
        self.logs = [] # 결과 저장 리스트

    def process_frame(self, frame):
        # 1. MediaPipe로 얼굴 감지
        img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = self.face_detection.process(img_rgb)
        
        current_intensity = 0.0
        
        if results.detections:
            for detection in results.detections:
                bboxC = detection.location_data.relative_bounding_box
                ih, iw, _ = frame.shape
                x, y, w, h = int(bboxC.xmin * iw), int(bboxC.ymin * ih), int(bboxC.width * iw), int(bboxC.height * ih)
                
                # 얼굴 크롭 및 여백 확보
                x, y = max(0, x), max(0, y)
                face_img = frame[y:y+h, x:x+w]
                
                if face_img.size == 0: continue

                # 2. 모델 추론 (웃음 확률 계산)
                # YOLO Predict 사용 시
                results_cls = self.model.predict(face_img, verbose=False)
                probs = results_cls[0].probs
                
                # happy 클래스의 인덱스가 0이라고 가정 (data.yaml 순서 확인 필수)
                # 만약 happy가 1번이면 probs.data[1]
                happy_score = float(probs.data[0]) if probs is not None else 0.0
                
                current_intensity = happy_score
                
                # 시각화 (얼굴 박스)
                color = (0, 255, 0) if happy_score > 0.5 else (0, 0, 255)
                cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
                cv2.putText(frame, f"Smile: {happy_score:.2f}", (x, y - 10), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        # 3. 타임스탬프 및 강도 기록 로직
        self.update_log(current_intensity)
        
        return frame, current_intensity

    def update_log(self, intensity):
        threshold = 0.6  # 웃음으로 간주할 임계값
        now = time.time()

        if intensity >= threshold:
            if not self.is_laughing:
                # 웃음 시작
                self.is_laughing = True
                self.laugh_start_time = now
                self.peak_intensity = intensity
                print(f"[START] Laugh detected at {now}")
            else:
                # 웃음 유지 중 - 피크 갱신
                self.peak_intensity = max(self.peak_intensity, intensity)
        else:
            if self.is_laughing:
                # 웃음 종료
                self.is_laughing = False
                duration = now - self.laugh_start_time
                log_entry = {
                    "start": self.laugh_start_time,
                    "end": now,
                    "duration": duration,
                    "peak_intensity": self.peak_intensity
                }
                self.logs.append(log_entry)
                print(f"[END] Duration: {duration:.2f}s, Peak: {self.peak_intensity:.2f}")

# --- 실행 예시 ---
# analyzer = EmotionAnalyzer("runs/classify/train/weights/best.pt")
# cap = cv2.VideoCapture(0)
# while cap.isOpened():
#     ret, frame = cap.read()
#     if not ret: break
#     frame, gauge = analyzer.process_frame(frame)
#     cv2.imshow('Smile Challenge', frame)
#     if cv2.waitKey(1) & 0xFF == ord('q'): break
# cap.release()
# cv2.destroyAllWindows()

시작

In [7]:
import cv2
import torch
import numpy as np
import mediapipe as mp
from ultralytics import YOLO
import base64
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware

# --- 제공해주신 EmotionAnalyzer 클래스 (수정됨) ---
class EmotionAnalyzer:
    def __init__(self, model_path):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # 모델 로드
        try:
            self.model = YOLO(model_path)
        except Exception as e:
            print(f"모델 로드 실패: {e}")
            self.model = None

        self.mp_face_detection = mp.solutions.face_detection
        self.face_detection = self.mp_face_detection.FaceDetection(min_detection_confidence=0.5)

    def process_frame(self, frame):
        # 이미지 쓰기 가능하도록 복사 (OpenCV/MediaPipe 호환성)
        frame.flags.writeable = True
        img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = self.face_detection.process(img_rgb)
        
        current_intensity = 0.0
        
        if results.detections:
            for detection in results.detections:
                bboxC = detection.location_data.relative_bounding_box
                ih, iw, _ = frame.shape
                x, y, w, h = int(bboxC.xmin * iw), int(bboxC.ymin * ih), int(bboxC.width * iw), int(bboxC.height * ih)
                
                x, y = max(0, x), max(0, y)
                face_img = frame[y:y+h, x:x+w]
                
                if face_img.size == 0: continue

                # YOLO 추론
                if self.model:
                    # verbose=False로 로그 억제
                    results_cls = self.model.predict(face_img, verbose=False, device=self.device)
                    probs = results_cls[0].probs
                    
                    # Happy 클래스가 0번이라고 가정 (데이터 구조 기반)
                    happy_score = float(probs.data[0]) if probs is not None else 0.0
                    current_intensity = happy_score
                    
                    # 시각화
                    color = (0, 255, 0) if happy_score > 0.5 else (0, 0, 255)
                    cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
                    label = f"Smile: {happy_score*100:.1f}%"
                    cv2.putText(frame, label, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        return frame, current_intensity

# --- FastAPI 서버 설정 ---
app = FastAPI()

# CORS 설정 (React에서 접근 허용)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 모델 경로 설정 (실제 경로로 수정 필요)
# 'best.pt' 파일이 있는 경로를 지정해주세요.
MODEL_PATH = "best.pt" 
analyzer = EmotionAnalyzer(MODEL_PATH)

@app.websocket("/ws/video")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # 1. React로부터 Base64 이미지 수신
            data = await websocket.receive_text()
            
            # 2. Base64 -> OpenCV 이미지로 디코딩
            img_data = base64.b64decode(data.split(',')[1])
            nparr = np.frombuffer(img_data, np.uint8)
            frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

            # 3. 분석 수행
            processed_frame, intensity = analyzer.process_frame(frame)

            # 4. OpenCV 이미지 -> Base64 인코딩
            _, buffer = cv2.imencode('.jpg', processed_frame)
            processed_base64 = base64.b64encode(buffer).decode('utf-8')

            # 5. 결과 전송 (이미지 + 점수)
            await websocket.send_json({
                "image": f"data:image/jpeg;base64,{processed_base64}",
                "score": intensity
            })

    except WebSocketDisconnect:
        print("Client disconnected")
    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    # 실행: python app.py
    uvicorn.run(app, host="0.0.0.0", port=4478)

I0000 00:00:1763837924.118898  507842 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1763837924.175194  511187 gl_context.cc:369] GL version: 3.2 (OpenGL ES 3.2 NVIDIA 565.57.01), renderer: NVIDIA A40/PCIe/SSE2


모델 로드 실패: [Errno 2] No such file or directory: 'best.pt'


W0000 00:00:1763837924.184262  511182 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


RuntimeError: asyncio.run() cannot be called from a running event loop