In [None]:
from typing import Tuple, Any, Callable, List, Optioanl, Union
#Any랑 List랑 Optional은 필요없는건가?
import cv2
from torch.utils.data import Dataset
import pandas as pd
import torch
import os

class CustomDataset(Dataset):

    def __init__(
        self,
        root_dir: str,
        info_df: pd.DataFrame,
        transform: Callable, 
        is_inference: bool = False
    ):
        #데이터셋의 기본 경로, 이미지 변환 방법, 이미지 경로 및 레이블을 초기화한다.
        self.root_dir = root_dir # 이미지 파일들이 저장된 기본 디렉토리
        self.transform = transform # 이미지에 적용할 변환 처리
        self.is_inference = is_inference # 추론을 할 것인지 확인한다.
        self.image_paths = info_df['image_path'].tolist() # 이미지 파일 경로 목록

        if not self.is_inference:
            self.targets = info_df['target'].tolist() # 각 이미지에 대한 레이블 목록

    def __len__(self) -> int:
        # 데이터셋의 총 이미지 수를 반환
        return len(self.image_paths)
    
    def __getitem__(self, index: int) -> Union[Tuple[torch.Tensor, int], torch.Tensor]:
        '''
        특별메서드로, 객체에 대해서 []를 사용할 때 자동으로 호출된다.
        index에는 int 값이 들어간다.
        Union에는 여러 가능한 반환 타입 중 하나를 나타낸다.
        Tuple[torch.Tensor, int]: torch.Tensor나 int 값을 반환할 수 있다.
        torch.Tensor: 단일로 Tensor만 반환할 수 있다.
        
        정리하면, Union은 Tuple[torch.Tensor, int]를 반환하거나, torch.Tensor을 반환한다.
        '''
        img_path = os.path.join(self.root_dir, self.image_paths[index]) # 인덱스별 파일의 이미지 경로 위치를 구한다.
        image = cv2.imread(img_path, cv2.IMREAD_COLOR) # 이미지를 BGR 컬러 포맷의 numpy array로 읽어온다.
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # BGE 포맷을 RGB 포맷으로 변환
        image = self.transform(image) # 설정한 transform으로 이미지 변환

        if self.is_inference:
            return image #torch.Tensor 형태
        else:
            target = self.targets[index] # 해당 이미지의 레이블
            return image, target # 변환된 이미지와 레이블을 Tuple[torch.Tensor, int]이런 형태로 반환


transform.py

In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np
import torch

class AlbumentationsTransform:
    def __init__(self, is_train: bool=True, transform_config: str=None):
        # 공통 변환 설정: 리사이즈, 정규화, 텐서 변환 (train이랑 test 둘 다)
        common_transforms = [
            A.resize(224,224),
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.224]),
            ToTensorV2()
        ]

        if is_train:
            #훈련용 변환
            self.transform = A.Compose(self._create_augmentations(transform_config) + common_transforms)
        else:
            # 검증, 테스트 용 변환
            self.transform = A.Compose(common_transforms)

    def _create_augmentations(self, augmentations):
        # 입력 받은 augmentations로 인자를 list 형태로 변환
        aug_list = []
        for aug in augmentations:
            aug_class = getattr(A, aug["type"]) # A.HorizontalFlip이나, A.Rotate로 만들어준다.
            
            # 객체를 생성해서 list에 추가.
            aug_list.append(aug_class(**aug["params"])) # A.HorizontalFlip()매개 변수 적용
        return aug_list

    def __call__(self, image) -> torch.Tensor:
        #이미지가 numpy 배열인지 확인
        if not isinstance(image, np.ndarray):
            raise TypeError("Image should be a Numpy array (OpenCV format).")
        
        transformed = self.transform(image=image)

        # 결과는 딕셔너리로 반환 됨.
        return transformed['image'] # 이미지 외에 마스크나 바운딩 박스도 넣을 수 있다.

class TransformSelector:
    '''
    이미지 변환 라이브러리를 선택하기 위한 클래스
    '''
    def __init__(self, transform_type: str, transform_config: str=None):

        # 지원하는 라이브버리인지 확인
        if transform_type in ['albumentations']:
            self.transform_type = transform_type
            self.tranform_config = transform_config
        else:
            raise ValueError("Unknown transformation library specified")
        
    def get_transform(self, is_train: bool):
        
        #선택한 라이브러리에 따라 적절한 변환 객체를 생성
        if self.transform_type == 'albumentations':
            transform = AlbumentationsTransform(is_train=is_train, transform_config=self.transform_config)

        return transform



loss.py

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CELoss(nn.Module):

    # super().__init__()와 같지만, 좀 더 명시적인 표현
    def __init__(self):
        super(CELoss, self).__init__()
        self.loss_fn = nn.CrossEntropyLoss()

    def forward(
            self,
            outputs: torch.Tensor,
            targets: torch.Tensor,
    ) -> torch.Tensor:
        
        return self.loss_fn(outputs, targets)

class SmoothingLoss(nn.Module):
    
    def __init__(self):
        super(SmoothingLoss, self).__init__()
        self.loss_fn = nn.CrossEntropyLoss(label_smoothing=0.1)

    def forward(
            self,
            outputs: torch.Tensor,
            targets: torch.Tensor,
    ) -> torch.Tensor:
        
        return self.loss_fn(outputs, targets)
    
class FocalLoss(nn.Module):
    
    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        '''
        alpha: 클래스 불균형 조정을 위한 가중치
        gamma: 
        reduction: 손실 값을 처리하는 방식
        '''
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, outputs, targets):

        # Cross entropy loss 계산
        BCE_loss = F.cross_entropy(outputs, targets, reduction='none', label_smoothing=0.1)

        # 예측 확률을 계산
        pt = torch.exp(-BCE_loss)

        # Focal Loss 계산
        F_loss = self.alpha * (1 - pt) ** self.gamma * BCE_loss

        if self.reduction == 'mean':
            return F_loss.mean()
        elif self.reduction == 'sum':
            return F_loss.sum()
        else:
            # none, 샘블별 손실을 유지하면서 반환
            # 샘플별로, 상위 10% 손실값만 사용하는 등의 활용이 가능하다.
            return F_loss



model.py

In [None]:
import timm
import torch.nn as nn
import torch
from torchvision import models

class TorchvisionModel(nn.Module):
    '''
    Torchvision에서 제공하는 사전 훈련된 모델을 사용하는 클래스
    '''
    def __init__(
            self,
            model_name: str,
            num_classes: int,
            pretrained: bool
    ):
        super(TorchvisionModel, self).__init__()
        #model.(원하는 모델)(파라미터)를 가지고 옴.
        self.model = models.__dict__[model_name](pretrained=pretrained)
        

        # 모델의 최종 분류기 부분을 사용자 정의 클래스 수에 맞게 조정
        #'fc' (Fully Connected): ResNet, VGG 등의 모델에서 주로 사용
        # dir: 객체의 모든 속성과 메서드 이름을 리스트로 반환
        if 'fc' in dir(self.model):
            # self.model.fc는 모델의 마지막 입력 특성 수
            # self.model.fc.in_features는 이 계층의 입력 특성 수
            num_ftrs = self.model.fc.in_features
            # 마지막 계층의 outputs을 클래스 수에 맞게 조정
            self.model.fc = nn.Linear(num_ftrs, num_classes)

        #'classifier': DenseNet, MobileNet 등의 일부 모델에서 사용
        elif 'classifier' in dir(self.model):
            # [-1]로 마지막 계층으로 이동
            num_ftrs = self.model.classifier[-1].infeatures
            self.model.classifier[-1] = nn.Linear(num_ftrs, num_classes)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.model(x)


class SimpleCNN(nn.Module):
    """
    간단한 CNN 아키텍처를 정의하는 클래스.
    """
    def __init__(self, num_classes: int):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(128 * 4 * 4, 512)
        self.fc2 = nn.Linear(512, num_classes)
        self.relu = nn.ReLU()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        
        # 순전파 함수 정의
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = self.pool(self.relu(self.conv3(x)))
        x = torch.flatten(x, 1)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        
        return x

class TimmModel(nn.Module):
    
    def __init__(
            self,
            model_name: str,
            num_classes: int,
            pretrained: bool
    ):
        super(TimmModel, self).__init__()
        self.model = timm.create_model(
            model_name,
            pretrained=pretrained,
            num_claess=num_classes
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:

        return self.model(x)
    
class ModelSelector:

    def __init__(
            self,
            model_type: str,
            num_classes: int,
            **kwargs
    ):
        # 모델 유형에 따라 적절한 모델 객체를 생성
        if model_type == 'simple':
            self.model = SimpleCNN(num_classes=num_classes)

        elif model_type == 'torchvision':
            self.model = TorchvisionModel(num_classes=num_classes, **kwargs)
        elif model_type == 'timm':
            self.model = TimmModel(num_classes=num_classes, **kwargs)
        else:
            raise ValueError("Unknown model type specified")
                 
    def get_model(self) -> nn.Module:

        return self.model



trainer.py

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import torch.optim as optim
import os
import shutil
from tqdm.auto import tqdm
from torch.utils.tensorboard import SummaryWriter
from torch.cuda.amp import autocast, GradScaler

class Trainer:
    def __init__(
        self,       
        model: nn.Module,
        device: torch.device,
        train_loader: DataLoader,
        val_loader: DataLoader,
        optimizer: optim.optimizer,
        scheduler: optim.lr_scheduler,
        loss_fn: torch.nn.modules.loss._Loss,
        epochs: int,
        result_path: str,
        exp_name: str,
        patience: int,
        min_delta: str,
        mine_delta: float,
        config_path: str
    ):
        # 클래스 초기화: 모델, 디바이스, 데이터 로더 등 설정
        self.model = model  # 훈련할 모델
        self.device = device  # 연산을 수행할 디바이스 (CPU or GPU)
        self.train_loader = train_loader  # 훈련 데이터 로더
        self.val_loader = val_loader  # 검증 데이터 로더
        self.optimizer = optimizer  # 최적화 알고리즘
        self.scheduler = scheduler # 학습률 스케줄러
        self.loss_fn = loss_fn  # 손실 함수
        self.epochs = epochs  # 총 훈련 에폭 수
        # ES 조기종료를 위한 추가
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = float('inf')
    
        # 저장경로 생성
        self.result_path = os.path.join(result_path, exp_name)
        os.makedir(self.result_path, exist_ok=False)

        shutil.copyfile(config_path, os.path.join(self.result_path, 'config.yaml'))

        # 텐서보드의 summaryWriter 설정
        self.weriter = SummaryWriter(log_dir=self.result_path)

        self.best_models = [] # 상위 3개의 모델 정보를 저장하는 리스트
        self.lowest_loss = float('inf') # 가장 낮은 Loss를 저장할 변수


    def save_model(self, epoch, loss):
        # 모델 저장 경로 설정
        current_model_path = os.path.join(self.result_path, f'model_epoch_{epoch}_loss_{loss:.4f}.pt')
        torch.save(self.model.state_dict(), current_model_path)
        
        # 최상위 3개 모델 관리
        self.best_models.append((loss, epoch, current_model_path))
        self.best_models.sort()
        if len(self.best_models) > 3:
            _, _, path_to_remove = self.best_models.pop(-1)  # 가장 loss가 높은 모델 삭제
            if os.path.exists(path_to_remove):
                os.remove(path_to_remove)
        
        # 가장 낮은 손실의 모델 저장
        if loss < self.lowest_loss:
            self.lowest_loss = loss
            best_model_path = os.path.join(self.result_path, 'best_model.pt')
            torch.save(self.model.state_dict(), best_model_path)
            print(f'save {epoch} epoch result. Loss = {loss:.4f}')

    def train_epoch(self) -> float:
        
        # 한 에폭 동안의 훈련을 관리
        self.model.train()
        
        total_loss = 0.0
        #tqdm으로 훈련과정을 시각화 
        # self.train_loader: 각 배치의 진행 상황
        # desc: Traning이라는 설명 추가
        # 훈련과정이 프로그레스바 삭제
        progress_bar = tqdm(self.train_loader, desc="Traning", leave=False)
        # AMP를 위한 Gradscaler 객체 생성
        scaler = torch.amp.Gradscaler('cuda')

        for batch_idx, (images, targets) in enumerate(progress_bar):
            images, targets = images.to(self.device), targets.to(self.device)
            # optimizer에 등록된 모든 파라미터의 gradient를 초기화
            self.optimizer.zero_grad()

            # AMP로, 여기서만 float16으로 계산
            with torch.amp.autocast('cuda'):
                outputs = self.model(images)
                loss = self.loss_fn(outputs, targets)

            scaler.scale(loss).backward()
            scaler.step(self.optimizer)
            scaler.update()
            self.scheduler.step()
            total_loss += loss.item()
            progress_bar.set_postfix(loss=loss.item())

        return total_loss / len(self.trainloader)

