# Library import

In [1]:
# 필요 library들을 import합니다.
import os
from typing import Tuple, Any, Callable, List, Optional, Union

import cv2
import timm
import torch
import numpy as np
import pandas as pd
import albumentations as A
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import models, datasets, transforms
from tqdm.auto import tqdm
from torch.utils.data import DataLoader, Dataset, Subset
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
from albumentations.pytorch import ToTensorV2
from PIL import Image

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# 데이터 캐싱을 위한 전처리 및 저장 스크립트
def cache_dataset_images(info_df, root_dir, cache_dir, image_size=(448, 448)):
    os.makedirs(cache_dir, exist_ok=True)
    transform = transforms.Compose([
        transforms.Resize(image_size),
        transforms.ToTensor(),
    ])
    
    for idx, row in tqdm(info_df.iterrows(), total=len(info_df), desc="Caching images"):
        img_path = os.path.join(root_dir, row['image_path'])
        image = cv2.imread(img_path, cv2.IMREAD_COLOR)
        if image is None:
            print(f"Error loading image: {img_path}")
            continue
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = Image.fromarray(image)
        image = transform(image)  # 이미지 전처리
        
        # 캐시 파일 저장
        cache_path = os.path.join(cache_dir, f"{row['image_path']}.pt")
        os.makedirs(os.path.dirname(cache_path), exist_ok=True)
        torch.save(image, cache_path)

# Dataset Class

In [3]:
# CustomDataset 클래스 수정
class CustomDataset(Dataset):
    def __init__(
        self, 
        cache_dir: str, 
        info_df: pd.DataFrame, 
        transform: Callable = None,
        is_inference: bool = False
    ):
        self.cache_dir = cache_dir  # 캐시된 이미지 경로
        self.transform = transform  # 추가적인 데이터 증강
        self.is_inference = is_inference
        self.image_paths = info_df['image_path'].tolist()
        
        if not self.is_inference:
            self.targets = info_df['target'].tolist()

    def __len__(self) -> int:
        return len(self.image_paths)

    def __getitem__(self, index: int):
        cache_path = os.path.join(self.cache_dir, f"{self.image_paths[index]}.pt")
        image = torch.load(cache_path)  # 캐시된 이미지 로드

        if self.transform:
            image = self.transform(image)  # 추가적인 데이터 증강 적용

        if self.is_inference:
            return image
        else:
            target = self.targets[index]
            return image, target

# Transform Class

In [4]:
# 이미지 변환 설정 (텐서에 적용되는 변환)
class TensorTransform:
    def __init__(self, is_train: bool = True):
        if is_train:
            self.transform = transforms.Compose([
                transforms.RandomHorizontalFlip(p=0.5),
                transforms.RandomRotation(15),
                transforms.ColorJitter(brightness=0.2, contrast=0.2),
                # 필요에 따라 추가 변환
            ])
        else:
            self.transform = transforms.Compose([
                # 검증용 변환 (필요한 경우)
            ])

    def __call__(self, image: torch.Tensor) -> torch.Tensor:
        return self.transform(image)

In [5]:
class AlbumentationsTransform:
    def __init__(self, is_train: bool = True):
        # 공통 변환 설정: 이미지 리사이즈, 정규화, 텐서 변환
        common_transforms = [
            A.Resize(448, 448),  # 이미지를 224x224 크기로 리사이즈
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # 정규화
            ToTensorV2()  # albumentations에서 제공하는 PyTorch 텐서 변환
        ]
        
        if is_train:
            # 훈련용 변환: 랜덤 수평 뒤집기, 랜덤 회전, 랜덤 밝기 및 대비 조정 추가
            self.transform = A.Compose(
                [
                    A.HorizontalFlip(p=0.5),  # 50% 확률로 이미지를 수평 뒤집기
                    A.Rotate(limit=15),  # 최대 15도 회전
                    A.RandomBrightnessContrast(p=0.2),  # 밝기 및 대비 무작위 조정
                ] + common_transforms
            )
        else:
            # 검증/테스트용 변환: 공통 변환만 적용
            self.transform = A.Compose(common_transforms)

    def __call__(self, image) -> torch.Tensor:
        # 이미지가 NumPy 배열인지 확인
        if not isinstance(image, np.ndarray):
            raise TypeError("Image should be a NumPy array (OpenCV format).")
        
        # 이미지에 변환 적용 및 결과 반환
        transformed = self.transform(image=image)  # 이미지에 설정된 변환을 적용
        
        return transformed['image']  # 변환된 이미지의 텐서를 반환

In [6]:
class TransformSelector:
    """
    이미지 변환 라이브러리를 선택하기 위한 클래스.
    """
    def __init__(self, transform_type: str):

        # 지원하는 변환 라이브러리인지 확인
        if transform_type in ["torchvision", "albumentations"]:
            self.transform_type = transform_type
        
        else:
            raise ValueError("Unknown transformation library specified.")

    def get_transform(self, is_train: bool):
        
        # 선택된 라이브러리에 따라 적절한 변환 객체를 생성
        if self.transform_type == 'torchvision':
            transform = TorchvisionTransform(is_train=is_train)
        
        elif self.transform_type == 'albumentations':
            transform = AlbumentationsTransform(is_train=is_train)
        
        return transform

# Model Class

In [7]:
class SimpleCNN(nn.Module):
    """
    간단한 CNN 아키텍처를 정의하는 클래스.
    """
    def __init__(self, num_classes: int):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(128 * 4 * 4, 512)
        self.fc2 = nn.Linear(512, num_classes)
        self.relu = nn.ReLU()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        
        # 순전파 함수 정의
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = self.pool(self.relu(self.conv3(x)))
        x = torch.flatten(x, 1)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        
        return x

In [8]:
class TorchvisionModel(nn.Module):
    """
    Torchvision에서 제공하는 사전 훈련된 모델을 사용하는 클래스.
    """
    def __init__(
        self, 
        model_name: str, 
        num_classes: int, 
        pretrained: bool
    ):
        super(TorchvisionModel, self).__init__()
        self.model = models.__dict__[model_name](pretrained=pretrained)
        
        # 모델의 최종 분류기 부분을 사용자 정의 클래스 수에 맞게 조정
        if 'fc' in dir(self.model):
            num_ftrs = self.model.fc.in_features
            self.model.fc = nn.Linear(num_ftrs, num_classes)
        
        elif 'classifier' in dir(self.model):
            num_ftrs = self.model.classifier[-1].in_features
            self.model.classifier[-1] = nn.Linear(num_ftrs, num_classes)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        
        return self.model(x)

In [9]:
class TimmModel(nn.Module):
    """
    Timm 라이브러리를 사용하여 다양한 사전 훈련된 모델을 제공하는 클래스.
    """
    def __init__(
        self, 
        model_name: str, 
        num_classes: int, 
        pretrained: bool
    ):
        super(TimmModel, self).__init__()
        self.model = timm.create_model(
            model_name, 
            pretrained=pretrained, 
            num_classes=num_classes
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        
        return self.model(x)

In [10]:
class ModelSelector:
    """
    사용할 모델 유형을 선택하는 클래스.
    """
    def __init__(
        self, 
        model_type: str, 
        num_classes: int, 
        **kwargs
    ):
        
        # 모델 유형에 따라 적절한 모델 객체를 생성
        if model_type == 'simple':
            self.model = SimpleCNN(num_classes=num_classes)
        
        elif model_type == 'torchvision':
            self.model = TorchvisionModel(num_classes=num_classes, **kwargs)
        
        elif model_type == 'timm':
            self.model = TimmModel(num_classes=num_classes, **kwargs)
        
        else:
            raise ValueError("Unknown model type specified.")

    def get_model(self) -> nn.Module:

        # 생성된 모델 객체 반환
        return self.model

# Loss Class

In [11]:
class Loss(nn.Module):
    """
    모델의 손실함수를 계산하는 클래스.
    """
    def __init__(self):
        super(Loss, self).__init__()
        self.loss_fn = nn.CrossEntropyLoss()

    def forward(
        self, 
        outputs: torch.Tensor, 
        targets: torch.Tensor
    ) -> torch.Tensor:
    
        return self.loss_fn(outputs, targets)

# Trainer Class

In [12]:
class EarlyStopping:
    def __init__(self, patience=3, min_delta=0):
        """
        Args:
            patience (int): 개선이 없을 때 몇 에포크를 기다릴지
            min_delta (float): 성능이 개선되었다고 판단하는 최소 변화량
        """
        self.patience = patience  # 개선되지 않아도 기다리는 최대 에포크 수
        self.min_delta = min_delta  # 성능 개선이 없다고 판단하는 최소 변화량
        self.counter = 0  # 개선되지 않은 에포크 수 카운트
        self.best_loss = None  # 검증 손실의 최저값
        self.early_stop = False  # 중지 플래그
    
    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss  # 첫 번째 에포크의 손실 저장
        elif val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss  # 손실이 개선되면 갱신
            self.counter = 0  # 카운터 초기화
        else:
            self.counter += 1  # 손실이 개선되지 않으면 카운터 증가
            if self.counter >= self.patience:
                self.early_stop = True  # patience를 초과하면 학습 중지

In [13]:
from torch.cuda.amp import autocast, GradScaler  # Mixed precision
import os

class Trainer:
    def __init__(
        self, 
        model: nn.Module, 
        device: torch.device, 
        train_loader: DataLoader, 
        val_loader: DataLoader, 
        optimizer: optim.Optimizer,
        scheduler: optim.lr_scheduler,
        loss_fn: torch.nn.modules.loss._Loss, 
        epochs: int,
        result_path: str,
        patience: int = 5,  # Early Stopping patience 추가
        min_delta: float = 0.01  # Early Stopping min_delta 추가
    ):
        # 클래스 초기화: 모델, 디바이스, 데이터 로더 등 설정
        self.model = model  # 훈련할 모델
        self.device = device  # 연산을 수행할 디바이스 (CPU or GPU)
        self.train_loader = train_loader  # 훈련 데이터 로더
        self.val_loader = val_loader  # 검증 데이터 로더
        self.optimizer = optimizer  # 최적화 알고리즘
        self.scheduler = scheduler  # 학습률 스케줄러
        self.loss_fn = loss_fn  # 손실 함수
        self.epochs = epochs  # 총 훈련 에폭 수
        self.result_path = result_path  # 모델 저장 경로
        self.best_models = []  # 가장 좋은 상위 3개 모델의 정보를 저장할 리스트
        self.lowest_loss = float('inf')  # 가장 낮은 Loss를 저장할 변수
        self.early_stopping = EarlyStopping(patience=patience, min_delta=min_delta)  # EarlyStopping 초기화
         
        # 가장 높은 validation accuracy와 그에 해당하는 epoch을 기록
        self.best_epoch = 0
        self.best_val_accuracy = 0.0

    def save_model(self, epoch, loss):
        # 모델 저장 경로 설정
        os.makedirs(self.result_path, exist_ok=True)

        # 현재 에폭 모델 저장
        current_model_path = os.path.join(self.result_path, f'model_epoch_{epoch}_loss_{loss:.4f}.pt')
        torch.save(self.model.state_dict(), current_model_path)

        # 최상위 3개 모델 관리
        self.best_models.append((loss, epoch, current_model_path))
        self.best_models.sort()
        if len(self.best_models) > 3:
            _, _, path_to_remove = self.best_models.pop(-1)  # 가장 높은 손실 모델 삭제
            if os.path.exists(path_to_remove):
                os.remove(path_to_remove)

        # 가장 낮은 손실의 모델 저장
        if loss < self.lowest_loss:
            self.lowest_loss = loss
            best_model_path = os.path.join(self.result_path, 'best_model.pt')
            torch.save(self.model.state_dict(), best_model_path)
            print(f"Save {epoch}epoch result. Loss = {loss:.4f}")

    def train_epoch(self) -> tuple:
        # 한 에폭 동안의 훈련을 진행
        self.model.train()
        
        total_loss = 0.0
        correct = 0
        total = 0
        progress_bar = tqdm(self.train_loader, desc="Training", leave=False)
        scaler = GradScaler()  # AMP를 위한 GradScaler 객체 생성

        for images, targets in progress_bar:
            images, targets = images.to(self.device), targets.to(self.device)
            self.optimizer.zero_grad()

            # autocast 컨텍스트 내에서 모델을 실행하여 정밀도를 관리
            with autocast():
                outputs = self.model(images)
                loss = self.loss_fn(outputs, targets)

            # 스케일링된 손실을 사용하여 역전파 실행
            scaler.scale(loss).backward()

            # 스케일러를 사용해 가중치를 업데이트
            scaler.step(self.optimizer)
            scaler.update()

            # 학습률 스케줄러 업데이트
            self.scheduler.step()

            total_loss += loss.item()

            # 정확도 계산
            _, predicted = outputs.max(1)
            correct += predicted.eq(targets).sum().item()
            total += targets.size(0)

            progress_bar.set_postfix(loss=loss.item())

        # 전체 정확도 계산
        train_accuracy = 100.0 * correct / total
        return total_loss / len(self.train_loader), train_accuracy

    def validate(self) -> tuple:
        # 모델의 검증을 진행
        self.model.eval()
        
        total_loss = 0.0
        correct = 0
        total = 0
        progress_bar = tqdm(self.val_loader, desc="Validating", leave=False)
        
        with torch.no_grad():
            for images, targets in progress_bar:
                images, targets = images.to(self.device), targets.to(self.device)
                outputs = self.model(images)
                loss = self.loss_fn(outputs, targets)
                total_loss += loss.item()

                # 정확도 계산
                _, predicted = outputs.max(1)
                correct += predicted.eq(targets).sum().item()
                total += targets.size(0)

                progress_bar.set_postfix(loss=loss.item())

        # 전체 정확도 계산
        val_accuracy = 100.0 * correct / total

        # validation accuracy가 더 높으면 best_epoch 업데이트
        if val_accuracy > self.best_val_accuracy:
            self.best_val_accuracy = val_accuracy
            self.best_epoch = self.current_epoch  # best epoch 저장
        
        return total_loss / len(self.val_loader), val_accuracy

    def train(self) -> None:
        # 전체 훈련 과정을 관리
        for epoch in range(self.epochs):
            self.current_epoch = epoch + 1  # 현재 epoch을 기록
            print(f"Epoch {epoch+1}/{self.epochs}")

            train_loss, train_accuracy = self.train_epoch()
            val_loss, val_accuracy = self.validate()

            print(f"Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%")
            print(f"Epoch {epoch+1}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%\n")

            self.save_model(epoch, val_loss)

            # Early Stopping 조건 확인
            self.early_stopping(val_loss)
            if self.early_stopping.early_stop:
                print("Early stopping triggered. Stopping training...")
                break

            self.scheduler.step()
    
    def get_best_epoch(self):
        # 가장 높은 validation accuracy를 기록한 epoch 반환
        return self.best_epoch


# Model Training

In [14]:
# 학습에 사용할 장비를 선택.
# torch라이브러리에서 gpu를 인식할 경우, cuda로 설정.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [15]:
# 데이터 경로 설정
traindata_info_file = "/data/ephemeral/home/data/train.csv"
cache_dir = "/data/ephemeral/home/data/cache"  # 캐시된 데이터 저장 경로
save_result_path = "/data/ephemeral/home/youngtae/model2/total_data_training_result"

In [16]:
# 학습 데이터의 class, image path, target에 대한 정보가 들어있는 csv파일을 읽기.
train_info = pd.read_csv(traindata_info_file)

# 총 class의 수를 측정.
num_classes = len(train_info['target'].unique())

# 데이터 캐싱 수행 (한 번만 실행)
# cache_dataset_images(train_info, traindata_dir, cache_dir, image_size=(448, 448))

In [17]:
# K-Fold 설정
k_folds = 5
kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

In [18]:
# 모델, 손실 함수, 옵티마이저 등 설정
model_selector = ModelSelector(
    model_type='timm', 
    num_classes=num_classes,
    model_name='eva02_large_patch14_448.mim_m38m_ft_in22k_in1k', 
    pretrained=True
)
model = model_selector.get_model()
model.to(device)

loss_fn = Loss()

In [19]:
# Fold별로 가장 높은 Validation Accuracy를 기록한 epoch들을 저장할 리스트
best_epochs = []

# 전체 학습 루프 (K-Fold Cross Validation 적용)
for fold, (train_idx, val_idx) in enumerate(kf.split(train_info)):
    print(f'Fold {fold+1}/{k_folds}')
    
    train_df = train_info.iloc[train_idx].reset_index(drop=True)
    val_df = train_info.iloc[val_idx].reset_index(drop=True)
    
    # Transform 설정
    train_transform = TensorTransform(is_train=True)
    val_transform = TensorTransform(is_train=False)
    
    # Dataset 및 DataLoader 생성
    train_dataset = CustomDataset(
        cache_dir=cache_dir,
        info_df=train_df,
        transform=train_transform,
        is_inference=False
    )
    val_dataset = CustomDataset(
        cache_dir=cache_dir,
        info_df=val_df,
        transform=val_transform,
        is_inference=False
    )

    train_loader = DataLoader(
        train_dataset, 
        batch_size=64, 
        shuffle=True,
        num_workers=4,
        pin_memory=True
    )
    val_loader = DataLoader(
        val_dataset, 
        batch_size=64, 
        shuffle=False,
        num_workers=4,
        pin_memory=True
    )

    # 모델 초기화 (각 폴드마다 새로운 모델 생성)
    model_selector = ModelSelector(
        model_type='timm', 
        num_classes=num_classes,
        model_name='eva02_large_patch14_448.mim_m38m_ft_in22k_in1k', 
        pretrained=True  # 사전 학습된 모델을 다시 로드
    )
    model = model_selector.get_model()
    model.to(device)  # 모델을 디바이스에 할당 (GPU/CPU)

    # 옵티마이저 및 스케줄러 설정 (폴드마다 새로 설정)
    optimizer = optim.AdamW(
        model.parameters(), 
        lr=0.001
    )

    scheduler_step_size = 30
    scheduler_gamma = 0.1
    steps_per_epoch = len(train_loader)
    epochs_per_lr_decay = 2
    scheduler_step_size = steps_per_epoch * epochs_per_lr_decay

    scheduler = optim.lr_scheduler.StepLR(
        optimizer, 
        step_size=scheduler_step_size, 
        gamma=scheduler_gamma
    )

    # 모델의 모든 파라미터를 동결
    for name, param in model.named_parameters():
        param.requires_grad = False

    # 특정 레이어만 학습되도록 설정
    for name, param in model.named_parameters():
        if 'blocks.23' in name or 'head' in name:
            param.requires_grad = True

    # Trainer 설정
    trainer = Trainer(
        model=model, 
        device=device, 
        train_loader=train_loader,
        val_loader=val_loader, 
        optimizer=optimizer,
        scheduler=scheduler,
        loss_fn=loss_fn, 
        epochs=50,
        result_path= None # 기록을 남기지 않도록 경로를 None으로 설정
    )

    # 학습 실행
    trainer.train()

    # 각 Fold에서 Validation Accuracy가 가장 높았던 epoch 기록
    best_epoch = trainer.get_best_epoch()  # 가정: trainer가 가장 성능 좋은 epoch을 반환하는 메서드가 있다고 가정
    best_epochs.append(best_epoch)

    print(f'Fold {fold+1} completed. Best Epoch: {best_epoch}\n')

# Fold별 Best Epoch들의 평균 계산
average_best_epoch = sum(best_epochs) // len(best_epochs)
print(f"Average Best Epoch across all folds: {average_best_epoch}")

# 전체 Train + Validation 데이터를 결합하여 새로운 데이터셋 생성
full_train_df = train_info  # Train + Validation 합친 데이터

# Transform 설정
full_train_transform = TensorTransform(is_train=True)

# 전체 Dataset 및 DataLoader 생성
full_train_dataset = CustomDataset(
    cache_dir=cache_dir,
    info_df=full_train_df,
    transform=full_train_transform,
    is_inference=False
)

full_train_loader = DataLoader(
    full_train_dataset, 
    batch_size=64, 
    shuffle=True,
    num_workers=4,
    pin_memory=True
)

# 모델 다시 초기화
model_selector = ModelSelector(
    model_type='timm', 
    num_classes=num_classes,
    model_name='eva02_large_patch14_448.mim_m38m_ft_in22k_in1k', 
    pretrained=True  # 사전 학습된 모델을 다시 로드
)
model = model_selector.get_model()
model.to(device)  # 모델을 디바이스에 할당 (GPU/CPU)

# 옵티마이저 및 스케줄러 재설정
optimizer = optim.AdamW(
    model.parameters(), 
    lr=0.001
)

scheduler_step_size = 30
scheduler_gamma = 0.1
steps_per_epoch = len(full_train_loader)
epochs_per_lr_decay = 2
scheduler_step_size = steps_per_epoch * epochs_per_lr_decay

scheduler = optim.lr_scheduler.StepLR(
    optimizer, 
    step_size=scheduler_step_size, 
    gamma=scheduler_gamma
)

# 모델의 모든 파라미터를 동결
for name, param in model.named_parameters():
    param.requires_grad = False

# 특정 레이어만 학습되도록 설정
for name, param in model.named_parameters():
    if 'blocks.23' in name or 'head' in name:
        param.requires_grad = True

# Trainer 설정 (전체 데이터로 학습)
final_trainer = Trainer(
    model=model, 
    device=device, 
    train_loader=full_train_loader,
    val_loader=None,  # 전체 데이터를 학습에 사용하므로 검증은 하지 않음
    optimizer=optimizer,
    scheduler=scheduler,
    loss_fn=loss_fn, 
    epochs=average_best_epoch,  # 평균 Best Epoch 만큼 학습
    result_path=os.path.join(save_result_path, "final_model")
)

# 전체 데이터로 학습 실행
final_trainer.train()

print(f"Training completed for {average_best_epoch} epochs on full dataset.")


Fold 1/5
Epoch 1/50


                                                                       

Epoch 1, Train Loss: 1.1096, Train Accuracy: 78.95%
Epoch 1, Validation Loss: 0.4999, Validation Accuracy: 86.92%

Save 0epoch result. Loss = 0.4999
Epoch 2/50


                                                                        

KeyboardInterrupt: 

# Inference

In [101]:
# 추론 코드 (필요한 경우 캐싱된 테스트 데이터 사용)
testdata_dir = "/data/ephemeral/home/data/cache"
testdata_info_file = "/data/ephemeral/home/data/test.csv"

test_info = pd.read_csv(testdata_info_file)

# 테스트 데이터 캐싱 (한 번만 실행)
# cache_dataset_images(test_info, testdata_dir, cache_dir, image_size=(448, 448))

# 테스트 데이터셋 및 데이터 로더 생성
test_dataset = CustomDataset(
    cache_dir=cache_dir,
    info_df=test_info,
    transform=None,  # 추론 시 추가적인 변환이 필요 없다면 None
    is_inference=True
)

test_loader = DataLoader(
    test_dataset, 
    batch_size=64, 
    shuffle=False,
    num_workers=4,
    pin_memory=True
)

Caching images: 100%|██████████| 10014/10014 [02:39<00:00, 62.71it/s]


In [102]:
# 모델 로드 (각 폴드의 최적 모델을 앙상블하거나 선택)
# 여기서는 예시로 마지막 폴드의 모델을 로드합니다.
model.load_state_dict(
    torch.load(
        os.path.join(save_result_path, f"fold_{k_folds}", "best_model.pt"),
        map_location=device
    )
)

<All keys matched successfully>

In [103]:
# 추론 함수 정의
def inference(
    model: nn.Module, 
    device: torch.device, 
    test_loader: DataLoader
):
    model.to(device)
    model.eval()
    
    predictions = []
    with torch.no_grad():
        for images in tqdm(test_loader, desc="Inference"):
            images = images.to(device)
            outputs = model(images)
            logits = F.softmax(outputs, dim=1)
            preds = logits.argmax(dim=1)
            predictions.extend(preds.cpu().numpy())
    
    return predictions

# 추론 실행
predictions = inference(
    model=model, 
    device=device, 
    test_loader=test_loader
)


Inference: 100%|██████████| 157/157 [10:30<00:00,  4.02s/it]


In [104]:
# 결과 저장
test_info['target'] = predictions
test_info = test_info.reset_index().rename(columns={"index": "ID"})
test_info.to_csv("output.csv", index=False)

: 