In [2]:
%pip install timm
%pip install albumentations augraphy torch opencv-python
%pip install --upgrade numpy pandas scikit-learn

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


# 1. 라이브러리 로드 및 클래스와 함수 정의

In [10]:
import os
import cv2
import timm
import random
import numpy as np
import pandas as pd
from tqdm import tqdm
import augraphy as aug
import albumentations as A
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from albumentations.pytorch import ToTensorV2
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, f1_score

import torch
import torch.nn as nn
import torch.optim as optim

# ============================
# 시드 고정
# ============================
def set_seed(seed=42):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


In [11]:
# ============================
# 데이터셋
# ============================
class DocumentDataset(Dataset):
    def __init__(self, df, img_dir, 
                 hard_paper_transform=None,
                 easy_paper_transform=None, 
                 non_paper_transform=None):
        self.img_dir = img_dir
        # self.transform = transform

        # 1. 종이/비종이/어려운종이 이미지에 따라 다른 증강을 적용할 수 있도록 분리
        self.hard_paper_transform = hard_paper_transform
        self.easy_paper_transform = easy_paper_transform
        self.non_paper_transform = non_paper_transform
        
        # 2. 타겟 그룹을 set으로 정의 (빠른 조회를 위해)
        # 2.1 '어려운 종이'
        self.hard_paper_targets = {3, 4, 6, 7, 14, 10}
        
        # 2.2 '쉬운 종이'
        # (기존 '종이' 그룹 [1,3,4,6,7,11,12,14,10,13] 에서 '어려운' 그룹을 제외)
        self.easy_paper_targets = {1, 11, 12, 13}

        # 3. K-Fold로 잘린 df를 받자마자 인덱스를 0부터 리셋합니다.
        df_reset = df.reset_index(drop=True)
        self.image_ids = df_reset['ID'].values
        self.labels = df_reset['target'].values
        self.length = len(df_reset)

    def __len__(self):
        # __init__에서 저장한 데이터셋의 길이를 반환합니다.
        return self.length

    def __getitem__(self, idx):
        # DataLoader는 0부터 (self.length - 1)까지의 idx를 전달합니다.
        
        try:
            # 3. .iloc가 아닌, 안전한 numpy 배열에서 idx로 데이터를 가져옵니다.
            img_id = self.image_ids[idx]
            label = self.labels[idx]
        except IndexError:
            print(f"!!! DATASET INDEX ERROR: idx={idx}, len={self.length} !!!")
            raise

        img_path = os.path.join(self.img_dir, img_id)
        image = cv2.imread(img_path)
        
        # 이미지 파일을 읽지 못한 경우 에러 처리
        if image is None:
            raise FileNotFoundError(f"이미지 파일을 찾을 수 없거나 읽을 수 없습니다: {img_path}")

        # BGR -> RGB
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            
        # --- 레이블에 따라 다른 Transform 적용 ---
        transform_to_apply = None
        
        if label in self.hard_paper_targets:
            # 1. '어려운 종이'
            transform_to_apply = self.hard_paper_transform
        elif label in self.easy_paper_targets:
            # 2. '쉬운 종이'
            transform_to_apply = self.easy_paper_transform
        else:
            # 3. '비종이'
            transform_to_apply = self.non_paper_transform
        
        # 5. 선택된 Transform 적용
        if transform_to_apply:
            augmented = transform_to_apply(image=image)
            image = augmented['image']
            
        return image, label

In [12]:
# ============================
# Mixup
# ============================
def mixup_data(x, y, alpha=1.0, device='cuda'):
    """
    배치 내에서 MixUp을 수행합니다.
    x: 이미지 배치 (images)
    y: 레이블 배치 (labels)
    alpha: Beta 분포의 하이퍼파라미터 (1.0일 때 0~1 사이의 균일한 분포와 유사)
    """
    if alpha > 0:
        # 람다(lambda) 값을 Beta 분포에서 샘플링
        lam = np.random.beta(alpha, alpha)
    else:
        # alpha=0이면 MixUp을 적용하지 않음
        lam = 1

    batch_size = x.size(0)
    
    # 섞을 대상 인덱스를 랜덤하게 섞음
    # (예: [0, 1, 2, 3] -> [2, 0, 3, 1])
    index = torch.randperm(batch_size).to(device)

    # 이미지를 섞음 (x_a * lam + x_b * (1-lam))
    mixed_x = lam * x + (1 - lam) * x[index, :]
    
    # 레이블도 섞을 준비 (y_a, y_b)
    y_a, y_b = y, y[index]
    
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, preds, y_a, y_b, lam):
    """
    MixUp된 예측값(preds)에 대한 손실(loss)을 계산합니다.
    criterion: 기본 손실 함수 (예: nn.CrossEntropyLoss)
    preds: 모델의 예측값 (logits)
    y_a, y_b: 섞인 두 원본 레이블
    lam: MixUp 람다 값
    """
    # 손실 계산: loss = (loss_a * lam) + (loss_b * (1-lam))
    return lam * criterion(preds, y_a) + (1 - lam) * criterion(preds, y_b)

In [13]:
# ============================
# Epoch 학습 (MixUp 포함)
# ============================
def train_one_epoch(model, loader, criterion, optimizer, device, mixup_alpha):
    model.train()
    total_loss = 0
    
    for images, labels in tqdm(loader, desc="Train"):
        images = images.to(device)
        labels = labels.to(device)

        # MixUp 적용
        mixed_images, labels_a, labels_b, lam = mixup_data(images, labels, mixup_alpha, device)

        optimizer.zero_grad()
        
        outputs = model(mixed_images)
        loss = mixup_criterion(criterion, outputs, labels_a, labels_b, lam)
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
    return total_loss / len(loader)

In [14]:
# ============================
# Epoch 검증 함수 (F1 스코어)
# ============================
def validate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for images, labels in tqdm(loader, desc="Valid"):
            images = images.to(device)
            labels = labels.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            total_loss += loss.item()
            
            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            
    # F1 스코어 계산 (Macro F1)
    f1 = f1_score(all_labels, all_preds, average='macro')
    return total_loss / len(loader), f1

In [15]:
# ============================
# 학습 조기 종료
# ============================
class EarlyStopping:
    """
    Validation F1 스코어를 모니터링하여 학습을 조기 종료시킵니다.
    F1 스코어는 높을수록 좋습니다.
    """
    def __init__(self, patience=5, verbose=False, delta=0, path='best_model.pth'):
        """
        Args:
            patience (int): F1 스코어가 개선되지 않아도 기다릴 에포크 수
            verbose (bool): 조기 종료 또는 개선 시 메시지 출력 여부
            delta (float): F1 스코어가 최소 이 값 이상 개선되어야 함
            path (str): 최고 성능 모델을 저장할 경로
        """
        self.patience = patience
        self.verbose = verbose
        self.delta = delta
        self.path = path
        self.counter = 0
        self.best_score = None
        self.early_stop = False

    def __call__(self, val_f1, model):
        # 처음 호출 시 best_score 초기화
        if self.best_score is None:
            self.best_score = val_f1
            self.save_checkpoint(val_f1, model)
        
        # F1 스코어가 개선되지 않았을 때
        elif val_f1 < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        
        # F1 스코어가 개선되었을 때
        else:
            self.best_score = val_f1
            self.save_checkpoint(val_f1, model)
            self.counter = 0

    def save_checkpoint(self, val_f1, model):
        # 모델 저장
        if self.verbose:
            print(f'Validation F1 improved ({self.best_score:.6f} -> {val_f1:.6f}). Saving model ...')
        torch.save(model.state_dict(), self.path)

# 2. Augraphy 파이프라인 생성 (온라인 증강)

In [17]:

IMG_SIZE = 384
TRAIN_IMG_DIR = "datasets_fin/train/"
TRAIN_CSV_PATH = "datasets_fin/train.csv"
MODEL_SAVE_DIR = "model/"

# ============================
# Augraphy 파이프라인
# ============================
# 표준 종이 문서 ([1,11,12,13])
augraphy_pipeline = aug.AugraphyPipeline(
    [
        # 1. 팩스/복사본처럼 품질 저하 (Test 데이터의 흐릿함/대비 문제)
        aug.BadPhotoCopy(
            noise_size=(3, 3),        # 노이즈 크기
            noise_type=-1,            # 랜덤 노이즈 타입
            noise_value=(50, 100), # 노이즈 강도
            p=0.5                     # 50% 확률로 적용
        ),
        
        # 2. 잉크/토너 부족 효과
        aug.LowInkRandomLines(
            use_consistent_lines=False, # 일관된 줄무늬 사용 안 함
            count_range=(3, 10), # 잉크 빠진 줄 개수
            p=0.4                       # 40% 확률로 적용
        ),

        # 3. 스캐너 롤러 자국 또는 얼룩
        aug.DirtyRollers(
            line_width_range=(2, 6),
            p=0.3                       # 30% 확률로 적용
        ),
    ]
)

# 어려운 종이 문서 ([3,4,6,7,10,14])
augraphy_pipeline_hard = aug.AugraphyPipeline(
    [
        aug.BadPhotoCopy(
            noise_size=(2, 4), # 노이즈 크기 증가
            noise_value=(40, 80), # 강도 증가
            p=0.8 # 확률 50% -> 80%
        ),
        aug.LowInkRandomLines(
            count_range=(5, 15), # 잉크 빠짐 증가
            use_consistent_lines=False, 
            p=0.7 # 확률 40% -> 70%
        ),
        aug.DirtyRollers(
            line_width_range=(3, 8), # 롤러 자국 증가
            p=0.6 # 확률 30% -> 60%
        ),
    ]
)

# --- Albumentations의 Lambda로 Augraphy 래핑 ---
# Albumentations 파이프라인 내에서 Augraphy를 호출하기 위한 함수
# 표준 종이 문서 ([1,11,12,13])
def apply_augraphy(image, **kwargs):
    """
    Albumentations가 넘겨준 이미지를 Augraphy 파이프라인에 적용
    """
    # Augraphy는 RGB (H, W, 3) 형태의 uint8 numpy 배열을 기대합니다.
    # cv2.imread로 읽었다면 BGR -> RGB 변환이 필요할 수 있지만,
    # Pytorch Dataset에서 보통 RGB로 로드하므로 바로 사용 가능
    return augraphy_pipeline.augment(image)["output"]

# 어려운 종이 문서 ([3,4,6,7,10,14])
def apply_augraphy_hard(image, **kwargs):
    return augraphy_pipeline_hard.augment(image)["output"]


# --- 최종 Albumentations 파이프라인 ---
# 표준 종이 문서 ([1,11,12,13])
def get_train_transform(img_size=IMG_SIZE):
    return A.Compose([
        # --- 1. Augraphy 적용 (Lambda 사용) ---
        # 50% 확률로 Augraphy의 '더러운' 효과 적용
        A.Lambda(image=apply_augraphy, p=0.5),

        # --- 2. 기하학적 변형 (Test 데이터의 심한 왜곡 재현) ---
        # (가장 중요) 원근 왜곡: 비스듬히 찍힌 효과
        A.Perspective(
            scale=(0.05, 0.1),  # 왜곡 강도
            pad_mode=cv2.BORDER_CONSTANT, # 빈 공간 채우기
            pad_val=0,          # 검은색으로 채우기
            p=0.7               # 70% 확률로 적용
        ),

        # 회전: 각도를 크게 줌
        A.Rotate(
            limit=(-40, 40),      # -40 ~ +40도 사이로 회전
            border_mode=cv2.BORDER_CONSTANT,
            value=0,
            p=0.8                 # 80% 확률로 적용
        ),

        # --- 3. 품질 저하 (Test 데이터의 흐릿함 재현) ---
        # 모션 블러 (손 떨림 효과)
        A.MotionBlur(blur_limit=(3, 11), p=0.4),

        # 가우시안 블러
        A.GaussianBlur(blur_limit=(3, 7), p=0.4),

        # --- 4. 색상/조명 변형 ---
        # 밝기, 대비 조절 (다양한 조명 환경)
        A.RandomBrightnessContrast(
            brightness_limit=0.2, # 밝기 20%
            contrast_limit=0.2,   # 대비 20%
            p=0.5
        ),
        
        # 가우시안 노이즈
        A.GaussNoise(var_limit=(10.0, 50.0), p=0.3),

        # --- 5. 최종 처리 ---
        # 이미지 크기 통일 (모델 입력 크기)
        A.Resize(img_size, img_size),

        # 정규화 (ImageNet 평균/표준편차)
        A.Normalize(
            mean=[0.485, 0.456, 0.406], 
            std=[0.229, 0.224, 0.225]
        ),
        
        # Pytorch 텐서로 변환
        ToTensorV2()
    ])

# 어려운 종이 문서 ([3,4,6,7,10,14])
def get_train_transform_hard_paper(img_size):
    """
    (신규) '어려운' 종이 문서를 위한 강력한 Transform
    """
    return A.Compose([
        # 1. '강력한' Augraphy 적용 (확률 80%로 상시 적용에 가깝게)
        A.Lambda(image=apply_augraphy_hard, p=0.8),

        # 2. 기하학적 변형 (Albumentations) - 강도 유지
        A.Perspective(scale=(0.05, 0.1), pad_mode=cv2.BORDER_CONSTANT, p=0.7),
        A.Rotate(limit=(-40, 40), border_mode=cv2.BORDER_CONSTANT, value=0, p=0.8),

        # 3. 품질 저하 (흐릿함)
        A.MotionBlur(blur_limit=(3, 11), p=0.5), # 확률 살짝 높임
        A.GaussianBlur(blur_limit=(3, 7), p=0.5), # 확률 살짝 높임

        # 4. 색상/조명
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
        A.GaussNoise(var_limit=(10.0, 50.0), p=0.3),

        # 5. 최종 처리
        A.Resize(img_size, img_size),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2()
    ])

# 비종이 문서 ([0,2,5,8,9,15,16])
def get_train_transform_non_paper(img_size):
    """
    (신규) 비종이 문서(대시보드, 신분증 등)를 위한 Transform
    Augraphy (팩스/잉크 훼손)가 제외됩니다.
    """
    return A.Compose([
        # --- 1. Augraphy (제외됨) ---

        # --- 2. 기하학적 변형 (Albumentations) ---
        # (카메라 왜곡 등은 여전히 유효함)
        A.Perspective(scale=(0.05, 0.1), pad_mode=cv2.BORDER_CONSTANT, p=0.7),
        A.Rotate(limit=(-40, 40), border_mode=cv2.BORDER_CONSTANT, value=0, p=0.8),

        # 3. 품질 저하 (흐릿함)
        A.MotionBlur(blur_limit=(3, 11), p=0.4),
        A.GaussianBlur(blur_limit=(3, 7), p=0.4),

        # 4. 색상/조명
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
        A.GaussNoise(var_limit=(10.0, 50.0), p=0.3),

        # 5. 최종 처리
        A.Resize(img_size, img_size),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2()
    ])

# --- 검증(Validation) 데이터용 Transform ---
def get_valid_transform(img_size=IMG_SIZE):
    return A.Compose([
        A.Resize(img_size, img_size),
        A.Normalize(
            mean=[0.485, 0.456, 0.406], 
            std=[0.229, 0.224, 0.225]
        ),
        ToTensorV2()
    ])

# 3. 하이퍼파라미터 세팅
### 1. ConvNeXt

In [18]:
N_SPLITS = 5
BATCH_SIZE = 8
N_CLASSES = 17
EPOCHS = 20       # (Fold당 Epoch)
LR = 1e-4
MIXUP_ALPHA = 1.0
SEED = 42
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PATIENCE = 5
WEIGHT_DECAY = 0.05
LABEL_SMOOTHING = 0.1

model_name = "convnext_base"

# model name 축약
model_short_name = "conv"

### 2. Efficientnet_b4

In [20]:
N_SPLITS = 5
BATCH_SIZE = 8
N_CLASSES = 17
EPOCHS = 20       # (Fold당 Epoch)
LR = 2e-4
MIXUP_ALPHA = 1.0
SEED = 42
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PATIENCE = 5
WEIGHT_DECAY = 1e-2
LABEL_SMOOTHING = 0.1

model_name = "tf_efficientnet_b4_ns"

# model name 축약
model_short_name = "tf_eff_b4"

### 3. ViT

In [22]:
N_SPLITS = 5
BATCH_SIZE = 8
N_CLASSES = 17
EPOCHS = 20       # (Fold당 Epoch)
LR = 2e-5
MIXUP_ALPHA = 1.0
SEED = 42
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PATIENCE = 5
WEIGHT_DECAY = 0.05
LABEL_SMOOTHING = 0.1

model_name = "vit_base_patch16_384"

# model name 축약
model_short_name = "vit"

# 4. 학습 수행

In [None]:
# 6.2 시드 고정
set_seed(SEED)

# 6.3 전체 데이터 로드
df = pd.read_csv(TRAIN_CSV_PATH)

# 6.4 K-Fold 준비
# .values를 사용해 numpy 배열로 변환
X = df['ID'].values 
y = df['target'].values

skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=SEED)

# 6.5 K-Fold 학습 시작
for fold, (train_idx, valid_idx) in enumerate(skf.split(X, y)):
    print(f"--- FOLD {fold+1}/{N_SPLITS} ---")

    # 1. Fold별 데이터 분리
    train_df = df.iloc[train_idx]
    valid_df = df.iloc[valid_idx]

    # 2. Transform 정의
    # 2.1 '어려운 종이' 학습용 (가장 강함)
    hard_paper_transform = get_train_transform_hard_paper(IMG_SIZE)
    
    # 2.2 '쉬운 종이' 학습용 (중간 강도 - Augraphy 포함)
    easy_paper_transform = get_train_transform(IMG_SIZE)
    
    # 2.3 '비종이' 학습용 (Augraphy 제외)
    non_paper_transform = get_train_transform_non_paper(IMG_SIZE)
    
    # 2.4 검증용 (증강 없음 - 공통 사용)
    valid_transform = get_valid_transform(IMG_SIZE)

    # 3. Dataset 생성
    train_dataset = DocumentDataset(train_df, 
                                    TRAIN_IMG_DIR, 
                                    hard_paper_transform=hard_paper_transform,
                                    easy_paper_transform=easy_paper_transform,
                                    non_paper_transform=non_paper_transform
                                    )
    
    valid_dataset = DocumentDataset(valid_df, 
                                    TRAIN_IMG_DIR, 
                                    hard_paper_transform=hard_paper_transform,
                                    easy_paper_transform=easy_paper_transform,
                                    non_paper_transform=non_paper_transform
                                    )
    
    # 4. DataLoader 생성
    train_loader = DataLoader(
        train_dataset, 
        batch_size=BATCH_SIZE, 
        shuffle=True, 
        num_workers=4 
    )
    valid_loader = DataLoader(
        valid_dataset, 
        batch_size=BATCH_SIZE, 
        shuffle=False, 
        num_workers=4
    )

    print(f"Train: {len(train_dataset)} | Valid: {len(valid_dataset)}")

    model = timm.create_model(
        model_name,
        pretrained=True,
        num_classes=17
    ).to(DEVICE)

    criterion = nn.CrossEntropyLoss(label_smoothing=LABEL_SMOOTHING)
    optimizer = optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
    
    # --- EarlyStopping 객체 초기화 ---
    fold_model_path = f"{MODEL_SAVE_DIR}best_{model_short_name}_model_fold_{fold+1}.pth"
    early_stopping = EarlyStopping(
        patience=PATIENCE, 
        verbose=True, 
        path=fold_model_path
    )

    best_f1 = 0.0

    # 6. Epoch 학습
    for epoch in range(1, EPOCHS + 1):
        print(f"Epoch {epoch}/{EPOCHS}")
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, DEVICE, MIXUP_ALPHA)
        valid_loss, valid_f1 = validate(model, valid_loader, criterion, DEVICE)
        
        print(f"Train Loss: {train_loss:.4f} | Valid Loss: {valid_loss:.4f} | Valid F1: {valid_f1:.4f}")
        
        # EarlyStopping 호출
        early_stopping(valid_f1, model)
        if early_stopping.early_stop:
            print("Early stopping triggered.")
            break

    print(f"{model_short_name} : Fold {fold+1} Best F1 Score: {early_stopping.best_score::.4f}")

print("--- K-Fold Training Finished ---")