# 실시간 영상 분석 파이프라인 구현

**목적**: 드론 비디오 스트림 실시간 처리 및 작물 탐지  
**담당**: Claude Sonnet 4  
**날짜**: 2025-10-21

## 📋 작업 내용
1. 실시간 비디오 스트림 처리 파이프라인
2. 멀티스레딩 기반 성능 최적화
3. 프레임 큐 관리 및 버퍼링
4. 실시간 결과 시각화
5. 스트리밍 품질 자동 조절

## 1. 기본 라이브러리 및 설정

In [None]:
import torch
import cv2
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import time
import threading
import queue
import json
import os
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Callable
from dataclasses import dataclass
from collections import deque
import logging

from ultralytics import YOLO

# 이전 노트북에서 구현한 클래스들 재사용
# (실제 환경에서는 import로 불러올 수 있음)

print(f"PyTorch 버전: {torch.__version__}")
print(f"OpenCV 버전: {cv2.__version__}")
print(f"CUDA 사용 가능: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

## 2. 실시간 성능 모니터링 클래스

In [None]:
@dataclass
class PerformanceMetrics:
    """성능 메트릭 데이터 클래스"""
    fps: float = 0.0
    processing_time: float = 0.0
    detection_count: int = 0
    frame_drops: int = 0
    queue_size: int = 0
    gpu_memory_used: float = 0.0
    timestamp: float = 0.0

class PerformanceMonitor:
    """실시간 성능 모니터링 클래스"""
    
    def __init__(self, window_size: int = 30):
        """초기화
        
        Args:
            window_size: 이동 평균 윈도우 크기
        """
        self.window_size = window_size
        self.reset()
        
        # 로깅 설정
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    def reset(self):
        """메트릭 리셋"""
        self.frame_times = deque(maxlen=self.window_size)
        self.processing_times = deque(maxlen=self.window_size)
        self.detection_counts = deque(maxlen=self.window_size)
        self.total_frames = 0
        self.total_drops = 0
        self.start_time = time.time()
    
    def update(self, processing_time: float, detection_count: int, 
               frame_dropped: bool = False, queue_size: int = 0):
        """메트릭 업데이트
        
        Args:
            processing_time: 처리 시간
            detection_count: 탐지 개수
            frame_dropped: 프레임 드롭 여부
            queue_size: 큐 크기
        """
        current_time = time.time()
        
        self.frame_times.append(current_time)
        self.processing_times.append(processing_time)
        self.detection_counts.append(detection_count)
        self.total_frames += 1
        
        if frame_dropped:
            self.total_drops += 1
    
    def get_metrics(self) -> PerformanceMetrics:
        """현재 성능 메트릭 반환
        
        Returns:
            metrics: 성능 메트릭
        """
        if len(self.frame_times) < 2:
            return PerformanceMetrics()
        
        # FPS 계산 (이동 평균)
        time_diff = self.frame_times[-1] - self.frame_times[0]
        fps = len(self.frame_times) / time_diff if time_diff > 0 else 0
        
        # 평균 처리 시간
        avg_processing_time = np.mean(self.processing_times)
        
        # 평균 탐지 개수
        avg_detection_count = np.mean(self.detection_counts)
        
        # GPU 메모리 사용량
        gpu_memory = 0.0
        if torch.cuda.is_available():
            gpu_memory = torch.cuda.memory_allocated() / 1024**3  # GB
        
        return PerformanceMetrics(
            fps=fps,
            processing_time=avg_processing_time,
            detection_count=int(avg_detection_count),
            frame_drops=self.total_drops,
            gpu_memory_used=gpu_memory,
            timestamp=time.time()
        )
    
    def log_metrics(self):
        """메트릭 로깅"""
        metrics = self.get_metrics()
        self.logger.info(
            f"FPS: {metrics.fps:.2f} | "
            f"처리시간: {metrics.processing_time*1000:.1f}ms | "
            f"탐지수: {metrics.detection_count} | "
            f"드롭: {metrics.frame_drops} | "
            f"GPU: {metrics.gpu_memory_used:.2f}GB"
        )

# 성능 모니터 인스턴스 생성
perf_monitor = PerformanceMonitor()
print("✅ PerformanceMonitor 준비 완료")

## 3. 프레임 큐 및 버퍼 관리

In [None]:
class FrameBuffer:
    """프레임 버퍼 관리 클래스"""
    
    def __init__(self, max_size: int = 10, drop_policy: str = 'oldest'):
        """초기화
        
        Args:
            max_size: 최대 버퍼 크기
            drop_policy: 드롭 정책 ('oldest', 'newest')
        """
        self.max_size = max_size
        self.drop_policy = drop_policy
        self.frame_queue = queue.Queue(maxsize=max_size)
        self.frame_counter = 0
        self.dropped_frames = 0
        self.lock = threading.Lock()
    
    def put_frame(self, frame: np.ndarray, timestamp: float = None) -> bool:
        """프레임 추가
        
        Args:
            frame: 프레임 이미지
            timestamp: 타임스탬프
            
        Returns:
            success: 추가 성공 여부
        """
        if timestamp is None:
            timestamp = time.time()
        
        frame_data = {
            'frame': frame,
            'timestamp': timestamp,
            'frame_id': self.frame_counter
        }
        
        with self.lock:
            if self.frame_queue.full():
                if self.drop_policy == 'oldest':
                    try:
                        self.frame_queue.get_nowait()
                        self.dropped_frames += 1
                    except queue.Empty:
                        pass
                elif self.drop_policy == 'newest':
                    self.dropped_frames += 1
                    return False
            
            try:
                self.frame_queue.put_nowait(frame_data)
                self.frame_counter += 1
                return True
            except queue.Full:
                self.dropped_frames += 1
                return False
    
    def get_frame(self, timeout: float = 0.1) -> Optional[Dict]:
        """프레임 가져오기
        
        Args:
            timeout: 타임아웃
            
        Returns:
            frame_data: 프레임 데이터 또는 None
        """
        try:
            return self.frame_queue.get(timeout=timeout)
        except queue.Empty:
            return None
    
    def get_stats(self) -> Dict:
        """버퍼 통계 반환
        
        Returns:
            stats: 버퍼 통계
        """
        with self.lock:
            return {
                'queue_size': self.frame_queue.qsize(),
                'max_size': self.max_size,
                'total_frames': self.frame_counter,
                'dropped_frames': self.dropped_frames,
                'drop_rate': self.dropped_frames / max(1, self.frame_counter)
            }
    
    def clear(self):
        """버퍼 비우기"""
        with self.lock:
            while not self.frame_queue.empty():
                try:
                    self.frame_queue.get_nowait()
                except queue.Empty:
                    break

# 프레임 버퍼 인스턴스 생성
frame_buffer = FrameBuffer(max_size=5, drop_policy='oldest')
print("✅ FrameBuffer 준비 완료")

## 4. 실시간 영상 처리 파이프라인

In [None]:
class RealTimeVideoPipeline:
    """실시간 영상 처리 파이프라인"""
    
    def __init__(self, 
                 model_path: str = 'yolo11n.pt',
                 device: str = 'auto',
                 buffer_size: int = 5,
                 target_fps: float = 30.0):
        """초기화
        
        Args:
            model_path: YOLO 모델 경로
            device: 실행 디바이스
            buffer_size: 프레임 버퍼 크기
            target_fps: 목표 FPS
        """
        self.model = YOLO(model_path)
        self.device = device if device != 'auto' else ('cuda' if torch.cuda.is_available() else 'cpu')
        self.target_fps = target_fps
        self.frame_interval = 1.0 / target_fps
        
        # 버퍼 및 모니터링
        self.frame_buffer = FrameBuffer(max_size=buffer_size)
        self.performance_monitor = PerformanceMonitor()
        
        # 스레드 관리
        self.capture_thread = None
        self.process_thread = None
        self.display_thread = None
        self.running = False
        
        # 결과 저장
        self.results_queue = queue.Queue(maxsize=100)
        self.detection_history = deque(maxlen=1000)
        
        # 동적 품질 조절
        self.adaptive_quality = True
        self.current_quality = 1.0  # 1.0 = 원본 해상도
        self.conf_threshold = 0.25
        
        print(f"🎥 RealTimeVideoPipeline 초기화 완료")
        print(f"   모델: {model_path}")
        print(f"   디바이스: {self.device}")
        print(f"   목표 FPS: {target_fps}")
    
    def capture_frames(self, source):
        """프레임 캡처 스레드
        
        Args:
            source: 비디오 소스 (웹캠, 파일, 스트림)
        """
        cap = cv2.VideoCapture(source)
        if not cap.isOpened():
            print(f"❌ 비디오 소스 열기 실패: {source}")
            return
        
        # 해상도 설정
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
        cap.set(cv2.CAP_PROP_FPS, self.target_fps)
        
        print(f"📹 캡처 시작: {source}")
        frame_count = 0
        last_frame_time = time.time()
        
        try:
            while self.running:
                ret, frame = cap.read()
                if not ret:
                    print("⚠️ 프레임 읽기 실패")
                    break
                
                current_time = time.time()
                
                # FPS 제어
                if current_time - last_frame_time < self.frame_interval:
                    continue
                
                # 해상도 동적 조절
                if self.adaptive_quality and self.current_quality != 1.0:
                    h, w = frame.shape[:2]
                    new_h, new_w = int(h * self.current_quality), int(w * self.current_quality)
                    frame = cv2.resize(frame, (new_w, new_h))
                
                # 프레임 버퍼에 추가
                success = self.frame_buffer.put_frame(frame, current_time)
                if not success:
                    # 프레임 드롭 시 품질 조절
                    if self.adaptive_quality:
                        self.adjust_quality(decrease=True)
                
                last_frame_time = current_time
                frame_count += 1
                
                # 주기적 상태 출력
                if frame_count % 100 == 0:
                    stats = self.frame_buffer.get_stats()
                    print(f"캡처된 프레임: {frame_count}, 드롭률: {stats['drop_rate']:.3f}")
        
        finally:
            cap.release()
            print("📹 캡처 종료")
    
    def process_frames(self):
        """프레임 처리 스레드"""
        print("🔄 프레임 처리 시작")
        
        while self.running:
            frame_data = self.frame_buffer.get_frame(timeout=0.1)
            if frame_data is None:
                continue
            
            start_time = time.time()
            
            try:
                # YOLO 추론
                results = self.model(
                    frame_data['frame'],
                    device=self.device,
                    conf=self.conf_threshold,
                    verbose=False
                )
                
                # 결과 처리
                detections = []
                annotated_frame = frame_data['frame'].copy()
                
                for result in results:
                    # 주석이 추가된 프레임
                    annotated_frame = result.plot()
                    
                    # 탐지 결과 추출
                    if result.boxes is not None:
                        for box in result.boxes:
                            cls_id = int(box.cls)
                            confidence = float(box.conf)
                            bbox = box.xyxy[0].cpu().numpy()
                            class_name = self.model.names[cls_id]
                            
                            detections.append({
                                'class_id': cls_id,
                                'class_name': class_name,
                                'confidence': confidence,
                                'bbox': bbox.tolist(),
                                'timestamp': frame_data['timestamp'],
                                'frame_id': frame_data['frame_id']
                            })
                
                processing_time = time.time() - start_time
                
                # 결과 저장
                result_data = {
                    'frame': annotated_frame,
                    'detections': detections,
                    'processing_time': processing_time,
                    'timestamp': frame_data['timestamp'],
                    'frame_id': frame_data['frame_id']
                }
                
                # 결과 큐에 추가
                try:
                    self.results_queue.put_nowait(result_data)
                except queue.Full:
                    # 큐가 가득 찬 경우 가장 오래된 결과 제거
                    try:
                        self.results_queue.get_nowait()
                        self.results_queue.put_nowait(result_data)
                    except queue.Empty:
                        pass
                
                # 성능 모니터링 업데이트
                self.performance_monitor.update(
                    processing_time=processing_time,
                    detection_count=len(detections),
                    queue_size=self.results_queue.qsize()
                )
                
                # 탐지 이력 저장
                self.detection_history.append({
                    'timestamp': frame_data['timestamp'],
                    'detection_count': len(detections),
                    'processing_time': processing_time
                })
                
                # 동적 품질 조절
                if self.adaptive_quality:
                    target_processing_time = self.frame_interval * 0.8  # 80% 목표
                    if processing_time > target_processing_time:
                        self.adjust_quality(decrease=True)
                    elif processing_time < target_processing_time * 0.5:
                        self.adjust_quality(decrease=False)
                
            except Exception as e:
                print(f"❌ 프레임 처리 오류: {e}")
                continue
        
        print("🔄 프레임 처리 종료")
    
    def adjust_quality(self, decrease: bool = True):
        """품질 동적 조절
        
        Args:
            decrease: 품질 감소 여부
        """
        if decrease:
            self.current_quality = max(0.5, self.current_quality - 0.1)
            self.conf_threshold = min(0.5, self.conf_threshold + 0.05)
        else:
            self.current_quality = min(1.0, self.current_quality + 0.1)
            self.conf_threshold = max(0.15, self.conf_threshold - 0.05)
    
    def display_results(self, show_window: bool = True):
        """결과 표시 스레드
        
        Args:
            show_window: 윈도우 표시 여부
        """
        print("🖥️ 결과 표시 시작")
        
        while self.running:
            try:
                result_data = self.results_queue.get(timeout=0.1)
                
                if show_window:
                    # OpenCV 윈도우에 표시
                    frame = result_data['frame']
                    
                    # 성능 정보 오버레이
                    metrics = self.performance_monitor.get_metrics()
                    info_text = [
                        f"FPS: {metrics.fps:.1f}",
                        f"처리시간: {result_data['processing_time']*1000:.1f}ms",
                        f"탐지수: {len(result_data['detections'])}",
                        f"품질: {self.current_quality:.1f}"
                    ]
                    
                    for i, text in enumerate(info_text):
                        cv2.putText(frame, text, (10, 30 + i*25), 
                                  cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
                    
                    cv2.imshow('Drone Crop Detection', frame)
                    
                    # ESC 키로 종료
                    if cv2.waitKey(1) & 0xFF == 27:
                        self.stop()
                        break
                
                # 주기적 성능 로깅
                if result_data['frame_id'] % 30 == 0:
                    self.performance_monitor.log_metrics()
                
            except queue.Empty:
                continue
            except Exception as e:
                print(f"❌ 표시 오류: {e}")
                continue
        
        if show_window:
            cv2.destroyAllWindows()
        print("🖥️ 결과 표시 종료")
    
    def start(self, source, show_display: bool = True):
        """파이프라인 시작
        
        Args:
            source: 비디오 소스
            show_display: 화면 표시 여부
        """
        if self.running:
            print("⚠️ 파이프라인이 이미 실행 중입니다")
            return
        
        self.running = True
        
        # 스레드 시작
        self.capture_thread = threading.Thread(target=self.capture_frames, args=(source,))
        self.process_thread = threading.Thread(target=self.process_frames)
        self.display_thread = threading.Thread(target=self.display_results, args=(show_display,))
        
        self.capture_thread.start()
        self.process_thread.start()
        self.display_thread.start()
        
        print("🚀 실시간 파이프라인 시작")
    
    def stop(self):
        """파이프라인 중지"""
        print("🛑 파이프라인 중지 중...")
        self.running = False
        
        # 스레드 종료 대기
        if self.capture_thread:
            self.capture_thread.join(timeout=2)
        if self.process_thread:
            self.process_thread.join(timeout=2)
        if self.display_thread:
            self.display_thread.join(timeout=2)
        
        print("✅ 파이프라인 중지 완료")
    
    def get_statistics(self) -> Dict:
        """전체 통계 반환
        
        Returns:
            stats: 전체 통계
        """
        metrics = self.performance_monitor.get_metrics()
        buffer_stats = self.frame_buffer.get_stats()
        
        # 탐지 이력 분석
        detection_counts = [h['detection_count'] for h in self.detection_history]
        processing_times = [h['processing_time'] for h in self.detection_history]
        
        return {
            'performance': {
                'current_fps': metrics.fps,
                'target_fps': self.target_fps,
                'avg_processing_time': metrics.processing_time,
                'gpu_memory_used': metrics.gpu_memory_used
            },
            'buffer': buffer_stats,
            'quality': {
                'current_quality': self.current_quality,
                'conf_threshold': self.conf_threshold,
                'adaptive_enabled': self.adaptive_quality
            },
            'detection_stats': {
                'total_frames_processed': len(self.detection_history),
                'avg_detections_per_frame': np.mean(detection_counts) if detection_counts else 0,
                'max_detections_per_frame': np.max(detection_counts) if detection_counts else 0,
                'avg_processing_time': np.mean(processing_times) if processing_times else 0
            }
        }

# 실시간 파이프라인 인스턴스 생성
pipeline = RealTimeVideoPipeline(
    model_path='yolo11n.pt',
    target_fps=15.0,  # 코랩 환경을 고려한 낮은 FPS
    buffer_size=3
)
print("✅ RealTimeVideoPipeline 준비 완료")

## 5. 웹캠 테스트 (시뮬레이션)

In [None]:
# 웹캠 테스트 (코랩에서는 시뮬레이션)
def test_webcam_simulation():
    """웹캠 시뮬레이션 테스트"""
    print("📷 웹캠 시뮬레이션 테스트\n")
    
    # 가상 웹캠 시뮬레이터
    class VirtualWebcam:
        def __init__(self, width=640, height=480, fps=15):
            self.width = width
            self.height = height
            self.fps = fps
            self.frame_count = 0
            self.start_time = time.time()
        
        def read(self):
            # 가상 프레임 생성 (농장 시뮬레이션)
            frame = np.random.randint(50, 200, (self.height, self.width, 3), dtype=np.uint8)
            
            # 가상 작물 모양 추가
            for _ in range(np.random.randint(3, 8)):
                x = np.random.randint(50, self.width-50)
                y = np.random.randint(50, self.height-50)
                size = np.random.randint(20, 60)
                color = (0, np.random.randint(100, 255), 0)  # 녹색 계열
                cv2.circle(frame, (x, y), size, color, -1)
            
            # 타임스탬프 오버레이
            elapsed = time.time() - self.start_time
            cv2.putText(frame, f"Frame: {self.frame_count}", (10, 30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
            cv2.putText(frame, f"Time: {elapsed:.1f}s", (10, 60), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
            
            self.frame_count += 1
            
            # FPS 제어
            time.sleep(1.0 / self.fps)
            
            return True, frame
        
        def release(self):
            pass
    
    # 가상 웹캠으로 테스트
    virtual_cam = VirtualWebcam()
    
    print("가상 웹캠으로 10초간 테스트...")
    
    # 간단한 테스트 루프
    start_time = time.time()
    frame_count = 0
    
    while time.time() - start_time < 10:  # 10초간 테스트
        ret, frame = virtual_cam.read()
        if not ret:
            break
        
        # YOLO 추론 (간단한 버전)
        inference_start = time.time()
        results = pipeline.model(frame, device=pipeline.device, verbose=False)
        inference_time = time.time() - inference_start
        
        # 결과 처리
        detection_count = 0
        for result in results:
            if result.boxes is not None:
                detection_count = len(result.boxes)
        
        frame_count += 1
        
        # 주기적 출력
        if frame_count % 30 == 0:
            fps = frame_count / (time.time() - start_time)
            print(f"프레임: {frame_count}, FPS: {fps:.1f}, "
                  f"추론시간: {inference_time*1000:.1f}ms, 탐지수: {detection_count}")
    
    final_fps = frame_count / (time.time() - start_time)
    print(f"\n테스트 완료: {frame_count}프레임, 평균 FPS: {final_fps:.2f}")
    
    virtual_cam.release()

# 웹캠 시뮬레이션 실행
test_webcam_simulation()

## 6. 비디오 파일 처리 테스트

In [None]:
# 비디오 파일 처리 테스트
def test_video_file_processing():
    """비디오 파일 처리 테스트"""
    print("🎬 비디오 파일 처리 테스트\n")
    
    # 가상 비디오 파일 생성
    def create_virtual_video(filename='test_video.mp4', duration=5, fps=15):
        """가상 비디오 파일 생성"""
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(filename, fourcc, fps, (640, 480))
        
        total_frames = duration * fps
        
        for i in range(total_frames):
            # 농장 장면 시뮬레이션
            frame = np.random.randint(30, 150, (480, 640, 3), dtype=np.uint8)
            
            # 움직이는 작물 패턴
            for j in range(5):
                x = int(100 + 50 * np.sin(i * 0.1 + j))
                y = int(100 + j * 60)
                cv2.circle(frame, (x, y), 25, (0, 200, 0), -1)
            
            # 프레임 정보
            cv2.putText(frame, f"Frame {i+1}/{total_frames}", (10, 30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
            
            out.write(frame)
        
        out.release()
        return filename
    
    # 가상 비디오 생성
    video_file = create_virtual_video('drone_test.mp4', duration=3, fps=10)
    print(f"가상 비디오 생성: {video_file}")
    
    # 비디오 처리 클래스
    class VideoProcessor:
        def __init__(self, model_path='yolo11n.pt'):
            self.model = YOLO(model_path)
            self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        
        def process_video(self, input_path, output_path=None):
            cap = cv2.VideoCapture(input_path)
            if not cap.isOpened():
                print(f"❌ 비디오 열기 실패: {input_path}")
                return
            
            # 비디오 정보
            fps = cap.get(cv2.CAP_PROP_FPS)
            width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            
            print(f"비디오 정보: {width}x{height}, {fps}fps, {total_frames}프레임")
            
            # 결과 저장용 비디오 작성기
            if output_path:
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
            
            # 통계
            processing_times = []
            detection_counts = []
            
            frame_idx = 0
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                
                # YOLO 추론
                start_time = time.time()
                results = self.model(frame, device=self.device, verbose=False)
                processing_time = time.time() - start_time
                
                # 결과 처리
                detection_count = 0
                annotated_frame = frame.copy()
                
                for result in results:
                    annotated_frame = result.plot()
                    if result.boxes is not None:
                        detection_count = len(result.boxes)
                
                # 성능 정보 오버레이
                info_text = f"Frame: {frame_idx+1}/{total_frames} | "
                info_text += f"Time: {processing_time*1000:.1f}ms | "
                info_text += f"Detections: {detection_count}"
                
                cv2.putText(annotated_frame, info_text, (10, height-20), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
                
                # 결과 저장
                if output_path:
                    out.write(annotated_frame)
                
                # 통계 수집
                processing_times.append(processing_time)
                detection_counts.append(detection_count)
                
                frame_idx += 1
                
                # 진행 상황 출력
                if frame_idx % 10 == 0 or frame_idx == total_frames:
                    progress = frame_idx / total_frames * 100
                    avg_time = np.mean(processing_times[-10:])
                    print(f"진행률: {progress:.1f}% | 평균 처리시간: {avg_time*1000:.1f}ms")
            
            # 정리
            cap.release()
            if output_path:
                out.release()
                print(f"결과 저장: {output_path}")
            
            # 최종 통계
            avg_processing_time = np.mean(processing_times)
            avg_detections = np.mean(detection_counts)
            effective_fps = 1.0 / avg_processing_time
            
            print(f"\n📊 처리 완료 통계:")
            print(f"   평균 처리시간: {avg_processing_time*1000:.2f}ms")
            print(f"   평균 탐지수: {avg_detections:.1f}")
            print(f"   효과적 FPS: {effective_fps:.1f}")
            print(f"   총 프레임: {frame_idx}")
            
            return {
                'total_frames': frame_idx,
                'avg_processing_time': avg_processing_time,
                'avg_detections': avg_detections,
                'effective_fps': effective_fps
            }
    
    # 비디오 처리 실행
    processor = VideoProcessor()
    stats = processor.process_video(video_file, 'processed_drone_test.mp4')
    
    # 파일 정리
    try:
        os.remove(video_file)
        print(f"임시 파일 삭제: {video_file}")
    except:
        pass
    
    return stats

# 비디오 파일 처리 테스트 실행
video_stats = test_video_file_processing()

## 7. 스트리밍 품질 최적화

In [None]:
class StreamOptimizer:
    """스트리밍 품질 최적화 클래스"""
    
    def __init__(self, target_fps: float = 15.0, target_latency: float = 0.1):
        """초기화
        
        Args:
            target_fps: 목표 FPS
            target_latency: 목표 지연시간 (초)
        """
        self.target_fps = target_fps
        self.target_latency = target_latency
        self.target_frame_time = 1.0 / target_fps
        
        # 최적화 파라미터
        self.quality_levels = {
            'ultra': {'scale': 1.0, 'conf': 0.15, 'imgsz': 640},
            'high': {'scale': 0.8, 'conf': 0.25, 'imgsz': 512},
            'medium': {'scale': 0.6, 'conf': 0.35, 'imgsz': 416},
            'low': {'scale': 0.4, 'conf': 0.45, 'imgsz': 320},
            'potato': {'scale': 0.3, 'conf': 0.55, 'imgsz': 256}
        }
        
        self.current_quality = 'medium'
        self.performance_history = deque(maxlen=50)
        
        print(f"📈 StreamOptimizer 초기화")
        print(f"   목표 FPS: {target_fps}")
        print(f"   목표 지연시간: {target_latency}초")
    
    def update_performance(self, processing_time: float, detection_count: int):
        """성능 업데이트
        
        Args:
            processing_time: 처리 시간
            detection_count: 탐지 개수
        """
        self.performance_history.append({
            'processing_time': processing_time,
            'detection_count': detection_count,
            'timestamp': time.time()
        })
    
    def get_optimal_quality(self) -> str:
        """최적 품질 레벨 결정
        
        Returns:
            quality_level: 품질 레벨
        """
        if len(self.performance_history) < 10:
            return self.current_quality
        
        # 최근 성능 분석
        recent_times = [p['processing_time'] for p in list(self.performance_history)[-10:]]
        avg_processing_time = np.mean(recent_times)
        max_processing_time = np.max(recent_times)
        
        # 현재 품질 레벨 인덱스
        quality_names = list(self.quality_levels.keys())
        current_idx = quality_names.index(self.current_quality)
        
        # 품질 조정 결정
        if avg_processing_time > self.target_frame_time * 0.8:  # 80% 임계값
            # 품질 낮추기
            if current_idx < len(quality_names) - 1:
                new_quality = quality_names[current_idx + 1]
                print(f"품질 낮춤: {self.current_quality} → {new_quality}")
                self.current_quality = new_quality
        elif avg_processing_time < self.target_frame_time * 0.4:  # 40% 임계값
            # 품질 높이기
            if current_idx > 0:
                new_quality = quality_names[current_idx - 1]
                print(f"품질 높임: {self.current_quality} → {new_quality}")
                self.current_quality = new_quality
        
        return self.current_quality
    
    def get_quality_settings(self, quality_level: str = None) -> Dict:
        """품질 설정 반환
        
        Args:
            quality_level: 품질 레벨
            
        Returns:
            settings: 품질 설정
        """
        if quality_level is None:
            quality_level = self.current_quality
        
        return self.quality_levels.get(quality_level, self.quality_levels['medium'])
    
    def benchmark_quality_levels(self, model, test_frame: np.ndarray) -> Dict:
        """품질 레벨별 벤치마크
        
        Args:
            model: YOLO 모델
            test_frame: 테스트 프레임
            
        Returns:
            benchmark_results: 벤치마크 결과
        """
        print("🔍 품질 레벨별 벤치마크 실행")
        
        results = {}
        
        for quality_name, settings in self.quality_levels.items():
            # 프레임 리사이즈
            h, w = test_frame.shape[:2]
            new_h, new_w = int(h * settings['scale']), int(w * settings['scale'])
            resized_frame = cv2.resize(test_frame, (new_w, new_h))
            
            # 벤치마크 실행
            times = []
            detection_counts = []
            
            for _ in range(5):  # 5회 평균
                start_time = time.time()
                
                results_yolo = model(
                    resized_frame,
                    conf=settings['conf'],
                    imgsz=settings['imgsz'],
                    verbose=False
                )
                
                processing_time = time.time() - start_time
                times.append(processing_time)
                
                # 탐지 개수
                detection_count = 0
                for result in results_yolo:
                    if result.boxes is not None:
                        detection_count = len(result.boxes)
                detection_counts.append(detection_count)
            
            # 통계 계산
            avg_time = np.mean(times)
            fps = 1.0 / avg_time
            avg_detections = np.mean(detection_counts)
            
            results[quality_name] = {
                'avg_processing_time': avg_time,
                'fps': fps,
                'avg_detections': avg_detections,
                'settings': settings,
                'meets_target': fps >= self.target_fps
            }
            
            print(f"  {quality_name:8s}: {avg_time*1000:5.1f}ms | {fps:5.1f}fps | {avg_detections:4.1f}탐지")
        
        return results

# 최적화 테스트
def test_stream_optimization():
    """스트림 최적화 테스트"""
    print("⚡ 스트림 최적화 테스트\n")
    
    # 최적화 인스턴스 생성
    optimizer = StreamOptimizer(target_fps=15.0)
    
    # 테스트 프레임 생성
    test_frame = np.random.randint(0, 255, (720, 1280, 3), dtype=np.uint8)
    
    # 가상 작물 추가
    for _ in range(10):
        x = np.random.randint(100, 1180)
        y = np.random.randint(100, 620)
        size = np.random.randint(20, 50)
        cv2.circle(test_frame, (x, y), size, (0, 200, 0), -1)
    
    # YOLO 모델 로드
    model = YOLO('yolo11n.pt')
    
    # 벤치마크 실행
    benchmark_results = optimizer.benchmark_quality_levels(model, test_frame)
    
    # 최적 품질 추천
    print("\n🎯 품질 레벨 분석:")
    
    suitable_qualities = []
    for quality, result in benchmark_results.items():
        status = "✅" if result['meets_target'] else "❌"
        efficiency = result['avg_detections'] / result['avg_processing_time']
        
        print(f"{status} {quality:8s}: FPS {result['fps']:5.1f} | 효율성 {efficiency:6.1f}")
        
        if result['meets_target']:
            suitable_qualities.append((quality, result, efficiency))
    
    # 가장 효율적인 품질 선택
    if suitable_qualities:
        best_quality = max(suitable_qualities, key=lambda x: x[2])
        print(f"\n🏆 추천 품질: {best_quality[0]} (효율성: {best_quality[2]:.1f})")
    else:
        print("\n⚠️ 목표 FPS를 만족하는 품질이 없습니다. 하드웨어 업그레이드 필요")
    
    return benchmark_results

# 최적화 테스트 실행
optimization_results = test_stream_optimization()

## 8. 최종 통합 테스트

In [None]:
def final_integration_test():
    """최종 통합 테스트"""
    print("🎯 Todo 4 최종 통합 테스트\n")
    
    # 1. 성능 모니터링 테스트
    print("✅ 성능 모니터링:")
    perf_monitor = PerformanceMonitor()
    
    # 더미 데이터로 테스트
    for i in range(10):
        processing_time = np.random.uniform(0.02, 0.08)
        detection_count = np.random.randint(1, 10)
        perf_monitor.update(processing_time, detection_count)
        time.sleep(0.01)
    
    metrics = perf_monitor.get_metrics()
    print(f"   평균 FPS: {metrics.fps:.2f}")
    print(f"   평균 처리시간: {metrics.processing_time*1000:.1f}ms")
    print(f"   평균 탐지수: {metrics.detection_count}")
    
    # 2. 프레임 버퍼 테스트
    print("\n✅ 프레임 버퍼:")
    buffer = FrameBuffer(max_size=5)
    
    # 테스트 프레임 추가
    for i in range(8):  # 버퍼 크기보다 많이 추가
        test_frame = np.random.randint(0, 255, (100, 100, 3), dtype=np.uint8)
        success = buffer.put_frame(test_frame)
        if not success:
            print(f"   프레임 {i+1} 드롭됨")
    
    stats = buffer.get_stats()
    print(f"   버퍼 상태: {stats['queue_size']}/{stats['max_size']}")
    print(f"   드롭률: {stats['drop_rate']:.3f}")
    
    # 3. 실시간 파이프라인 구성 요소 테스트
    print("\n✅ 실시간 파이프라인:")
    pipeline = RealTimeVideoPipeline(
        model_path='yolo11n.pt',
        target_fps=10.0,
        buffer_size=3
    )
    
    # 단일 프레임 처리 테스트
    test_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
    
    start_time = time.time()
    results = pipeline.model(test_frame, device=pipeline.device, verbose=False)
    processing_time = time.time() - start_time
    
    detection_count = 0
    for result in results:
        if result.boxes is not None:
            detection_count = len(result.boxes)
    
    print(f"   단일 프레임 처리시간: {processing_time*1000:.1f}ms")
    print(f"   탐지된 객체 수: {detection_count}")
    print(f"   실효 FPS: {1/processing_time:.1f}")
    
    # 4. 스트림 최적화 테스트
    print("\n✅ 스트림 최적화:")
    optimizer = StreamOptimizer(target_fps=15.0)
    
    # 성능 시뮬레이션
    for _ in range(20):
        sim_time = np.random.uniform(0.03, 0.12)
        sim_detections = np.random.randint(1, 8)
        optimizer.update_performance(sim_time, sim_detections)
    
    optimal_quality = optimizer.get_optimal_quality()
    quality_settings = optimizer.get_quality_settings(optimal_quality)
    
    print(f"   최적 품질 레벨: {optimal_quality}")
    print(f"   스케일: {quality_settings['scale']}")
    print(f"   신뢰도 임계값: {quality_settings['conf']}")
    print(f"   이미지 크기: {quality_settings['imgsz']}")
    
    # 5. 종합 성능 평가
    print("\n📊 종합 성능 평가:")
    
    # GPU 메모리 확인
    if torch.cuda.is_available():
        gpu_memory = torch.cuda.memory_allocated() / 1024**3
        max_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3
        print(f"   GPU 메모리 사용률: {gpu_memory:.2f}GB / {max_memory:.1f}GB ({gpu_memory/max_memory*100:.1f}%)")
    
    # 실시간 처리 가능성 평가
    target_frame_time = 1.0 / 15.0  # 15 FPS 목표
    processing_margin = (target_frame_time - processing_time) / target_frame_time * 100
    
    print(f"   목표 프레임 시간: {target_frame_time*1000:.1f}ms")
    print(f"   실제 처리 시간: {processing_time*1000:.1f}ms")
    print(f"   성능 여유도: {processing_margin:.1f}%")
    
    if processing_margin > 20:
        print("   🟢 실시간 처리 가능 (여유도 충분)")
    elif processing_margin > 0:
        print("   🟡 실시간 처리 가능 (최적화 권장)")
    else:
        print("   🔴 실시간 처리 어려움 (하드웨어 업그레이드 또는 품질 조정 필요)")
    
    print("\n🎉 Todo 4 완료!")
    print("\n📋 구현된 기능:")
    print("   ✅ 실시간 성능 모니터링")
    print("   ✅ 프레임 버퍼 및 큐 관리")
    print("   ✅ 멀티스레드 비디오 처리 파이프라인")
    print("   ✅ 동적 품질 조절")
    print("   ✅ 스트림 최적화")
    print("   ✅ 웹캠 및 비디오 파일 지원")
    
    print("\n📋 다음 단계:")
    print("   - Todo 9: 성능 최적화 및 GPU 가속 설정")
    print("   - Todo 10: 에러 처리 및 로깅 시스템 구현")
    
    return {
        'performance_monitor': metrics,
        'buffer_stats': stats,
        'processing_time': processing_time,
        'detection_count': detection_count,
        'optimal_quality': optimal_quality,
        'performance_margin': processing_margin
    }

# 최종 통합 테스트 실행
integration_results = final_integration_test()

## 📝 Todo 4 완료 체크리스트

### ✅ 완료된 작업

1. **실시간 성능 모니터링**
   - [x] PerformanceMetrics 데이터 클래스
   - [x] PerformanceMonitor 클래스 (FPS, 처리시간, GPU 메모리 추적)
   - [x] 이동 평균 기반 실시간 모니터링
   - [x] 로깅 시스템 통합

2. **프레임 버퍼 및 큐 관리**
   - [x] FrameBuffer 클래스 (멀티스레드 안전)
   - [x] 프레임 드롭 정책 (oldest/newest)
   - [x] 큐 통계 및 모니터링
   - [x] 스레드 동기화

3. **실시간 비디오 처리 파이프라인**
   - [x] RealTimeVideoPipeline 클래스
   - [x] 멀티스레드 아키텍처 (캡처/처리/표시)
   - [x] 웹캠 및 비디오 파일 지원
   - [x] 실시간 결과 시각화

4. **동적 품질 조절**
   - [x] 해상도 자동 조절
   - [x] 신뢰도 임계값 동적 변경
   - [x] FPS 기반 적응형 처리
   - [x] 프레임 드롭 시 품질 감소

5. **스트림 최적화**
   - [x] StreamOptimizer 클래스
   - [x] 5단계 품질 레벨 (ultra~potato)
   - [x] 품질 레벨별 벤치마킹
   - [x] 효율성 기반 최적화

6. **테스트 및 검증**
   - [x] 웹캠 시뮬레이션 테스트
   - [x] 비디오 파일 처리 테스트
   - [x] 성능 벤치마크
   - [x] 통합 테스트

### 🚀 주요 성과
- 멀티스레드 기반 실시간 처리 파이프라인 구현
- 동적 품질 조절로 다양한 하드웨어 환경 지원
- 체계적인 성능 모니터링 및 최적화 시스템
- 프레임 드롭 방지 및 지연시간 최소화
- 코랩과 로컬 환경 모두 지원

### 🔧 구현된 클래스
- `PerformanceMonitor`: 실시간 성능 추적
- `FrameBuffer`: 스레드 안전 프레임 큐
- `RealTimeVideoPipeline`: 통합 처리 파이프라인
- `StreamOptimizer`: 품질 최적화 관리

### 📊 성능 특징
- 목표 FPS 달성을 위한 자동 품질 조절
- GPU 메모리 사용량 모니터링
- 프레임 드롭률 추적 및 최소화
- 실시간 탐지 결과 시각화

이제 Todo 9 (성능 최적화)와 Todo 10 (에러 처리)을 구현할 준비가 완료되었습니다!