v2.mixup_test_251126.ipynb

목표: v2.mixup의 기능 확인
과제: v2.mixup이 이미지를 섞는 연산 확인
- v2.mixup이 이미지와 라벨을 섞는 기능 확인
- v2.mixup을 통과한 결과가 손실함수에 반영되는 기능 확인
- v2.mixup을 통과한 결과가 optimizer에 반영되는 기능 확인


In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset
import torchvision.transforms as transforms
from torchvision.transforms import v2
from pathlib import Path

from PIL import Image
from tqdm import tqdm
import pandas as pd

In [None]:
from glob import glob

# 커스텀 데이터셋 클래스
class MyDataset(Dataset):
    def __init__(self, data_path, transform=None, train=True):
        self.train = train
        train_df = pd.read_csv(cvs_path)

        self.name2label = dict(zip(train_df["name"], train_df["label"]))

        if self.train:
            self.img_path = glob(f"{data_path}/train_data/*.png")
            self.labels =  [self.name2label[d.split("/")[-1]] for d in self.img_path]
        else:
            self.img_path = glob(f"{data_path}/test_data/*.png")

        self.transform = transform

    def __len__(self):
        return len(self.img_path)   

    def __getitem__(self, index):
        img = Image.open(self.img_path[index])
        if img.mode != 'RGB':
            img = img.convert('RGB')

        if self.transform:
            img = self.transform(img)

        if self.train:
            return img, self.labels[index]
        else:
            return img, self.img_path[index].split("/")[-1]

# 데이터셋 디렉토리 위치 지정
data_f_path = Path.cwd()
data_path = data_f_path.joinpath("v2.mixup_test_251126_png")
cvs_path = data_path.joinpath("train_data.csv")

transform =  v2.Compose([
    v2.ToImage(),
    v2.ToDtype(dtype=torch.float32, scale=True),
    v2.Normalize(mean=mean, std=std),
])

test_transform =  v2.Compose([
    v2.ToImage(),
    v2.ToDtype(dtype=torch.long)
])

train_data = MyDataset(data_path, train=True, transform=transform)
test_data = MyDataset(data_path, train=False, transform=test_transform)

train_size = int(len(train_data) * 0.9)
train_data, val_data = torch.utils.data.random_split(train_data, [train_size, len(train_data) - train_size])
train_data, train_vl_data = torch.utils.data.random_split(train_data, [train_size, len(train_data) - train_size])

train_loader = torch.utils.data.DataLoader(train_data, batch_size=128, shuffle=True)
train_vl_loader = torch.utils.data.DataLoader(train_vl_data, batch_size=128, shuffle=False)
val_loader = torch.utils.data.DataLoader(val_data, batch_size=128, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=128, shuffle=False)

In [None]:
from torchvision.models import resnet18
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.cuda.amp import GradScaler
model = resnet18(pretrained=False)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.fc = nn.Linear(512, 10, bias=True)
model = model.to(device)

scaler = GradScaler()
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
scheduler = CosineAnnealingLR(optimizer, T_max=100, eta_min=0.001)

In [None]:
# train 함수
def train_one_epoch(model, loader, optimizer, criterion):
    model.train()                 
    tot_loss, tot_acc, tot_cnt = 0.0, 0.0, 0

    pbar = tqdm(loader, desc='훈련')
    for i, (x, y) in enumerate(pbar):
        x, y = x.to(device), y.to(device)
        x, y = mixup(x, y)
        optimizer.zero_grad()              
        with autocast(device_type=device): 
            out = model(x)         
            loss = criterion(out, y)    
        scaler.scale(loss).backward()           
        scaler.step(optimizer)          
        scaler.update()

        current_lr = optimizer.param_groups[0]['lr']
        scheduler.step()                  

        tot_loss += loss.item() * y.size(0)
        pred = torch.sigmoid(out) >= 0.5
        y_true = y >= 0.5
        tot_acc += (pred == y_true).all(dim=1).sum().item()  
        tot_cnt  += y.size(0)

        pbar.set_postfix({'LR': current_lr})

    return tot_loss/tot_cnt, tot_acc/tot_cnt, current_lr

def evaluate(model, loader, criterion):
    model.eval()
    tot_loss, tot_acc, tot_cnt = 0.0, 0.0, 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            y = F.one_hot(y, num_classes=21).float()
            
            with autocast(device_type=device):
                out = model(x)
                loss = criterion(out, y)

            tot_loss += loss.item() * y.size(0)
            pred = torch.sigmoid(out) >= 0.5
            y_true = y >= 0.5
            tot_acc += (pred == y_true).all(dim=1).sum().item()  
            tot_cnt  += y.size(0)

    return tot_loss/tot_cnt, tot_acc/tot_cnt

In [None]:
dummy_X = np.zeros(len(y_tensor))

for fold, (train_i, val_i) in enumerate(rskf.split(dummy_X, y_tensor)):
    print('')


    tr_dataset = MySubset(train_ten, train_i, transform=aug)
    vl_dataset = MySubset(train_ten, val_i)

    tr = DataLoader(tr_dataset, batch_size=BATCH_SIZE, shuffle=True)
    vl = DataLoader(vl_dataset, batch_size=BATCH_SIZE, shuffle=False)

    tr_hist, te_hist, lr_hist = [], [], []
    te_loss, best_te_loss, patience_counter = 0, 0, 0
    for ep in range(1, 5):
        tr_loss, tr_acc, current_lr = train_one_epoch(model, tr, optimizer, criterion) # tr
        te_loss, te_acc = evaluate(model, vl, criterion) # va

        tr_hist.append((tr_loss, tr_acc))
        te_hist.append((te_loss, te_acc))
        lr_hist.append(current_lr)

            if te_loss < best_te_loss:
                best_te_loss = te_loss
                patience_counter = 0
            else:
                patience_counter += 1

            if patience_counter >= 3:
                print(f"Early stopping at epoch {ep}")
                break

    print('학습완료')