
# Thumbs Up/Down 분류 파이프라인 (캡처 ↔ 학습 ↔ 실시간 분류)
요구 반영:
- **카메라 창 크게** 띄우기 (`cv2.WINDOW_NORMAL + resizeWindow(1280x720)`)
- **데이터 촬영**과 **모델 분류(실시간)** **완전히 분리**
- PyTorch `resnet18` 전이학습 기반 (원본 코드 구조/하이퍼파라미터 최대한 유지)



## 0) 설정
- 데이터는 `data_dir/라벨/파일.jpg` 형태로 저장 (`THUMBS_UP`, `THUMBS_DOWN`)
- 모델 가중치는 `./models/` 아래에 저장
- Jetson Orin Nano에서도 동작하도록 카메라 초기화 루틴(재시도) 포함


In [1]:

import os, time, cv2, torch, numpy as np
from pathlib import Path
from datetime import datetime

# 경로/클래스
data_dir = Path("./thumbs_data")
model_dir = Path("./models")
model_dir.mkdir(parents=True, exist_ok=True)

CLASS_NAMES = {0:"THUMBS_UP", 1:"THUMBS_DOWN"}
CLASS_DIRS = [CLASS_NAMES[i] for i in sorted(CLASS_NAMES.keys())]  # ["THUMBS_UP", "THUMBS_DOWN"]
for c in CLASS_DIRS:
    (data_dir / c).mkdir(parents=True, exist_ok=True)

# 창/카메라 설정
WINDOW_NAME = "Camera"
REQ_W, REQ_H, REQ_FPS = 1280, 720, 30

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device:", device)


device: cpu



## 1) 카메라 초기화 (재시도 포함)
여러 백엔드/포맷 조합으로 시도 → 안정적으로 프레임이 읽히는 설정을 자동 선택합니다.


In [2]:

def setup_camera_with_retry():
    methods = [
        {
            'name': 'V4L2_YUYV',
            'backend': cv2.CAP_V4L2,
            'settings': {
                'fourcc': cv2.VideoWriter_fourcc('Y', 'U', 'Y', 'V'),
                'width': REQ_W,
                'height': REQ_H,
                'fps': REQ_FPS,
                'buffersize': 1,
            }
        },
        {
            'name': 'V4L2_MJPEG',
            'backend': cv2.CAP_V4L2,
            'settings': {
                'fourcc': cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'),
                'width': REQ_W,
                'height': REQ_H,
                'fps': REQ_FPS,
                'buffersize': 1,
            }
        },
        {
            'name': 'DEFAULT',
            'backend': None,
            'settings': {
                'width': REQ_W,
                'height': REQ_H,
                'fps': REQ_FPS,
                'buffersize': 1,
            }
        },
    ]
    for method in methods:
        print(f"Attempting {method['name']}...")
        try:
            cap = cv2.VideoCapture(0) if method['backend'] is None else cv2.VideoCapture(0, method['backend'])
            if not cap.isOpened():
                print(f"Failed to open camera with {method['name']}")
                continue
            st = method['settings']
            if 'fourcc' in st: cap.set(cv2.CAP_PROP_FOURCC, st['fourcc'])
            cap.set(cv2.CAP_PROP_FRAME_WIDTH, st['width'])
            cap.set(cv2.CAP_PROP_FRAME_HEIGHT, st['height'])
            cap.set(cv2.CAP_PROP_FPS, st['fps'])
            cap.set(cv2.CAP_PROP_BUFFERSIZE, st['buffersize'])
            time.sleep(1.0)
            ok_count = 0
            for _ in range(5):
                ret, frm = cap.read()
                if ret and frm is not None: ok_count += 1
                time.sleep(0.05)
            if ok_count >= 3:
                print(f"Camera initialized successfully with {method['name']}")
                return cap, method['name']
            print(f"Frame unstable with {method['name']}")
            cap.release()
        except Exception as e:
            print(f"Error with {method['name']}: {e}")
    return None, None



## 2) 데이터 촬영 전용 셀
- 창을 **크게** 띄우고, 라벨은 키보드로 선택 후 `s`를 눌러 저장
- `0`: THUMBS_UP, `1`: THUMBS_DOWN, `q`: 종료


In [3]:

import cv2
from collections import deque

label_idx = 0  # 0:UP, 1:DOWN

cap, method = setup_camera_with_retry()
if cap is None:
    raise RuntimeError("Camera initialization failed. Try reloading kernel or checking permissions.")

cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL)
cv2.resizeWindow(WINDOW_NAME, REQ_W, REQ_H)

print("데이터 촬영 시작: 0/1 라벨 선택, s=저장, q=종료")
saved = 0
try:
    while True:
        ret, frame = cap.read()
        if not ret or frame is None:
            print("Frame read failed."); time.sleep(0.1); continue
        frame = cv2.flip(frame, 1)
        msg = f"Label[{label_idx}] {CLASS_NAMES[label_idx]} | s=save | q=quit | method={method}"
        cv2.putText(frame, msg, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0,255,0), 2, cv2.LINE_AA)
        cv2.imshow(WINDOW_NAME, frame)
        k = cv2.waitKey(1) & 0xFF
        if k == ord('q') or k == 27: break
        if k == ord('0'): label_idx = 0
        if k == ord('1'): label_idx = 1
        if k == ord('s'):
            ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
            out_path = data_dir / CLASS_NAMES[label_idx] / f"{ts}.jpg"
            cv2.imwrite(str(out_path), frame)
            saved += 1
            print(f"Saved: {out_path} (total {saved})")
finally:
    cap.release()
    cv2.destroyAllWindows()
    print("촬영 종료")


Attempting V4L2_YUYV...
Failed to open camera with V4L2_YUYV
Attempting V4L2_MJPEG...
Failed to open camera with V4L2_MJPEG
Attempting DEFAULT...
Camera initialized successfully with DEFAULT
데이터 촬영 시작: 0/1 라벨 선택, s=저장, q=종료
Saved: thumbs_data\THUMBS_UP\20250816_195324_346.jpg (total 1)
Saved: thumbs_data\THUMBS_UP\20250816_195324_922.jpg (total 2)
Saved: thumbs_data\THUMBS_UP\20250816_195325_291.jpg (total 3)
Saved: thumbs_data\THUMBS_UP\20250816_195325_515.jpg (total 4)
Saved: thumbs_data\THUMBS_UP\20250816_195325_722.jpg (total 5)
Saved: thumbs_data\THUMBS_UP\20250816_195325_962.jpg (total 6)
Saved: thumbs_data\THUMBS_UP\20250816_195326_155.jpg (total 7)
Saved: thumbs_data\THUMBS_UP\20250816_195326_362.jpg (total 8)
Saved: thumbs_data\THUMBS_UP\20250816_195326_555.jpg (total 9)
Saved: thumbs_data\THUMBS_UP\20250816_195326_794.jpg (total 10)
Saved: thumbs_data\THUMBS_UP\20250816_195326_987.jpg (total 11)
Saved: thumbs_data\THUMBS_UP\20250816_195327_195.jpg (total 12)
Saved: thumbs_dat


## 3) 모델 학습 (ResNet18 전이학습)
- `torchvision.models.resnet18(pretrained)` 사용
- FC를 `num_classes=2`로 교체
- `ImageFolder`로 `thumbs_data/THUMBS_UP`, `thumbs_data/THUMBS_DOWN`에서 학습/검증 분리


In [4]:

import torch, torch.nn as nn, torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import random_split, DataLoader

num_classes = 2
epochs = 5
batch_size = 16
lr = 1e-4

# 변환
train_tf = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])
val_tf = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

# 데이터셋
full_ds = datasets.ImageFolder(str(data_dir), transform=train_tf)
if len(full_ds.classes) != 2 or set(full_ds.classes) != set(CLASS_DIRS):
    print("경고: 폴더 클래스가 기대와 다릅니다:", full_ds.classes, "기대:", CLASS_DIRS)

val_ratio = 0.2
val_len = max(1, int(len(full_ds)*val_ratio))
train_len = len(full_ds) - val_len
train_ds, val_ds = random_split(full_ds, [train_len, val_len])
# 검증에는 augmentation 제거
val_ds.dataset.transform = val_tf

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

# 모델
resnet = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
resnet.fc = nn.Linear(resnet.fc.in_features, num_classes)
resnet = resnet.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(resnet.parameters(), lr=lr)

def evaluate(model, loader):
    model.eval()
    correct = 0; total = 0; loss_sum = 0.0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            out = model(x)
            loss = criterion(out, y)
            loss_sum += loss.item()*x.size(0)
            pred = out.argmax(dim=1)
            correct += (pred==y).sum().item(); total += y.size(0)
    return loss_sum/total if total>0 else 0.0, (correct/total*100.0 if total>0 else 0.0)

best_acc = 0.0; last_train_loss = None
for ep in range(1, epochs+1):
    resnet.train()
    epoch_loss = 0.0; seen = 0
    for x, y in train_loader:
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        out = resnet(x)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()*x.size(0)
        seen += x.size(0)
    last_train_loss = epoch_loss/seen if seen>0 else None
    val_loss, val_acc = evaluate(resnet, val_loader)
    print(f"[{ep}/{epochs}] train_loss={last_train_loss:.4f} | val_loss={val_loss:.4f} | val_acc={val_acc:.1f}%")
    if val_acc > best_acc:
        best_acc = val_acc
        stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        best_path = model_dir / f"thumbs_resnet18_best_{stamp}.pth"
        torch.save({
            "state_dict": resnet.state_dict(),
            "classes": full_ds.classes,
            "val_acc": best_acc,
            "timestamp": stamp
        }, best_path)
        print("✅ Best model saved:", best_path)




[1/5] train_loss=0.1963 | val_loss=0.0059 | val_acc=100.0%
✅ Best model saved: models\thumbs_resnet18_best_20250816_195435.pth
[2/5] train_loss=0.0071 | val_loss=0.0014 | val_acc=100.0%
[3/5] train_loss=0.0022 | val_loss=0.0004 | val_acc=100.0%
[4/5] train_loss=0.0010 | val_loss=0.0004 | val_acc=100.0%
[5/5] train_loss=0.0014 | val_loss=0.0004 | val_acc=100.0%



## 4) 실시간 분류 (학습된 모델 사용)
- 저장된 best 모델 경로를 지정하거나, 폴더에서 가장 최근 파일을 자동 선택
- 카메라 창은 **크게**
- `q`로 종료


In [5]:

import glob, os, torch, cv2, numpy as np
from torchvision import models, transforms
import torch.nn as nn

# 가장 최근 best 모델 자동 선택
candidates = sorted(glob.glob(str(model_dir / "thumbs_resnet18_best_*.pth")))
assert len(candidates)>0, "저장된 모델이 없습니다. 먼저 학습 셀을 실행하세요."
model_path = candidates[-1]
print("로드 모델:", model_path)

ckpt = torch.load(model_path, map_location=device)
resnet = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
resnet.fc = nn.Linear(resnet.fc.in_features, 2)
resnet.load_state_dict(ckpt["state_dict"], strict=False)
resnet = resnet.to(device).eval()
classes = ckpt.get("classes", CLASS_DIRS)
print("클래스:", classes)

infer_tf = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

cap, method = setup_camera_with_retry()
if cap is None:
    raise RuntimeError("Camera init failed.")

cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL)
cv2.resizeWindow(WINDOW_NAME, REQ_W, REQ_H)

fps, t0, cnt = 0.0, time.time(), 0
try:
    while True:
        ret, frame = cap.read()
        if not ret: 
            time.sleep(0.05); continue
        frame = cv2.flip(frame, 1)
        x = infer_tf(frame).unsqueeze(0).to(device)
        with torch.no_grad():
            out = resnet(x)
            prob = torch.softmax(out, dim=1)[0].detach().cpu().numpy()
        idx = int(np.argmax(prob)); conf = float(prob[idx])
        label = classes[idx] if idx<len(classes) else str(idx)

        cnt += 1
        if cnt % 20 == 0:
            now = time.time(); fps = 20.0/(now - t0); t0 = now

        cv2.putText(frame, f"{label} {conf:.2f} | {fps:.1f} FPS", (20, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.1, (0,255,255), 2, cv2.LINE_AA)
        cv2.imshow(WINDOW_NAME, frame)
        k = cv2.waitKey(1) & 0xFF
        if k == ord('q') or k == 27:
            break
finally:
    cap.release(); cv2.destroyAllWindows()
    print("종료")


로드 모델: models\thumbs_resnet18_best_20250816_195435.pth
클래스: ['THUMBS_DOWN', 'THUMBS_UP']
Attempting V4L2_YUYV...
Failed to open camera with V4L2_YUYV
Attempting V4L2_MJPEG...
Failed to open camera with V4L2_MJPEG
Attempting DEFAULT...
Camera initialized successfully with DEFAULT
종료
