# Project Spero - RealSense Object Detection & Tracking

### 이 노트북은 데이터 수집, 학습, 추론, 추적 모듈을 통합한 문서입니다.

## 환경 변수 설명

| 변수명 | 기본값 | 설명 |
| -------- | -------- | ------ |
| CI | False | CI/CD 환경 여부. "1", "true", "yes" 중 하나면 True |
| NB_RUN_DATA_COLLECTION | True | 데이터 수집 모듈 실행 여부 |
| NB_RUN_DATASET_SPLIT | True | 데이터셋 분할 모듈 실행 여부 |
| NB_RUN_TRAINING | True | 모델 학습 모듈 실행 여부 |
| NB_RUN_INFERENCE | True | 추론 모듈 실행 여부 |
| NB_RUN_TRACKING | True | 추적 모듈 실행 여부 |

In [None]:

import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image
from tqdm import tqdm
import json
from datetime import datetime
import shutil
import random
import pyrealsense2 as rs
import matplotlib.pyplot as plt

# Check GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# ===== CI / Notebook Execution Controls =====
# These flags let GitHub Actions (papermill/nbclient) run the notebook safely.
CI = os.getenv("CI", "").lower() in ("1", "true", "yes")
RUN_DATA_COLLECTION = os.getenv("NB_RUN_DATA_COLLECTION", "1") == "1"
RUN_DATASET_SPLIT  = os.getenv("NB_RUN_DATASET_SPLIT",  "1") == "1"
RUN_TRAINING       = os.getenv("NB_RUN_TRAINING",       "1") == "1"
RUN_INFERENCE      = os.getenv("NB_RUN_INFERENCE",      "1") == "1"
RUN_TRACKING       = os.getenv("NB_RUN_TRACKING",       "1") == "1"

# When runner is installed as a Windows Service, GUI (cv2.imshow) is usually not available.
HEADLESS = os.getenv("NB_HEADLESS", "1") == "1"

## 1. Data Collection

### 주요 기능
- 실시간 스트리밍: RealSense 카메라의 **컬러 영상**과 **깊이(Depth) 영상을** 640x480 해상도로 시각화합니다.
- ROI(Region of Interest) 선택: 마우스 드래그를 통해 이미지에서 데이터로 추출할 관심 영역을 자유롭게 지정할 수 있습니다.
- 라벨링(Labeling): 수집할 데이터의 클래스 이름을 사용자로부터 입력받아 분류별로 저장합니다.
- 데이터 자동 저장: 지정된 ROI 영역의 컬러 이미지(PNG), 깊이 이미지(16-bit PNG), 그리고 메타데이터(JSON)를 자동으로 생성하고 저장합니다.


### 사용자 조작 가이드 (단축키)

* **[L]** : 현재 수집중인 객체의 **라벨(클래스명) **을 입력합니다. (예: 'car', 'person', 'bike' 등)
* ** 마우스 드래그**: 이미지 위에서 드래그하여 객체가 있는 ROI 영역을 선택합니다.
* **[S]** : 현재 선택된 ROI 영역의 데이터를 저장합니다. (컬러/깊이 이미지 및 메타데이터)
* **[C]** : 선택된 ROI 영역을 초기화(Clear) 합니다.
* **[Q]** : 프로그램을 안전하게 종료합니다
* **닫기 버튼**: 프로그램을 안전하게 종료합니다

In [None]:
import pyrealsense2 as rs
import numpy as np
import cv2
import os
from datetime import datetime
import json

class DataCollector:
    def __init__(self):
        # 전역 변수 초기화
        self.roi_start = None
        self.roi_end = None
        self.is_drawing = False
        self.roi_selected = False
        self.current_label = ""
        self.save_count = 0
        
        # 데이터 저장 경로
        self.BASE_DIR = "dataset"
        self.METADATA_FILE = "metadata.json"
        
        # RealSense 파이프라인 설정
        self.pipeline = rs.pipeline()
        self.config = rs.config()
        self.config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
        self.config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
        self.depth_scale = 0

    def mouse_callback(self, event, x, y, flags, param):
        """마우스 이벤트 처리 - ROI 선택"""
        if event == cv2.EVENT_LBUTTONDOWN:
            self.is_drawing = True
            self.roi_start = (x, y)
            self.roi_end = (x, y)
            self.roi_selected = False
            
        elif event == cv2.EVENT_MOUSEMOVE:
            if self.is_drawing:
                self.roi_end = (x, y)
                
        elif event == cv2.EVENT_LBUTTONUP:
            self.is_drawing = False
            self.roi_end = (x, y)
            self.roi_selected = True

    def create_directory_structure(self):
        """데이터셋 디렉토리 구조 생성"""
        if not os.path.exists(self.BASE_DIR):
            os.makedirs(self.BASE_DIR)
            print(f"✓ 디렉토리 생성: {self.BASE_DIR}")

    def save_roi_data(self, color_image, depth_image, roi_coords):
        """ROI 데이터 저장"""
        if not self.current_label:
            print("⚠ 라벨이 설정되지 않았습니다. 먼저 라벨을 입력하세요.")
            return False
        
        x1, y1, x2, y2 = roi_coords
        
        # 클래스별 디렉토리 생성
        class_dir = os.path.join(self.BASE_DIR, self.current_label)
        if not os.path.exists(class_dir):
            os.makedirs(class_dir)
            print(f"✓ 새 클래스 디렉토리 생성: {class_dir}")
        
        # 타임스탬프 생성
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
        
        # ROI 영역 추출
        roi_color = color_image[y1:y2, x1:x2]
        roi_depth = depth_image[y1:y2, x1:x2]
        
        # 파일명 생성
        color_filename = f"{timestamp}_color.png"
        depth_filename = f"{timestamp}_depth.png"
        
        color_path = os.path.join(class_dir, color_filename)
        depth_path = os.path.join(class_dir, depth_filename)
        
        # 이미지 저장
        cv2.imwrite(color_path, roi_color)
        
        # Depth 이미지를 16-bit로 저장
        cv2.imwrite(depth_path, roi_depth)
        
        # 메타데이터 저장
        valid_depth = roi_depth[roi_depth > 0]
        metadata = {
            "timestamp": timestamp,
            "label": self.current_label,
            "roi": [x1, y1, x2, y2],
            "roi_size": [x2 - x1, y2 - y1],
            "color_image": color_filename,
            "depth_image": depth_filename,
            "depth_avg": float(np.mean(valid_depth) * self.depth_scale) if len(valid_depth) > 0 else 0.0,
            "depth_min": float(np.min(valid_depth) * self.depth_scale) if len(valid_depth) > 0 else 0.0,
            "depth_max": float(np.max(valid_depth) * self.depth_scale) if len(valid_depth) > 0 else 0.0,
        }
        
        # 메타데이터 파일에 추가
        metadata_path = os.path.join(class_dir, self.METADATA_FILE)
        metadata_list = []
        
        if os.path.exists(metadata_path):
            with open(metadata_path, 'r', encoding='utf-8') as f:
                metadata_list = json.load(f)
        
        metadata_list.append(metadata)
        
        with open(metadata_path, 'w', encoding='utf-8') as f:
            json.dump(metadata_list, f, indent=2, ensure_ascii=False)
        
        self.save_count += 1
        print(f"✓ 저장 완료 [{self.save_count}]: {self.current_label}/{color_filename}")
        return True

    def draw_ui(self, image, roi_coords=None):
        """UI 요소 그리기"""
        height, width = image.shape[:2]
        
        # 상단 정보 패널
        panel_height = 120
        overlay = image.copy()
        cv2.rectangle(overlay, (0, 0), (width, panel_height), (0, 0, 0), -1)
        cv2.addWeighted(overlay, 0.7, image, 0.3, 0, image)
        
        # 제목
        cv2.putText(image, "Data Collector - RealSense ROI Labeling", (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
        
        # 현재 라벨 표시
        label_text = f"Current Label: {self.current_label if self.current_label else '[Not Set]'}"
        label_color = (0, 255, 0) if self.current_label else (0, 0, 255)
        cv2.putText(image, label_text, (10, 60), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, label_color, 2)
        
        # 저장 카운트
        cv2.putText(image, f"Saved: {self.save_count}", (10, 90), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
        
        # 하단 도움말
        help_y = height - 100
        cv2.rectangle(image, (0, help_y), (width, height), (0, 0, 0), -1)
        
        help_texts = [
            "[ L ] Set Label  |  [ S ] Save ROI  |  [ C ] Clear ROI  |  [ Q ] Quit",
            "Drag mouse to select ROI"
        ]
        
        for i, text in enumerate(help_texts):
            cv2.putText(image, text, (10, help_y + 25 + i * 25), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
        
        # ROI 정보 표시
        if roi_coords:
            x1, y1, x2, y2 = roi_coords
            roi_info = f"ROI: ({x2-x1}x{y2-y1})"
            cv2.putText(image, roi_info, (width - 200, 60), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)

    def run(self):
        # 디렉토리 구조 생성
        self.create_directory_structure()
        
        # 스트리밍 시작
        try:
            profile = self.pipeline.start(self.config)
        except RuntimeError as e:
            print("=" * 60)
            print("ERROR: RealSense 카메라를 찾을 수 없습니다!")
            print("=" * 60)
            print("\n다음 사항을 확인해주세요:")
            print("  1. RealSense 카메라가 USB 포트에 연결되어 있는지 확인")
            print("  2. 카메라의 LED가 켜져 있는지 확인")
            print("  3. 다른 프로그램에서 카메라를 사용 중인지 확인")
            print("\n원본 에러 메시지:", str(e))
            print("=" * 60)
            return
        
        # 깊이 스케일 가져오기
        self.depth_scale = profile.get_device().first_depth_sensor().get_depth_scale()
        
        # 윈도우 생성 및 마우스 콜백 설정
        cv2.namedWindow('Data Collector')
        cv2.setMouseCallback('Data Collector', self.mouse_callback)
        
        print("\n" + "=" * 60)
        print("데이터 수집 프로그램 시작")
        print("=" * 60)
        print("사용법:")
        print("  1. [L] 키를 눌러 라벨(클래스명) 입력")
        print("  2. 마우스로 드래그하여 ROI 선택")
        print("  3. [S] 키를 눌러 저장")
        print("  4. [C] 키로 ROI 초기화")
        print("  5. [Q] 키로 종료")
        print("=" * 60 + "\n")
        
        try:
            while True:
                # 프레임 대기
                frames = self.pipeline.wait_for_frames()
                depth_frame = frames.get_depth_frame()
                color_frame = frames.get_color_frame()
                if not depth_frame or not color_frame:
                    continue
                
                # 이미지를 numpy 배열로 변환
                depth_image = np.asanyarray(depth_frame.get_data())
                color_image = np.asanyarray(color_frame.get_data())
                
                # 표시용 이미지 복사
                display_image = color_image.copy()
                
                # ROI 그리기
                roi_coords = None
                if self.roi_start is not None and self.roi_end is not None:
                    x1 = max(0, min(self.roi_start[0], self.roi_end[0]))
                    y1 = max(0, min(self.roi_start[1], self.roi_end[1]))
                    x2 = min(639, max(self.roi_start[0], self.roi_end[0]))
                    y2 = min(479, max(self.roi_start[1], self.roi_end[1]))
                    
                    roi_coords = (x1, y1, x2, y2)
                    
                    # ROI 사각형 그리기
                    color = (0, 255, 255) if self.is_drawing else (0, 255, 0)
                    cv2.rectangle(display_image, (x1, y1), (x2, y2), color, 2)
                    
                    # ROI 영역 반투명 오버레이
                    if self.roi_selected and x2 > x1 and y2 > y1:
                        overlay = display_image.copy()
                        cv2.rectangle(overlay, (x1, y1), (x2, y2), (0, 255, 0), -1)
                        cv2.addWeighted(overlay, 0.2, display_image, 0.8, 0, display_image)
                
                # UI 그리기
                self.draw_ui(display_image, roi_coords)
                
                window_name = 'Data Collector'
                # 윈도우 닫기 버튼(X) 클릭 감지
                if cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) < 1:
                    break

                # 화면 표시
                cv2.imshow(window_name, display_image)
                
                # 키 입력 처리
                key = cv2.waitKey(1) & 0xFF
                
                if key == ord('q'):
                    print("\n프로그램을 종료합니다.")
                    break
                    
                elif key == ord('l'):
                    # 라벨 입력
                    print("\n" + "-" * 40)
                    new_label = input("클래스 라벨을 입력하세요: ").strip()
                    if new_label:
                        self.current_label = new_label
                        print(f"✓ 라벨 설정: {self.current_label}")
                    else:
                        print("⚠ 라벨이 비어있습니다.")
                    print("-" * 40 + "\n")
                    
                elif key == ord('s'):
                    # ROI 저장
                    if self.roi_selected and roi_coords:
                        x1, y1, x2, y2 = roi_coords
                        if x2 > x1 and y2 > y1:
                            self.save_roi_data(color_image, depth_image, roi_coords)
                        else:
                            print("⚠ 유효하지 않은 ROI입니다.")
                    else:
                        print("⚠ ROI를 먼저 선택하세요.")
                        
                elif key == ord('c'):
                    # ROI 초기화
                    self.roi_start = None
                    self.roi_end = None
                    self.is_drawing = False
                    self.roi_selected = False
                    print("✓ ROI 초기화")
        
        finally:
            # 정리
            self.pipeline.stop()
            cv2.destroyAllWindows()
            
            print("\n" + "=" * 60)
            print(f"총 {self.save_count}개의 샘플이 저장되었습니다.")
            print(f"데이터 위치: {os.path.abspath(self.BASE_DIR)}")
            print("=" * 60)

if "RUN_DATA_COLLECTION" in globals() and RUN_DATA_COLLECTION:
    collector = DataCollector()
    collector.run()
else:
    print("Skip data collection.(set RUN_DATA_COLLECTION=1 to run)")


## 2. Dataset Preparation

** "수집된 데이터를 섞고 나누어서, 인공지능이 학습할 수 있는 만반의 준비를 마치는 과정" **이 한 번에 수행됩니다.

### 데이터셋 분할
- 수집된 원본 데이터(dataset/)를 학습(Train), 검증(Val), 테스트(Test)용 폴더(dataset_split/)로 자동 분리합니다.
- 기본적으로 Train(학습): 70%, Val(검증): 15%, Test(테스트): 15% 비율로 나누며, 랜덤 시드(Seed)를 고정하여 항상 동일하게 분할되도록 보장합니다.
- 모델이 학습하지 않은 데이터로 성능을 공정하게 평가할 수 있는 기반을 마련합니다.

### 데이터셋 테스트
- Train 및 Val 데이터셋을 로드해보고, 데이터 개수가 몇 개인지, 어떤 클래스가 감지되었는지 출력하여 정상을 확인합니다.

In [None]:
"""
데이터셋 유틸리티 - train/val/test 분할 및 데이터 로딩
"""
import os
import shutil
import json
from pathlib import Path
import random

def split_dataset(source_dir="dataset", output_dir="dataset_split", 
                  train_ratio=0.7, val_ratio=0.15, test_ratio=0.15, seed=42):
    """
    데이터셋을 train/val/test로 분할
    
    Args:
        source_dir: 원본 데이터셋 디렉토리
        output_dir: 분할된 데이터셋 저장 디렉토리
        train_ratio: 학습 데이터 비율
        val_ratio: 검증 데이터 비율
        test_ratio: 테스트 데이터 비율
        seed: 랜덤 시드
    """
    random.seed(seed)
    
    # 비율 검증
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6, \
        "train_ratio + val_ratio + test_ratio must equal 1.0"
    
    # 출력 디렉토리 생성
    splits = ['train', 'val', 'test']
    for split in splits:
        split_dir = os.path.join(output_dir, split)
        if os.path.exists(split_dir):
            print(f"⚠ {split_dir} 이미 존재합니다. 건너뜁니다.")
        else:
            os.makedirs(split_dir)
    
    # 클래스별 처리
    class_dirs = [d for d in os.listdir(source_dir) 
                  if os.path.isdir(os.path.join(source_dir, d))]
    
    print(f"\n발견된 클래스: {class_dirs}")
    print(f"분할 비율 - Train: {train_ratio}, Val: {val_ratio}, Test: {test_ratio}\n")
    
    total_stats = {'train': 0, 'val': 0, 'test': 0}
    
    for class_name in class_dirs:
        class_path = os.path.join(source_dir, class_name)
        
        # 클래스별 이미지 파일 수집 (color 이미지만)
        color_images = [f for f in os.listdir(class_path) 
                       if f.endswith('_color.png')]
        
        # 타임스탬프 추출 (중복 방지)
        timestamps = list(set([img.replace('_color.png', '') for img in color_images]))
        
        # 셔플
        random.shuffle(timestamps)
        
        # 분할 인덱스 계산
        n_total = len(timestamps)
        n_train = int(n_total * train_ratio)
        n_val = int(n_total * val_ratio)
        
        train_timestamps = timestamps[:n_train]
        val_timestamps = timestamps[n_train:n_train + n_val]
        test_timestamps = timestamps[n_train + n_val:]
        
        # 각 split에 복사
        split_data = {
            'train': train_timestamps,
            'val': val_timestamps,
            'test': test_timestamps
        }
        
        for split, timestamps_list in split_data.items():
            # 클래스 디렉토리 생성
            split_class_dir = os.path.join(output_dir, split, class_name)
            os.makedirs(split_class_dir, exist_ok=True)
            
            # 파일 복사
            for timestamp in timestamps_list:
                # Color 이미지
                color_src = os.path.join(class_path, f"{timestamp}_color.png")
                color_dst = os.path.join(split_class_dir, f"{timestamp}_color.png")
                
                # Depth 이미지
                depth_src = os.path.join(class_path, f"{timestamp}_depth.png")
                depth_dst = os.path.join(split_class_dir, f"{timestamp}_depth.png")
                
                if os.path.exists(color_src):
                    shutil.copy2(color_src, color_dst)
                if os.path.exists(depth_src):
                    shutil.copy2(depth_src, depth_dst)
            
            total_stats[split] += len(timestamps_list)
            print(f"  {class_name}/{split}: {len(timestamps_list)} samples")
    
    print(f"\n총 분할 결과:")
    print(f"  Train: {total_stats['train']} samples")
    print(f"  Val: {total_stats['val']} samples")
    print(f"  Test: {total_stats['test']} samples")
    print(f"  Total: {sum(total_stats.values())} samples")
    print(f"\n✓ 데이터셋 분할 완료: {output_dir}")

def get_class_names(dataset_dir):
    """데이터셋에서 클래스 이름 추출"""
    class_names = sorted([d for d in os.listdir(dataset_dir) 
                         if os.path.isdir(os.path.join(dataset_dir, d))])
    return class_names

def count_samples(dataset_dir):
    """데이터셋의 샘플 수 계산"""
    class_names = get_class_names(dataset_dir)
    stats = {}
    
    for class_name in class_names:
        class_path = os.path.join(dataset_dir, class_name)
        color_images = [f for f in os.listdir(class_path) 
                       if f.endswith('_color.png')]
        stats[class_name] = len(color_images)
    
    return stats

# if __name__ == "__main__":
    # 데이터셋 정보 출력
    print("=" * 60)
    print("데이터셋 분석")
    print("=" * 60)
    
    if os.path.exists("dataset"):
        stats = count_samples("dataset")
        print("\n클래스별 샘플 수:")
        for class_name, count in stats.items():
            print(f"  {class_name}: {count} samples")
        print(f"\n총 샘플 수: {sum(stats.values())}")
        
        # 데이터셋 분할
        print("\n" + "=" * 60)
        print("데이터셋 분할 시작")
        print("=" * 60)
        split_dataset()
    else:
        print("⚠ dataset 폴더를 찾을 수 없습니다.")


"""
PyTorch Dataset 클래스 정의
"""
import os
import cv2
import numpy as np
import torch
from torch.utils.data import Dataset
from torchvision import transforms
from PIL import Image

class RealSenseDataset(Dataset):
    """RealSense RGB-D 데이터셋"""
    
    def __init__(self, root_dir, transform=None, use_depth=False, img_size=224):
        """
        Args:
            root_dir: 데이터셋 루트 디렉토리 (train, val, test 중 하나)
            transform: 이미지 변환 (torchvision.transforms)
            use_depth: Depth 정보 사용 여부
            img_size: 이미지 크기 (정사각형)
        """
        self.root_dir = root_dir
        self.transform = transform
        self.use_depth = use_depth
        self.img_size = img_size
        
        # 클래스 이름 및 인덱스 매핑
        self.classes = sorted([d for d in os.listdir(root_dir) 
                              if os.path.isdir(os.path.join(root_dir, d))])
        self.class_to_idx = {cls_name: idx for idx, cls_name in enumerate(self.classes)}
        
        # 샘플 수집
        self.samples = []
        for class_name in self.classes:
            class_dir = os.path.join(root_dir, class_name)
            color_images = [f for f in os.listdir(class_dir) 
                           if f.endswith('_color.png')]
            
            for color_img in color_images:
                timestamp = color_img.replace('_color.png', '')
                color_path = os.path.join(class_dir, color_img)
                depth_path = os.path.join(class_dir, f"{timestamp}_depth.png")
                
                # Depth 파일 존재 확인
                if self.use_depth and not os.path.exists(depth_path):
                    continue
                
                self.samples.append({
                    'color': color_path,
                    'depth': depth_path if self.use_depth else None,
                    'label': self.class_to_idx[class_name],
                    'class_name': class_name
                })
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        sample = self.samples[idx]
        
        # Color 이미지 로드
        color_image = cv2.imread(sample['color'])
        color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
        
        # 이미지 크기 조정
        color_image = cv2.resize(color_image, (self.img_size, self.img_size))
        
        if self.use_depth:
            # Depth 이미지 로드
            depth_image = cv2.imread(sample['depth'], cv2.IMREAD_UNCHANGED)
            depth_image = cv2.resize(depth_image, (self.img_size, self.img_size))
            
            # Depth 정규화 (0-255 범위로)
            depth_image = cv2.normalize(depth_image, None, 0, 255, cv2.NORM_MINMAX)
            depth_image = depth_image.astype(np.uint8)
            
            # RGB + D = 4채널
            image = np.dstack([color_image, depth_image])
        else:
            image = color_image
        
        # PIL Image로 변환 (transforms 적용을 위해)
        if self.use_depth:
            # 4채널은 별도 처리
            image = torch.from_numpy(image).permute(2, 0, 1).float() / 255.0
        else:
            image = Image.fromarray(image)
            if self.transform:
                image = self.transform(image)
        
        label = sample['label']
        
        return image, label
    
    def get_class_name(self, idx):
        """인덱스로부터 클래스 이름 반환"""
        return self.classes[idx]


def get_transforms(img_size=224, augment=True):
    """
    데이터 변환 정의
    
    Args:
        img_size: 이미지 크기
        augment: 데이터 증강 사용 여부
    """
    if augment:
        # 학습용 변환 (데이터 증강 포함)
        train_transform = transforms.Compose([
            transforms.Resize((img_size, img_size)),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomRotation(degrees=15),
            transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                               std=[0.229, 0.224, 0.225])
        ])
    else:
        # 검증/테스트용 변환
        train_transform = transforms.Compose([
            transforms.Resize((img_size, img_size)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                               std=[0.229, 0.224, 0.225])
        ])
    
    val_transform = transforms.Compose([
        transforms.Resize((img_size, img_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                           std=[0.229, 0.224, 0.225])
    ])
    
    return train_transform, val_transform


def test_dataset():
    """데이터셋 로드 테스트"""
    print("=" * 60)
    print("데이터셋 테스트")
    print("=" * 60)
    
    if os.path.exists("dataset_split/train"):
        train_transform, val_transform = get_transforms()
        
        train_dataset = RealSenseDataset("dataset_split/train", 
                                        transform=train_transform,
                                        use_depth=False)
        
        print(f"\n클래스: {train_dataset.classes}")
        print(f"클래스 수: {len(train_dataset.classes)}")
        print(f"학습 샘플 수: {len(train_dataset)}")
        
        # 첫 번째 샘플 확인
        if len(train_dataset) > 0:
            image, label = train_dataset[0]
            print(f"\n샘플 확인:")
            print(f"  이미지 shape: {image.shape}")
            print(f"  라벨: {label} ({train_dataset.get_class_name(label)})")
    else:
        print("⚠ dataset_split/train 폴더를 찾을 수 없습니다.")
        print("먼저 dataset_utils.py를 실행하여 데이터셋을 분할하세요.")

if "RUN_DATASET_SPLIT" in globals() and RUN_DATASET_SPLIT:
    split_dataset()

    test_dataset()
else:
    print("Skip dataset split/test (set NB_RUN_DATASET_SPLIT=1 to enable).")        

## 3. Training

### 데이터 준비 (Data Loader)
- dataset.RealSenseDataset 커스텀 데이터셋 사용합니다.
- Train: Augmentation 적용 (Flip, Rotation 등) + Shuffle, 일반화(Generalization) 성능을 확보하고 과적합(Overfitting)을 방지합니다.
- Validation: Resize/Normalize + No Shuffle, 평가의 일관성을 유지합니다.

### 모델 아키텍처 (Model Construction)
- Transfer Learning: Pre-trained (MobileNetV2 등) Backbone 사용
- Fine-tuning: 마지막 FC Layer를 현재 클래스 수(num_classes)에 맞게 교체

### 최적화 전략 (Optimization)
- Loss: CrossEntropyLoss (Multi-class Classification)
- Optimizer: Adam (Adaptive Moment Estimation)
- Scheduler: ReduceLROnPlateau (Validation Loss 정체 시 LR 감소)

### 학습 파이프라인 (Training Loop)
- Train Step: Forward -> Loss -> Backward -> Optimizer Step
- Validation Step: no_grad() 상태로 Inference -> 성능 평가 (Loss/Acc)
- Monitoring: tqdm으로 진행률 및 실시간 지표 표시

### 모델 관리 (Model Management)
- Best Model Saving: Validation Accuracy 최고점 갱신 시 best_model.pth 저장
- Early Stopping: 성능 개선 없을 시 (patience=10) 조기 종료
- Log: 학습 이력(Loss/Acc) JSON 저장

In [None]:
"""
PyTorch 학습 스크립트
"""
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import models
import numpy as np
from tqdm import tqdm
import json
from datetime import datetime

# from dataset import RealSenseDataset, get_transforms

class Trainer:
    def __init__(self, config):
        self.config = config
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"사용 디바이스: {self.device}")
        
        # 데이터셋 로드
        self.load_datasets()
        
        # 모델 생성
        self.create_model()
        
        # Loss, Optimizer 설정
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=config['learning_rate'])
        self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            self.optimizer, mode='min', factor=0.5, patience=5
        )
        
        # 학습 기록
        self.history = {
            'train_loss': [],
            'train_acc': [],
            'val_loss': [],
            'val_acc': []
        }
        
        self.best_val_acc = 0.0
    
    def load_datasets(self):
        """데이터셋 로드"""
        train_transform, val_transform = get_transforms(
            img_size=self.config['img_size'],
            augment=self.config['use_augmentation']
        )
        
        self.train_dataset = RealSenseDataset(
            root_dir=self.config['train_dir'],
            transform=train_transform,
            use_depth=self.config['use_depth'],
            img_size=self.config['img_size']
        )
        
        self.val_dataset = RealSenseDataset(
            root_dir=self.config['val_dir'],
            transform=val_transform,
            use_depth=self.config['use_depth'],
            img_size=self.config['img_size']
        )
        
        self.train_loader = DataLoader(
            self.train_dataset,
            batch_size=self.config['batch_size'],
            shuffle=True,
            num_workers=self.config['num_workers']
        )
        
        self.val_loader = DataLoader(
            self.val_dataset,
            batch_size=self.config['batch_size'],
            shuffle=False,
            num_workers=self.config['num_workers']
        )
        
        self.num_classes = len(self.train_dataset.classes)
        self.class_names = self.train_dataset.classes
        
        print(f"\n데이터셋 로드 완료:")
        print(f"  클래스: {self.class_names}")
        print(f"  학습 샘플: {len(self.train_dataset)}")
        print(f"  검증 샘플: {len(self.val_dataset)}")
    
    def create_model(self):
        """모델 생성"""
        model_name = self.config['model_name']
        
        if model_name == 'resnet18':
            self.model = models.resnet18(pretrained=self.config['use_pretrained'])
            in_features = self.model.fc.in_features
            self.model.fc = nn.Linear(in_features, self.num_classes)
            
        elif model_name == 'resnet50':
            self.model = models.resnet50(pretrained=self.config['use_pretrained'])
            in_features = self.model.fc.in_features
            self.model.fc = nn.Linear(in_features, self.num_classes)
            
        elif model_name == 'mobilenet_v2':
            self.model = models.mobilenet_v2(pretrained=self.config['use_pretrained'])
            in_features = self.model.classifier[1].in_features
            self.model.classifier[1] = nn.Linear(in_features, self.num_classes)
            
        else:
            raise ValueError(f"지원하지 않는 모델: {model_name}")
        
        self.model = self.model.to(self.device)
        print(f"\n모델 생성 완료: {model_name}")
        print(f"  사전 학습 가중치: {'사용' if self.config['use_pretrained'] else '미사용'}")
    
    def train_epoch(self):
        """1 에폭 학습"""
        self.model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        pbar = tqdm(self.train_loader, desc='Training')
        for images, labels in pbar:
            images = images.to(self.device)
            labels = labels.to(self.device)
            
            # Forward
            self.optimizer.zero_grad()
            outputs = self.model(images)
            loss = self.criterion(outputs, labels)
            
            # Backward
            loss.backward()
            self.optimizer.step()
            
            # 통계
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
            
            # Progress bar 업데이트
            pbar.set_postfix({
                'loss': f'{running_loss/len(pbar):.4f}',
                'acc': f'{100.*correct/total:.2f}%'
            })
        
        epoch_loss = running_loss / len(self.train_loader)
        epoch_acc = 100. * correct / total
        
        return epoch_loss, epoch_acc
    
    def validate(self):
        """검증"""
        self.model.eval()
        running_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            pbar = tqdm(self.val_loader, desc='Validation')
            for images, labels in pbar:
                images = images.to(self.device)
                labels = labels.to(self.device)
                
                outputs = self.model(images)
                loss = self.criterion(outputs, labels)
                
                running_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()
                
                pbar.set_postfix({
                    'loss': f'{running_loss/len(pbar):.4f}',
                    'acc': f'{100.*correct/total:.2f}%'
                })
        
        epoch_loss = running_loss / len(self.val_loader)
        epoch_acc = 100. * correct / total
        
        return epoch_loss, epoch_acc
    
    def train(self):
        """전체 학습 루프"""
        print("\n" + "=" * 60)
        print("학습 시작")
        print("=" * 60)
        
        for epoch in range(self.config['num_epochs']):
            print(f"\nEpoch {epoch+1}/{self.config['num_epochs']}")
            print("-" * 60)
            
            # 학습
            train_loss, train_acc = self.train_epoch()
            
            # 검증
            val_loss, val_acc = self.validate()
            
            # 학습률 조정
            self.scheduler.step(val_loss)
            
            # 기록
            self.history['train_loss'].append(train_loss)
            self.history['train_acc'].append(train_acc)
            self.history['val_loss'].append(val_loss)
            self.history['val_acc'].append(val_acc)
            
            # 결과 출력
            print(f"\n결과:")
            print(f"  Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
            print(f"  Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
            
            # 최고 성능 모델 저장
            if val_acc > self.best_val_acc:
                self.best_val_acc = val_acc
                self.save_checkpoint('best_model.pth', epoch, val_acc)
                print(f"  ✓ 최고 성능 모델 저장 (Val Acc: {val_acc:.2f}%)")
            
            # Early Stopping (선택사항)
            if self.config['early_stopping_patience'] > 0:
                if epoch > self.config['early_stopping_patience']:
                    recent_val_acc = self.history['val_acc'][-self.config['early_stopping_patience']:]
                    if all(acc <= self.best_val_acc for acc in recent_val_acc):
                        print(f"\nEarly stopping at epoch {epoch+1}")
                        break
        
        # 최종 모델 저장
        self.save_checkpoint('final_model.pth', self.config['num_epochs'], val_acc)
        
        # 학습 기록 저장
        self.save_history()
        
        print("\n" + "=" * 60)
        print("학습 완료!")
        print(f"최고 검증 정확도: {self.best_val_acc:.2f}%")
        print("=" * 60)
    
    def save_checkpoint(self, filename, epoch, val_acc):
        """체크포인트 저장"""
        checkpoint = {
            'epoch': epoch,
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'val_acc': val_acc,
            'class_names': self.class_names,
            'config': self.config
        }
        
        save_path = os.path.join(self.config['save_dir'], filename)
        torch.save(checkpoint, save_path)
    
    def save_history(self):
        """학습 기록 저장"""
        history_path = os.path.join(self.config['save_dir'], 'training_history.json')
        with open(history_path, 'w') as f:
            json.dump(self.history, f, indent=2)
        print(f"학습 기록 저장: {history_path}")


def main():
    # 학습 설정
    config = {
        # 데이터
        'train_dir': 'dataset_split/train',
        'val_dir': 'dataset_split/val',
        'img_size': 224,
        'use_depth': False,  # Depth 정보 사용 여부
        'use_augmentation': True,
        
        # 모델
        'model_name': 'mobilenet_v2',  # 'resnet18', 'resnet50', 'mobilenet_v2'
        'use_pretrained': True,  # Transfer Learning
        
        # 학습
        'batch_size': 16,
        'num_epochs': 50,
        'learning_rate': 0.001,
        'num_workers': 0,  # Windows Jupyter Notebook: 0으로 설정 (멀티프로세싱 이슈 방지)
        'early_stopping_patience': 10,
        
        # 저장
        'save_dir': 'models'
    }
    
    # 저장 디렉토리 생성
    os.makedirs(config['save_dir'], exist_ok=True)
    
    # 설정 출력
    print("=" * 60)
    print("학습 설정")
    print("=" * 60)
    for key, value in config.items():
        print(f"  {key}: {value}")
    
    # 학습 시작
    trainer = Trainer(config)
    trainer.train()

if "RUN_TRAINING" in globals() and RUN_TRAINING:
    main()
else:
    print("Skip training (set NB_RUN_TRAINING=1 to enable).")


## 4. Inference

### 개요
- RealSense 카메라로 실시간 영상을 받아 사용자가 선택한 ROI(관심 영역)의 객체를 분류한다.

### 주요 기능
- 모델 로드: 학습된 모델(ResNet18/50, MobileNetV2) 불러오기
-  ROI 선택: 마우스 드래그로 관심 영역 지정
- 실시간 추론: 선택된 영역의 객체 클래스 분류
- 결과 시각화: 클래스명, 신뢰도, 전체 확률 화면 표시

### 키 조작
- **마우스 드래그**: ROI 영역 선택
- **[C]**:  ROI 초기화
- **[Q]** : 프로그램 종료
- **X 버튼**: 윈도우 닫기로 종료

### 실행 조건
- models/best_model.pth 파일 필요 (학습 완료된 모델)
- RealSense 카메라 연결 필요

In [None]:
"""
실시간 추론 프로그램 - RealSense ROI 판별
"""
import os
import torch
import torch.nn as nn
from torchvision import models, transforms
import cv2
import numpy as np
import pyrealsense2 as rs
from PIL import Image

class RealtimeInference:
    def __init__(self, model_path, img_size=224, use_depth=False):
        """
        Args:
            model_path: 학습된 모델 경로
            img_size: 입력 이미지 크기
            use_depth: Depth 정보 사용 여부
        """
        self.img_size = img_size
        self.use_depth = use_depth
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        # 모델 로드
        self.load_model(model_path)
        
        # 이미지 전처리 변환
        self.transform = transforms.Compose([
            transforms.Resize((img_size, img_size)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                               std=[0.229, 0.224, 0.225])
        ])
        
        # ROI 선택 상태
        self.roi_start = None
        self.roi_end = None
        self.is_drawing = False
        self.roi_selected = False
        
        print(f"✓ 모델 로드 완료")
        print(f"  클래스: {self.class_names}")
        print(f"  디바이스: {self.device}")
    
    def load_model(self, model_path):
        """학습된 모델 로드"""
        checkpoint = torch.load(model_path, map_location=self.device)
        
        # 설정 및 클래스 정보
        self.class_names = checkpoint['class_names']
        self.num_classes = len(self.class_names)
        config = checkpoint['config']
        
        # 모델 생성
        model_name = config['model_name']
        
        if model_name == 'resnet18':
            self.model = models.resnet18(pretrained=False)
            in_features = self.model.fc.in_features
            self.model.fc = nn.Linear(in_features, self.num_classes)
            
        elif model_name == 'resnet50':
            self.model = models.resnet50(pretrained=False)
            in_features = self.model.fc.in_features
            self.model.fc = nn.Linear(in_features, self.num_classes)
            
        elif model_name == 'mobilenet_v2':
            self.model = models.mobilenet_v2(pretrained=False)
            in_features = self.model.classifier[1].in_features
            self.model.classifier[1] = nn.Linear(in_features, self.num_classes)
        
        # 가중치 로드
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.model = self.model.to(self.device)
        self.model.eval()
    
    def mouse_callback(self, event, x, y, flags, param):
        """마우스 이벤트 처리"""
        if event == cv2.EVENT_LBUTTONDOWN:
            self.is_drawing = True
            self.roi_start = (x, y)
            self.roi_end = (x, y)
            self.roi_selected = False
            
        elif event == cv2.EVENT_MOUSEMOVE:
            if self.is_drawing:
                self.roi_end = (x, y)
                
        elif event == cv2.EVENT_LBUTTONUP:
            self.is_drawing = False
            self.roi_end = (x, y)
            self.roi_selected = True
    
    def preprocess_roi(self, color_image, roi_coords):
        """ROI 전처리"""
        x1, y1, x2, y2 = roi_coords
        roi = color_image[y1:y2, x1:x2]
        
        # BGR to RGB
        roi_rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
        
        # PIL Image로 변환
        roi_pil = Image.fromarray(roi_rgb)
        
        # Transform 적용
        roi_tensor = self.transform(roi_pil)
        roi_tensor = roi_tensor.unsqueeze(0)  # 배치 차원 추가
        
        return roi_tensor
    
    def predict(self, roi_tensor):
        """추론 수행"""
        with torch.no_grad():
            roi_tensor = roi_tensor.to(self.device)
            outputs = self.model(roi_tensor)
            
            # Softmax로 확률 계산
            probabilities = torch.nn.functional.softmax(outputs, dim=1)
            confidence, predicted = torch.max(probabilities, 1)
            
            predicted_class = predicted.item()
            confidence_score = confidence.item()
            
            return predicted_class, confidence_score, probabilities[0].cpu().numpy()
    
    def draw_results(self, image, roi_coords, predicted_class, confidence, probabilities):
        """결과 시각화"""
        x1, y1, x2, y2 = roi_coords
        
        # ROI 사각형
        color = (0, 255, 0) if confidence > 0.7 else (0, 165, 255)
        cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
        
        # 예측 결과 텍스트
        class_name = self.class_names[predicted_class]
        result_text = f"{class_name}: {confidence*100:.1f}%"
        
        # 텍스트 배경
        text_size = cv2.getTextSize(result_text, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)[0]
        text_x = x1
        text_y = y1 - 10
        
        if text_y < 30:
            text_y = y2 + 25
        
        # 배경 박스
        cv2.rectangle(image, (text_x - 5, text_y - text_size[1] - 5),
                     (text_x + text_size[0] + 5, text_y + 5),
                     (0, 0, 0), -1)
        
        # 텍스트
        cv2.putText(image, result_text, (text_x, text_y),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
        
        # 모든 클래스 확률 표시 (오른쪽 상단)
        prob_y = 30
        for i, (cls_name, prob) in enumerate(zip(self.class_names, probabilities)):
            prob_text = f"{cls_name}: {prob*100:.1f}%"
            prob_color = (0, 255, 0) if i == predicted_class else (200, 200, 200)
            cv2.putText(image, prob_text, (image.shape[1] - 200, prob_y + i * 25),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, prob_color, 1)
    
    def run(self):
        """실시간 추론 실행"""
        # RealSense 파이프라인 설정
        pipeline = rs.pipeline()
        config = rs.config()
        
        config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
        config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
        
        # 스트리밍 시작
        try:
            pipeline.start(config)
        except RuntimeError as e:
            print("=" * 60)
            print("ERROR: RealSense 카메라를 찾을 수 없습니다!")
            print("=" * 60)
            print("\n다음 사항을 확인해주세요:")
            print("  1. RealSense 카메라가 USB 포트에 연결되어 있는지 확인")
            print("  2. 카메라의 LED가 켜져 있는지 확인")
            print("  3. 다른 프로그램에서 카메라를 사용 중인지 확인")
            print("\n원본 에러 메시지:", str(e))
            print("=" * 60)
            return
        
        # 윈도우 생성 및 마우스 콜백
        cv2.namedWindow('RealSense Inference')
        cv2.setMouseCallback('RealSense Inference', self.mouse_callback)
        
        print("\n" + "=" * 60)
        print("실시간 추론 시작")
        print("=" * 60)
        print("사용법:")
        print("  1. 마우스로 드래그하여 ROI 선택")
        print("  2. 자동으로 추론 결과 표시")
        print("  3. [C] 키로 ROI 초기화")
        print("  4. [Q] 키로 종료")
        print("=" * 60 + "\n")
        
        try:
            while True:
                # 프레임 대기
                frames = pipeline.wait_for_frames()
                depth_frame = frames.get_depth_frame()
                color_frame = frames.get_color_frame()
                if not depth_frame or not color_frame:
                    continue
                
                # 이미지 변환
                depth_image = np.asanyarray(depth_frame.get_data())
                color_image = np.asanyarray(color_frame.get_data())
                
                # 표시용 이미지
                display_image = color_image.copy()
                
                # ROI 그리기 및 추론
                if self.roi_start is not None and self.roi_end is not None:
                    x1 = max(0, min(self.roi_start[0], self.roi_end[0]))
                    y1 = max(0, min(self.roi_start[1], self.roi_end[1]))
                    x2 = min(639, max(self.roi_start[0], self.roi_end[0]))
                    y2 = min(479, max(self.roi_start[1], self.roi_end[1]))
                    
                    roi_coords = (x1, y1, x2, y2)
                    
                    # 그리는 중
                    if self.is_drawing:
                        cv2.rectangle(display_image, (x1, y1), (x2, y2), (0, 255, 255), 2)
                    
                    # 선택 완료 시 추론
                    elif self.roi_selected and x2 > x1 + 10 and y2 > y1 + 10:
                        # ROI 전처리
                        roi_tensor = self.preprocess_roi(color_image, roi_coords)
                        
                        # 추론
                        predicted_class, confidence, probabilities = self.predict(roi_tensor)
                        
                        # 결과 시각화
                        self.draw_results(display_image, roi_coords, 
                                        predicted_class, confidence, probabilities)
                
                # UI 안내
                cv2.putText(display_image, "Drag to select ROI for inference", (10, 30),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv2.putText(display_image, "[C] Clear | [Q] Quit", (10, 60),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                
                
                window_name = 'RealSense Inference'
                # 윈도우 닫기 버튼(X) 클릭 감지
                if cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) < 1:
                    break

                # 화면 표시
                cv2.imshow(window_name, display_image)
                
                # 키 입력
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    break
                elif key == ord('c'):
                    self.roi_start = None
                    self.roi_end = None
                    self.is_drawing = False
                    self.roi_selected = False
        
        finally:
            pipeline.stop()
            cv2.destroyAllWindows()
            print("\n프로그램 종료")


def inference_main():
    # 모델 경로
    model_path = "models/best_model.pth"
    
    if not os.path.exists(model_path):
        print(f"ERROR: 모델 파일을 찾을 수 없습니다: {model_path}")
        print("먼저 train.py를 실행하여 모델을 학습하세요.")
        return
    
    # 추론 실행
    inference = RealtimeInference(model_path)
    inference.run()


if "RUN_INFERENCE" in globals() and RUN_INFERENCE:
    inference_main()
else:
    print("Skip inference (set NB_RUN_INFERENCE=1 to enable).")


## 5. Tracking

### 개요
RealSense 카메라로 실시간 영상을 받아 객체를 분류하고, 템플릿 매칭 기반으로 해당 객체를 지속 추적한다.

### 주요 기능
- 모델 로드: 학습된 분류 모델(ResNet18/50, MobileNetV2) 불러오기
- ROI 선택: 마우스 드래그로 추적할 객체 영역 지정
- 객체 분류: 선택된 ROI의 클래스 판별
- 템플릿 매칭 추적: 분류된 객체를 프레임마다 자동 추적
- 결과 시각화: 추적 상태, 클래스명, 신뢰도 화면 표시


### 추적 방식
- 사용자가 ROI 선택 시 해당 영역을 템플릿으로 저장
- 매 프레임마다 이전 위치 주변(±50px)에서 템플릿 매칭 수행
- 매칭 신뢰도 0.5 이상이면 추적 성공으로 판정
- 추적 성공 시 템플릿 갱신 (적응형 추적)


### 키 조작
- **마우스 드래그**: ROI 영역 선택
- **[T]**: 추적 시작/중지
- **[C]**: ROI 및 추적 초기화
- **[R]**: 추적 중 객체 재분류
- **[Q]**: 프로그램 종료
- **X 버튼**: 윈도우 닫기로 종료

### Inference와의 차이점
- Inference: ROI 선택 → 1회 분류 → 결과 표시
- Tracking: ROI 선택 → 분류 → 연속 추적 (객체가 이동해도 따라감)

### 실행 조건
- models/best_model.pth 파일 필요
- RealSense 카메라 연결 필요



In [None]:
"""
실시간 추론 + 객체 추적 프로그램
한 번 분류된 객체를 자동으로 추적합니다.
"""
import os
import torch
import torch.nn as nn
from torchvision import models, transforms
import cv2
import numpy as np
import pyrealsense2 as rs
from PIL import Image

class RealtimeInferenceWithTracking:
    def __init__(self, model_path, img_size=224, use_depth=False):
        """
        Args:
            model_path: 학습된 모델 경로
            img_size: 입력 이미지 크기
            use_depth: Depth 정보 사용 여부
        """
        self.img_size = img_size
        self.use_depth = use_depth
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        # 모델 로드
        self.load_model(model_path)
        
        # 이미지 전처리 변환
        self.transform = transforms.Compose([
            transforms.Resize((img_size, img_size)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                               std=[0.229, 0.224, 0.225])
        ])
        
        # ROI 선택 상태
        self.roi_start = None
        self.roi_end = None
        self.is_drawing = False
        self.roi_selected = False
        
        # 추적 상태
        self.template = None
        self.template_size = None
        self.last_bbox = None
        self.tracking = False
        self.tracked_class = None
        self.tracked_confidence = 0.0
        self.tracked_probabilities = None
        
        print(f"✓ 모델 로드 완료")
        print(f"  클래스: {self.class_names}")
        print(f"  디바이스: {self.device}")
    
    def load_model(self, model_path):
        """학습된 모델 로드"""
        checkpoint = torch.load(model_path, map_location=self.device)
        
        # 설정 및 클래스 정보
        self.class_names = checkpoint['class_names']
        self.num_classes = len(self.class_names)
        config = checkpoint['config']
        
        # 모델 생성
        model_name = config['model_name']
        
        if model_name == 'resnet18':
            self.model = models.resnet18(pretrained=False)
            in_features = self.model.fc.in_features
            self.model.fc = nn.Linear(in_features, self.num_classes)
            
        elif model_name == 'resnet50':
            self.model = models.resnet50(pretrained=False)
            in_features = self.model.fc.in_features
            self.model.fc = nn.Linear(in_features, self.num_classes)
            
        elif model_name == 'mobilenet_v2':
            self.model = models.mobilenet_v2(pretrained=False)
            in_features = self.model.classifier[1].in_features
            self.model.classifier[1] = nn.Linear(in_features, self.num_classes)
        
        # 가중치 로드
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.model = self.model.to(self.device)
        self.model.eval()
    
    def mouse_callback(self, event, x, y, flags, param):
        """마우스 이벤트 처리"""
        if event == cv2.EVENT_LBUTTONDOWN:
            self.is_drawing = True
            self.roi_start = (x, y)
            self.roi_end = (x, y)
            self.roi_selected = False
            
        elif event == cv2.EVENT_MOUSEMOVE:
            if self.is_drawing:
                self.roi_end = (x, y)
                
        elif event == cv2.EVENT_LBUTTONUP:
            self.is_drawing = False
            self.roi_end = (x, y)
            self.roi_selected = True
    
    def preprocess_roi(self, color_image, roi_coords):
        """ROI 전처리"""
        x1, y1, x2, y2 = roi_coords
        roi = color_image[y1:y2, x1:x2]
        
        # BGR to RGB
        roi_rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
        
        # PIL Image로 변환
        roi_pil = Image.fromarray(roi_rgb)
        
        # Transform 적용
        roi_tensor = self.transform(roi_pil)
        roi_tensor = roi_tensor.unsqueeze(0)  # 배치 차원 추가
        
        return roi_tensor
    
    def predict(self, roi_tensor):
        """추론 수행"""
        with torch.no_grad():
            roi_tensor = roi_tensor.to(self.device)
            outputs = self.model(roi_tensor)
            
            # Softmax로 확률 계산
            probabilities = torch.nn.functional.softmax(outputs, dim=1)
            confidence, predicted = torch.max(probabilities, 1)
            
            predicted_class = predicted.item()
            confidence_score = confidence.item()
            
            return predicted_class, confidence_score, probabilities[0].cpu().numpy()
    
    def init_tracker(self, frame, bbox):
        """트래커 초기화 - 템플릿 매칭 사용"""
        x, y, w, h = [int(v) for v in bbox]
        
        # 템플릿 저장 (추적할 영역)
        self.template = frame[y:y+h, x:x+w].copy()
        self.template_size = (w, h)
        self.last_bbox = bbox
        self.tracking = True
        
    def update_tracker(self, frame):
        """트래커 업데이트 - 템플릿 매칭으로 위치 찾기"""
        if self.template is None:
            return False, None
        
        # 이전 위치 주변에서 검색 (효율성)
        x, y, w, h = [int(v) for v in self.last_bbox]
        
        # 검색 영역 설정 (이전 위치 ±50 픽셀)
        search_margin = 50
        x1 = max(0, x - search_margin)
        y1 = max(0, y - search_margin)
        x2 = min(frame.shape[1], x + w + search_margin)
        y2 = min(frame.shape[0], y + h + search_margin)
        
        search_region = frame[y1:y2, x1:x2]
        
        # 템플릿 매칭
        try:
            result = cv2.matchTemplate(search_region, self.template, cv2.TM_CCOEFF_NORMED)
            min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
            
            # 신뢰도 체크 (0.5 이상이면 성공)
            if max_val > 0.5:
                # 새로운 위치 계산
                new_x = x1 + max_loc[0]
                new_y = y1 + max_loc[1]
                new_bbox = (new_x, new_y, w, h)
                
                # 위치 업데이트
                self.last_bbox = new_bbox
                
                # 템플릿 업데이트 (적응형 추적)
                self.template = frame[new_y:new_y+h, new_x:new_x+w].copy()
                
                return True, new_bbox
            else:
                return False, None
        except:
            return False, None
    
    def draw_results(self, image, bbox, predicted_class, confidence, probabilities, is_tracking=False):
        """결과 시각화"""
        x, y, w, h = [int(v) for v in bbox]
        x1, y1, x2, y2 = x, y, x + w, y + h
        
        # 박스 색상 (추적 중이면 파란색, 아니면 초록/주황)
        if is_tracking:
            color = (255, 0, 0)  # 파란색 - 추적 중
        else:
            color = (0, 255, 0) if confidence > 0.7 else (0, 165, 255)
        
        cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
        
        # 예측 결과 텍스트
        class_name = self.class_names[predicted_class]
        status = "[TRACKING]" if is_tracking else "[DETECTED]"
        result_text = f"{status} {class_name}: {confidence*100:.1f}%"
        
        # 텍스트 배경
        text_size = cv2.getTextSize(result_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]
        text_x = x1
        text_y = y1 - 10
        
        if text_y < 30:
            text_y = y2 + 25
        
        # 배경 박스
        cv2.rectangle(image, (text_x - 5, text_y - text_size[1] - 5),
                     (text_x + text_size[0] + 5, text_y + 5),
                     (0, 0, 0), -1)
        
        # 텍스트
        cv2.putText(image, result_text, (text_x, text_y),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
        
        # 모든 클래스 확률 표시 (오른쪽 상단)
        prob_y = 30
        cv2.putText(image, "Class Probabilities:", (image.shape[1] - 220, prob_y),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
        
        for i, (cls_name, prob) in enumerate(zip(self.class_names, probabilities)):
            prob_text = f"{cls_name}: {prob*100:.1f}%"
            prob_color = (0, 255, 0) if i == predicted_class else (200, 200, 200)
            cv2.putText(image, prob_text, (image.shape[1] - 200, prob_y + (i+1) * 25),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, prob_color, 1)
    
    def run(self):
        """실시간 추론 + 추적 실행"""
        # RealSense 파이프라인 설정
        pipeline = rs.pipeline()
        config = rs.config()
        
        config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
        config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
        
        # 스트리밍 시작
        try:
            pipeline.start(config)
        except RuntimeError as e:
            print("=" * 60)
            print("ERROR: RealSense 카메라를 찾을 수 없습니다!")
            print("=" * 60)
            print("\n다음 사항을 확인해주세요:")
            print("  1. RealSense 카메라가 USB 포트에 연결되어 있는지 확인")
            print("  2. 카메라의 LED가 켜져 있는지 확인")
            print("  3. 다른 프로그램에서 카메라를 사용 중인지 확인")
            print("\n원본 에러 메시지:", str(e))
            print("=" * 60)
            return
        
        # 윈도우 생성 및 마우스 콜백
        cv2.namedWindow('RealSense Tracking')
        cv2.setMouseCallback('RealSense Tracking', self.mouse_callback)
        
        print("\n" + "=" * 60)
        print("실시간 추론 + 추적 시작")
        print("=" * 60)
        print("사용법:")
        print("  1. 마우스로 드래그하여 ROI 선택")
        print("  2. 자동으로 추론 및 추적 시작")
        print("  3. [T] 키로 추적 시작/중지")
        print("  4. [C] 키로 ROI 초기화")
        print("  5. [R] 키로 재분류 (추적 중)")
        print("  6. [Q] 키로 종료")
        print("=" * 60 + "\n")
        
        try:
            while True:
                # 프레임 대기
                frames = pipeline.wait_for_frames()
                depth_frame = frames.get_depth_frame()
                color_frame = frames.get_color_frame()
                if not depth_frame or not color_frame:
                    continue
                
                # 이미지 변환
                depth_image = np.asanyarray(depth_frame.get_data())
                color_image = np.asanyarray(color_frame.get_data())
                
                # 표시용 이미지
                display_image = color_image.copy()
                
                # 추적 모드
                if self.tracking and self.template is not None:
                    # 트래커 업데이트
                    success, bbox = self.update_tracker(color_image)
                    
                    if success:
                        # 추적 성공 - 결과 표시
                        self.draw_results(display_image, bbox, 
                                        self.tracked_class, 
                                        self.tracked_confidence,
                                        self.tracked_probabilities,
                                        is_tracking=True)
                    else:
                        # 추적 실패
                        cv2.putText(display_image, "Tracking Lost! Press [C] to reset", 
                                   (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
                        self.tracking = False
                        self.template = None
                
                # ROI 선택 모드
                elif self.roi_start is not None and self.roi_end is not None:
                    x1 = max(0, min(self.roi_start[0], self.roi_end[0]))
                    y1 = max(0, min(self.roi_start[1], self.roi_end[1]))
                    x2 = min(639, max(self.roi_start[0], self.roi_end[0]))
                    y2 = min(479, max(self.roi_start[1], self.roi_end[1]))
                    
                    roi_coords = (x1, y1, x2, y2)
                    
                    # 그리는 중
                    if self.is_drawing:
                        cv2.rectangle(display_image, (x1, y1), (x2, y2), (0, 255, 255), 2)
                    
                    # 선택 완료 시 추론
                    elif self.roi_selected and x2 > x1 + 10 and y2 > y1 + 10:
                        # ROI 전처리
                        roi_tensor = self.preprocess_roi(color_image, roi_coords)
                        
                        # 추론
                        predicted_class, confidence, probabilities = self.predict(roi_tensor)
                        
                        # 결과 저장 (추적용)
                        self.tracked_class = predicted_class
                        self.tracked_confidence = confidence
                        self.tracked_probabilities = probabilities
                        
                        # bbox 형식으로 변환 (x, y, w, h)
                        bbox = (x1, y1, x2 - x1, y2 - y1)
                        
                        # 결과 시각화
                        self.draw_results(display_image, bbox,
                                        predicted_class, confidence, probabilities,
                                        is_tracking=False)
                
                # UI 안내
                status_y = 30
                if self.tracking:
                    cv2.putText(display_image, "MODE: TRACKING", (10, status_y),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
                else:
                    cv2.putText(display_image, "MODE: DETECTION", (10, status_y),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
                
                cv2.putText(display_image, "[T] Track | [C] Clear | [R] Reclassify | [Q] Quit", 
                           (10, status_y + 30),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                
                # 화면 표시
                window_name = 'RealSense Tracking'

                # 윈도우 닫기 버튼(X) 클릭 감지
                if cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) < 1:
                    break
                
                # imshow() 호출 전에 닫힘을 먼저 확인하면, “닫자마자 imshow가 창을 재생성”하는 상황을 피할 수 있어요.
                cv2.imshow(window_name, display_image)

                # 키 입력
                key = cv2.waitKey(1) & 0xFF

                if key == ord('q'):
                    break
                
                elif key == ord('c'):
                    # 초기화
                    self.roi_start = None
                    self.roi_end = None
                    self.is_drawing = False
                    self.roi_selected = False
                    self.tracking = False
                    self.template = None
                    print("✓ 초기화 완료")
                    
                elif key == ord('t'):
                    # 추적 시작/중지
                    if not self.tracking and self.roi_selected:
                        # 추적 시작
                        x1 = min(self.roi_start[0], self.roi_end[0])
                        y1 = min(self.roi_start[1], self.roi_end[1])
                        x2 = max(self.roi_start[0], self.roi_end[0])
                        y2 = max(self.roi_start[1], self.roi_end[1])
                        
                        bbox = (x1, y1, x2 - x1, y2 - y1)
                        self.init_tracker(color_image, bbox)
                        print(f"✓ 추적 시작: {self.class_names[self.tracked_class]}")
                    else:
                        # 추적 중지
                        self.tracking = False
                        self.template = None
                        print("✓ 추적 중지")
                        
                elif key == ord('r'):
                    # 재분류 (추적 중일 때)
                    if self.tracking and self.template is not None:
                        success, bbox = self.update_tracker(color_image)
                        if success:
                            x, y, w, h = [int(v) for v in bbox]
                            roi_coords = (x, y, x + w, y + h)
                            
                            # 재분류
                            roi_tensor = self.preprocess_roi(color_image, roi_coords)
                            predicted_class, confidence, probabilities = self.predict(roi_tensor)
                            
                            # 결과 업데이트
                            self.tracked_class = predicted_class
                            self.tracked_confidence = confidence
                            self.tracked_probabilities = probabilities
                            print(f"✓ 재분류: {self.class_names[predicted_class]} ({confidence*100:.1f}%)")
        
        finally:
            pipeline.stop()
            cv2.destroyAllWindows()
            print("\n프로그램 종료")


def tracking_main():
    # 모델 경로
    model_path = "models/best_model.pth"
    
    if not os.path.exists(model_path):
        print(f"ERROR: 모델 파일을 찾을 수 없습니다: {model_path}")
        print("먼저 2_2_train.py를 실행하여 모델을 학습하세요.")
        return
    
    # 추론 + 추적 실행
    tracker = RealtimeInferenceWithTracking(model_path)
    tracker.run()


if "RUN_TRACKING" in globals() and RUN_TRACKING:
    tracking_main()
else:
    print("Skip tracking (set NB_RUN_TRACKING=1 to enable).")