In [None]:
#필요한 라이브러리 임포트
 
import os
import time
import random
import timm
import torch
import albumentations as A
import pandas as pd
import numpy as np
import torch.nn as nn
from albumentations.pytorch import ToTensorV2
from torch.optim import Adam
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from torch.utils.data import random_split
import torch.nn.functional as F
from torch.optim.lr_scheduler import StepLR
from sklearn.metrics import f1_score
import cv2

In [None]:
# 랜덤 시드 고정: 결과 재현성을 위해 시드를 고정

SEED = 42
os.environ['PYTHONHASHSEED'] = str(SEED) # 해시 시드 고정
random.seed(SEED) # 파이썬 랜덤 시드 고정
np.random.seed(SEED) # NumPy 랜덤 시드 고정
torch.manual_seed(SEED) # PyTorch CPU 시드 고정
torch.cuda.manual_seed(SEED) # PyTorch CUDA 시드 고정
torch.cuda.manual_seed_all(SEED) # 모든 GPU에 대한 PyTorch CUDA 시드 고정
torch.backends.cudnn.benchmark = True # CUDA의 성능 최적화

In [None]:
#라벨 스무딩

class LabelSmoothingLoss(nn.Module):
    def __init__(self, classes, smoothing=0.0, dim=-1):
        super(LabelSmoothingLoss, self).__init__()
        self.confidence = 1.0 - smoothing
        self.smoothing = smoothing
        self.cls = classes
        self.dim = dim
    def forward(self, pred, target):
        pred = pred.log_softmax(dim=self.dim)
        with torch.no_grad():
            # true_dist = pred.data.clone()
            true_dist = torch.zeros_like(pred)
            true_dist.fill_(self.smoothing / (self.cls - 1))
            true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
        return torch.mean(torch.sum(-true_dist * pred, dim=self.dim))

In [None]:
# 데이터셋 클래스를 정의

class ImageDataset(Dataset):
    def __init__(self, csv, path, transform=None):
        self.df = pd.read_csv(csv).values # CSV 파일에서 데이터 로드
        self.path = path # 이미지 파일 경로
        self.transform = transform # 데이터 변환 함수

    def __len__(self):
        return len(self.df) # 데이터셋의 길이 반환

    def __getitem__(self, idx):
        name, target = self.df[idx] # 이미지 이름과 타겟 레이블 추출
        img = np.array(Image.open(os.path.join(self.path, name))) # 이미지 로드
        if self.transform:
            img = self.transform(image=img)['image'] # 변환 적용
        return img, target # 이미지와 레이블 반환



In [None]:
# 이미지 크기 설정
img_size = 224

# augmentation을 위한 transform 코드 (훈련 데이터용)
trn_transform = A.Compose([
    A.Resize(height=img_size, width=img_size),  # 이미지 크기 조정
    A.PadIfNeeded(min_height=600, min_width=600, border_mode=cv2.BORDER_CONSTANT, value=(0, 0, 0), p=1.0),  # 필요시 패딩 추가
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # 정규화
    ToTensorV2(),  # PyTorch 텐서로 변환
])

# test image 변환을 위한 transform 코드 (검증 데이터용)
tst_transform = A.Compose([
    A.Resize(height=img_size, width=img_size),  # 이미지 크기 조정
    A.PadIfNeeded(min_height=600, min_width=600, border_mode=cv2.BORDER_CONSTANT, value=(0, 0, 0), p=1.0),  # 필요시 패딩 추가
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # 정규화
    ToTensorV2(),  # PyTorch 텐서로 변환
])


In [None]:
# 데이터셋 정의
full_dataset = ImageDataset("data2/train2.csv", "data2/train", transform=trn_transform)

# 데이터셋 크기 및 분할
dataset_size = len(full_dataset)
train_ratio = 0.8
train_size = int(train_ratio * dataset_size)
val_size = dataset_size - train_size

# 전체 인덱스를 생성하고 섞기
indices = list(range(dataset_size))
random.seed(42)  # 랜덤 시드 고정
random.shuffle(indices)

# 섞인 인덱스를 사용하여 데이터셋 분할
train_indices = indices[:train_size]
val_indices = indices[train_size:]

# 훈련 및 검증 데이터셋 생성
trn_dataset = torch.utils.data.Subset(full_dataset, train_indices)
val_dataset = torch.utils.data.Subset(full_dataset, val_indices)

# DataLoader 정의
BATCH_SIZE = 32
num_workers = 0

trn_loader = DataLoader(trn_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=num_workers)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=num_workers)

# 데이터셋 크기 출력
print("Full dataset size:", dataset_size)
print("Training dataset size:", len(trn_dataset))
print("Validation dataset size:", len(val_dataset))



In [None]:
# device 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 모델 설정
model_name = 'resnet50' #모델명 입력
model = timm.create_model(model_name, pretrained=True, num_classes=17).to(device)

# 손실 함수 및 옵티마이저 설정
loss_fn = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=1e-3)
criterion = LabelSmoothingLoss(classes=17, smoothing=0.1)

# 학습률 스케줄러 설정
scheduler = StepLR(optimizer, step_size=5, gamma=0.1)

# 초기 학습률 설정
scheduler.step(0)

In [None]:
# 결과 로깅 함수
def log_results(epoch, ret):
    log = f"Epoch: {epoch}\n"
    for k, v in ret.items():
        log += f"{k}: {v:.4f}\n"
    print(log)

In [None]:
# 에폭당 학습을 위한 함수

def train_one_epoch(loader, model, optimizer, loss_fn, device):
    model.train() # 모델을 학습 모드로 설정
    train_loss = 0
    preds_list = [] # 예측 결과 리스트 초기화
    targets_list = [] # 타겟 리스트 초기화

    pbar = tqdm(loader) # 진행 상황을 표시하기 위한 tqdm
    for image, targets in pbar:
        image = image.to(device) # 이미지 텐서를 지정한 장치로 이동
        targets = targets.to(device) # 타겟 텐서를 지정한 장치로 이동

        model.zero_grad(set_to_none=True) # 그래디언트 초기화

        preds = model(image) # 모델 예측
        loss = loss_fn(preds, targets) # 손실 계산
        loss.backward() # 역전파
        optimizer.step() # 최적화 단계

        train_loss += loss.item() # 손실 누적
        preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy()) # 예측 결과 추가
        targets_list.extend(targets.detach().cpu().numpy()) # 타겟 추가

        pbar.set_description(f"Loss: {loss.item():.4f}") # 진행 바에 손실 출력

    train_loss /= len(loader) # 평균 손실 계산
    train_acc = accuracy_score(targets_list, preds_list) # 정확도 계산
    train_f1 = f1_score(targets_list, preds_list, average='macro') # F1 점수 계산

    ret = {
        "train_loss": train_loss, # 평균 손실
        "train_acc": train_acc, # 정확도
        "train_f1": train_f1, # F1 점수
    }

    return ret # 결과 반환

def validation(model, val_loader, loss_fn):
    model.eval()
    val_loss = 0
    preds_list = []
    targets_list = []
    
    with torch.no_grad():
        for val_images, val_labels in val_loader:
            val_images, val_labels = val_images.to(device), val_labels.to(device)
            val_outputs = model(val_images)
            loss = loss_fn(val_outputs, val_labels)
            val_loss += loss.item()
            preds_list.extend(val_outputs.argmax(dim=1).cpu().numpy())
            targets_list.extend(val_labels.cpu().numpy())
    
    val_loss /= len(val_loader)
    val_acc = accuracy_score(targets_list, preds_list)
    val_f1 = f1_score(targets_list, preds_list, average='macro')
    
    return val_loss, val_acc, val_f1



In [None]:
# 학습 및 검증 루프

EPOCHS = 100
best_val_loss = float('inf')
patience = 5
early_stop_counter = 0

for epoch in range(EPOCHS):
    # 1. 한 epoch 동안 학습 수행
    train_results = train_one_epoch(trn_loader, model, optimizer, loss_fn, device)
    # 2. 검증 수행
    val_loss, val_accuracy, val_f1 = validation(model, val_loader, loss_fn)

    # 3. 학습 결과 출력
    log_results(epoch + 1, {
        'train_loss': train_results['train_loss'],
        'train_acc': train_results['train_acc'],
        'train_f1': train_results['train_f1'],
        'val_loss': val_loss,
        'val_acc': val_accuracy,
        'val_f1': val_f1
    })
    
    # 4. 학습률 스케줄러 업데이트
    scheduler.step()
    
    # 조기 종료 조건
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), f"./model_resnet50fin_best.pt")
        early_stop_counter = 0
    else:
        early_stop_counter += 1

    if early_stop_counter >= patience:
        print("Early stopping")
        break


    
# 마지막 모델 저장
torch.save(model.state_dict(), f"./model_resnet50fin_last.pt")

In [None]:
# 테스트 데이터셋 로드
tst_dataset = ImageDataset("sample_submission.csv", "data/test", transform=tst_transform)
tst_loader = DataLoader(tst_dataset, batch_size=32, shuffle=False, num_workers=0)

# 베스트 모델 로드
model.load_state_dict(torch.load("./model_resnet50fin_best.pt"))  # 저장한 모델 로드
model.to(device)  # 모델을 장치로 이동

# 예측 리스트 초기화
preds_list = []

# 모델을 평가 모드로 설정
model.eval()
for image, _ in tqdm(tst_loader):
    image = image.to(device)

    with torch.no_grad():
        preds = model(image)

    preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())

# 결과 DataFrame 생성
pred_df = pd.DataFrame(tst_dataset.df, columns=['ID', 'target'])
pred_df['target'] = preds_list

# 샘플 제출 파일 로드 및 확인
sample_submission_df = pd.read_csv("sample_submission.csv")
assert (sample_submission_df['ID'] == pred_df['ID']).all()

# 예측 결과를 CSV 파일로 저장
pred_df.to_csv("pred_model_best.csv", index=False)

In [None]:
# 테스트 데이터셋 로드
tst_dataset = ImageDataset("data/sample_submission.csv", "data/test", transform=tst_transform)
tst_loader = DataLoader(tst_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=num_workers)

# 베스트 모델 로드
model.load_state_dict(torch.load("./model_name_last.pt"))  # 저장한 모델 로드
model.to(device)  # 모델을 장치로 이동

# 예측 리스트 초기화
preds_list = []

# 모델을 평가 모드로 설정
model.eval()
for image, _ in tqdm(tst_loader):
    image = image.to(device)

    with torch.no_grad():
        preds = model(image)

    preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())

# 결과 DataFrame 생성
pred_df = pd.DataFrame(tst_dataset.df, columns=['ID', 'target'])
pred_df['target'] = preds_list

# 샘플 제출 파일 로드 및 확인
sample_submission_df = pd.read_csv("data2/sample_submission.csv")
assert (sample_submission_df['ID'] == pred_df['ID']).all()

# 예측 결과를 CSV 파일로 저장
pred_df.to_csv("pred_model_last.csv", index=False)