## Contents
- Import Library & Define Functions
- Hyper-parameters
- Load Data
- Train Model
- Inference & Save File


## Import Library & Define Functions
* 학습 및 추론에 필요한 라이브러리를 로드합니다.
* 학습 및 추론에 필요한 함수와 클래스를 정의합니다.

In [None]:
import os
import random

import timm
import torch
import albumentations as A
import pandas as pd
import numpy as np
import torch.nn as nn
from albumentations.pytorch import ToTensorV2
from torch.optim import Adam
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from copy import deepcopy

from datetime import datetime
import time
from zoneinfo import ZoneInfo
import wandb

In [None]:
train_time = datetime.fromtimestamp(time.time(), tz=ZoneInfo("Asia/Seoul")).strftime("%Y%m%d-%H%M%S")
train_time

wandb.init(project="document-classification", name=f"run-{train_time}")

In [None]:
# 시드를 고정합니다.
SEED = 42
os.environ['PYTHONHASHSEED'] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.benchmark = True

In [None]:
# 데이터셋 클래스를 정의합니다.
class ImageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None, augraphy_pipeline=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.augraphy_pipeline = augraphy_pipeline

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.data.iloc[idx, 0])
        image = np.array(Image.open(img_name).convert('RGB'))
        label = self.data.iloc[idx, 1]

        if self.augraphy_pipeline:
            image = self.augraphy_pipeline(image)

        if self.transform:
            augmented = self.transform(image=image)
            image = augmented['image']

        return image, label

In [None]:
class EarlyStopping:
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt'):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = float('inf')
        self.delta = delta
        self.path = path

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

In [None]:
def validate(val_loader, model, loss_fn, device):
    model.eval()
    val_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            val_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    return val_loss / len(val_loader), correct / total

In [None]:
# one epoch 학습을 위한 함수입니다.
def train_one_epoch(loader, model, optimizer, loss_fn, device):
    model.train()
    train_loss = 0
    preds_list = []
    targets_list = []

    pbar = tqdm(loader)
    for image, targets in pbar:
        image = image.to(device)
        targets = targets.to(device)

        model.zero_grad(set_to_none=True)

        preds = model(image)
        loss = loss_fn(preds, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())
        targets_list.extend(targets.detach().cpu().numpy())

        pbar.set_description(f"Loss: {loss.item():.4f}")

    train_loss /= len(loader)
    train_acc = accuracy_score(targets_list, preds_list)
    train_f1 = f1_score(targets_list, preds_list, average='macro')

    ret = {
        "train_loss": train_loss,
        "train_acc": train_acc,
        "train_f1": train_f1,
    }

    # wandb에 훈련 메트릭 로깅
    wandb.log(ret)
    
    return ret

## Hyper-parameters
* 학습 및 추론에 필요한 하이퍼파라미터들을 정의합니다.

In [None]:
# device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# data config
data_path = 'data/'

# model config
model_name = 'efficientnet_b0'

# training config
img_size = 224
LR = 1e-3
EPOCHS = 100
BATCH_SIZE = 32
num_workers = 16
PATIENCE = 5

retrain_full_dataset = False # 최종 예측 시 전체 train 데이터로 재학습할지 여부
reinitialize_model = False # 최종 예측 재학습 시 모델 초기화할지 여부

# 설정 로깅
wandb.config.update({
    "model": model_name,
    "img_size": img_size,
    "learning_rate": LR,
    "epochs": EPOCHS,
    "batch_size": BATCH_SIZE,
    "num_workers": num_workers,
    'patience': PATIENCE,
    "retrain_full_dataset": retrain_full_dataset,
    "reinitialize_model": reinitialize_model
})

## Load Data
* 학습, 테스트 데이터셋과 로더를 정의합니다.

In [None]:
import augraphy
from augraphy import *
import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image

# augmentation을 위한 transform 코드
def get_augraphy_pipeline():
    return AugraphyPipeline([
        BleedThrough(p=0.5),
        DirtyRollers(p=0.5),
        InkBleed(p=0.5),
        Faxify(p=0.3),
        NoiseTexturize(p=0.5),
        Letterpress(p=0.5),
        LowInkPeriodicLines(p=0.5),
        LowInkRandomLines(p=0.5),
        Folding(p=0.5),
        Markup(p=0.3),  # PencilScribbles 대신
        Stains(p=0.3),  # Watermark 대신
        ])

def get_train_transforms(height, width):
    return A.Compose([
        A.RandomResizedCrop(height=height, width=width, scale=(0.8, 1.0), ratio=(0.75, 1.3333333333333333)),
        A.OneOf([
            A.RandomRotate90(p=0.5),
            A.Rotate(limit=180, p=0.5),
        ], p=0.7),
        A.Flip(p=0.5),
        A.RandomBrightnessContrast(brightness_limit=(-0.2, 0.2), contrast_limit=(-0.2, 0.2), p=0.7),
        A.GaussNoise(var_limit=(10.0, 150.0), p=0.5),
        A.GaussianBlur(blur_limit=(3, 15), p=0.5),
        A.OneOf([
            A.OpticalDistortion(distort_limit=0.1, shift_limit=0.1, p=1.0),
            A.GridDistortion(num_steps=5, distort_limit=0.1, p=1.0),
        ], p=0.5),
        A.ImageCompression(quality_lower=50, quality_upper=100, p=0.5),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2(),
    ])

def get_pred_transforms(height, width):
    return A.Compose([
        A.Resize(height=height, width=width),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2(),
    ])

In [None]:
def stratified_split_dataset(train_csv_path, img_dir, trn_transform, tst_transform, augraphy_pipeline=None, train_size=0.7, val_size=0.15, test_size=0.15, random_state=42):
    # CSV 파일 읽기
    train_df = pd.read_csv(train_csv_path)
    
    # 첫 번째 split: 훈련 세트와 나머지(검증+테스트) 세트로 분할
    train_df, temp_df = train_test_split(
        train_df, 
        train_size=train_size,
        stratify=train_df['target'],
        random_state=random_state
    )
    
    # 두 번째 split: 나머지를 검증 세트와 테스트 세트로 분할
    val_size_adjusted = val_size / (val_size + test_size)
    val_df, test_df = train_test_split(
        temp_df, 
        train_size=val_size_adjusted, 
        stratify=temp_df['target'],
        random_state=random_state
    )
    
    print(f"훈련 세트: {len(train_df)} 샘플")
    print(f"검증 세트: {len(val_df)} 샘플")
    print(f"테스트 세트: {len(test_df)} 샘플")
    
    # 각 데이터프레임을 임시 CSV 파일로 저장
    train_df.to_csv('temp_train.csv', index=False)
    val_df.to_csv('temp_val.csv', index=False)
    test_df.to_csv('temp_test.csv', index=False)
    
    # ImageDataset 생성
    train_dataset = ImageDataset('temp_train.csv', img_dir, transform=trn_transform, augraphy_pipeline=augraphy_pipeline)
    val_dataset = ImageDataset('temp_val.csv', img_dir, transform=tst_transform)
    test_dataset = ImageDataset('temp_test.csv', img_dir, transform=tst_transform)
    
    # 임시 파일 삭제
    os.remove('temp_train.csv')
    os.remove('temp_val.csv')
    os.remove('temp_test.csv')
    
    return train_dataset, val_dataset, test_dataset

In [None]:
# Dataset 정의
train_dataset, val_dataset, test_dataset = stratified_split_dataset(
    'data/train.csv',
    'data/train/',
    trn_transform=get_train_transforms(img_size, img_size),
    tst_transform=get_pred_transforms(img_size, img_size),
    augraphy_pipeline=get_augraphy_pipeline()
)

pred_dataset = ImageDataset(
    "data/sample_submission.csv",
    "data/test/",
    transform=get_pred_transforms(img_size, img_size)
)

print(len(train_dataset), len(val_dataset), len(test_dataset), len(pred_dataset))

In [None]:
# DataLoader 정의

train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=num_workers,
    pin_memory=True,
    drop_last=False
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=num_workers,
    pin_memory=True
)

test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=num_workers,
    pin_memory=True
)

pred_loader = DataLoader(
    pred_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,
    pin_memory=True
)

## Train Model
* 모델을 로드하고, 학습을 진행합니다.

In [None]:
# load model
model = timm.create_model(
    model_name,
    pretrained=True,
    num_classes=17
).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=LR)

In [None]:
# Early stopping 설정
early_stopping = EarlyStopping(PATIENCE, verbose=True, path=f'model_{train_time}.pt')

for epoch in range(EPOCHS):
    ret = train_one_epoch(train_loader, model, optimizer, loss_fn, device=device)
    val_loss, val_acc = validate(val_loader, model, loss_fn, device)
    
    ret['epoch'] = epoch
    ret['val_loss'] = val_loss
    ret['val_acc'] = val_acc

    # wandb에 에폭 로깅
    wandb.log(ret)

    log = ""
    for k, v in ret.items():
        log += f"{k}: {v:.4f}\n"
    print(log)

    # Early stopping 체크
    early_stopping(val_loss, model)
    if early_stopping.early_stop:
        print("Early stopping")
        break

# 최종 모델 저장
model_file_path = os.path.join('model', f'final_model_{train_time}.pt')
torch.save(model.state_dict(), model_file_path)

## 평가

In [None]:
import torch
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score

def evaluate(loader, model, loss_fn, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_targets = []

    with torch.no_grad():
        for image, targets in tqdm(loader, desc="Evaluating"):
            image = image.to(device)
            targets = targets.to(device)

            preds = model(image)
            loss = loss_fn(preds, targets)

            total_loss += loss.item()
            all_preds.extend(preds.argmax(dim=1).cpu().numpy())
            all_targets.extend(targets.cpu().numpy())

    avg_loss = total_loss / len(loader)
    accuracy = accuracy_score(all_targets, all_preds)
    f1 = f1_score(all_targets, all_preds, average='macro')

    return avg_loss, accuracy, f1

# 학습 후 각 데이터셋에 대한 평가
model.to(device)
train_results = evaluate(train_loader, model, loss_fn, device)
valid_results = evaluate(val_loader, model, loss_fn, device)
test_results = evaluate(test_loader, model, loss_fn, device)

# 평가 결과 로깅
wandb.log({
    "final_train_loss": train_results[0],
    "final_train_accuracy": train_results[1],
    "final_train_f1": train_results[2],
    "final_valid_loss": valid_results[0],
    "final_valid_accuracy": valid_results[1],
    "final_valid_f1": valid_results[2],
    "final_test_loss": test_results[0],
    "final_test_accuracy": test_results[1],
    "final_test_f1": test_results[2]
})

In [None]:
def interpret_results(train_results, valid_results, test_results):
    """
    훈련, 검증, 테스트 결과를 해석하는 함수
    
    :param train_results: (train_loss, train_acc, train_f1)
    :param valid_results: (valid_loss, valid_acc, valid_f1)
    :param test_results: (test_loss, test_acc, test_f1)
    :return: 해석 문자열
    """
    train_loss, train_acc, train_f1 = train_results
    valid_loss, valid_acc, valid_f1 = valid_results
    test_loss, test_acc, test_f1 = test_results
    
    interpretation = "모델 성능 해석:\n\n"
    
    # 각 세트의 성능 출력
    interpretation += f"훈련 세트 - Loss: {train_loss:.4f}, Accuracy: {train_acc:.4f}, F1: {train_f1:.4f}\n"
    interpretation += f"검증 세트 - Loss: {valid_loss:.4f}, Accuracy: {valid_acc:.4f}, F1: {valid_f1:.4f}\n"
    interpretation += f"테스트 세트 - Loss: {test_loss:.4f}, Accuracy: {test_acc:.4f}, F1: {test_f1:.4f}\n\n"
    
    # 과적합 여부 확인
    if train_acc - valid_acc > 0.05 and train_acc - test_acc > 0.05:
        interpretation += "과적합 징후가 있습니다. 훈련 세트의 성능이 검증 및 테스트 세트보다 현저히 높습니다.\n"
    elif valid_acc - test_acc > 0.05:
        interpretation += "검증 세트에 과적합되었을 가능성이 있습니다. 테스트 세트의 성능이 상대적으로 낮습니다.\n"
    else:
        interpretation += "과적합의 징후가 크지 않습니다. 세 세트의 성능이 비교적 일관적입니다.\n"
    
    # 전반적인 성능 평가
    avg_acc = (train_acc + valid_acc + test_acc) / 3
    if avg_acc < 0.3:
        interpretation += "전반적인 성능이 낮습니다. 모델 개선이 필요합니다.\n"
    elif avg_acc < 0.6:
        interpretation += "모델이 어느 정도의 학습을 보이지만, 상당한 개선의 여지가 있습니다.\n"
    else:
        interpretation += "모델이 비교적 좋은 성능을 보이고 있습니다. 미세 조정을 통해 더 개선할 수 있습니다.\n"
    
    # F1 점수 해석
    if min(train_f1, valid_f1, test_f1) < 0.3:
        interpretation += "F1 점수가 낮습니다. 클래스 불균형 문제를 고려해야 할 수 있습니다.\n"
    
    # 개선 제안
    interpretation += "\n개선을 위한 제안:\n"
    if train_acc - valid_acc > 0.05:
        interpretation += "- 정규화 기법 (예: dropout, L2 정규화)을 적용해 보세요.\n"
        interpretation += "- 데이터 증강 기법을 강화해 보세요.\n"
    if avg_acc < 0.5:
        interpretation += "- 더 복잡한 모델 아키텍처를 시도해 보세요.\n"
        interpretation += "- 학습률과 배치 크기를 조정해 보세요.\n"
        interpretation += "- 전이 학습을 고려해 보세요.\n"
    if min(train_f1, valid_f1, test_f1) < 0.3:
        interpretation += "- 클래스 가중치 조정을 통해 불균형 문제를 해결해 보세요.\n"
        interpretation += "- 앙상블 기법을 시도해 보세요.\n"
    
    return interpretation

interpret = interpret_results(train_results, valid_results, test_results)
print(interpret)
wandb.log({"interpretation": interpret})

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

def evaluate_and_visualize_errors(model, dataloader, device, num_samples=10):
    model.eval()
    all_preds = []
    all_labels = []
    error_images = []
    error_preds = []
    error_labels = []

    with torch.no_grad():
        for images, labels in tqdm(dataloader, desc="Evaluating"):
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

            # 오류 식별
            errors = preds != labels
            error_images.extend(images[errors].cpu())
            error_preds.extend(preds[errors].cpu().numpy())
            error_labels.extend(labels[errors].cpu().numpy())

    # 정확도 계산
    accuracy = np.mean(np.array(all_preds) == np.array(all_labels))
    print(f"Accuracy: {accuracy:.4f}")

    # 오류 시각화
    num_samples = min(num_samples, len(error_images))
    fig, axes = plt.subplots(1, num_samples, figsize=(20, 4))
    for i in range(num_samples):
        img = error_images[i].permute(1, 2, 0).numpy()
        axes[i].imshow(img)
        axes[i].set_title(f"Pred: {error_preds[i]}, True: {error_labels[i]}")
        axes[i].axis('off')
    plt.tight_layout()
    plt.show()

    # 혼동 행렬 생성 및 시각화
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()

    return error_images, error_preds, error_labels

# 검증 세트에 대한 평가 및 오류 시각화
print("Validation Set Errors:")
val_errors = evaluate_and_visualize_errors(model, val_loader, device)

# 테스트 세트에 대한 평가 및 오류 시각화
print("Test Set Errors:")
test_errors = evaluate_and_visualize_errors(model, test_loader, device)

# 오류 분석
def analyze_errors(error_preds, error_labels):
    error_pairs = list(zip(error_preds, error_labels))
    error_counts = {}
    for pred, true in error_pairs:
        key = f"Pred: {pred}, True: {true}"
        error_counts[key] = error_counts.get(key, 0) + 1
    
    print("Most common errors:")
    for error, count in sorted(error_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
        print(f"{error}: {count} times")

print("Validation Set Error Analysis:")
analyze_errors(val_errors[1], val_errors[2])

print("Test Set Error Analysis:")
analyze_errors(test_errors[1], test_errors[2])

# Inference & Save File
* 테스트 이미지에 대한 추론을 진행하고, 결과 파일을 저장합니다.

In [None]:
if retrain_full_dataset:
    print("Starting final training on entire dataset for submission...")

    # 전체 데이터셋 생성
    full_dataset = ImageDataset(
        "data/train.csv",
        "data/train/",
        transform=train_transform
    )

    full_loader = DataLoader(
        full_dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True,
        drop_last=False
    )

    # 모델 재초기화
    if reinitialize_model:
        model = timm.create_model(model_name, pretrained=True, num_classes=17, drop_rate=0.3).to(device)
        optimizer = Adam(model.parameters(), lr=LR)

    # 전체 데이터셋으로 재학습
    for epoch in range(EPOCHS):
        ret = train_one_epoch(full_loader, model, optimizer, loss_fn, device=device)
        print(f"Epoch {epoch+1}/{EPOCHS}")
        print(f"Loss: {ret['train_loss']:.4f}, Accuracy: {ret['train_acc']:.4f}, F1: {ret['train_f1']:.4f}")

    print("Final training completed.")

In [None]:
print("Generating predictions for submission...")
preds_list = []

model.eval()
for image, _ in tqdm(pred_loader):
    image = image.to(device)

    with torch.no_grad():
        preds = model(image)
    preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())

In [None]:
pred_df = pd.DataFrame(pred_dataset.data, columns=['ID', 'target'])
pred_df['target'] = preds_list 

In [None]:
sample_submission_df = pd.read_csv("data/sample_submission.csv")
assert (sample_submission_df['ID'] == pred_df['ID']).all()

In [None]:
submission_file_path = os.path.join('output', f'{train_time}.csv')
pred_df.to_csv(submission_file_path, index=False)

In [None]:
pred_df.head()

In [None]:
wandb.finish()