<a href="https://colab.research.google.com/github/Subibub/ML/blob/main/Untitled2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
import os
import json
import random
import time
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision.models.detection import ssdlite320_mobilenet_v3_large
from torchvision.models.detection.ssd import SSDClassificationHead
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import glob
import cv2
import xml.etree.ElementTree as ET
from xml.dom import minidom
import shutil
from tqdm import tqdm
import tensorflow as tf

In [2]:
# 랜덤 시드 설정
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed()

In [3]:
# Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# 데이터셋 경로 설정 (사용자 환경에 맞게 수정 필요)
DATASET_PATH = '/content/drive/MyDrive/DL'  # 이 경로는 실제 데이터셋 위치로 변경해야 합니다
ANNOTATIONS_FILE = os.path.join(DATASET_PATH, 'annotations.json')  # JSON 라벨 파일 경로
IMAGES_DIR = os.path.join(DATASET_PATH, 'images')  # 이미지 폴더 경로


In [13]:
# 설정값
YOLO_DATASET_PATH = '/content/drive/MyDrive/DL/complete_dataset'  # YOLO 데이터셋 경로
SSD_OUTPUT_PATH = '/content/drive/MyDrive/DL/ssd_dataset'     # 변환된 SSD 데이터셋 저장 경로
BATCH_SIZE = 500  # 한 번에 처리할 이미지 수


In [14]:
# 필요한 디렉토리 생성
os.makedirs(os.path.join(SSD_OUTPUT_PATH, 'images'), exist_ok=True)
os.makedirs(os.path.join(SSD_OUTPUT_PATH, 'annotations'), exist_ok=True)



In [15]:
# YOLO 포맷에서 bounding box 정보 추출 함수
def convert_yolo_to_ssd_format(yolo_file_path, image_width, image_height, class_names):
    """
    YOLO 포맷 (class_id, x_center, y_center, width, height)를
    SSD 포맷 (xmin, ymin, xmax, ymax, class_name)으로 변환
    """
    boxes = []

    with open(yolo_file_path, 'r') as f:
        lines = f.readlines()

    for line in lines:
        parts = line.strip().split()
        class_id = int(parts[0])
        x_center = float(parts[1]) * image_width
        y_center = float(parts[2]) * image_height
        width = float(parts[3]) * image_width
        height = float(parts[4]) * image_height

        # 바운딩 박스 좌표 계산 (SSD 포맷)
        xmin = max(0, int(x_center - width / 2))
        ymin = max(0, int(y_center - height / 2))
        xmax = min(image_width, int(x_center + width / 2))
        ymax = min(image_height, int(y_center + height / 2))

        class_name = class_names[class_id]
        boxes.append((xmin, ymin, xmax, ymax, class_name))

    return boxes


In [16]:
# Pascal VOC XML 파일 생성 함수
def create_pascal_voc_xml(image_path, boxes, image_width, image_height, output_path):
    """
    바운딩 박스 정보를 이용해 Pascal VOC XML 포맷으로 변환
    """
    root = ET.Element("annotation")

    folder = ET.SubElement(root, "folder")
    folder.text = "images"

    filename_elem = ET.SubElement(root, "filename")
    filename_elem.text = os.path.basename(image_path)

    path_elem = ET.SubElement(root, "path")
    path_elem.text = image_path

    source = ET.SubElement(root, "source")
    database = ET.SubElement(source, "database")
    database.text = "Unknown"

    size = ET.SubElement(root, "size")
    width_elem = ET.SubElement(size, "width")
    width_elem.text = str(image_width)
    height_elem = ET.SubElement(size, "height")
    height_elem.text = str(image_height)
    depth = ET.SubElement(size, "depth")
    depth.text = "3"

    segmented = ET.SubElement(root, "segmented")
    segmented.text = "0"

    for box in boxes:
        xmin, ymin, xmax, ymax, class_name = box

        object_elem = ET.SubElement(root, "object")
        name = ET.SubElement(object_elem, "name")
        name.text = class_name

        pose = ET.SubElement(object_elem, "pose")
        pose.text = "Unspecified"

        truncated = ET.SubElement(object_elem, "truncated")
        truncated.text = "0"

        difficult = ET.SubElement(object_elem, "difficult")
        difficult.text = "0"

        bndbox = ET.SubElement(object_elem, "bndbox")
        xmin_elem = ET.SubElement(bndbox, "xmin")
        xmin_elem.text = str(xmin)
        ymin_elem = ET.SubElement(bndbox, "ymin")
        ymin_elem.text = str(ymin)
        xmax_elem = ET.SubElement(bndbox, "xmax")
        xmax_elem.text = str(xmax)
        ymax_elem = ET.SubElement(bndbox, "ymax")
        ymax_elem.text = str(ymax)

    # XML 파일 저장
    xml_str = minidom.parseString(ET.tostring(root)).toprettyxml(indent="  ")
    with open(output_path, "w") as f:
        f.write(xml_str)


In [17]:
# TFRecord 파일 생성 함수
def create_tf_example(image_path, xml_path):
    """
    이미지와 XML 파일로부터 TFRecord example 생성
    """
    with tf.io.gfile.GFile(image_path, 'rb') as fid:
        encoded_image = fid.read()

    # XML 파일에서 annotation 정보 파싱
    tree = ET.parse(xml_path)
    root = tree.getroot()

    size = root.find('size')
    width = int(size.find('width').text)
    height = int(size.find('height').text)

    xmins = []
    ymins = []
    xmaxs = []
    ymaxs = []
    classes_text = []
    classes = []

    for obj in root.findall('object'):
        class_name = obj.find('name').text
        class_id = class_names.index(class_name)

        bbox = obj.find('bndbox')
        xmin = float(bbox.find('xmin').text) / width
        ymin = float(bbox.find('ymin').text) / height
        xmax = float(bbox.find('xmax').text) / width
        ymax = float(bbox.find('ymax').text) / height

        xmins.append(xmin)
        ymins.append(ymin)
        xmaxs.append(xmax)
        ymaxs.append(ymax)
        classes_text.append(class_name.encode('utf8'))
        classes.append(class_id)

    tf_example = tf.train.Example(features=tf.train.Features(feature={
        'image/height': tf.train.Feature(int64_list=tf.train.Int64List(value=[height])),
        'image/width': tf.train.Feature(int64_list=tf.train.Int64List(value=[width])),
        'image/filename': tf.train.Feature(bytes_list=tf.train.BytesList(value=[os.path.basename(image_path).encode('utf8')])),
        'image/source_id': tf.train.Feature(bytes_list=tf.train.BytesList(value=[os.path.basename(image_path).encode('utf8')])),
        'image/encoded': tf.train.Feature(bytes_list=tf.train.BytesList(value=[encoded_image])),
        'image/format': tf.train.Feature(bytes_list=tf.train.BytesList(value=['jpeg'.encode('utf8')])),
        'image/object/bbox/xmin': tf.train.Feature(float_list=tf.train.FloatList(value=xmins)),
        'image/object/bbox/xmax': tf.train.Feature(float_list=tf.train.FloatList(value=xmaxs)),
        'image/object/bbox/ymin': tf.train.Feature(float_list=tf.train.FloatList(value=ymins)),
        'image/object/bbox/ymax': tf.train.Feature(float_list=tf.train.FloatList(value=ymaxs)),
        'image/object/class/text': tf.train.Feature(bytes_list=tf.train.BytesList(value=classes_text)),
        'image/object/class/label': tf.train.Feature(int64_list=tf.train.Int64List(value=classes)),
    }))

    return tf_example


In [18]:
# 클래스 이름 로드 (classes.txt 파일에서 로드한다고 가정)
class_names = []
with open(os.path.join(YOLO_DATASET_PATH, 'classes.txt'), 'r') as f:
    class_names = [line.strip() for line in f.readlines()]

# 라벨맵 파일 생성
def create_label_map(class_names, output_path):
    with open(output_path, 'w') as f:
        for i, name in enumerate(class_names):
            f.write('item {\n')
            f.write(f'  id: {i+1}\n')
            f.write(f'  name: "{name}"\n')
            f.write('}\n')

# 라벨맵 생성
create_label_map(class_names, os.path.join(SSD_OUTPUT_PATH, 'label_map.pbtxt'))


FileNotFoundError: [Errno 2] No such file or directory: '/content/drive/MyDrive/DL/complete_dataset/classes.txt'

In [19]:
# 데이터셋 분할 처리
def process_dataset_in_batches():
    # 모든 이미지 파일 경로 가져오기
    image_paths = glob.glob(os.path.join(YOLO_DATASET_PATH, 'images', '*.jpg'))
    image_paths.extend(glob.glob(os.path.join(YOLO_DATASET_PATH, 'images', '*.jpeg')))
    image_paths.extend(glob.glob(os.path.join(YOLO_DATASET_PATH, 'images', '*.png')))

    total_images = len(image_paths)
    print(f"총 {total_images}개의 이미지를 처리합니다.")

    # TFRecord 파일 준비
    train_writer = tf.io.TFRecordWriter(os.path.join(SSD_OUTPUT_PATH, 'train.record'))
    val_writer = tf.io.TFRecordWriter(os.path.join(SSD_OUTPUT_PATH, 'val.record'))

    # 학습/검증 데이터 분할 (80:20)
    np.random.shuffle(image_paths)
    split_idx = int(total_images * 0.8)
    train_images = image_paths[:split_idx]
    val_images = image_paths[split_idx:]

    # 학습 데이터 처리
    for batch_start in range(0, len(train_images), BATCH_SIZE):
        batch_paths = train_images[batch_start:batch_start + BATCH_SIZE]
        process_batch(batch_paths, train_writer, is_training=True)

    # 검증 데이터 처리
    for batch_start in range(0, len(val_images), BATCH_SIZE):
        batch_paths = val_images[batch_start:batch_start + BATCH_SIZE]
        process_batch(batch_paths, val_writer, is_training=False)

    train_writer.close()
    val_writer.close()


In [21]:
# 배치 단위 처리 함수
def process_batch(image_paths, tf_writer, is_training=True):
    dataset_type = "train" if is_training else "val"

    for img_path in tqdm(image_paths, desc=f"Processing {dataset_type} batch"):
        try:
            # 이미지 로드 및 크기 확인
            img = cv2.imread(img_path)
            if img is None:
                print(f"이미지를 로드할 수 없습니다: {img_path}")
                continue

            height, width, _ = img.shape

            # YOLO 형식 레이블 파일 경로
            base_name = os.path.splitext(os.path.basename(img_path))[0]
            label_path = os.path.join(YOLO_DATASET_PATH, 'labels', f"{base_name}.txt")

            if not os.path.exists(label_path):
                print(f"레이블 파일이 없습니다: {label_path}")
                continue

            # YOLO에서 SSD 형식으로 변환
            boxes = convert_yolo_to_ssd_format(label_path, width, height, class_names)

            # 이미지를 SSD 데이터셋 디렉토리로 복사
            dst_img_path = os.path.join(SSD_OUTPUT_PATH, 'images', f"{base_name}.jpg")
            cv2.imwrite(dst_img_path, img)

            # VOC XML 파일 생성
            xml_path = os.path.join(SSD_OUTPUT_PATH, 'annotations', f"{base_name}.xml")
            create_pascal_voc_xml(dst_img_path, boxes, width, height, xml_path)

            # TFRecord에 추가
            tf_example = create_tf_example(dst_img_path, xml_path)
            tf_writer.write(tf_example.SerializeToString())

            # 메모리 정리
            del img, boxes, tf_example

        except Exception as e:
            print(f"오류 발생 ({img_path}): {str(e)}")
            continue

    # 메모리 정리
    import gc
    gc.collect()

In [22]:
# MobileNet + SSD 모델 학습 함수
def train_mobilenet_ssd():
    # 필요한 TensorFlow Object Detection API 설치
    if not os.path.exists('/content/models'):
        !git clone --quiet https://github.com/tensorflow/models.git

    # Protobuf 설치 및 컴파일
    !apt-get install -qq protobuf-compiler
    !cd /content/models/research && protoc object_detection/protos/*.proto --python_out=.
    !cd /content/models/research && pip install -q .

    # 모델 설정 파일 생성
    config_text = """
    model {
      ssd {
        num_classes: %NUM_CLASSES%
        image_resizer {
          fixed_shape_resizer {
            height: 300
            width: 300
          }
        }
        feature_extractor {
          type: "ssd_mobilenet_v2"
          depth_multiplier: 1.0
          min_depth: 16
          conv_hyperparams {
            regularizer {
              l2_regularizer {
                weight: 4.0e-05
              }
            }
            initializer {
              truncated_normal_initializer {
                mean: 0.0
                stddev: 0.03
              }
            }
            activation: RELU_6
            batch_norm {
              decay: 0.9997
              center: true
              scale: true
              epsilon: 0.001
              train: true
            }
          }
          override_base_feature_extractor_hyperparams: true
        }
        box_coder {
          faster_rcnn_box_coder {
            y_scale: 10.0
            x_scale: 10.0
            height_scale: 5.0
            width_scale: 5.0
          }
        }
        matcher {
          argmax_matcher {
            matched_threshold: 0.5
            unmatched_threshold: 0.5
            ignore_thresholds: false
            negatives_lower_than_unmatched: true
            force_match_for_each_row: true
          }
        }
        similarity_calculator {
          iou_similarity {
          }
        }
        box_predictor {
          convolutional_box_predictor {
            conv_hyperparams {
              regularizer {
                l2_regularizer {
                  weight: 4.0e-05
                }
              }
              initializer {
                truncated_normal_initializer {
                  mean: 0.0
                  stddev: 0.03
                }
              }
              activation: RELU_6
              batch_norm {
                decay: 0.9997
                center: true
                scale: true
                epsilon: 0.001
                train: true
              }
            }
            min_depth: 0
            max_depth: 0
            num_layers_before_predictor: 0
            use_dropout: false
            dropout_keep_probability: 0.8
            kernel_size: 3
            box_code_size: 4
            apply_sigmoid_to_scores: false
          }
        }
        anchor_generator {
          ssd_anchor_generator {
            num_layers: 6
            min_scale: 0.2
            max_scale: 0.95
            aspect_ratios: 1.0
            aspect_ratios: 2.0
            aspect_ratios: 0.5
            aspect_ratios: 3.0
            aspect_ratios: 0.3333
          }
        }
        post_processing {
          batch_non_max_suppression {
            score_threshold: 1.0e-08
            iou_threshold: 0.6
            max_detections_per_class: 100
            max_total_detections: 100
          }
          score_converter: SIGMOID
        }
        normalize_loss_by_num_matches: true
        loss {
          localization_loss {
            weighted_smooth_l1 {
            }
          }
          classification_loss {
            weighted_sigmoid_focal {
              gamma: 2.0
              alpha: 0.75
            }
          }
          hard_example_miner {
            num_hard_examples: 3000
            iou_threshold: 0.99
            loss_type: CLASSIFICATION
            max_negatives_per_positive: 3
            min_negatives_per_image: 3
          }
          classification_weight: 1.0
          localization_weight: 1.0
        }
      }
    }
    train_config {
      batch_size: 8
      data_augmentation_options {
        random_horizontal_flip {
        }
      }
      data_augmentation_options {
        random_crop_image {
          min_object_covered: 0.0
          min_aspect_ratio: 0.75
          max_aspect_ratio: 3.0
          min_area: 0.75
          max_area: 1.0
          overlap_thresh: 0.0
        }
      }
      optimizer {
        momentum_optimizer {
          learning_rate {
            cosine_decay_learning_rate {
              learning_rate_base: 0.0799999982119
              total_steps: 50000
              warmup_learning_rate: 0.0266660004854
              warmup_steps: 1000
            }
          }
          momentum_optimizer_value: 0.9
        }
        use_moving_average: false
      }
      fine_tune_checkpoint: "/content/models/research/object_detection/ssd_mobilenet_v2_coco_2018_03_29/model.ckpt"
      fine_tune_checkpoint_type: "detection"
      num_steps: 50000
    }
    train_input_reader {
      label_map_path: "%LABEL_MAP_PATH%"
      tf_record_input_reader {
        input_path: "%TRAIN_RECORD_PATH%"
      }
    }
    eval_config {
      num_examples: 8000
      metrics_set: "coco_detection_metrics"
      use_moving_averages: false
    }
    eval_input_reader {
      label_map_path: "%LABEL_MAP_PATH%"
      shuffle: false
      num_readers: 1
      tf_record_input_reader {
        input_path: "%VAL_RECORD_PATH%"
      }
    }
    """

    # 설정 파일에 경로 및 클래스 수 대체
    config_text = config_text.replace("%NUM_CLASSES%", str(len(class_names)))
    config_text = config_text.replace("%LABEL_MAP_PATH%", os.path.join(SSD_OUTPUT_PATH, 'label_map.pbtxt'))
    config_text = config_text.replace("%TRAIN_RECORD_PATH%", os.path.join(SSD_OUTPUT_PATH, 'train.record'))
    config_text = config_text.replace("%VAL_RECORD_PATH%", os.path.join(SSD_OUTPUT_PATH, 'val.record'))

    config_path = os.path.join(SSD_OUTPUT_PATH, 'pipeline.config')
    with open(config_path, 'w') as f:
        f.write(config_text)

    # 사전 훈련된 MobileNet+SSD 모델 다운로드
    !wget http://download.tensorflow.org/models/object_detection/ssd_mobilenet_v2_coco_2018_03_29.tar.gz
    !tar -xzf ssd_mobilenet_v2_coco_2018_03_29.tar.gz

    # 모델 훈련
    !python /content/models/research/object_detection/model_main.py \
      --pipeline_config_path={config_path} \
      --model_dir={os.path.join(SSD_OUTPUT_PATH, 'model')} \
      --alsologtostderr

# 변환 및 훈련 실행
if __name__ == "__main__":
    # 데이터셋 변환
    process_dataset_in_batches()

    # 모델 훈련
    train_mobilenet_ssd()

총 0개의 이미지를 처리합니다.
[31mERROR: Directory '.' is not installable. Neither 'setup.py' nor 'pyproject.toml' found.[0m[31m
[0m--2025-03-31 05:33:20--  http://download.tensorflow.org/models/object_detection/ssd_mobilenet_v2_coco_2018_03_29.tar.gz
Resolving download.tensorflow.org (download.tensorflow.org)... 142.251.167.207, 172.253.115.207, 172.253.122.207, ...
Connecting to download.tensorflow.org (download.tensorflow.org)|142.251.167.207|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 187925923 (179M) [application/x-tar]
Saving to: ‘ssd_mobilenet_v2_coco_2018_03_29.tar.gz’


2025-03-31 05:33:21 (131 MB/s) - ‘ssd_mobilenet_v2_coco_2018_03_29.tar.gz’ saved [187925923/187925923]

2025-03-31 05:33:25.272810: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1743399205.295565   27995 cuda_dnn.cc:8310] Unable to register cuDN

In [None]:
# 행동 카테고리 정의 (실제 데이터셋에 맞게 수정 필요)
ACTION_CATEGORIES = {
    1: "walking",
    2: "running",
    3: "sitting",
    4: "standing"
}

In [None]:
# COCO 형식 데이터셋을 위한 커스텀 데이터셋 클래스
class ActionDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None):
        self.coco = COCO(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        self.ids = list(sorted(self.coco.imgs.keys()))

    def __getitem__(self, idx):
        # 이미지 ID로 이미지 로드
        img_id = self.ids[idx]
        img_info = self.coco.loadImgs(img_id)[0]
        img_path = os.path.join(self.img_dir, img_info['file_name'])
        img = Image.open(img_path).convert("RGB")

        # 어노테이션 정보 가져오기
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        anns = self.coco.loadAnns(ann_ids)

        # 바운딩 박스와 라벨 정보 추출
        boxes = []
        labels = []

        for ann in anns:
            # COCO 형식의 바운딩 박스를 [x_min, y_min, x_max, y_max] 형식으로 변환
            x_min = ann['bbox'][0]
            y_min = ann['bbox'][1]
            width = ann['bbox'][2]
            height = ann['bbox'][3]
            x_max = x_min + width
            y_max = y_min + height

            # 이미지 경계 확인
            img_width, img_height = img.size
            x_min = max(0, x_min)
            y_min = max(0, y_min)
            x_max = min(img_width, x_max)
            y_max = min(img_height, y_max)

            # 유효한 바운딩 박스인지 확인
            if x_max > x_min and y_max > y_min:
                boxes.append([x_min, y_min, x_max, y_max])
                labels.append(ann['category_id'])

        # 이미지 변환
        if self.transform:
            img = self.transform(img)

        # 타겟 딕셔너리 생성
        target = {}
        if len(boxes) > 0:
            target["boxes"] = torch.as_tensor(boxes, dtype=torch.float32)
            target["labels"] = torch.as_tensor(labels, dtype=torch.int64)
        else:
            # 바운딩 박스가 없는 경우 더미 박스 생성
            target["boxes"] = torch.zeros((0, 4), dtype=torch.float32)
            target["labels"] = torch.zeros((0), dtype=torch.int64)

        return img, target

    def __len__(self):
        return len(self.ids)


In [None]:
# 데이터 전처리
def get_transform():
    transforms_list = [
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ]
    return transforms.Compose(transforms_list)

# 모델 준비
def get_model(num_classes):
    # MobileNet + SSD 모델 로드 (사전 학습된 가중치 사용)
    model = ssdlite320_mobilenet_v3_large(pretrained=True)

    # 클래스 수 변경 (배경 클래스 포함)
    in_channels = model.head.classification_head.in_channels
    num_anchors = model.head.classification_head.num_anchors

    # 새로운 분류 헤드 생성
    model.head.classification_head = SSDClassificationHead(
        in_channels, num_anchors, num_classes + 1  # 배경 클래스 포함
    )

    return model

In [None]:
# 학습 함수
def train_one_epoch(model, optimizer, data_loader, device):
    model.train()

    total_loss = 0
    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        total_loss += losses.item()

    return total_loss / len(data_loader)


In [None]:
# 평가 함수
def evaluate(model, data_loader, device):
    model.eval()

    predictions = []
    ground_truths = []

    with torch.no_grad():
        for images, targets in data_loader:
            images = list(img.to(device) for img in images)

            outputs = model(images)

            for i, output in enumerate(outputs):
                pred_boxes = output['boxes'].cpu().numpy()
                pred_scores = output['scores'].cpu().numpy()
                pred_labels = output['labels'].cpu().numpy()

                gt_boxes = targets[i]['boxes'].cpu().numpy()
                gt_labels = targets[i]['labels'].cpu().numpy()

                predictions.append({
                    'boxes': pred_boxes,
                    'scores': pred_scores,
                    'labels': pred_labels
                })

                ground_truths.append({
                    'boxes': gt_boxes,
                    'labels': gt_labels
                })

    # 간단한 mAP 계산 (실제로는 pycocotools.cocoeval 사용 권장)
    # 이 예제에서는 간단한 구현만 포함합니다
    return predictions, ground_truths

In [None]:
# 시각화 함수
def visualize_prediction(image, prediction, threshold=0.5):
    # PIL 이미지로 변환
    image = image.permute(1, 2, 0).cpu().numpy()
    image = (image * [0.229, 0.224, 0.225] + [0.485, 0.456, 0.406]) * 255
    image = image.astype(np.uint8)
    image = Image.fromarray(image)
    draw = ImageDraw.Draw(image)

    boxes = prediction['boxes'].cpu().numpy()
    scores = prediction['scores'].cpu().numpy()
    labels = prediction['labels'].cpu().numpy()

    # 임계값보다 높은 신뢰도를 가진 예측만 시각화
    for box, score, label in zip(boxes, scores, labels):
        if score >= threshold:
            x_min, y_min, x_max, y_max = box
            draw.rectangle([(x_min, y_min), (x_max, y_max)], outline="red", width=2)

            # 라벨과 신뢰도 표시
            label_text = f"{ACTION_CATEGORIES.get(label, 'unknown')}: {score:.2f}"
            draw.text((x_min, y_min - 10), label_text, fill="red")

    return image

In [None]:
# 메인 함수
def main():
    # GPU 사용 가능 여부 확인
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    # 데이터셋 및 데이터로더 생성
    try:
        dataset = ActionDataset(ANNOTATIONS_FILE, IMAGES_DIR, transform=get_transform())

        # 데이터셋 분할 (학습:검증 = 8:2)
        dataset_size = len(dataset)
        train_size = int(0.8 * dataset_size)
        val_size = dataset_size - train_size

        train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

        # 데이터 로더 생성
        train_loader = DataLoader(
            train_dataset, batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x))
        )

        val_loader = DataLoader(
            val_dataset, batch_size=4, shuffle=False, collate_fn=lambda x: tuple(zip(*x))
        )

        print(f"Training samples: {len(train_dataset)}")
        print(f"Validation samples: {len(val_dataset)}")

        # 모델 생성
        num_classes = len(ACTION_CATEGORIES)
        model = get_model(num_classes)
        model.to(device)

        # 옵티마이저 및 학습률 스케줄러 설정
        params = [p for p in model.parameters() if p.requires_grad]
        optimizer = optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
        lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

        # 학습 실행
        num_epochs = 10
        for epoch in range(num_epochs):
            print(f"Epoch {epoch+1}/{num_epochs}")

            # 학습
            start_time = time.time()
            train_loss = train_one_epoch(model, optimizer, train_loader, device)
            end_time = time.time()

            print(f"Train Loss: {train_loss:.4f}, Time: {end_time - start_time:.2f}s")

            # 평가
            predictions, ground_truths = evaluate(model, val_loader, device)

            # 학습률 업데이트
            lr_scheduler.step()

            # 모델 저장
            if (epoch + 1) % 5 == 0:
                torch.save(model.state_dict(), f"mobilenet_ssd_action_epoch{epoch+1}.pth")
                print(f"Model saved at epoch {epoch+1}")

        # 최종 모델 저장
        torch.save(model.state_dict(), "mobilenet_ssd_action_final.pth")
        print("Final model saved")

        # 테스트 이미지에 대한 예측 시각화 (샘플)
        if len(val_dataset) > 0:
            image, target = val_dataset[0]
            image_tensor = image.unsqueeze(0).to(device)

            model.eval()
            with torch.no_grad():
                prediction = model(image_tensor)[0]

            # 결과 시각화
            result_image = visualize_prediction(image, prediction)
            plt.figure(figsize=(10, 10))
            plt.imshow(result_image)
            plt.axis('off')
            plt.savefig('prediction_result.png')
            plt.show()

            print("Prediction visualization saved as 'prediction_result.png'")

        # 다른 모델과 비교 (YOLO, ResNet50)
        print("\n=== Model Comparison ===")
        print("1. MobileNet-SSD:")
        print("   - 정확도: 적절한 수준")
        print("   - 처리속도: 빠름 (모바일 디바이스에 최적화)")
        print("   - 모델 크기: 작음 (~20MB)")
        print("   - 실용성: 실시간 처리에 적합, 리소스 제약 환경에서 효율적")

        print("\n2. YOLO (참고):")
        print("   - 정확도: 높음")
        print("   - 처리속도: 매우 빠름")
        print("   - 모델 크기: 중간 (~50-250MB, 버전에 따라 다름)")
        print("   - 실용성: 실시간 처리에 매우 적합, 고성능 GPU에서 최적 성능")

        print("\n3. ResNet50-FPN (참고):")
        print("   - 정확도: 매우 높음")
        print("   - 처리속도: 느림")
        print("   - 모델 크기: 큼 (~100MB 이상)")
        print("   - 실용성: 높은 정확도가 필요하고 속도가 중요하지 않은 경우 적합")

    except Exception as e:
        print(f"Error occurred: {e}")


if __name__ == "__main__":
    main()
