### 라이브러리 가져오기

In [None]:
import os
import random
import timm
import time
import pickle

import torch
import torch.nn.functional as F
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts

import albumentations as A
from albumentations.pytorch import ToTensorV2

import torch.nn as nn

from torch.cuda.amp import GradScaler, autocast
from torch.optim import Adam
from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import StratifiedKFold
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, f1_score

from PIL import Image
from tqdm import tqdm

import cv2
import pandas as pd
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import logging

import warnings
warnings.filterwarnings(action='ignore')

In [None]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', 
                    handlers=[logging.FileHandler('swin_transformer_384_class_3_notebook.log'), logging.StreamHandler()])

### CONFIG

In [None]:
class CONFIG:

    @staticmethod
    def set_seed(SEED):
        os.environ['PYTHONHASHSEED'] = str(SEED)
        random.seed(SEED)
        np.random.seed(SEED)
        torch.manual_seed(SEED)
        torch.cuda.manual_seed(SEED)
        torch.cuda.manual_seed_all(SEED)
        torch.backends.cudnn.benchmark = True
    
    @staticmethod
    def set_path(root_path):
        train_path = f'{root_path}/train/'
        test_path = f'{root_path}/test/'

        return root_path, train_path, test_path

In [None]:
CONFIG.set_seed(0xC0FFE)
root_path, train_path, test_path = CONFIG.set_path('/root/Project/new_data')

logging.info('1. Set Seed')

### CustomDataset

In [None]:
# 데이터셋 클래스를 정의합니다.
class ImageDataset(Dataset):
    def __init__(self, df, path, transform=None):
        self.df = df
        self.path = path
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        name, target = self.df.iloc[idx]
        img = np.array(Image.open(os.path.join(self.path, name)))
        if self.transform:
            img = self.transform(image=img)['image']
        return img, target

logging.info('2. Define Dataset Class')

In [None]:
# 하이퍼파라미터 및 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_name = 'swin_large_patch4_window12_384'  # 모델명
img_size = 384
learning_rate = 5e-4
num_epochs = 10
batch_size = 16
num_workers = 4
patience = 3
T_0 = 5
T_mult = 2
eta_min = 1e-6
accumulation_steps = 4  # 그래디언트 누적 스텝 수

train_transform = A.Compose([
    A.Resize(height=img_size, width=img_size),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

test_transform = A.Compose([
    A.LongestMaxSize(max_size=img_size, always_apply=True), 
    A.PadIfNeeded(min_height=img_size, min_width=img_size, border_mode=0, value=(255, 255, 255)), 
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

logging.info('3. Set Hyperparameter')

In [None]:
# 훈련 데이터 로드 및 클래스 할당
train_file = pd.read_csv(f'{root_path}/combined_data.csv')
train_file['class'] = None

train_file.loc[train_file['target'].isin([0, 5, 8, 9]), 'class'] = 0
train_file.loc[train_file['target'].isin([2, 16]), 'class'] = 1
train_file.loc[train_file['target'].isin([1, 3, 4, 6, 7, 10, 11, 12, 13, 14, 15]), 'class'] = 2


# 훈련 데이터셋 생성 및 로더 설정
train_datasets = {cls: ImageDataset(df=train_file[train_file['class'] == cls], 
                                    path=train_path, 
                                    transform=train_transform) 
                  for cls in range(3)}

train_loaders = {cls: DataLoader(dataset, batch_size=batch_size, shuffle=True) 
                 for cls, dataset in train_datasets.items()}

logging.info('4. Load Data')

### class가 2인 paper class에 대해서만 학습

In [None]:
class_file = train_file.loc[train_file['class'] == 2, ['ID', 'target']]
class_dataset = train_datasets[2]
class_loader = train_loaders[2]

In [None]:
# 1, 3, 4, 6, 7, 10, 11, 12, 13, 14, 15
# 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
class_file.loc[class_file['target'] == 1, 'target'] = 0
class_file.loc[class_file['target'] == 3, 'target'] = 1
class_file.loc[class_file['target'] == 4, 'target'] = 2
class_file.loc[class_file['target'] == 6, 'target'] = 3
class_file.loc[class_file['target'] == 7, 'target'] = 4
class_file.loc[class_file['target'] == 10, 'target'] = 5
class_file.loc[class_file['target'] == 11, 'target'] = 6
class_file.loc[class_file['target'] == 12, 'target'] = 7
class_file.loc[class_file['target'] == 13, 'target'] = 8
class_file.loc[class_file['target'] == 14, 'target'] = 9
class_file.loc[class_file['target'] == 15, 'target'] = 10
class_file

### Stratified K-Fold

In [None]:
# 분할된 데이터를 fold별로 시각화하기 위한 함수를 구성합니다.
# Scikit-learn에서 https://scikit-learn.org/stable/auto_examples/model_selection/plot_cv_indices.html 사용한 코드를 가져와서 사용하겠습니다.
cmap_data = plt.cm.Paired
cmap_cv = plt.cm.coolwarm

def plot_cv_indices(x, y, cv, ax, split_strategy='KFold', group=None, lw=10):
    """Create a sample plot for indices of a cross-validation object."""

    for ii, (tr, tt) in enumerate(cv.split(X=x, y=y, groups=group)):
        # Fill in indices with the training/test groups
        # print(f"Fold {ii} :")
        # print(f"  Train : index={tr[:5]}...")
        # print(f"  Valid : index={tt[:5]}...")
        indices = np.array([np.nan] * len(x))
        indices[tt] = 1
        indices[tr] = 0
        # Visualize the results
        ax.scatter(
            range(len(indices)),
            [ii + 0.5] * len(indices),
            c=indices,
            marker="_",
            lw=lw,
            cmap=cmap_cv,
            vmin=-0.2,
            vmax=0.2,
        )

    # Formatting
    yticklabels = list(range(5))

    ax.set(
        yticks=np.arange(len(yticklabels)) + 0.5,
        yticklabels=yticklabels,
        xlabel="Sample index",
        ylabel="CV iteration",
        ylim=[len(yticklabels) + 0.2, -0.2],
        xlim=[0, len(x)],
    )
    ax.set_title(split_strategy, fontsize=15)
    return ax

In [None]:
# Stratified K-Fold
skf = StratifiedKFold(n_splits=5)
fig, ax = plt.subplots()
plot_cv_indices(x=class_file['ID'],
                y=class_file['target'],
                cv=skf,
                ax=ax,
                split_strategy='Stratified K-Fold')

train_folds = skf.split(class_file['ID'], class_file['target'])

### 모델 학습

In [None]:
# one epoch 학습을 위한 함수입니다.
def _train_one_epoch(loader, model, optimizer, loss_fn, scheduler, scaler, device):
    model.train()
    train_loss = 0
    preds_list = []
    targets_list = []

    pbar = tqdm(loader)
    for index, (images, targets) in enumerate(pbar):

        if (index + 1) % 50 == 0:
            logging.info(f'Batch count: {index + 1} / {len(pbar)}')

        images = images.to(device)
        targets = targets.to(device)

        model.zero_grad(set_to_none=True)

        with autocast():
            preds = model(images)
            loss = loss_fn(preds, targets)
            
        scaler.scale(loss).backward()

        # 일정 배치마다 역전파 수행 및 가중치 업데이트
        if (index + 1) % accumulation_steps == 0 or (index + 1) == len(loader):
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()

        train_loss += loss.item()
        preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())
        targets_list.extend(targets.detach().cpu().numpy())

        pbar.set_description(f"Loss: {loss.item():.4f}")
    
    scheduler.step()

    train_loss /= len(loader)
    train_acc = accuracy_score(targets_list, preds_list)
    train_f1 = f1_score(targets_list, preds_list, average='macro')

    ret = {
        "train_loss": train_loss,
        "train_acc": train_acc,
        "train_f1": train_f1,
    }

    return ret

In [None]:
# one epoch 학습을 위한 함수입니다.
def _val_one_epoch(loader, model, device):
    model.eval()
    
    preds_list = []
    targets_list = []

    pbar = tqdm(loader)
    for images, targets in pbar:
        images = images.to(device)
        targets = targets.to(device)

        model.zero_grad(set_to_none=True)
        with torch.no_grad():
            preds = model(images)

        preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())
        targets_list.extend(targets.detach().cpu().numpy())

    train_acc = accuracy_score(targets_list, preds_list)
    train_f1 = f1_score(targets_list, preds_list, average='macro')

    ret = {
        "train_acc": train_acc,
        "train_f1": train_f1,
    }

    return ret, train_f1

In [None]:
# 클래스 가중치 계산
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(class_file['target']), y=class_file['target'])
print(class_weights)

alpha = torch.tensor(class_weights, dtype=torch.float64).to(device)

class FocalLoss(nn.Module):
    def __init__(self, alpha, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction
    
    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)

        # alpha 값을 targets의 크기에 맞게 브로드캐스팅
        alpha = self.alpha.gather(0, targets.long())

        focal_loss = alpha * (1 - pt) ** self.gamma * ce_loss

        if self.reduction == 'mean':
            focal_loss = focal_loss.mean()
        elif self.reduction == 'sum':
            focal_loss = focal_loss.sum()
        return focal_loss
    
# loss_fn = FocalLoss(alpha=alpha, gamma=2)
logging.info('5. Define Focal Loss')

In [None]:
def tta(model, loader, device):
    model.eval()
    final_preds = []
    rot_transform = A.Compose([
        A.Rotate(limit=(0, 0), p=1.0, border_mode=cv2.BORDER_CONSTANT, value=(1, 1, 1))
    ])

    with torch.no_grad():
        for images, _ in tqdm(loader):
            images = images.to(device)

            outputs = model(images)
            probabilities = torch.nn.functional.softmax(outputs, dim=1)

            flipped_images = torch.flip(images, dims=[3])
            outputs_flipped = model(flipped_images)
            probabilities_flipped = torch.nn.functional.softmax(outputs_flipped, dim=1)

            rotations = [30, 60, 90, 120, 150, 180, 210, 240, 270, 300, 330]
            rotated_probabilities = []
            for angle in rotations:
                rot_transform.transforms[0].limit = (angle, angle)
                rotated_images = []
                for img in images:
                    img_np = img.permute(1, 2, 0).cpu().numpy()
                    rotated_img = rot_transform(image=img_np)['image']
                    rotated_img_tensor = torch.from_numpy(rotated_img).permute(2, 0, 1).to(device)
                    rotated_images.append(rotated_img_tensor)
                rotated_images = torch.stack(rotated_images)

                outputs_rotated = model(rotated_images)
                probabilities_rotated = torch.nn.functional.softmax(outputs_rotated, dim=1)
                rotated_probabilities.append(probabilities_rotated)

            averaged_probabilities = (probabilities + probabilities_flipped + sum(rotated_probabilities)) / (len(rotations) + 2)
            # preds = averaged_probabilities.argmax(dim=1)
            final_preds.extend(averaged_probabilities.cpu().numpy())
    return final_preds

logging.info('6. Define Test Time Augmentation')

In [None]:
def train_model(patience, num_epochs, device):

    # 시작 시간
    since = time.time()

    models = []
    
    for fold_index, (train_index, validation_index) in enumerate(train_folds):

        # 그라디언트 스케일러 초기화
        scaler = GradScaler()
  
        print()
        print(f'Stratified K-Fold: {fold_index}')
        logging.info(f'Stratified K-Fold: {fold_index + 1} / 5')
        print('-' * 10)

        model = timm.create_model(
            model_name=model_name,
            pretrained=True,
            # 11개
            num_classes=11
        ).to(device)

        loss_fn = FocalLoss(alpha=alpha, gamma=2)
        optimizer = Adam(model.parameters(), lr=learning_rate)
        scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=T_0, T_mult=T_mult, eta_min=eta_min)
        
        # train
        train_data = class_file.iloc[train_index, :]
        train_dataset = ImageDataset(
            df=train_data,
            path=train_path,
            transform=train_transform
        )
        train_loader = DataLoader(
            train_dataset,
            batch_size=batch_size,
            shuffle=True,
            num_workers=num_workers,
            pin_memory=True,
            drop_last=False
        )

        # validation
        validation_data = class_file.iloc[validation_index, :]
        validation_dataset = ImageDataset(
            df=validation_data,
            path=train_path,
            transform=train_transform
        )
        validation_loader = DataLoader(
            validation_dataset,
            batch_size=batch_size,
            shuffle=True,
            num_workers=num_workers,
            pin_memory=True,
            drop_last=False
        )

        best_epoch = 0
        best_f1_score = 0
        early_stop_counter = 0
        best_model_weights = None

        for epoch in range(num_epochs):
            print(f'Epoch {epoch}/{num_epochs - 1}')
            print('-' * 10)
            logging.info(f"Epoch {epoch + 1}/{num_epochs}")

            if early_stop_counter >= patience:
                logging.info(f"Early Stopping... epoch {epoch + 1}")
                print("Early Stopping....\n")
                break

            # train
            ret = _train_one_epoch(train_loader, model, optimizer, loss_fn, scheduler, scaler, device)
            # validation
            ret2, val_f1 = _val_one_epoch(validation_loader, model, device)

            print(f"Loss: {ret['train_loss']:.4f}, train Accuracy: {ret['train_acc']:.4f}, train F1-Score: {ret['train_f1']:.4f}")
            print(f"validation Accuracy: {ret2['train_acc']:.4f}, validation F1-Score: {ret2['train_f1']:.4f}")
            print('-' * 10)
            
            # f1-score을 비교
            if val_f1 > best_f1_score:
                early_stop_counter = 0

                best_epoch = epoch
                best_f1_score = val_f1
                best_model_weights = model.state_dict()
                
            else:
                early_stop_counter += 1

    
        print(f'best epoch: {best_epoch}, best f1 score: {best_f1_score}')

        time_elapsed = time.time() - since
        print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')

        # 가장 좋은 모델의 가중치(w) 가져오기
        model.load_state_dict(best_model_weights)
    
        model_path = f'{root_path}/model/{model_name}_class_3'
        if not os.path.exists(model_path):
            os.makedirs(model_path)

        with open(f'{model_path}/fold_{fold_index}' + '.pkl', 'wb') as f:
            pickle.dump(model, f)

        models.append({
            'model': model,
            'weights': best_model_weights,
            'f1_score': best_f1_score,
        })
        
    return models

In [None]:
logging.info('7. Start Training')
logging.info('----------------------------------------------')

In [None]:
# 훈련하기
models = train_model(patience, num_epochs=num_epochs, device=device)

In [None]:

logging.info('----------------------------------------------')
logging.info('8. Finish Training')

In [None]:
# 모델 불러오기
models = []

for i in range(5):
    with open(f'{root_path}/model/{model_name}_all/fold_{i}.pkl', 'rb') as f:
        model = pickle.load(f)
    models.append(model)



### classifier로 나눈 3개의 클래스 중 class 2에 해당하는 row들만 가져오기

In [None]:
# classifier 결과 csv 불러오기
classfier_result = pd.read_csv(f'{root_path}/efficientnet_b5.sw_in12k_ft_in1k class.csv')


test_file = pd.read_csv(f'{root_path}/sample_submission.csv')

In [None]:
test_file_class_2 = test_file.loc[classfier_result['class'] == 2]
test_file_class_2

In [None]:
test_dataset = ImageDataset(
    df=test_file_class_2,
    path=test_path,
    transform=test_transform,
)

test_loader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=num_workers,
    pin_memory=True
)

In [None]:
result_path = f"{root_path}/result"
if not os.path.exists(result_path):
    os.makedirs(result_path)

# Soft Voting
ensemble_preds_soft = []
for model in models:
    # shape (model 수, test data 이미지 수, class 수)
    ensemble_preds_soft.append(tta(model, test_loader, device))
    
# shape (test data 이미지 수, class 수)
ensemble_preds_soft = np.mean(ensemble_preds_soft, axis=0)
# shape (test data 이미지 수)
final_preds_soft = np.argmax(ensemble_preds_soft, axis=1)

In [None]:
pred_df = pd.DataFrame(test_dataset.df, columns=['ID', 'target'])
pred_df['target'] = final_preds_soft
logging.info('9. Predict')

In [None]:
# 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
# 1, 3, 4, 6, 7, 10, 11, 12, 13, 14, 15 으로 다시 복구
pred_df.loc[pred_df['target'] == 0, 'target'] = 1
pred_df.loc[pred_df['target'] == 1, 'target'] = 3
pred_df.loc[pred_df['target'] == 2, 'target'] = 4
pred_df.loc[pred_df['target'] == 3, 'target'] = 6
pred_df.loc[pred_df['target'] == 4, 'target'] = 7
pred_df.loc[pred_df['target'] == 5, 'target'] = 10
pred_df.loc[pred_df['target'] == 6, 'target'] = 11
pred_df.loc[pred_df['target'] == 7, 'target'] = 12
pred_df.loc[pred_df['target'] == 8, 'target'] = 13
pred_df.loc[pred_df['target'] == 9, 'target'] = 14
pred_df.loc[pred_df['target'] == 10, 'target'] = 15

In [None]:
assert (test_file_class_2['ID'] == pred_df['ID']).all()

In [None]:
result_path = f"{root_path}/result"
if not os.path.exists(result_path):
    os.makedirs(result_path)
pred_df.to_csv(f"{result_path}/{model_name}_class_3.csv", index=False)
pred_df.head()
logging.info('Finish!')