# WRN-16-2

**목표**: 71.6%를 달성했던 모델(WRN-16-2)에, 현재 가장 강력한 데이터 증강 기법 중 하나인 `RandAugment`를 적용하여 성능을 한계까지 끌어 올리는 것을 목표로합니다.


## 1. 환경 설정

In [None]:
# 구글 드라이브 마운트
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# 필요한 라이브러리 임포트
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, Subset
import torchvision.transforms as transforms

import numpy as np
import pandas as pd
import os
import zipfile
import random

from sklearn.model_selection import StratifiedKFold

print(f"PyTorch Version: {torch.__version__}")

PyTorch Version: 2.8.0+cu126


In [None]:
# 재현성을 위한 시드 고정
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

SEED = 42
set_seed(SEED)

# GPU 장치 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


## 2. 데이터 준비

In [None]:
# 데이터셋 경로 설정
zip_file_path = '/content/drive/MyDrive/2025-ai-challenge.zip'
data_dir = '/content/dataset'

# 데이터셋 압축 해제
if os.path.exists(data_dir):
    import shutil
    shutil.rmtree(data_dir)
os.makedirs(data_dir)

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(data_dir)

print(f"Data extracted to: {data_dir}")
data_subdir = '' # 하위 폴더 없음
full_data_path = os.path.join(data_dir, data_subdir)

# 데이터 로드
train_data = np.load(os.path.join(full_data_path, 'trainset.npy'))
train_labels = np.load(os.path.join(full_data_path, 'trainlabel.npy'))
test_data = np.load(os.path.join(full_data_path, 'testset.npy'))

print('Train data shape:', train_data.shape)

Data extracted to: /content/dataset
Train data shape: (50000, 32, 32, 3)


In [None]:
# PyTorch용 커스텀 데이터셋 클래스
class CustomDataset(Dataset):
    def __init__(self, data, labels=None, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        image = self.data[idx]
        if self.transform:
            image = self.transform(image)
        if self.labels is not None:
            label = torch.tensor(self.labels[idx], dtype=torch.long)
            return image, label
        else:
            return image

class TransformedDataset(Dataset):
    def __init__(self, subset, transform=None):
        self.subset = subset
        self.transform = transform
    def __getitem__(self, index):
        x, y = self.subset[index]
        if self.transform:
            x = self.transform(x)
        return x, y
    def __len__(self):
        return len(self.subset)

## 3. 최종 전략 설정 및 모델 정의

In [None]:
# --- 하이퍼파라미터 ---
N_SPLITS = 5
EPOCHS = 200
BATCH_SIZE = 128
LEARNING_RATE = 0.1
MOMENTUM = 0.9
WEIGHT_DECAY = 5e-4

# K-Fold 설정
skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=SEED)
original_train_dataset = CustomDataset(train_data, train_labels)

# --- 업그레이드: RandAugment 적용 ---
cifar100_mean = (0.5071, 0.4867, 0.4408)
cifar100_std = (0.2675, 0.2565, 0.2761)

transform_train = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.RandAugment(), # <-- RandAugment
    transforms.ToTensor(),
    transforms.Normalize(cifar100_mean, cifar100_std),
    transforms.RandomErasing(p=0.5)
])

transform_val_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(cifar100_mean, cifar100_std)
])

In [None]:
# WideResNet 모델 정의
class WideBasicBlock(nn.Module):
    def __init__(self, in_planes, planes, dropout_rate, stride=1):
        super(WideBasicBlock, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, padding=1, bias=True)
        self.dropout = nn.Dropout(p=dropout_rate)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=True)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != planes:
            self.shortcut = nn.Sequential(nn.Conv2d(in_planes, planes, kernel_size=1, stride=stride, bias=True))
    def forward(self, x):
        out = self.dropout(self.conv1(F.relu(self.bn1(x))))
        out = self.conv2(F.relu(self.bn2(out)))
        out += self.shortcut(x)
        return out

class WideResNet(nn.Module):
    def __init__(self, depth, widen_factor, dropout_rate, num_classes):
        super(WideResNet, self).__init__()
        self.in_planes = 16
        assert ((depth - 4) % 6 == 0), 'Wide-resnet depth should be 6n+4'
        n = (depth - 4) // 6
        k = widen_factor
        nStages = [16, 16 * k, 32 * k, 64 * k]
        self.conv1 = nn.Conv2d(3, nStages[0], kernel_size=3, stride=1, padding=1, bias=True)
        self.layer1 = self._make_layer(WideBasicBlock, nStages[1], n, dropout_rate, stride=1)
        self.layer2 = self._make_layer(WideBasicBlock, nStages[2], n, dropout_rate, stride=2)
        self.layer3 = self._make_layer(WideBasicBlock, nStages[3], n, dropout_rate, stride=2)
        self.bn1 = nn.BatchNorm2d(nStages[3], momentum=0.9)
        self.linear = nn.Linear(nStages[3], num_classes)
    def _make_layer(self, block, planes, num_blocks, dropout_rate, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, dropout_rate, stride))
            self.in_planes = planes
        return nn.Sequential(*layers)
    def forward(self, x):
        out = self.conv1(x)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = F.relu(self.bn1(out))
        out = F.avg_pool2d(out, 8)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

## 4. 학습 루프 (Label Smoothing만 적용)

In [None]:
# Label Smoothing Loss
class LabelSmoothingLoss(nn.Module):
    def __init__(self, classes, smoothing=0.1):
        super(LabelSmoothingLoss, self).__init__()
        self.confidence = 1.0 - smoothing
        self.smoothing = smoothing
        self.cls = classes
    def forward(self, pred, target):
        pred = pred.log_softmax(dim=-1)
        with torch.no_grad():
            true_dist = torch.zeros_like(pred)
            true_dist.fill_(self.smoothing / (self.cls - 1))
            true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
        return torch.mean(torch.sum(-true_dist * pred, dim=-1))

# --- K-Fold 루프 시작 ---
for fold, (train_idx, val_idx) in enumerate(skf.split(original_train_dataset.data, original_train_dataset.labels)):
    print(f'=============== FOLD {fold+1}/{N_SPLITS} ===============')

    # --- WRN-16-2로 복귀 ---
    model = WideResNet(depth=16, widen_factor=2, dropout_rate=0.3, num_classes=100).to(device)
    optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE, momentum=MOMENTUM, weight_decay=WEIGHT_DECAY)
    criterion = LabelSmoothingLoss(classes=100, smoothing=0.1)
    scheduler_cosine = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS-5)

    # 데이터로더 생성
    train_subset = Subset(original_train_dataset, train_idx)
    val_subset = Subset(original_train_dataset, val_idx)
    train_dataset_aug = TransformedDataset(train_subset, transform_train)
    val_dataset_aug = TransformedDataset(val_subset, transform_val_test)
    train_loader = DataLoader(train_dataset_aug, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_dataset_aug, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

    best_val_accuracy = 0.0
    model_save_path = f'/content/drive/MyDrive/best_model_fold_{fold}_comeback.pth' # 새 버전으로 저장

    for epoch in range(EPOCHS):
        model.train()
        # Warmup
        if epoch < 5:
            for param_group in optimizer.param_groups:
                param_group['lr'] = LEARNING_RATE * (epoch + 1) / 5

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        if epoch >= 5: scheduler_cosine.step()

        # 검증
        model.eval()
        val_corrects = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, preds = torch.max(outputs, 1)
                val_corrects += torch.sum(preds == labels.data)
        epoch_val_acc = val_corrects.double() / len(val_subset)

        if (epoch + 1) % 10 == 0:
            print(f'Epoch {epoch+1}/{EPOCHS} | Val Acc: {epoch_val_acc:.4f} | LR: {optimizer.param_groups[0]["lr"]:.6f}')

        if epoch_val_acc > best_val_accuracy:
            best_val_accuracy = epoch_val_acc
            torch.save(model.state_dict(), model_save_path)

    print(f'Fold {fold+1} Best Val Acc: {best_val_accuracy:.4f}')
    print(f'Best model for Fold {fold+1} saved to {model_save_path}')

print('\nComeback strategy training finished!')

Epoch 10/200 | Val Acc: 0.2346 | LR: 0.099838
Epoch 20/200 | Val Acc: 0.3651 | LR: 0.098547
Epoch 30/200 | Val Acc: 0.3542 | LR: 0.095999
Epoch 40/200 | Val Acc: 0.4485 | LR: 0.092260
Epoch 50/200 | Val Acc: 0.4571 | LR: 0.087426
Epoch 60/200 | Val Acc: 0.4183 | LR: 0.081622
Epoch 70/200 | Val Acc: 0.4666 | LR: 0.075000
Epoch 80/200 | Val Acc: 0.4756 | LR: 0.067730
Epoch 90/200 | Val Acc: 0.4883 | LR: 0.060001
Epoch 100/200 | Val Acc: 0.5191 | LR: 0.052013
Epoch 110/200 | Val Acc: 0.5475 | LR: 0.043973
Epoch 120/200 | Val Acc: 0.5355 | LR: 0.036089
Epoch 130/200 | Val Acc: 0.5636 | LR: 0.028565
Epoch 140/200 | Val Acc: 0.5576 | LR: 0.021597
Epoch 150/200 | Val Acc: 0.5929 | LR: 0.015364
Epoch 160/200 | Val Acc: 0.5943 | LR: 0.010028
Epoch 170/200 | Val Acc: 0.6397 | LR: 0.005727
Epoch 180/200 | Val Acc: 0.6679 | LR: 0.002573
Epoch 190/200 | Val Acc: 0.6602 | LR: 0.000647
Epoch 200/200 | Val Acc: 0.6640 | LR: 0.000000
Fold 1 Best Val Acc: 0.6686
Best model for Fold 1 saved to /content/d

## 5. 앙상블 추론 및 최종 제출 (TTA 적용)

In [None]:
# 테스트 데이터셋 정의
test_dataset_final = CustomDataset(test_data, labels=None, transform=transform_val_test)
test_loader_final = DataLoader(test_dataset_final, batch_size=BATCH_SIZE, shuffle=False)

# 5개의 Fold 모델들을 모두 로드
models = []
for fold in range(N_SPLITS):
    model_path = f'/content/drive/MyDrive/best_model_fold_{fold}_comeback.pth'
    model = WideResNet(depth=16, widen_factor=2, dropout_rate=0.3, num_classes=100).to(device)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    models.append(model)
print(f'{len(models)} models loaded for ensemble.')

# TTA 적용 앙상블 추론
tta_transform = transforms.RandomHorizontalFlip(p=1.0)
all_predictions = []
with torch.no_grad():
    for images in test_loader_final:
        images = images.to(device)
        ensemble_outputs = torch.zeros(images.size(0), 100).to(device)

        # 원본 이미지 예측
        for model in models:
            outputs = model(images)
            ensemble_outputs += F.softmax(outputs, dim=1)

        # 좌우 반전된 이미지 예측
        flipped_images = tta_transform(images)
        for model in models:
            outputs = model(flipped_images)
            ensemble_outputs += F.softmax(outputs, dim=1)

        _, preds = torch.max(ensemble_outputs, 1)
        all_predictions.extend(preds.cpu().numpy())

print("Ensemble prediction with TTA finished!")

# 최종 제출 파일 생성
sample_submission_path = os.path.join(full_data_path, 'sample_submission.csv')
submission_df = pd.read_csv(sample_submission_path)
submission_df['label'] = all_predictions
submission_file_path = '/content/submission_comeback.csv'
submission_df.to_csv(submission_file_path, index=False)

print(f"Final submission file created at: {submission_file_path}")
print("\n--- Final Submission File Preview ---")
print(submission_df.head())

NameError: name 'CustomDataset' is not defined