In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install --quiet timm pytorch_lightning==1.7.7 torchmetrics==0.11.1

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m12.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m708.1/708.1 kB[0m [31m19.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m517.2/517.2 kB[0m [31m19.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import os
import gc
import numpy as np
import pandas as pd
import random
import PIL

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import albumentations as A
from albumentations.pytorch import ToTensorV2

from glob import glob
from tqdm.auto import tqdm
from sklearn.metrics import f1_score
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from torchvision.transforms import v2 as transforms
from torch.utils.data import Dataset, DataLoader
from transformers import Swinv2Config, Swinv2Model

import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.callbacks.early_stopping import EarlyStopping

In [None]:
# 하이퍼파라미터 변수 설정
CFG = {
    'IMG_SIZE':256,
    'EPOCHS':5,
    'N_SPLIT':5,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':10,
    'SEED':41
}

In [None]:
def seed_everything(seed):
    random.seed(seed)                             # random module의 시드 고정
    os.environ['PYTHONHASHSEED'] = str(seed)      # 해시 함수의 랜덤성 제어, 자료구조 실행할 때 동일한 순서 고정
    np.random.seed(seed)                          # numpy 랜덤 숫자 일정
    torch.manual_seed(seed)                       # torch라이브러리에서 cpu텐서 생성 랜덤 시드 고정
    torch.cuda.manual_seed(seed)                  # cuda의 gpu텐서에 대한 시드 고정
    torch.backends.cudnn.deterministic = True     # 백엔드가 결정적 알고리즘만 사용하도록 고정
    torch.backends.cudnn.benchmark = True         # CuDNN이 여러 내부 휴리스틱을 사용하여 가자아 빠른 알고리즘 동적으로 찾도록 설정

seed_everything(CFG['SEED'])                      # Seed 고정

In [None]:
dir_path = '/content/drive/MyDrive/Colab Notebooks/bird_data/'

train_df = pd.read_csv(dir_path+'train.csv')

# 라벨 인코딩
le = LabelEncoder()
train_df['label'] = le.fit_transform(train_df['label'])

In [None]:

class CustomDataset(Dataset):
    def __init__(self, df, dir_path, mode='train'):
        self.df = df
        self.dir_path = dir_path
        self.mode = mode

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        data = self.df.iloc[idx]
        image_path = os.path.join(self.dir_path, data['img_path'].replace('./', ''))
        image = PIL.Image.open(image_path).convert('RGB')
        image = np.array(image) # 이미지 어레이화 # image.shape : (64, 64, 3)

        if self.mode == 'train':
            label = data['label']
            return {'image': image, 'label': label}

        elif self.mode == 'val':
            label = data['label']
            return {'image': image, 'label': label}

        elif self.mode == 'test':
            return {'image': image}

In [None]:
class CustomCollateFn:
    def __init__(self, transform, mode):
        self.transform = transform
        self.mode = mode

    def __call__(self, batch):
        if self.mode == 'train':
            pixel_values = torch.stack([self.transform(image=data['image'])['image'] for data in batch])
            label = torch.LongTensor([data['label'] for data in batch])
            return {'pixel_values': pixel_values, 'label': label} # torch.Size([1, 3, 256, 256])

        elif self.mode == 'val':
            pixel_values = torch.stack([self.transform(image=data['image'])['image'] for data in batch])
            label = torch.LongTensor([data['label'] for data in batch])
            return {'pixel_values': pixel_values, 'label': label}

        elif self.mode == 'test':
            pixel_values = torch.stack([self.transform(image=data['image'])['image'] for data in batch])
            return {'pixel_values': pixel_values}

In [None]:
class CustomModel(nn.Module):
    def __init__(self, model):
        super(CustomModel, self).__init__()
        self.model = model
        self.clf = nn.Sequential(
            # nn.Flatten(),
            # nn.Dropout(0.3),
            nn.ReLU(),
            nn.LazyLinear(25),  # output size(class #) : 25
        )

#     @torch.compile
    def forward(self, x, label=None):
        x = self.model(x).pooler_output
        x = self.clf(x)
        loss = None
        if label is not None:
            loss = nn.CrossEntropyLoss()(x, label)
        probs = nn.LogSoftmax(dim=-1)(x)
        return probs, loss

class LitCustomModel(pl.LightningModule):
    def __init__(self, model):
        super().__init__()
        self.model = CustomModel(model)
        self.validation_step_outputs = []

    def configure_optimizers(self):
        '''
        Adamw : 가중치 감쇠 적용
        ReduceLROnPlateau : 학습률 감소('max':검증 정확도가 증가할 때 성능 개선으로 간주)
        '''
        optimizer = torch.optim.AdamW(self.parameters(), lr=1e-5)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, verbose=True)
        return {
            'optimizer' : optimizer,
            'lr_scheduler': scheduler,
            'monitor': 'val_score'
        }

    def training_step(self, batch, batch_idx=None):
        x = batch['pixel_values']
        label = batch['label']
        probs, loss = self.model(x, label)
        self.log(f"train_loss", loss, on_step=True, on_epoch=False)
        return loss

    def validation_step(self, batch, batch_idx=None):
        x = batch['pixel_values']
        label = batch['label']
        probs, loss = self.model(x, label)
        self.validation_step_outputs.append([probs,label])
        return loss

    def predict_step(self, batch, batch_idx=None):
        x = batch['pixel_values']
        probs, _ = self.model(x)
        return probs

    def on_validation_epoch_end(self):
        pred = torch.cat([x for x, _ in self.validation_step_outputs]).cpu().detach().numpy().argmax(1)
        label = torch.cat([label for _, label in self.validation_step_outputs]).cpu().detach().numpy()
        score = f1_score(label,pred, average='macro')
        self.log("val_score", score)
        self.validation_step_outputs.clear()
        return score

In [None]:
# StratifiedKFold <- for 불균형한 분포도 가진 레이블 데이터 집한
skf = StratifiedKFold(n_splits=CFG['N_SPLIT'], random_state=CFG['SEED'], shuffle=True)

In [None]:
train_transform = A.Compose([
    A.Resize(height=256, width=256, interpolation=3),
    A.HorizontalFlip(p=0.5),  # horizontal flip with a probability of 0.5
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),             # numpy array -> tensor
])

val_transform = A.Compose([
    A.Resize(height=256, width=256, interpolation=3),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])

train_collate_fn = CustomCollateFn(train_transform, 'train')
val_collate_fn = CustomCollateFn(val_transform, 'val')

In [None]:
for fold_idx, (train_index, val_index) in enumerate(skf.split(train_df, train_df['label'])):
    train_fold_df = train_df.loc[train_index,:]
    val_fold_df = train_df.loc[val_index,:]

    train_dataset = CustomDataset(train_fold_df, dir_path, mode='train')
    val_dataset = CustomDataset(val_fold_df, dir_path, mode='val')

    # 미니배치단위로 데이터 불러오기
    train_dataloader = DataLoader(train_dataset, collate_fn=train_collate_fn, batch_size=CFG['BATCH_SIZE'])
    val_dataloader = DataLoader(val_dataset, collate_fn=val_collate_fn, batch_size=CFG['BATCH_SIZE']*2)

    model = Swinv2Model.from_pretrained("microsoft/swinv2-large-patch4-window12to16-192to256-22kto1k-ft")
    lit_model = LitCustomModel(model)

    # checkpoint
    checkpoint_callback = ModelCheckpoint(
        monitor='val_score',
        mode='max',
        dirpath=dir_path+'checkpoints/',
        filename=f'swinv2-large-resize-fold_idx={fold_idx}'+'-{epoch:02d}-{train_loss:.4f}-{val_score:.4f}',
        save_top_k=1,
        save_weights_only=True,
        verbose=True
    )

    # early stopping
    earlystopping_callback = EarlyStopping(monitor="val_score", mode="max", patience=3)
    trainer = pl.Trainer(max_epochs=100, accelerator='auto', precision=32, callbacks=[checkpoint_callback, earlystopping_callback], val_check_interval=0.5)
    trainer.fit(lit_model, train_dataloader, val_dataloader)

    model.cpu()
    lit_model.cpu()
    del model, lit_model, checkpoint_callback, earlystopping_callback, trainer
    gc.collect()              # 가비지 컬렉션 수행하여 순환참조 객체를 메모리에서 해제
    torch.cuda.empty_cache()  # 사용되지 않는 GPU상 cache를 정리

In [None]:
test_df = pd.read_csv(dir_path+'test.csv')

In [None]:
test_transform = A.Compose([
    A.Resize(height=256, width=256, interpolation=3),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])


test_collate_fn = CustomCollateFn(test_transform, 'test')
test_dataset = CustomDataset(test_df, dir_path, mode='test')
test_dataloader = DataLoader(test_dataset, collate_fn=test_collate_fn, batch_size=CFG['BATCH_SIZE']*2)

In [None]:
fold_preds = []
for checkpoint_path in glob(dir_path+'checkpoints/swinv2-large-resize*.ckpt'):
    model = Swinv2Model.from_pretrained("microsoft/swinv2-large-patch4-window12to16-192to256-22kto1k-ft")
    lit_model = LitCustomModel.load_from_checkpoint(checkpoint_path, model=model)
    trainer = pl.Trainer(accelerator='auto', precision=32)
    preds = trainer.predict(lit_model, test_dataloader)
    preds = torch.cat(preds, dim=0).detach().cpu().numpy().argmax(1)
    fold_preds.append(preds)

pred_ensemble = list(map(lambda x: np.bincount(x).argmax(), np.stack(fold_preds, axis=1)))

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Predicting: 0it [00:00, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Predicting: 0it [00:00, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Predicting: 0it [00:00, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Predicting: 0it [00:00, ?it/s]

In [None]:
submission = pd.read_csv(dir_path+'swinv2_submission.csv')

In [None]:
submission['label'] = le.inverse_transform(pred_ensemble)

submission.to_csv(dir_path+'swinv2_submission.csv', index=False)