In [1]:
import tensorflow
from tensorflow.python.client import device_lib
device_lib.list_local_devices()

[name: "/device:CPU:0"
 device_type: "CPU"
 memory_limit: 268435456
 locality {
 }
 incarnation: 14135599746533502644
 xla_global_id: -1,
 name: "/device:GPU:0"
 device_type: "GPU"
 memory_limit: 14626652160
 locality {
   bus_id: 1
   links {
   }
 }
 incarnation: 9543946156236132401
 physical_device_desc: "device: 0, name: Tesla T4, pci bus id: 0000:00:04.0, compute capability: 7.5"
 xla_global_id: 416903419]

# 9.01~9.09 Faster R-CNN 모델 실습

1. 데이터셋 클래스 정의

In [None]:
import os
import torch
from PIL import Image
from pycocotools.coco import COCO
from torch.utils.data import Dataset


class COCODataset(Dataset):
    def __init__(self, root, train, transform=None):
        super().__init__()
        # 훈련 데이터인지 검증 데이터인지 설정함
        directory = "train" if train else "val"
        # 주어진 경로에서 어노테이션 파일을 불러옴
        annotations = os.path.join(root, "annotations", f"{directory}_annotations.json")

        self.coco = COCO(annotations)  # COCO API로 어노테이션 로드함
        self.iamge_path = os.path.join(root, directory)  # 이미지 경로 설정함
        self.transform = transform  # 이미지 변환 설정

        self.categories = self._get_categories()  # 카테고리 이름 로드함
        self.data = self._load_data()  # 데이터셋 로드함

    def _get_categories(self):
        # COCO 데이터셋의 카테고리 정보를 딕셔너리로 저장함
        categories = {0: "background"}
        for category in self.coco.cats.values():
            categories[category["id"]] = category["name"]
        return categories

    def _load_data(self):
        # 이미지와 라벨 데이터를 리스트 형태로 로드함
        data = []
        for _id in self.coco.imgs:
            file_name = self.coco.loadImgs(_id)[0]["file_name"]  # 파일 이름 가져옴
            image_path = os.path.join(self.iamge_path, file_name)  # 파일 경로 생성
            image = Image.open(image_path).convert("RGB")  # 이미지를 RGB로 변환

            boxes = []  # 박스 좌표 저장할 리스트
            labels = []  # 라벨 저장할 리스트
            anns = self.coco.loadAnns(self.coco.getAnnIds(_id))  # 이미지에 연결된 어노테이션 가져옴
            for ann in anns:
                x, y, w, h = ann["bbox"]  # 바운딩 박스 정보 추출

                boxes.append([x, y, x + w, y + h])  # 좌표 저장함
                labels.append(ann["category_id"])  # 라벨 저장함

            # PyTorch 모델에 입력하기 위한 타겟 포맷 생성함
            target = {
                "image_id": torch.LongTensor([_id]),
                "boxes": torch.FloatTensor(boxes),
                "labels": torch.LongTensor(labels)
            }
            data.append([image, target])  # 데이터 리스트에 추가
        return data

    def __getitem__(self, index):
        # 인덱스에 해당하는 데이터 반환
        image, target = self.data[index]
        if self.transform:
            image = self.transform(image)  # 변환 적용
        return image, target

    def __len__(self):
        return len(self.data)  # 데이터셋 길이 반환

2. 데이터 변환 및 데이터로더 설정

In [None]:
from torchvision import transforms
from torch.utils.data import DataLoader


def collator(batch):
    # 배치 데이터 병합 함수 정의함
    return tuple(zip(*batch))

# 이미지 변환 적용
transform = transforms.Compose(
    [
        transforms.PILToTensor(),  # 이미지를 텐서로 변환
        transforms.ConvertImageDtype(dtype=torch.float)  # dtype을 float으로 변경
    ]
)

# 데이터셋 정의
train_dataset = COCODataset("../datasets/coco", train=True, transform=transform)
test_dataset = COCODataset("../datasets/coco", train=False, transform=transform)

# 데이터로더 정의
train_dataloader = DataLoader(
    train_dataset, batch_size=4, shuffle=True, drop_last=True, collate_fn=collator
)
test_dataloader = DataLoader(
    test_dataset, batch_size=1, shuffle=True, drop_last=True, collate_fn=collator
)

3. Faster R-CNN 모델 정의

In [None]:
from torchvision import models
from torchvision import ops
from torchvision.models.detection import rpn
from torchvision.models.detection import FasterRCNN


# VGG16 백본 모델을 가져와 특징 추출기로 활용함
backbone = models.vgg16(weights="VGG16_Weights.IMAGENET1K_V1").features
backbone.out_channels = 512  # 백본 출력 채널 수 설정

# 앵커 생성기 설정
anchor_generator = rpn.AnchorGenerator(
    sizes=((32, 64, 128, 256, 512),),  # 다양한 크기의 앵커 생성
    aspect_ratios=((0.5, 1.0, 2.0),)  # 가로세로 비율 설정
)

# RoI Pooler 설정 (특징 맵의 크기를 고정함)
roi_pooler = ops.MultiScaleRoIAlign(
    featmap_names=["0"],  # 사용할 특징 맵 레이어 이름
    output_size=(7, 7),  # 출력 크기
    sampling_ratio=2  # 샘플링 비율
)

# 모델 정의
device = "cuda" if torch.cuda.is_available() else "cpu"
model = FasterRCNN(
    backbone=backbone,
    num_classes=3,  # 클래스 수 정의 (예: 배경 포함 3개)
    rpn_anchor_generator=anchor_generator,
    box_roi_pool=roi_pooler
).to(device)

4. 모델 학습 설정 및 훈련 루프



In [None]:
from torch import optim


# 학습 가능한 파라미터를 옵티마이저에 전달함
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)  # SGD 설정
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)  # 학습률 감소 스케줄러

# 모델 훈련 루프
for epoch in range(5):
    cost = 0.0
    for idx, (images, targets) in enumerate(train_dataloader):
        images = list(image.to(device) for image in images)  # 이미지를 장치로 이동
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]  # 타겟도 이동

        loss_dict = model(images, targets)  # 손실 계산
        losses = sum(loss for loss in loss_dict.values())  # 모든 손실 합산

        optimizer.zero_grad()  # 그래디언트 초기화
        losses.backward()  # 역전파 수행
        optimizer.step()  # 파라미터 업데이트

        cost += losses

    lr_scheduler.step()  # 학습률 스케줄러 업데이트
    cost = cost / len(train_dataloader)  # 평균 손실 계산
    print(f"Epoch : {epoch+1:4d}, Cost : {cost:.3f}")  # 손실 출력

5. 결과 시각화

In [None]:
import numpy as np
from PIL import Image
from matplotlib import pyplot as plt
from torchvision.transforms.functional import to_pil_image


# 바운딩 박스를 이미지에 그리는 함수 정의
def draw_bbox(ax, box, text, color):
    ax.add_patch(
        plt.Rectangle(
            xy=(box[0], box[1]),
            width=box[2] - box[0],
            height=box[3] - box[1],
            fill=False,
            edgecolor=color,
            linewidth=2,
        )
    )
    ax.annotate(
        text=text,
        xy=(box[0] - 5, box[1] - 5),
        color=color,
        weight="bold",
        fontsize=13,
    )

# 모델 평가 및 결과 시각화
threshold = 0.5  # 예측 임계값 설정
categories = test_dataset.categories
with torch.no_grad():
    model.eval()  # 평가 모드 전환
    for images, targets in test_dataloader:
        images = [image.to(device) for image in images]  # 이미지 장치로 이동
        outputs = model(images)  # 모델 예측 수행

        boxes = outputs[0]["boxes"].to("cpu").numpy()  # 예측된 박스
        labels = outputs[0]["labels"].to("cpu").numpy()  # 예측된 라벨
        scores = outputs[0]["scores"].to("cpu").numpy()  # 예측된 점수

        boxes = boxes[scores >= threshold].astype(np.int32)  # 임계값 기준 필터링
        labels = labels[scores >= threshold]
        scores = scores[scores >= threshold]

        fig = plt.figure(figsize=(8, 8))
        ax = fig.add_subplot(1, 1, 1)
        plt.imshow(to_pil_image(images[0]))  # 이미지를 표시

        for box, label, score in zip(boxes, labels, scores):
            draw_bbox(ax, box, f"{categories[label]} - {score:.4f}", "red")  # 예측 박스 표시

        tboxes = targets[0]["boxes"].numpy()  # 실제 박스
        tlabels = targets[0]["labels"].numpy()  # 실제 라벨
        for box, label in zip(tboxes, tlabels):
            draw_bbox(ax, box, f"{categories[label]}", "blue")  # 실제 박스 표시

        plt.show()

# 예제 9.10~9.12 SSD 모델 실습.

1. SSD Backbone 정의


In [None]:
from torch import nn
from collections import OrderedDict

class SSDBackbone(nn.Module):
    def __init__(self, backbone):
        super().__init__()
        # ResNet의 각 레이어를 SSD에 적합하게 분리함
        layer0 = nn.Sequential(backbone.conv1, backbone.bn1, backbone.relu)
        layer1 = backbone.layer1
        layer2 = backbone.layer2
        layer3 = backbone.layer3
        layer4 = backbone.layer4

        self.features = nn.Sequential(layer0, layer1, layer2, layer3)  # 기본 특징 추출 레이어
        self.upsampling = nn.Sequential(  # 업샘플링 레이어 정의
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=1),
            nn.ReLU(inplace=True),
        )
        # Extra layers를 정의하여 추가적인 특징을 추출함
        self.extra = nn.ModuleList(
            [
                nn.Sequential(
                    layer4,  # 기본 특징 레이어에서 시작
                    nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=1),
                    nn.ReLU(inplace=True),
                ),
                nn.Sequential(
                    nn.Conv2d(1024, 256, kernel_size=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(256, 512, kernel_size=3, padding=1, stride=2),
                    nn.ReLU(inplace=True),
                ),
                nn.Sequential(
                    nn.Conv2d(512, 128, kernel_size=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(128, 256, kernel_size=3, padding=1, stride=2),
                    nn.ReLU(inplace=True),
                ),
                nn.Sequential(
                    nn.Conv2d(256, 128, kernel_size=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(128, 256, kernel_size=3),
                    nn.ReLU(inplace=True),
                ),
                nn.Sequential(
                    nn.Conv2d(256, 128, kernel_size=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(128, 256, kernel_size=3),
                    nn.ReLU(inplace=True),
                ),
                nn.Sequential(
                    nn.Conv2d(256, 128, kernel_size=1),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(128, 256, kernel_size=4),
                    nn.ReLU(inplace=True),
                )
            ]
        )

    def forward(self, x):
        x = self.features(x)  # 기본 특징 추출
        output = [self.upsampling(x)]  # 업샘플링 레이어 출력 저장

        for block in self.extra:
            x = block(x)  # Extra layers 통과
            output.append(x)  # 각 결과를 저장

        return OrderedDict([(str(i), v) for i, v in enumerate(output)])  # OrderedDict로 반환

2. SDD 모델 구성

In [None]:
import torch
from torchvision.models import resnet34
from torchvision.models.detection import ssd
from torchvision.models.detection.anchor_utils import DefaultBoxGenerator

# ResNet34를 기반으로 SSD Backbone 구성
backbone_base = resnet34(weights="ResNet34_Weights.IMAGENET1K_V1")
backbone = SSDBackbone(backbone_base)

# 앵커 박스 생성기 정의
anchor_generator = DefaultBoxGenerator(
    aspect_ratios=[[2], [2, 3], [2, 3], [2, 3], [2, 3], [2], [2]],  # 각 레이어에 대해 다른 비율 설정
    scales=[0.07, 0.15, 0.33, 0.51, 0.69, 0.87, 1.05, 1.20],  # 스케일 정의
    steps=[8, 16, 32, 64, 100, 300, 512],  # 각 레이어의 격자 크기 설정
)

# SSD 모델 정의
device = "cuda" if torch.cuda.is_available() else "cpu"
model = ssd.SSD(
    backbone=backbone,
    anchor_generator=anchor_generator,
    size=(512, 512),  # 입력 이미지 크기 설정
    num_classes=3  # 클래스 수 설정 (배경 포함)
).to(device)

3. COCO 데이터셋 클래스 정의

In [None]:
import os
import torch
from PIL import Image
from pycocotools.coco import COCO
from torch.utils.data import Dataset

class COCODataset(Dataset):
    def __init__(self, root, train, transform=None):
        super().__init__()
        directory = "train" if train else "val"  # 훈련 또는 검증 경로 설정
        annotations = os.path.join(root, "annotations", f"{directory}_annotations.json")

        self.coco = COCO(annotations)  # COCO 어노테이션 로드
        self.iamge_path = os.path.join(root, directory)  # 이미지 경로 설정
        self.transform = transform  # 변환 설정

        self.categories = self._get_categories()  # 카테고리 로드
        self.data = self._load_data()  # 데이터 로드

    def _get_categories(self):
        categories = {0: "background"}  # 배경 추가
        for category in self.coco.cats.values():
            categories[category["id"]] = category["name"]  # 카테고리 매핑
        return categories

    def _load_data(self):
        data = []
        for _id in self.coco.imgs:
            file_name = self.coco.loadImgs(_id)[0]["file_name"]  # 이미지 파일 이름 가져오기
            image_path = os.path.join(self.iamge_path, file_name)  # 경로 생성
            image = Image.open(image_path).convert("RGB")  # RGB 이미지로 변환

            boxes = []
            labels = []
            anns = self.coco.loadAnns(self.coco.getAnnIds(_id))  # 어노테이션 로드
            for ann in anns:
                x, y, w, h = ann["bbox"]  # 바운딩 박스 추출

                boxes.append([x, y, x + w, y + h])  # 좌표 저장
                labels.append(ann["category_id"])  # 라벨 저장

            target = {  # 타겟 구성
                "image_id": torch.LongTensor([_id]),
                "boxes": torch.FloatTensor(boxes),
                "labels": torch.LongTensor(labels)
            }
            data.append([image, target])  # 데이터 추가
        return data

    def __getitem__(self, index):
        image, target = self.data[index]  # 이미지와 타겟 반환
        if self.transform:
            image = self.transform(image)  # 변환 적용
        return image, target

    def __len__(self):
        return len(self.data)  # 데이터 길이 반환

4. 데이터로더 구성

In [None]:
from torchvision import transforms
from torch.utils.data import DataLoader

def collator(batch):
    return tuple(zip(*batch))  # 배치 데이터를 병합

transform = transforms.Compose(
    [
        transforms.PILToTensor(),  # 이미지를 텐서로 변환
        transforms.ConvertImageDtype(dtype=torch.float)  # 데이터 타입 변경
    ]
)

# 훈련 및 검증 데이터셋 정의
train_dataset = COCODataset("../datasets/coco", train=True, transform=transform)
test_dataset = COCODataset("../datasets/coco", train=False, transform=transform)

# 데이터로더 정의
train_dataloader = DataLoader(
    train_dataset, batch_size=4, shuffle=True, drop_last=True, collate_fn=collator
)
test_dataloader = DataLoader(
    test_dataset, batch_size=1, shuffle=True, drop_last=True, collate_fn=collator
)

5. 모델 학습

In [None]:
from torch import optim

# 학습 가능한 파라미터 설정
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)  # 옵티마이저 정의
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)  # 학습률 스케줄러 정의

# 학습 루프
for epoch in range(10):
    cost = 0.0
    for idx, (images, targets) in enumerate(train_dataloader):
        images = list(image.to(device) for image in images)  # 이미지를 장치로 이동
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]  # 타겟도 장치로 이동

        loss_dict = model(images, targets)  # 손실 계산
        losses = sum(loss for loss in loss_dict.values())  # 모든 손실 합산

        optimizer.zero_grad()  # 그래디언트 초기화
        losses.backward()  # 역전파 수행
        optimizer.step()  # 파라미터 업데이트

        cost += losses  # 손실 누적

    lr_scheduler.step()  # 학습률 업데이트
    cost = cost / len(train_dataloader)  # 평균 손실 계산
    print(f"Epoch : {epoch+1:4d}, Cost : {cost:.3f}")  # 학습 상태 출력

6. 결과 시각화 및 평ㅇ가

In [None]:
import numpy as np
from PIL import Image
from matplotlib import pyplot as plt
from torchvision.transforms.functional import to_pil_image

# 바운딩 박스를 그리는 함수 정의
def draw_bbox(ax, box, text, color):
    ax.add_patch(
        plt.Rectangle(
            xy=(box[0], box[1]),
            width=box[2] - box[0],
            height=box[3] - box[1],
            fill=False,
            edgecolor=color,
            linewidth=2,
        )
    )
    ax.annotate(
        text=text,
        xy=(box[0] - 5, box[1] - 5),
        color=color,
        weight="bold",
        fontsize=13,
    )

# 예측 결과를 시각화
threshold = 0.5  # 점수 임계값
categories = test_dataset.categories
with torch.no_grad():
    model.eval()  # 모델을 평가 모드로 설정
    for images, targets in test_dataloader:
        images = [image.to(device) for image in images]  # 이미지를 장치로 이동
        outputs = model(images)  # 모델 예측 수행

        boxes = outputs[0]["boxes"].to("cpu").numpy()  # 예측된 바운딩 박스
        labels = outputs[0]["labels"].to("cpu").numpy()  # 예측된 라벨
        scores = outputs[0]["scores"].to("cpu").numpy()  # 예측된 점수

        boxes = boxes[scores >= threshold].astype(np.int32)  # 점수 기준 필터링
        labels = labels[scores >= threshold]
        scores = scores[scores >= threshold]

        fig = plt.figure(figsize=(8, 8))
        ax = fig.add_subplot(1, 1, 1)
        plt.imshow(to_pil_image(images[0]))  # 이미지를 출력

        for box, label, score in zip(boxes, labels, scores):
            draw_bbox(ax, box, f"{categories[label]} - {score:.4f}", "red")  # 예측 박스 출력

        tboxes = targets[0]["boxes"].numpy()  # 실제 바운딩 박스
        tlabels = targets[0]["labels"].numpy()  # 실제 라벨
        for box, label in zip(tboxes, tlabels):
            draw_bbox(ax, box, f"{categories[label]}", "blue")  # 실제 박스 출력

        plt.show()

# 예제 9.13~9.19 FCN 모델 실습

In [None]:
# ================================ 데이터셋 정의 =================================
import os
import json
import torch
import numpy as np
from PIL import Image
from torch.utils.data import Dataset

# SegmentationDataset 클래스 정의
class SegmentationDataset(Dataset):
    def __init__(self, root, train, transform=None, target_transform=None):
        super().__init__()
        self.root = os.path.join(root, "VOCdevkit", "VOC2012")  # VOC 데이터 경로 설정
        file_type = "train" if train else "val"  # 훈련 데이터인지 검증 데이터인지 선택
        file_path = os.path.join(
            self.root, "ImageSets", "Segmentation", f"{file_type}.txt"
        )
        with open(os.path.join(self.root, "classes.json"), "r") as file:
            self.categories = json.load(file)  # 클래스 정보 로드
        self.files = open(file_path).read().splitlines()  # 데이터 파일 이름 로드
        self.transform = transform  # 이미지 변환 설정
        self.target_transform = target_transform  # 타겟 변환 설정
        self.data = self._load_data()  # 데이터 로드

    # 데이터 로드 함수
    def _load_data(self):
        data = []
        for file in self.files:
            image_path = os.path.join(self.root, "JPEGImages", f"{file}.jpg")
            mask_path = os.path.join(self.root, "SegmentationClass", f"{file}.png")
            image = Image.open(image_path).convert("RGB")  # 이미지 로드
            mask = np.array(Image.open(mask_path))  # 마스크 로드
            mask = np.where(mask == 255, 0, mask)  # 배경 클래스 조정
            target = torch.LongTensor(mask).unsqueeze(0)  # 마스크 텐서로 변환
            data.append([image, target])  # 데이터 추가
        return data

    def __getitem__(self, index):
        image, mask = self.data[index]
        if self.transform is not None:
            image = self.transform(image)  # 이미지 변환
        if self.target_transform is not None:
            mask = self.target_transform(mask)  # 타겟 변환
        return image, mask

    def __len__(self):
        return len(self.data)  # 데이터셋 크기 반환

# ============================= 데이터셋 로더 설정 ================================
from torchvision import transforms
from torch.utils.data import DataLoader

# 데이터 변환 정의
transform = transforms.Compose(
    [
        transforms.PILToTensor(),  # 이미지를 텐서로 변환
        transforms.ConvertImageDtype(dtype=torch.float),  # 데이터 타입 변경
        transforms.Resize(size=(224, 224)),  # 이미지 크기 조정
    ]
)
target_transform = transforms.Compose(
    [
        transforms.Resize(
            size=(224, 224),
            interpolation=transforms.InterpolationMode.NEAREST,  # 최근접 이웃 보간법
        )
    ]
)

# 데이터셋 및 데이터로더 생성
train_dataset = SegmentationDataset(
    "../datasets", train=True, transform=transform, target_transform=target_transform
)
test_dataset = SegmentationDataset(
    "../datasets", train=False, transform=transform, target_transform=target_transform
)
train_dataloader = DataLoader(
    train_dataset, batch_size=8, shuffle=True, drop_last=True
)
test_dataloader = DataLoader(
    test_dataset, batch_size=4, shuffle=True, drop_last=True
)

# ============================ 시각화 함수 정의 =================================
import matplotlib.pyplot as plt

# 마스크를 시각화하는 함수 정의
def draw_mask(images, masks, outputs=None, plot_size=4):
    def color_mask(image, target):
        m = target.squeeze().numpy().astype(np.uint8)
        cm = np.zeros_like(image, dtype=np.uint8)

        for i in range(1, 21):  # 각 클래스 색상 적용
            cm[m == i] = train_dataset.categories[str(i)]["color"]

        classes = [train_dataset.categories[str(idx)]["class"] for idx in np.unique(m)]
        return cm, classes

    col = 3 if outputs is not None else 2
    figsize = 20 if outputs is not None else 28
    fig, ax = plt.subplots(plot_size, col, figsize=(14, figsize), constrained_layout=True)

    for batch in range(plot_size):
        im = images[batch].numpy().transpose(1, 2, 0)  # 이미지 차원 변환
        ax[batch][0].imshow(im)  # 원본 이미지 표시
        ax[batch][0].axis("off")

        cm, classes = color_mask(im, masks[batch])  # 마스크 생성
        ax[batch][1].set_title(classes)
        ax[batch][1].imshow(cm)
        ax[batch][1].axis("off")

        if outputs is not None:  # 모델 출력이 있을 경우
            cm, classes = color_mask(im, outputs[batch])
            ax[batch][2].set_title(classes)
            ax[batch][2].imshow(cm)
            ax[batch][2].axis("off")

# 훈련 데이터의 이미지와 마스크 시각화
images, masks = next(iter(train_dataloader))
draw_mask(images, masks, plot_size=4)

# ============================ 모델 학습 및 검증 =================================
from torch import nn
from torch import optim
from torchvision.models import segmentation

# FCN 모델 정의
num_classes = 21  # 클래스 수
device = "cuda" if torch.cuda.is_available() else "cpu"
model = segmentation.fcn_resnet50(
    weight="FCN_ResNet50_Weights.COCO_WITH_VOC_LABELS_V1",  # 사전 학습 가중치
    num_classes=21,
).to(device)

# 손실 함수와 옵티마이저 정의
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)
criterion = nn.CrossEntropyLoss()

# 모델 학습 루프
for epoch in range(30):
    model.train()
    cost = 0.0

    for images, targets in train_dataloader:
        images = images.to(device)  # 이미지 장치로 이동
        targets = targets.to(device)  # 타겟 장치로 이동

        output = model(images)  # 모델 예측
        output = output["out"].permute(0, 2, 3, 1).contiguous().view(-1, num_classes)
        targets = targets.permute(0, 2, 3, 1).contiguous().view(-1)

        loss = criterion(output, targets)  # 손실 계산

        optimizer.zero_grad()  # 그래디언트 초기화
        loss.backward()  # 역전파
        optimizer.step()  # 가중치 업데이트

        cost += loss
    cost = cost / len(train_dataloader)  # 평균 손실
    print(f"Epoch : {epoch+1:4d}, Cost : {cost:.3f}")

# ============================ 모델 평가 및 시각화 ================================
with torch.no_grad():
    model.eval()
    images, masks = next(iter(test_dataloader))
    outputs = model(images.to(device))["out"]  # 모델 출력
    outputs = outputs.argmax(axis=1).to("cpu")  # 클래스 예측
    draw_mask(images, masks, outputs, 4)  # 시각화

# ============================ mIoU 계산 =======================================
from collections import defaultdict

# IoU 계산 함수 정의
def calculate_iou(targets, outputs, ious, class_count, num_classes=21):
    for i in range(num_classes):
        intersection = np.float32(np.sum((outputs == targets) * (targets == i)))  # 교집합
        union = np.sum(targets == i) + np.sum(outputs == i) - intersection  # 합집합
        if union > 0:
            ious[i] += intersection / union  # IoU 계산
            class_count[i] += 1
    return ious, class_count

# mIoU 계산
ious = np.zeros(21)
class_count = defaultdict(int)
with torch.no_grad():
    model.eval()
    for images, targets in test_dataloader:
        images = images.to(device)
        outputs = model(images)["out"].permute(0, 2, 3, 1).detach().to("cpu").numpy()
        targets = targets.permute(0, 2, 3, 1).squeeze().detach().to("cpu").numpy()
        outputs = outputs.argmax(-1)

        ious, class_count = calculate_iou(targets, outputs, ious, class_count, 21)

# 평균 IoU 계산 및 출력
miou = 0.0
for idx in range(1, 21):  # 배경 제외 클래스
    miou += ious[idx] / class_count[idx]
miou /= 20
print(f"mIoU 계산 결과 : {miou}")

# 예제 9.20~9.24 Mask R-CNN 모델 실습

In [None]:
# =================================== COCO 데이터셋 정의 ===================================
import os
import torch
import numpy as np
from PIL import Image
from pycocotools.coco import COCO
from torch.utils.data import Dataset
from pycocotools import mask as maskUtils


class COCODataset(Dataset):
    def __init__(self, root, train, transform=None):
        super().__init__()
        directory = "train" if train else "val"  # 훈련 또는 검증 데이터 경로 선택
        annotations = os.path.join(root, "annotations", f"{directory}_annotations.json")  # 어노테이션 파일 경로

        self.coco = COCO(annotations)  # COCO 어노테이션 로드
        self.iamge_path = os.path.join(root, directory)  # 이미지 경로 설정
        self.transform = transform  # 이미지 변환 설정

        self.categories = self._get_categories()  # 카테고리 로드
        self.data = self._load_data()  # 데이터 로드

    def _get_categories(self):
        # 카테고리 정보를 로드하여 딕셔너리로 반환
        categories = {0: "background"}
        for category in self.coco.cats.values():
            categories[category["id"]] = category["name"]
        return categories

    def _load_data(self):
        # 이미지와 관련된 바운딩 박스, 라벨, 마스크 데이터를 로드
        data = []
        for _id in self.coco.imgs:
            file_name = self.coco.loadImgs(_id)[0]["file_name"]
            image_path = os.path.join(self.iamge_path, file_name)
            image = Image.open(image_path).convert("RGB")
            width, height = image.size  # 이미지 크기

            boxes = []
            labels = []
            masks = []
            anns = self.coco.loadAnns(self.coco.getAnnIds(_id))  # 어노테이션 로드
            for ann in anns:
                x, y, w, h = ann["bbox"]  # 바운딩 박스
                segmentations = ann["segmentation"]  # 분할 정보
                try:
                    mask = self._polygon_to_mask(segmentations, width, height)  # 마스크 생성
                except Exception as e:
                    pass

                boxes.append([x, y, x + w, y + h])  # 바운딩 박스 좌표
                labels.append(ann["category_id"])  # 라벨
                masks.append(mask)  # 마스크 추가

            target = {  # 타겟 데이터 생성
                "image_id": torch.LongTensor([_id]),
                "boxes": torch.FloatTensor(boxes),
                "labels": torch.LongTensor(labels),
                "masks": torch.FloatTensor(masks)
            }
            data.append([image, target])  # 데이터 추가
        return data

    def _polygon_to_mask(self, segmentations, width, height):
        # 폴리곤을 마스크로 변환
        binary_mask = []
        for seg in segmentations:
            rles = maskUtils.frPyObjects([seg], height, width)
            binary_mask.append(maskUtils.decode(rles))

        combined_mask = np.sum(binary_mask, axis=0).squeeze()  # 여러 마스크 결합
        return combined_mask

    def __getitem__(self, index):
        # 인덱스에 해당하는 데이터 반환
        image, target = self.data[index]
        if self.transform:
            image = self.transform(image)  # 변환 적용
        return image, target

    def __len__(self):
        return len(self.data)  # 데이터셋 크기 반환


# ============================= 데이터셋 및 데이터로더 설정 ================================
from torchvision import transforms
from torch.utils.data import DataLoader

def collator(batch):
    return tuple(zip(*batch))  # 배치 데이터를 병합

# 이미지 변환 설정
transform = transforms.Compose(
    [
        transforms.PILToTensor(),
        transforms.ConvertImageDtype(dtype=torch.float)
    ]
)

# 데이터셋 정의
train_dataset = COCODataset("../datasets/coco", train=True, transform=transform)
test_dataset = COCODataset("../datasets/coco", train=False, transform=transform)

# 데이터로더 생성
train_dataloader = DataLoader(
    train_dataset, batch_size=4, shuffle=True, drop_last=True, collate_fn=collator
)
test_dataloader = DataLoader(
    test_dataset, batch_size=1, shuffle=True, drop_last=True, collate_fn=collator
)


# =================================== Mask R-CNN 모델 정의 ==================================
from torchvision.models.detection import maskrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

num_classes = 3  # 클래스 수 (배경 포함)
hidden_layer = 256  # 마스크 예측 레이어 크기
device = "cuda" if torch.cuda.is_available() else "cpu"

# Mask R-CNN 모델 불러오기
model = maskrcnn_resnet50_fpn(weights="DEFAULT")

# 바운딩 박스 예측기 수정
model.roi_heads.box_predictor = FastRCNNPredictor(
    in_channels=model.roi_heads.box_predictor.cls_score.in_features,
    num_classes=num_classes
)

# 마스크 예측기 수정
model.roi_heads.mask_predictor = MaskRCNNPredictor(
    in_channels=model.roi_heads.mask_predictor.conv5_mask.in_channels,
    dim_reduced=hidden_layer,
    num_classes=num_classes
)
model.to(device)


# ================================= 모델 학습 설정 ====================================
from torch import optim

# 옵티마이저 및 학습률 스케줄러 설정
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# 모델 학습 루프
for epoch in range(5):
    cost = 0.0
    for idx, (images, targets) in enumerate(train_dataloader):
        images = list(image.to(device) for image in images)  # 이미지 장치로 이동
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]  # 타겟 이동

        loss_dict = model(images, targets)  # 손실 계산
        losses = sum(loss for loss in loss_dict.values())  # 총 손실 계산

        optimizer.zero_grad()  # 그래디언트 초기화
        losses.backward()  # 역전파
        optimizer.step()  # 가중치 업데이트

        cost += losses  # 손실 누적

    lr_scheduler.step()  # 학습률 감소
    cost = cost / len(train_dataloader)  # 평균 손실
    print(f"Epoch : {epoch+1:4d}, Cost : {cost:.3f}")


# ========================== 시각화 함수 및 모델 예측 시각화 ============================
import numpy as np
from matplotlib import pyplot as plt
from torchvision.transforms.functional import to_pil_image

# 바운딩 박스 및 마스크를 시각화하는 함수
def draw_bbox(ax, box, text, color, mask):
    ax.add_patch(
        plt.Rectangle(
            xy=(box[0], box[1]),
            width=box[2] - box[0],
            height=box[3] - box[1],
            fill=False,
            edgecolor=color,
            linewidth=2,
        )
    )
    ax.annotate(
        text=text,
        xy=(box[0] - 5, box[1] - 5),
        color=color,
        weight="bold",
        fontsize=13,
    )

    mask = np.ma.masked_where(mask == 0, mask)
    mask_color = {"blue": "Blues", "red" : "Reds"}

    cmap = plt.cm.get_cmap(mask_color.get(color, "Greens"))
    norm = plt.Normalize(vmin=0, vmax=1)
    rgba = cmap(norm(mask))
    ax.imshow(rgba, interpolation="nearest", alpha=0.3)


# 모델 예측 시각화
threshold = 0.5  # 임계값
categories = test_dataset.categories

with torch.no_grad():
    model.eval()
    for images, targets in test_dataloader:
        images = [image.to(device) for image in images]  # 이미지 장치로 이동
        outputs = model(images)  # 모델 예측

        # 예측 결과 추출
        boxes = outputs[0]["boxes"].to("cpu").numpy()
        labels = outputs[0]["labels"].to("cpu").numpy()
        scores = outputs[0]["scores"].to("cpu").numpy()
        masks = outputs[0]["masks"].squeeze(1).to("cpu").numpy()

        # 임계값 기준 필터링
        boxes = boxes[scores >= threshold].astype(np.int32)
        labels = labels[scores >= threshold]
        scores = scores[scores >= threshold]
        masks[masks >= threshold] = 1.0
        masks[masks < threshold] = 0.0

        fig = plt.figure(figsize=(8, 8))
        ax = fig.add_subplot(1, 1, 1)
        plt.imshow(to_pil_image(images[0]))  # 원본 이미지 표시

        # 예측 결과 시각화
        for box, mask, label, score in zip(boxes, masks, labels, scores):
            draw_bbox(ax, box, f"{categories[label]} - {score:.4f}", "red", mask)

        # 실제 결과 시각화
        tboxes = targets[0]["boxes"].numpy()
        tmask = targets[0]["masks"].numpy()
        tlabels = targets[0]["labels"].numpy()

        for box, mask, label in zip(tboxes, tmask, tlabels):
            draw_bbox(ax, box, f"{categories[label]}", "blue", mask)

        plt.show()


# ============================= COCO 평가 지표 계산 ================================
import numpy as np
from pycocotools.cocoeval import COCOeval

# COCO 평가
with torch.no_grad():
    model.eval()
    coco_detections = []
    for images, targets in test_dataloader:
        images = [img.to(device) for img in images]
        outputs = model(images)  # 모델 예측

        for i in range(len(targets)):
            image_id = targets[i]["image_id"].data.cpu().numpy().tolist()[0]
            boxes = outputs[i]["boxes"].data.cpu().numpy()
            boxes[:, 2] = boxes[:, 2] - boxes[:, 0]
            boxes[:, 3] = boxes[:, 3] - boxes[:, 1]
            scores = outputs[i]["scores"].data.cpu().numpy()
            labels = outputs[i]["labels"].data.cpu().numpy()
            masks = outputs[i]["masks"].squeeze(1).data.cpu().numpy()

            for instance_id in range(len(boxes)):
                segmentation_mask = masks[instance_id]
                binary_mask = segmentation_mask > 0.5
                binary_mask = binary_mask.astype(np.uint8)
                binary_mask_encoded = maskUtils.encode(
                    np.asfortranarray(binary_mask)
                )

                prediction = {
                    "image_id": int(image_id),
                    "category_id": int(labels[instance_id]),
                    "bbox": [round(coord, 2) for coord in boxes[instance_id]],
                    "score": float(scores[instance_id]),
                    "segmentation": binary_mask_encoded
                }
                coco_detections.append(prediction)

    # COCO 평가 실행
    coco_gt = test_dataloader.dataset.coco
    coco_dt = coco_gt.loadRes(coco_detections)
    coco_evaluator = COCOeval(coco_gt, coco_dt, iouType="segm")
    coco_evaluator.evaluate()
    coco_evaluator.accumulate()
    coco_evaluator.summarize()

# 예제 9.25~9.30 YOLOv8 모델 실습

In [None]:
# ============================= 영상 출력 및 YOLOv8 모델 설정 ==============================
from google.colab.patches import cv2_imshow
from ultralytics import YOLO
import cv2
import torch

# YOLO 모델 로드
model = YOLO("../models/yolov8m-pose.pt")

# ============================= 동영상 예제 기본 출력 =====================================
# 동영상 파일 로드
capture = cv2.VideoCapture("../datasets/woman.mp4")

while cv2.waitKey(10) < 0:
    if capture.get(cv2.CAP_PROP_POS_FRAMES) == capture.get(cv2.CAP_PROP_FRAME_COUNT):  # 마지막 프레임일 경우
        capture.set(cv2.CAP_PROP_POS_FRAMES, 0)  # 프레임을 처음으로 리셋

    ret, frame = capture.read()  # 프레임 읽기
    cv2.imshow("VideoFrame", frame)  # 프레임 출력

capture.release()  # 리소스 해제
cv2.destroyAllWindows()

# ============================ YOLO 예측 함수 정의 =====================================
def predict(frame, iou=0.7, conf=0.25):
    # YOLO를 사용하여 예측 수행
    results = model(
        source=frame,
        device="0" if torch.cuda.is_available() else "cpu",  # GPU/CPU 설정
        iou=iou,  # IoU 임계값
        conf=conf,  # Confidence 임계값
        verbose=False,  # 출력 생략
    )
    result = results[0]  # 첫 번째 결과 가져오기
    return result

# ============================= 바운딩 박스 그리기 함수 정의 ==============================
def draw_boxes(result, frame):
    for boxes in result.boxes:  # 예측된 바운딩 박스 순회
        x1, y1, x2, y2, score, classes = boxes.data.squeeze().cpu().numpy()  # 좌표와 클래스 정보 추출
        cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 0, 255), 1)  # 바운딩 박스 그림
    return frame

# ============================ 바운딩 박스 시각화 =====================================
capture = cv2.VideoCapture("../datasets/woman.mp4")

while cv2.waitKey(10) < 0:
    if capture.get(cv2.CAP_PROP_POS_FRAMES) == capture.get(cv2.CAP_PROP_FRAME_COUNT):  # 마지막 프레임일 경우
        capture.set(cv2.CAP_PROP_POS_FRAMES, 0)  # 프레임 리셋

    ret, frame = capture.read()  # 프레임 읽기
    result = predict(frame)  # YOLO 예측
    frame = draw_boxes(result, frame)  # 바운딩 박스 그리기
    cv2.imshow("VideoFrame", frame)  # 결과 출력

capture.release()  # 리소스 해제
cv2.destroyAllWindows()

# ========================== 키포인트 그리기 함수 정의 ============================
from ultralytics.yolo.utils.plotting import Annotator

def draw_keypoints(result, frame):
    annotator = Annotator(frame, line_width=1)  # 이미지 주석 생성기
    for kps in result.keypoints:  # 예측된 키포인트 순회
        kps = kps.data.squeeze()  # 키포인트 데이터 추출
        annotator.kpts(kps)  # 키포인트 시각화

        nkps = kps.cpu().numpy()  # NumPy로 변환
        for idx, (x, y, score) in enumerate(nkps):  # 키포인트 데이터 순회
            if score > 0.5:  # Confidence 임계값 이상일 경우
                cv2.circle(frame, (int(x), int(y)), 3, (0, 0, 255), cv2.FILLED)  # 키포인트 원 표시
                cv2.putText(frame, str(idx), (int(x), int(y)), cv2.FONT_HERSHEY_COMPLEX, 1, (0, 0, 255), 1)  # 인덱스 추가

    return frame

# ============================= 바운딩 박스 + 키포인트 시각화 ==============================
capture = cv2.VideoCapture("../datasets/woman.mp4")

while cv2.waitKey(10) < 0:
    if capture.get(cv2.CAP_PROP_POS_FRAMES) == capture.get(cv2.CAP_PROP_FRAME_COUNT):  # 마지막 프레임일 경우
        capture.set(cv2.CAP_PROP_POS_FRAMES, 0)  # 프레임 리셋

    ret, frame = capture.read()  # 프레임 읽기
    result = predict(frame)  # YOLO 예측
    frame = draw_boxes(result, frame)  # 바운딩 박스 그리기
    frame = draw_keypoints(result, frame)  # 키포인트 그리기
    cv2.imshow("VideoFrame", frame)  # 결과 출력

capture.release()  # 리소스 해제
cv2.destroyAllWindows()

# 예제 10.01~10.07 ViT 모델 실습 (1)

In [None]:
# =================================== 데이터셋 로드 및 서브셋 생성 ===================================
from itertools import chain
from collections import defaultdict
from torch.utils.data import Subset
from torchvision import datasets

# 클래스별 서브셋 생성 함수 정의
def subset_sampler(dataset, classes, max_len):
    target_idx = defaultdict(list)
    for idx, label in enumerate(dataset.train_labels):  # 각 라벨별 인덱스 수집
        target_idx[int(label)].append(idx)

    indices = list(
        chain.from_iterable(
            [target_idx[idx][:max_len] for idx in range(len(classes))]  # 최대 max_len만큼 인덱스 선택
        )
    )
    return Subset(dataset, indices)  # Subset 객체 반환

# FashionMNIST 데이터셋 로드
train_dataset = datasets.FashionMNIST(root="../datasets", download=True, train=True)
test_dataset = datasets.FashionMNIST(root="../datasets", download=True, train=False)

# 클래스 정보 확인
classes = train_dataset.classes
class_to_idx = train_dataset.class_to_idx
print(classes)
print(class_to_idx)

# 서브셋 생성
subset_train_dataset = subset_sampler(
    dataset=train_dataset, classes=train_dataset.classes, max_len=1000
)
subset_test_dataset = subset_sampler(
    dataset=test_dataset, classes=test_dataset.classes, max_len=100
)

# 서브셋 크기 출력
print(f"Training Data Size : {len(subset_train_dataset)}")
print(f"Testing Data Size : {len(subset_test_dataset)}")
print(train_dataset[0])  # 샘플 데이터 확인

# =================================== 이미지 변환 정의 ===================================
import torch
from torchvision import transforms
from transformers import AutoImageProcessor

# AutoImageProcessor를 사용하여 이미지 처리 정보 로드
image_processor = AutoImageProcessor.from_pretrained(
    pretrained_model_name_or_path="google/vit-base-patch16-224-in21k"
)

# 이미지 변환 파이프라인 정의
transform = transforms.Compose(
    [
        transforms.ToTensor(),  # 이미지를 텐서로 변환
        transforms.Resize(  # 이미지 크기 조정
            size=(
                image_processor.size["height"],
                image_processor.size["width"]
            )
        ),
        transforms.Lambda(
            lambda x: torch.cat([x, x, x], 0)  # 흑백 이미지를 3채널로 확장
        ),
        transforms.Normalize(  # 정규화
            mean=image_processor.image_mean,
            std=image_processor.image_std
        )
    ]
)

# 이미지 프로세서 정보 출력
print(f"size : {image_processor.size}")
print(f"mean : {image_processor.image_mean}")
print(f"std : {image_processor.image_std}")

# =================================== 데이터로더 정의 ===================================
from torch.utils.data import DataLoader

# 배치 데이터를 변환 및 병합하는 함수 정의
def collator(data, transform):
    images, labels = zip(*data)  # 이미지와 라벨 분리
    pixel_values = torch.stack([transform(image) for image in images])  # 이미지 변환
    labels = torch.tensor([label for label in labels])  # 라벨 텐서화
    return {"pixel_values": pixel_values, "labels": labels}

# 데이터로더 정의
train_dataloader = DataLoader(
    subset_train_dataset,
    batch_size=32,
    shuffle=True,
    collate_fn=lambda x: collator(x, transform),
    drop_last=True
)
valid_dataloader = DataLoader(
    subset_test_dataset,
    batch_size=4,
    shuffle=True,
    collate_fn=lambda x: collator(x, transform),
    drop_last=True
)

# 배치 데이터 확인
batch = next(iter(train_dataloader))
for key, value in batch.items():
    print(f"{key} : {value.shape}")

# =================================== ViT 모델 정의 ===================================
from transformers import ViTForImageClassification

# 사전학습된 ViT 모델 로드 및 재학습 설정
model = ViTForImageClassification.from_pretrained(
    pretrained_model_name_or_path="google/vit-base-patch16-224-in21k",
    num_labels=len(classes),  # 클래스 수 설정
    id2label={idx: label for label, idx in class_to_idx.items()},  # id-라벨 매핑
    label2id=class_to_idx,  # 라벨-id 매핑
    ignore_mismatched_sizes=True  # 크기 불일치 무시
)

# 모델 구성 정보 출력
print(model.classifier)
print(model.vit.embeddings)

# 배치 데이터 형태 확인
batch = next(iter(train_dataloader))
print("image shape :", batch["pixel_values"].shape)  # 이미지 크기
print("patch embeddings shape :",
    model.vit.embeddings.patch_embeddings(batch["pixel_values"]).shape
)  # 패치 임베딩 크기
print("[CLS] + patch embeddings shape :",
    model.vit.embeddings(batch["pixel_values"]).shape
)  # [CLS] 토큰 포함 임베딩 크기

# =================================== 훈련 설정 ===================================
from transformers import TrainingArguments

# TrainingArguments 정의
args = TrainingArguments(
    output_dir="../models/ViT-FashionMNIST",  # 출력 디렉토리
    save_strategy="epoch",  # 에포크마다 저장
    evaluation_strategy="epoch",  # 에포크마다 평가
    learning_rate=1e-5,  # 학습률
    per_device_train_batch_size=16,  # 훈련 배치 크기
    per_device_eval_batch_size=16,  # 평가 배치 크기
    num_train_epochs=3,  # 에포크 수
    weight_decay=0.001,  # 가중치 감소
    load_best_model_at_end=True,  # 최상의 모델 로드
    metric_for_best_model="f1",  # 최상의 모델 기준 메트릭
    logging_dir="logs",  # 로그 디렉토리
    logging_steps=125,  # 로그 기록 간격
    remove_unused_columns=False,  # 불필요한 열 제거
    seed=7  # 랜덤 시드 설정
)

# =================================== 평가 메트릭 정의 ===================================
import evaluate
import numpy as np

# F1 스코어 계산 함수 정의
def compute_metrics(eval_pred):
    metric = evaluate.load("f1")  # F1 스코어 메트릭 로드
    predictions, labels = eval_pred  # 예측값과 라벨 분리
    predictions = np.argmax(predictions, axis=1)  # 예측값에서 가장 높은 클래스 선택
    macro_f1 = metric.compute(
        predictions=predictions, references=labels, average="macro"
    )  # 매크로 F1 계산
    return macro_f1

# 예제 10.08~10.09 ViT 모델 실습 (2)

In [None]:
# ============================= 필요한 라이브러리 임포트 =============================
import torch
import evaluate
import numpy as np
from itertools import chain
from collections import defaultdict
from torch.utils.data import Subset
from torchvision import datasets, transforms
from transformers import (
    AutoImageProcessor,
    ViTForImageClassification,
    TrainingArguments,
    Trainer,
)
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# =========================== 서브셋 샘플링 함수 정의 ============================
def subset_sampler(dataset, classes, max_len):
    # 각 클래스별 인덱스를 수집하고 최대 max_len만큼 샘플링
    target_idx = defaultdict(list)
    for idx, label in enumerate(dataset.train_labels):  # 클래스별 인덱스 저장
        target_idx[int(label)].append(idx)

    indices = list(
        chain.from_iterable(
            [target_idx[idx][:max_len] for idx in range(len(classes))]  # 클래스별 최대 max_len 샘플링
        )
    )
    return Subset(dataset, indices)  # 서브셋 반환

# ========================= 모델 초기화 함수 정의 ===========================
def model_init(classes, class_to_idx):
    # ViT 모델 초기화
    model = ViTForImageClassification.from_pretrained(
        pretrained_model_name_or_path="google/vit-base-patch16-224-in21k",  # 사전학습 모델 로드
        num_labels=len(classes),  # 클래스 수 설정
        id2label={idx: label for label, idx in class_to_idx.items()},  # ID-라벨 매핑
        label2id=class_to_idx,  # 라벨-ID 매핑
    )
    return model

# =========================== 데이터 병합 함수 정의 ===========================
def collator(data, transform):
    # 배치 데이터를 변환 및 병합
    images, labels = zip(*data)  # 이미지와 라벨 분리
    pixel_values = torch.stack([transform(image) for image in images])  # 이미지 변환
    labels = torch.tensor([label for label in labels])  # 라벨 텐서화
    return {"pixel_values": pixel_values, "labels": labels}

# ========================== 평가 메트릭 계산 함수 정의 ==========================
def compute_metrics(eval_pred):
    # F1 스코어 계산
    metric = evaluate.load("f1")  # F1 스코어 메트릭 로드
    predictions, labels = eval_pred  # 예측값과 라벨 분리
    predictions = np.argmax(predictions, axis=1)  # 예측값에서 최대값 인덱스 선택
    macro_f1 = metric.compute(
        predictions=predictions, references=labels, average="macro"
    )  # 매크로 F1 계산
    return macro_f1

# ============================= 데이터셋 로드 ==============================
train_dataset = datasets.FashionMNIST(root="../datasets", download=True, train=True)
test_dataset = datasets.FashionMNIST(root="../datasets", download=True, train=False)

classes = train_dataset.classes  # 클래스 이름
class_to_idx = train_dataset.class_to_idx  # 클래스-인덱스 매핑

# 서브셋 생성
subset_train_dataset = subset_sampler(
    dataset=train_dataset, classes=train_dataset.classes, max_len=1000
)
subset_test_dataset = subset_sampler(
    dataset=test_dataset, classes=test_dataset.classes, max_len=100
)

# ========================== 이미지 변환 정의 ===========================
# AutoImageProcessor를 사용하여 이미지 처리 정보 로드
image_processor = AutoImageProcessor.from_pretrained(
    pretrained_model_name_or_path="google/vit-base-patch16-224-in21k"
)

# 이미지 변환 파이프라인 정의
transform = transforms.Compose(
    [
        transforms.ToTensor(),  # 이미지를 텐서로 변환
        transforms.Resize(  # 이미지 크기 조정
            size=(
                image_processor.size["height"],
                image_processor.size["width"],
            )
        ),
        transforms.Lambda(
            lambda x: torch.cat([x, x, x], 0)  # 흑백 이미지를 3채널로 확장
        ),
        transforms.Normalize(  # 정규화
            mean=image_processor.image_mean,
            std=image_processor.image_std,
        ),
    ]
)

# ============================ 훈련 인자 정의 ============================
args = TrainingArguments(
    output_dir="../models/ViT-FashionMNIST",  # 모델 출력 경로
    save_strategy="epoch",  # 에포크마다 저장
    evaluation_strategy="epoch",  # 에포크마다 평가
    learning_rate=1e-5,  # 학습률
    per_device_train_batch_size=16,  # 훈련 배치 크기
    per_device_eval_batch_size=16,  # 평가 배치 크기
    num_train_epochs=3,  # 에포크 수
    weight_decay=0.001,  # 가중치 감소
    load_best_model_at_end=True,  # 최상의 모델 로드
    metric_for_best_model="f1",  # 평가 메트릭
    logging_dir="logs",  # 로그 디렉토리
    logging_steps=125,  # 로그 출력 간격
    remove_unused_columns=False,  # 불필요한 열 제거
    seed=7,  # 랜덤 시드
)

# ============================= Trainer 초기화 ==============================
trainer = Trainer(
    model_init=lambda x: model_init(classes, class_to_idx),  # 모델 초기화
    args=args,  # 훈련 인자
    train_dataset=subset_train_dataset,  # 훈련 데이터
    eval_dataset=subset_test_dataset,  # 평가 데이터
    data_collator=lambda x: collator(x, transform),  # 데이터 병합 함수
    compute_metrics=compute_metrics,  # 메트릭 계산 함수
    tokenizer=image_processor,  # 이미지 프로세서
)

# ============================= 모델 훈련 ==============================
trainer.train()

# ============================= 예측 및 평가 ==============================
outputs = trainer.predict(subset_test_dataset)
print(outputs)

# =========================== 혼동 행렬 시각화 ===========================
y_true = outputs.label_ids  # 실제 라벨
y_pred = outputs.predictions.argmax(1)  # 예측된 라벨

# 혼동 행렬 계산 및 시각화
labels = list(classes)
matrix = confusion_matrix(y_true, y_pred)  # 혼동 행렬 계산
display = ConfusionMatrixDisplay(confusion_matrix=matrix, display_labels=labels)  # 시각화 객체 생성
_, ax = plt.subplots(figsize=(10, 10))  # 플롯 크기 설정
display.plot(xticks_rotation=45, ax=ax)  # 혼동 행렬 시각화
plt.show()

# 예제 10.10~10.12 스윈 트랜스포머 모델 구조

In [None]:
# ============================= 상대 위치 좌표 계산 =============================
import torch

# 윈도우 크기 설정
window_size = 2

# 좌표 생성
coords_h = torch.arange(window_size)  # 높이 좌표
coords_w = torch.arange(window_size)  # 너비 좌표
coords = torch.stack(torch.meshgrid([coords_h, coords_w], indexing="ij"))  # 격자 좌표 생성
coords_flatten = torch.flatten(coords, 1)  # 1차원으로 펼침

# 상대 좌표 계산
relative_coords = coords_flatten[:, :, None] - coords_flatten[:, None, :]  # 상대 좌표 계산

# 결과 출력
print(relative_coords)
print(relative_coords.shape)

# ============================= X, Y 축 상대 좌표 계산 =============================
x_coords = relative_coords[0, :, :]  # X축 상대 좌표
y_coords = relative_coords[1, :, :]  # Y축 상대 좌표

# 상대 좌표 보정 (③번 연산 과정)
x_coords += window_size - 1
y_coords += window_size - 1

# X축 좌표 조정 (④번 연산 과정)
x_coords *= 2 * window_size - 1

# X, Y축 상대 좌표 행렬 출력
print(f"X축에 대한 행렬:\n{x_coords}\n")
print(f"Y축에 대한 행렬:\n{y_coords}\n")

# X축과 Y축 좌표를 합산하여 위치 행렬 계산 (⑤번 연산 과정)
relative_position_index = x_coords + y_coords
print(f"X, Y축에 대한 위치 행렬:\n{relative_position_index}")

# ============================= 상대 위치 바이어스 테이블 생성 =============================
num_heads = 1  # 헤드 수
relative_position_bias_table = torch.Tensor(
    torch.zeros((2 * window_size - 1) * (2 * window_size - 1), num_heads)  # 바이어스 테이블 초기화
)

# 상대 위치 바이어스 추출 및 재구성
relative_position_bias = relative_position_bias_table[relative_position_index.view(-1)]
relative_position_bias = relative_position_bias.view(
    window_size * window_size, window_size * window_size, -1
)

# 결과 출력
print(relative_position_bias.shape)

# 예제 10.13~10.18 스윈 트랜스포머 모델 실습 (1)

In [None]:
# =========================== 데이터셋 로드 및 서브셋 생성 ============================
from itertools import chain
from collections import defaultdict
from torch.utils.data import Subset
from torchvision import datasets

# 서브셋 샘플링 함수 정의
def subset_sampler(dataset, classes, max_len):
    target_idx = defaultdict(list)  # 클래스별 인덱스를 저장할 딕셔너리
    for idx, label in enumerate(dataset.train_labels):  # 데이터셋의 라벨을 순회하며 저장
        target_idx[int(label)].append(idx)

    indices = list(
        chain.from_iterable(
            [target_idx[idx][:max_len] for idx in range(len(classes))]  # 클래스별로 max_len만큼 샘플링
        )
    )
    return Subset(dataset, indices)  # Subset 객체 반환

# FashionMNIST 데이터셋 로드
train_dataset = datasets.FashionMNIST(root="../datasets", download=True, train=True)
test_dataset = datasets.FashionMNIST(root="../datasets", download=True, train=False)

# 클래스 정보 추출
classes = train_dataset.classes
class_to_idx = train_dataset.class_to_idx

# 서브셋 생성
subset_train_dataset = subset_sampler(
    dataset=train_dataset, classes=train_dataset.classes, max_len=1000
)
subset_test_dataset = subset_sampler(
    dataset=test_dataset, classes=test_dataset.classes, max_len=100
)

# =========================== 이미지 변환 정의 ============================
import torch
from torchvision import transforms
from transformers import AutoImageProcessor

# 이미지 프로세서 초기화
image_processor = AutoImageProcessor.from_pretrained(
    pretrained_model_name_or_path="google/vit-base-patch16-224-in21k"
)

# 이미지 변환 파이프라인 정의
transform = transforms.Compose(
    [
        transforms.ToTensor(),  # 이미지를 텐서로 변환
        transforms.Resize(  # 이미지 크기 조정
            size=(
                image_processor.size["height"],
                image_processor.size["width"]
            )
        ),
        transforms.Lambda(
            lambda x: torch.cat([x, x, x], 0)  # 흑백 이미지를 3채널로 확장
        ),
        transforms.Normalize(  # 정규화
            mean=image_processor.image_mean,
            std=image_processor.image_std
        )
    ]
)

# ========================== 데이터로더 정의 ==========================
from torch.utils.data import DataLoader

# 데이터 배치 병합 함수 정의
def collator(data, transform):
    images, labels = zip(*data)  # 이미지와 라벨 분리
    pixel_values = torch.stack([transform(image) for image in images])  # 이미지 변환
    labels = torch.tensor([label for label in labels])  # 라벨 텐서화
    return {"pixel_values": pixel_values, "labels": labels}

# 데이터로더 정의
train_dataloader = DataLoader(
    subset_train_dataset,
    batch_size=32,
    shuffle=True,
    collate_fn=lambda x: collator(x, transform),
    drop_last=True
)
valid_dataloader = DataLoader(
    subset_test_dataset,
    batch_size=4,
    shuffle=True,
    collate_fn=lambda x: collator(x, transform),
    drop_last=True
)

# ========================== Swin Transformer 모델 정의 ==========================
from transformers import SwinForImageClassification

# 사전학습된 Swin Transformer 모델 로드
model = SwinForImageClassification.from_pretrained(
    pretrained_model_name_or_path="microsoft/swin-tiny-patch4-window7-224",
    num_labels=len(train_dataset.classes),  # 클래스 수 설정
    id2label={idx: label for label, idx in train_dataset.class_to_idx.items()},  # id-라벨 매핑
    label2id=train_dataset.class_to_idx,  # 라벨-id 매핑
    ignore_mismatched_sizes=True  # 크기 불일치 무시
)

# 모델 구조 출력
for main_name, main_module in model.named_children():
    print(main_name)
    for sub_name, sub_module in main_module.named_children():
        print("└", sub_name)
        for ssub_name, ssub_module in sub_module.named_children():
            print("│  └", ssub_name)
            for sssub_name, sssub_module in ssub_module.named_children():
                if sssub_name == "projection":
                    print("│  │  └", sssub_name, sssub_module)
                else:
                    print("│  │  └", sssub_name)

# ========================== 패치 임베딩 차원 확인 ==========================
batch = next(iter(train_dataloader))
print("이미지 차원 :", batch["pixel_values"].shape)

# 패치 임베딩 모듈 적용
patch_emb_output, shape = model.swin.embeddings.patch_embeddings(batch["pixel_values"])
print("모듈:", model.swin.embeddings.patch_embeddings)
print("패치 임베딩 차원 :", patch_emb_output.shape)

# =========================== Swin Layer 내부 모듈 구조 ============================
for main_name, main_module in model.swin.encoder.layers[0].named_children():
    print(main_name)
    for sub_name, sub_module in main_module.named_children():
        print("└", sub_name)
        for ssub_name, ssub_module in sub_module.named_children():
            print("│ └", ssub_name)

print(model.swin.encoder.layers[0].blocks[0])  # 첫 번째 Swin Layer의 블록 출력

# ========================== W-MSA 및 SW-MSA 적용 ==========================
W_MSA = model.swin.encoder.layers[0].blocks[0]  # Window Multi-head Self-Attention
SW_MSA = model.swin.encoder.layers[0].blocks[1]  # Shifted Window Multi-head Self-Attention

W_MSA_output = W_MSA(patch_emb_output, W_MSA.input_resolution)[0]
SW_MSA_output = SW_MSA(W_MSA_output, SW_MSA.input_resolution)[0]

print("W-MSA 결과 차원 :", W_MSA_output.shape)
print("SW-MSA 결과 차원 :", SW_MSA_output.shape)

# ========================== 패치 병합 모듈 확인 ==========================
patch_merge = model.swin.encoder.layers[0].downsample
print("patch_merge 모듈 :", patch_merge)

# 패치 병합 적용
output = patch_merge(SW_MSA_output, patch_merge.input_resolution)
print("patch_merge 결과 차원 :", output.shape)

# 예제 10.19 스윈 트랜스포머 모델 실습 (2)

In [None]:
# ============================= 환경 설정 =============================
# 애플 실리콘 사용자를 위한 PyTorch MPS 설정 (필요시 활성화)
# import os
# os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"

# 필요한 라이브러리 임포트
import torch
import evaluate
import numpy as np
from itertools import chain
from collections import defaultdict
from torch.utils.data import Subset
from torchvision import datasets, transforms
from transformers import AutoImageProcessor, SwinForImageClassification
from transformers import TrainingArguments, Trainer
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# =========================== 서브셋 샘플링 함수 정의 ============================
def subset_sampler(dataset, classes, max_len):
    target_idx = defaultdict(list)  # 클래스별 인덱스 저장
    for idx, label in enumerate(dataset.train_labels):  # 데이터셋 라벨 확인
        target_idx[int(label)].append(idx)

    indices = list(
        chain.from_iterable(
            [target_idx[idx][:max_len] for idx in range(len(classes))]  # 클래스별 최대 max_len 샘플링
        )
    )
    return Subset(dataset, indices)  # Subset 반환

# =========================== 모델 초기화 함수 정의 ============================
def model_init(classes, class_to_idx):
    model = SwinForImageClassification.from_pretrained(
        pretrained_model_name_or_path="microsoft/swin-tiny-patch4-window7-224",
        num_labels=len(classes),  # 클래스 수
        id2label={idx: label for label, idx in class_to_idx.items()},  # ID-라벨 매핑
        label2id=class_to_idx,  # 라벨-ID 매핑
        ignore_mismatched_sizes=True  # 크기 불일치 무시
    )
    return model

# ========================= 데이터 병합 함수 정의 ===========================
def collator(data, transform):
    images, labels = zip(*data)  # 이미지와 라벨 분리
    pixel_values = torch.stack([transform(image) for image in images])  # 이미지 변환
    labels = torch.tensor([label for label in labels])  # 라벨 텐서화
    return {"pixel_values": pixel_values, "labels": labels}

# =========================== 평가 메트릭 함수 정의 ============================
def compute_metrics(eval_pred):
    metric = evaluate.load("f1")  # F1 스코어 메트릭 로드
    predictions, labels = eval_pred  # 예측값과 라벨 분리
    predictions = np.argmax(predictions, axis=1)  # 가장 높은 값의 인덱스 선택
    macro_f1 = metric.compute(
        predictions=predictions, references=labels, average="macro"  # 매크로 F1 스코어 계산
    )
    return macro_f1

# ============================= 데이터셋 로드 ==============================
train_dataset = datasets.FashionMNIST(root="../datasets", download=True, train=True)
test_dataset = datasets.FashionMNIST(root="../datasets", download=True, train=False)

# 클래스 정보 추출
classes = train_dataset.classes
class_to_idx = train_dataset.class_to_idx

# 서브셋 생성
subset_train_dataset = subset_sampler(
    dataset=train_dataset, classes=train_dataset.classes, max_len=1000
)
subset_test_dataset = subset_sampler(
    dataset=test_dataset, classes=test_dataset.classes, max_len=100
)

# ========================= 이미지 프로세서 및 변환 정의 ===========================
image_processor = AutoImageProcessor.from_pretrained(
    pretrained_model_name_or_path="microsoft/swin-tiny-patch4-window7-224"
)

# 이미지 변환 파이프라인 정의
transform = transforms.Compose(
    [
        transforms.ToTensor(),  # 이미지를 텐서로 변환
        transforms.Resize(  # 이미지 크기 조정
            size=(
                image_processor.size["height"],
                image_processor.size["width"]
            )
        ),
        transforms.Lambda(
            lambda x: torch.cat([x, x, x], 0)  # 흑백 이미지를 3채널로 확장
        ),
        transforms.Normalize(  # 정규화
            mean=image_processor.image_mean,
            std=image_processor.image_std
        )
    ]
)

# ============================ 훈련 인자 정의 =============================
args = TrainingArguments(
    output_dir="../models/Swin-FashionMNIST",  # 모델 출력 경로
    save_strategy="epoch",  # 에포크마다 저장
    evaluation_strategy="epoch",  # 에포크마다 평가
    learning_rate=1e-5,  # 학습률
    per_device_train_batch_size=16,  # 훈련 배치 크기
    per_device_eval_batch_size=16,  # 평가 배치 크기
    num_train_epochs=3,  # 에포크 수
    weight_decay=0.001,  # 가중치 감소
    load_best_model_at_end=True,  # 최상의 모델 로드
    metric_for_best_model="f1",  # 평가 메트릭
    logging_dir="logs",  # 로그 경로
    logging_steps=125,  # 로그 출력 간격
    remove_unused_columns=False,  # 불필요한 열 제거
    seed=7  # 랜덤 시드
)

# ============================= Trainer 초기화 ==============================
trainer = Trainer(
    model_init=lambda x: model_init(classes, class_to_idx),  # 모델 초기화
    args=args,  # 훈련 인자
    train_dataset=subset_train_dataset,  # 훈련 데이터
    eval_dataset=subset_test_dataset,  # 평가 데이터
    data_collator=lambda x: collator(x, transform),  # 데이터 병합 함수
    compute_metrics=compute_metrics,  # 메트릭 계산 함수
    tokenizer=image_processor,  # 이미지 프로세서
)

# ============================= 모델 훈련 ==============================
trainer.train()

# ============================= 예측 및 평가 ==============================
outputs = trainer.predict(subset_test_dataset)
print(outputs)

# =========================== 혼동 행렬 시각화 ===========================
y_true = outputs.label_ids  # 실제 라벨
y_pred = outputs.predictions.argmax(1)  # 예측된 라벨

# 혼동 행렬 계산 및 시각화
labels = list(classes)
matrix = confusion_matrix(y_true, y_pred)  # 혼동 행렬 계산
display = ConfusionMatrixDisplay(confusion_matrix=matrix, display_labels=labels)  # 시각화 객체 생성
_, ax = plt.subplots(figsize=(10, 10))  # 플롯 크기 설정
display.plot(xticks_rotation=45, ax=ax)  # 혼동 행렬 시각화
plt.show()

# 예제 10.20~10.23 CvT 모델 실습 (1)

In [None]:
# =========================== 데이터셋 로드 및 서브셋 생성 ============================
from itertools import chain
from collections import defaultdict
from torch.utils.data import Subset
from torchvision import datasets

# 서브셋 샘플링 함수 정의
def subset_sampler(dataset, classes, max_len):
    target_idx = defaultdict(list)  # 클래스별 인덱스를 저장할 딕셔너리
    for idx, label in enumerate(dataset.train_labels):  # 데이터셋 라벨 확인
        target_idx[int(label)].append(idx)

    indices = list(
        chain.from_iterable(
            [target_idx[idx][:max_len] for idx in range(len(classes))]  # 클래스별 최대 max_len 샘플링
        )
    )
    return Subset(dataset, indices)  # Subset 반환

# FashionMNIST 데이터셋 로드
train_dataset = datasets.FashionMNIST(root="../datasets", download=True, train=True)
test_dataset = datasets.FashionMNIST(root="../datasets", download=True, train=False)

# 클래스 정보 추출
classes = train_dataset.classes
class_to_idx = train_dataset.class_to_idx

# 서브셋 생성
subset_train_dataset = subset_sampler(
    dataset=train_dataset, classes=train_dataset.classes, max_len=1000
)
subset_test_dataset = subset_sampler(
    dataset=test_dataset, classes=test_dataset.classes, max_len=100
)

# =========================== 이미지 변환 정의 ============================
import torch
from torchvision import transforms
from transformers import AutoImageProcessor

# 이미지 프로세서 초기화
image_processor = AutoImageProcessor.from_pretrained(
    pretrained_model_name_or_path="microsoft/cvt-21"
)

# 이미지 변환 파이프라인 정의
transform = transforms.Compose(
    [
        transforms.ToTensor(),  # 이미지를 텐서로 변환
        transforms.Resize(  # 이미지 크기 조정
            size=(
                image_processor.size["shortest_edge"],  # 짧은 가장자리 크기 사용
                image_processor.size["shortest_edge"]
            )
        ),
        transforms.Lambda(lambda x: torch.cat([x, x, x], 0)),  # 흑백 이미지를 3채널로 확장
        transforms.Normalize(  # 정규화
            mean=image_processor.image_mean,
            std=image_processor.image_std
        )
    ]
)

# ========================== 데이터로더 정의 ==========================
from torch.utils.data import DataLoader

# 데이터 배치 병합 함수 정의
def collator(data, transform):
    images, labels = zip(*data)  # 이미지와 라벨 분리
    pixel_values = torch.stack([transform(image) for image in images])  # 이미지 변환
    labels = torch.tensor([label for label in labels])  # 라벨 텐서화
    return {"pixel_values": pixel_values, "labels": labels}

# 데이터로더 정의
train_dataloader = DataLoader(
    subset_train_dataset,
    batch_size=32,
    shuffle=True,
    collate_fn=lambda x: collator(x, transform),
    drop_last=True
)
valid_dataloader = DataLoader(
    subset_test_dataset,
    batch_size=4,
    shuffle=True,
    collate_fn=lambda x: collator(x, transform),
    drop_last=True
)

# ========================== CvT 모델 정의 및 구조 확인 ==========================
from transformers import CvtForImageClassification

# 사전학습된 CvT 모델 로드
model = CvtForImageClassification.from_pretrained(
    pretrained_model_name_or_path="microsoft/cvt-21",
    num_labels=len(train_dataset.classes),  # 클래스 수 설정
    id2label={idx: label for label, idx in train_dataset.class_to_idx.items()},  # ID-라벨 매핑
    label2id=train_dataset.class_to_idx,  # 라벨-ID 매핑
    ignore_mismatched_sizes=True  # 크기 불일치 무시
)

# 모델 구조 출력
for main_name, main_module in model.named_children():
    print(main_name)
    for sub_name, sub_module in main_module.named_children():
        print("└", sub_name)
        for ssub_name, ssub_module in sub_module.named_children():
            print("   └", ssub_name)
            for sssub_name, sssub_module in ssub_module.named_children():
                print("     └", sssub_name)

# ========================== CvT 모델 스테이지 확인 ==========================
stages = model.cvt.encoder.stages
print(stages[0])  # 첫 번째 스테이지 확인

# ========================== 패치 임베딩 적용 및 출력 ==========================
batch = next(iter(train_dataloader))
print("이미지 차원 :", batch["pixel_values"].shape)

# 패치 임베딩 모듈 적용
patch_emb_output = stages[0].embedding(batch["pixel_values"])
print("패치 임베딩 차원 :", patch_emb_output.shape)

# ========================== 셀프 어텐션 입력 확인 ==========================
batch_size, num_channels, height, width = patch_emb_output.shape
hidden_state = patch_emb_output.view(batch_size, num_channels, height * width).permute(0, 2, 1)
print("셀프 어텐션 입력 차원 :", hidden_state.shape)

# ========================== 셀프 어텐션 출력 확인 ==========================
attention_output = stages[0].layers[0].attention.attention(hidden_state, height, width)
print("셀프 어텐션 출력 차원 :", attention_output.shape)

# 예제 10.24 CvT 모델 실습 (2)

In [None]:
# ============================= 필요한 라이브러리 임포트 =============================
import torch
import evaluate
import numpy as np
from itertools import chain
from collections import defaultdict
from torch.utils.data import Subset
from torchvision import datasets, transforms
from transformers import AutoImageProcessor, CvtForImageClassification
from transformers import TrainingArguments, Trainer
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# =========================== 서브셋 샘플링 함수 정의 ============================
def subset_sampler(dataset, classes, max_len):
    # 클래스별로 max_len만큼 데이터 샘플링
    target_idx = defaultdict(list)  # 클래스별 인덱스를 저장할 딕셔너리
    for idx, label in enumerate(dataset.train_labels):  # 데이터셋 라벨 순회
        target_idx[int(label)].append(idx)

    indices = list(
        chain.from_iterable(
            [target_idx[idx][:max_len] for idx in range(len(classes))]  # 클래스별 최대 max_len 샘플링
        )
    )
    return Subset(dataset, indices)  # Subset 반환

# =========================== 모델 초기화 함수 정의 ============================
def model_init(classes, class_to_idx):
    # CvT 모델 초기화
    model = CvtForImageClassification.from_pretrained(
        pretrained_model_name_or_path="microsoft/cvt-21",
        num_labels=len(classes),  # 클래스 수
        id2label={idx: label for label, idx in class_to_idx.items()},  # ID-라벨 매핑
        label2id=class_to_idx,  # 라벨-ID 매핑
        ignore_mismatched_sizes=True  # 크기 불일치 무시
    )
    return model

# ========================= 데이터 병합 함수 정의 ===========================
def collator(data, transform):
    # 배치 데이터를 변환 및 병합
    images, labels = zip(*data)  # 이미지와 라벨 분리
    pixel_values = torch.stack([transform(image) for image in images])  # 이미지 변환
    labels = torch.tensor([label for label in labels])  # 라벨 텐서화
    return {"pixel_values": pixel_values, "labels": labels}

# =========================== 평가 메트릭 함수 정의 ============================
def compute_metrics(eval_pred):
    metric = evaluate.load("f1")  # F1 스코어 메트릭 로드
    predictions, labels = eval_pred  # 예측값과 라벨 분리
    predictions = np.argmax(predictions, axis=1)  # 가장 높은 값의 인덱스 선택
    macro_f1 = metric.compute(
        predictions=predictions, references=labels, average="macro"  # 매크로 F1 스코어 계산
    )
    return macro_f1

# ============================= 데이터셋 로드 ==============================
train_dataset = datasets.FashionMNIST(root="../datasets", download=True, train=True)
test_dataset = datasets.FashionMNIST(root="../datasets", download=True, train=False)

# 클래스 정보 추출
classes = train_dataset.classes
class_to_idx = train_dataset.class_to_idx

# 서브셋 생성
subset_train_dataset = subset_sampler(
    dataset=train_dataset, classes=train_dataset.classes, max_len=1000
)
subset_test_dataset = subset_sampler(
    dataset=test_dataset, classes=test_dataset.classes, max_len=100
)

# ========================= 이미지 프로세서 및 변환 정의 ===========================
image_processor = AutoImageProcessor.from_pretrained(
    pretrained_model_name_or_path="microsoft/cvt-21"
)

# 이미지 변환 파이프라인 정의
transform = transforms.Compose(
    [
        transforms.ToTensor(),  # 이미지를 텐서로 변환
        transforms.Resize(  # 이미지 크기 조정
            size=(
                image_processor.size["shortest_edge"],  # 짧은 가장자리 크기 사용
                image_processor.size["shortest_edge"]
            )
        ),
        transforms.Lambda(
            lambda x: torch.cat([x, x, x], 0)  # 흑백 이미지를 3채널로 확장
        ),
        transforms.Normalize(  # 정규화
            mean=image_processor.image_mean,
            std=image_processor.image_std
        )
    ]
)

# ============================ 훈련 인자 정의 =============================
args = TrainingArguments(
    output_dir="../models/CvT-FashionMNIST",  # 모델 출력 경로
    save_strategy="epoch",  # 에포크마다 저장
    evaluation_strategy="epoch",  # 에포크마다 평가
    learning_rate=1e-5,  # 학습률
    per_device_train_batch_size=16,  # 훈련 배치 크기
    per_device_eval_batch_size=16,  # 평가 배치 크기
    num_train_epochs=3,  # 에포크 수
    weight_decay=0.001,  # 가중치 감소
    load_best_model_at_end=True,  # 최상의 모델 로드
    metric_for_best_model="f1",  # 평가 메트릭
    logging_dir="logs",  # 로그 경로
    logging_steps=125,  # 로그 출력 간격
    remove_unused_columns=False,  # 불필요한 열 제거
    seed=7  # 랜덤 시드
)

# ============================= Trainer 초기화 ==============================
trainer = Trainer(
    model_init=lambda x: model_init(classes, class_to_idx),  # 모델 초기화
    args=args,  # 훈련 인자
    train_dataset=subset_train_dataset,  # 훈련 데이터
    eval_dataset=subset_test_dataset,  # 평가 데이터
    data_collator=lambda x: collator(x, transform),  # 데이터 병합 함수
    compute_metrics=compute_metrics,  # 메트릭 계산 함수
    tokenizer=image_processor,  # 이미지 프로세서
)

# ============================= 모델 훈련 ==============================
trainer.train()

# ============================= 예측 및 평가 ==============================
outputs = trainer.predict(subset_test_dataset)
print(outputs)

# =========================== 혼동 행렬 시각화 ===========================
y_true = outputs.label_ids  # 실제 라벨
y_pred = outputs.predictions.argmax(1)  # 예측된 라벨

# 혼동 행렬 계산 및 시각화
labels = list(classes)
matrix = confusion_matrix(y_true, y_pred)  # 혼동 행렬 계산
display = ConfusionMatrixDisplay(confusion_matrix=matrix, display_labels=labels)  # 시각화 객체 생성
_, ax = plt.subplots(figsize=(10, 10))  # 플롯 크기 설정
display.plot(xticks_rotation=45, ax=ax)  # 혼동 행렬 시각화
plt.show()