In [7]:
import random
import numpy as np
import pandas as pd
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from timm import create_model  # 최신 모델 로드
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from tqdm import tqdm
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from albumentations import CoarseDropout
from PIL import Image

In [2]:
CFG = {
    'IMG_SIZE': 224,
    'EPOCHS': 10,
    'LEARNING_RATE': 3e-4,
    'BATCH_SIZE': 32,
    'SEED': 42
}

In [3]:
def seed_everything(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

seed_everything(CFG['SEED'])

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [4]:
df = pd.read_csv('./train.csv')
train, val, _, _ = train_test_split(df, df['label'], test_size=0.3, stratify=df['label'], random_state=CFG['SEED'])

le = preprocessing.LabelEncoder()
train['label'] = le.fit_transform(train['label'])
val['label'] = le.transform(val['label'])

In [8]:
class CustomDataset(Dataset):
    def __init__(self, img_paths, labels=None, transform=None):
        self.img_paths = img_paths
        self.labels = labels
        self.transform = transform

    def __getitem__(self, index):
        img = Image.open(self.img_paths[index]).convert("RGB")
        if self.transform:
            img = np.array(img)
            img = self.transform(image=img)['image']
        if self.labels is not None:
            return img, self.labels[index]
        return img

    def __len__(self):
        return len(self.img_paths)

# Data Augmentation
train_transform = A.Compose([
    A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=15, p=0.5),
    A.OneOf([
        A.CoarseDropout(max_holes=8, max_height=16, max_width=16, p=0.5),
    ], p=0.5),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

val_transform = A.Compose([
    A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

train_dataset = CustomDataset(train['img_path'].values, train['label'].values, train_transform)
val_dataset = CustomDataset(val['img_path'].values, val['label'].values, val_transform)
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)

In [12]:
def build_model(model_name, num_classes):
    model = create_model(model_name, pretrained=True, num_classes=num_classes)
    return model.to(device)

model_names = ['convnext_large', 'swin_large_patch4_window7_224']
models_to_train = [build_model(name, num_classes=len(le.classes_)) for name in model_names]

model.safetensors:   0%|          | 0.00/791M [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


model.safetensors:   0%|          | 0.00/788M [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


In [13]:
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        CE_loss = nn.CrossEntropyLoss()(inputs, targets)
        pt = torch.exp(-CE_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * CE_loss
        return focal_loss

In [14]:
def train_model(model, optimizer, scheduler, train_loader, val_loader, device):
    criterion = FocalLoss()
    best_model = None
    best_score = 0

    for epoch in range(CFG['EPOCHS']):
        model.train()
        for imgs, labels in tqdm(train_loader):
            imgs, labels = imgs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        val_loss, val_score = validate_model(model, val_loader, device)
        print(f"Epoch {epoch+1}: Val Loss = {val_loss:.4f}, Val F1 Score = {val_score:.4f}")
        if val_score > best_score:
            best_score = val_score
            best_model = model

        scheduler.step()

    return best_model

def validate_model(model, loader, device):
    model.eval()
    val_loss = 0
    preds, true_labels = [], []

    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        for imgs, labels in loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            preds.extend(outputs.argmax(dim=1).cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    f1 = f1_score(true_labels, preds, average='macro')
    return val_loss / len(loader), f1

In [None]:
trained_models = []
for model in models_to_train:
    optimizer = torch.optim.AdamW(model.parameters(), lr=CFG['LEARNING_RATE'])
    scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=5, T_mult=2)
    best_model = train_model(model, optimizer, scheduler, train_loader, val_loader, device)
    trained_models.append(best_model)

# Ensemble Prediction (Weighted Voting)
def weighted_voting(models, weights, loader, device):
    ensemble_preds = []
    for model, weight in zip(models, weights):
        model.eval()
        preds = []
        with torch.no_grad():
            for imgs in tqdm(loader):
                imgs = imgs.to(device)
                outputs = model(imgs).softmax(dim=1) * weight
                preds.append(outputs.cpu().numpy())
        ensemble_preds.append(np.vstack(preds))
    return np.sum(ensemble_preds, axis=0)

100%|██████████| 347/347 [4:05:07<00:00, 42.38s/it]  


Epoch 1: Val Loss = 0.2371, Val F1 Score = 0.9319


 88%|████████▊ | 306/347 [4:09:41<31:18, 45.81s/it]  

In [None]:
test = pd.read_csv('./test.csv')
test_dataset = CustomDataset(test['img_path'].values, None, val_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)


In [None]:
weights = [0.4, 0.3, 0.3]  # Assign weights to models based on validation performance
predictions = weighted_voting(trained_models, weights, test_loader, device)
final_preds = predictions.argmax(axis=1)
final_preds = le.inverse_transform(final_preds)


In [None]:
# Submission
submission = pd.read_csv('./sample_submission.csv')
submission['label'] = final_preds
submission.to_csv('./ensemble_submission.csv', index=False)