# **📄 Document type classification baseline code**
> 문서 타입 분류 대회에 오신 여러분 환영합니다! 🎉     
> 아래 baseline에서는 ResNet 모델을 로드하여, 모델을 학습 및 예측 파일 생성하는 프로세스에 대해 알아보겠습니다.

## Contents
- Prepare Environments
- Import Library & Define Functions
- Hyper-parameters
- Load Data
- Train Model
- Inference & Save File


## 1. Prepare Environments

* 데이터 로드를 위한 구글 드라이브를 마운트합니다.
* 필요한 라이브러리를 설치합니다.

## 2. Import Library & Define Functions
* 학습 및 추론에 필요한 라이브러리를 로드합니다.
* 학습 및 추론에 필요한 함수와 클래스를 정의합니다.

In [1]:
import os
import time
import random

import timm
import torch
import albumentations as A
import pandas as pd
import numpy as np
import torch.nn as nn
from albumentations.pytorch import ToTensorV2
from torch.optim import Adam
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, Subset, WeightedRandomSampler
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split

from torch.utils.data import random_split
from sklearn.model_selection import KFold
from torch import nn, optim

In [2]:
# 시드를 고정합니다.
SEED = 42
os.environ['PYTHONHASHSEED'] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.benchmark = True

In [3]:
# 데이터셋 클래스를 정의합니다.
class ImageDataset(Dataset):
    def __init__(self, csv, path, transform=None):
        self.df = pd.read_csv(csv).values
        self.path = path
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        name, target = self.df[idx]
        img = np.array(Image.open(os.path.join(self.path, name)))
        if self.transform:
            img = self.transform(image=img)['image']
        return img, target

In [4]:
# one epoch 학습을 위한 함수입니다.
def train_one_epoch(loader, model, optimizer, loss_fn, device):
    model.train()
    train_loss = 0
    preds_list = []
    targets_list = []

    pbar = tqdm(loader)
    for image, targets in pbar:
        image = image.to(device)
        targets = targets.to(device)

        model.zero_grad(set_to_none=True)

        preds = model(image)
        loss = loss_fn(preds, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())
        targets_list.extend(targets.detach().cpu().numpy())

        pbar.set_description(f"Loss: {loss.item():.4f}")

    train_loss /= len(loader)
    train_acc = accuracy_score(targets_list, preds_list)
    train_f1 = f1_score(targets_list, preds_list, average='macro')

    ret = {
        "train_loss": train_loss,
        "train_acc": train_acc,
        "train_f1": train_f1,
    }

    return ret

def validate(loader, model, loss_fn, device):
    model.eval()
    val_loss = 0
    preds_list = []
    targets_list = []
    
    with torch.no_grad():
        for image, targets in loader:
            image = image.to(device)
            targets = targets.to(device)
            
            preds = model(image)
            loss = loss_fn(preds, targets)
            
            val_loss += loss.item()
            preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())
            targets_list.extend(targets.detach().cpu().numpy())
    
    val_loss /= len(loader)
    val_acc = accuracy_score(targets_list, preds_list)
    val_f1 = f1_score(targets_list, preds_list, average='macro')
    
    ret = {
        "val_loss": val_loss,
        "val_acc": val_acc,
        "val_f1": val_f1,
    }
    
    return ret

In [5]:
def mixup_data(x, y, alpha=1.0):
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]
    index = torch.randperm(batch_size)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_loss(loss_fn, pred, labels_a, labels_b, lam):
    return lam * loss_fn(pred, labels_a) + (1 - lam) * loss_fn(pred, labels_b)

In [None]:
# Import Libraries
import torch
import torch.nn as nn
from torchvision import transforms
from sklearn.utils.class_weight import compute_class_weight

# Define Data Augmentation (for specific classes)
augment_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomResizedCrop(224),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.ToTensor(),
])

default_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
])

## 3. Hyper-parameters
* 학습 및 추론에 필요한 하이퍼파라미터들을 정의합니다.

In [7]:
# device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# data config
train_path = '../data/train.csv' # train.csv 이미지 폴더 로드
train_img_path = '../data/train' # train 이미지 폴더 로드
test_img_path = '../data/test' # test 이미지 폴더 로드
sub_path = '../data/sample_submission.csv'

# model config
model_name = 'efficientnet_b3'

# training config
img_size = 224
LR = 1e-3
EPOCHS = 10
BATCH_SIZE = 32
num_workers = 0
log_interval = 100

## 3-1. (Offline) Data augmentation

In [8]:
import os
import numpy as np
from PIL import Image, ImageOps
import pandas as pd
from tqdm import tqdm

import albumentations as A
import matplotlib.pyplot as plt 
from glob import glob 

import cv2

In [None]:
df = pd.read_csv('../data/train.csv')

In [None]:
ids = []
targets = []
data_path = train_img_path

def save_rotated_image(image_path, angle, prefix, ID, target):
    rotated_id = f'{prefix}_{ID}'
    ids.append(rotated_id)
    targets.append(target)
    rotated_image = np.array(ImageOps.exif_transpose(Image.open(image_path).rotate(angle, expand=True)))
    Image.fromarray(rotated_image).save(os.path.join(data_path, rotated_id))

for index, ID, target in tqdm(df.itertuples(), desc='Image rotation', mininterval=0.1):
    image_path = os.path.join(data_path, ID)
    
    # Rotate 90, 180, and 270 degrees
    save_rotated_image(image_path, 90, 'r90', ID, target)
    save_rotated_image(image_path, 180, 'r180', ID, target)
    save_rotated_image(image_path, 270, 'r270', ID, target)
 
rotate_data = {
    'ID': ids,
    'target': targets
}
rotate_df = pd.DataFrame(rotate_data)    
df = pd.concat([df, rotate_df])

In [None]:
h_flip = A.HorizontalFlip(always_apply=True, p=1)
v_flip = A.VerticalFlip(always_apply=True, p=1)

ids = []
targets = []
data_path = train_img_path

def save_flipped_image(image, flip_transform, prefix, ID, target):
    flipped_id = f'{prefix}_{ID}'
    ids.append(flipped_id)
    targets.append(target)
    flipped_image = flip_transform(image=image)['image']
    Image.fromarray(flipped_image).save(os.path.join(data_path, flipped_id))

for index, ID, target in tqdm(df.itertuples(), desc='Image flip', mininterval=0.1):
    image_path = os.path.join(data_path, ID)
    image = np.array(Image.open(image_path))

    # Vertical flip for r90 or r270, Horizontal flip otherwise
    if ID.startswith('r90') or ID.startswith('r270'):
        save_flipped_image(image, v_flip, 'Vflip', ID, target)
    else:
        save_flipped_image(image, h_flip, 'Hflip', ID, target)

# Dataframe for flipped images
flip_data = {
    'ID': ids,
    'target': targets
}
flip_df = pd.DataFrame(flip_data)    
df = pd.concat([df, flip_df])

In [None]:
transforms = A.Compose([
    A.OneOf([
        A.GridDistortion(num_steps=5, distort_limit=0.3, interpolation=1, border_mode=4, value=None, mask_value=None, always_apply=True, p=1),
        A.ElasticTransform(always_apply=True, p=1, alpha=1.0, sigma=50.0, alpha_affine=50.0, interpolation=0, border_mode=1, value=(0, 0, 0), mask_value=None, approximate=False),
        A.OpticalDistortion(always_apply=True, p=1, distort_limit=(-0.3, -0.1)),
        A.OpticalDistortion(always_apply=True, p=1, distort_limit=(0.1, 0.3)),
    ], p=0.85),
    A.SomeOf([
        A.RandomBrightnessContrast(brightness_limit=(-0.2, 0.2), contrast_limit=(-0.2, 0.2), p=1),
        A.HueSaturationValue(hue_shift_limit=15, sat_shift_limit=25, val_shift_limit=20, p=1),
        A.MultiplicativeNoise(p=1, multiplier=(1, 1.5), per_channel=True),
        A.Equalize(p=1, mode='cv', by_channels=True),
    ], n=2, p=0.85),
    A.OneOf([
        A.Rotate(limit=(10, 30), border_mode=cv2.BORDER_CONSTANT, p=1),
        A.Rotate(limit=(150, 170), border_mode=cv2.BORDER_CONSTANT, p=1),
        A.Rotate(limit=(190, 210), border_mode=cv2.BORDER_CONSTANT, p=1),
        A.Rotate(limit=(330, 350), border_mode=cv2.BORDER_CONSTANT, p=1),
    ], p=1),
    A.CoarseDropout(p=0.5, max_holes=40, max_height=15, max_width=15, min_holes=8, min_height=8, min_width=8),
    A.Equalize(p=0.5, mode='cv', by_channels=True),
    A.OneOf([
        A.Blur(blur_limit=(3, 4), p=1),
        A.MotionBlur(blur_limit=(3, 5), p=1),
        A.Downscale(scale_min=0.455, scale_max=0.5, interpolation=2, p=1),
    ], p=0.5),
    A.GaussNoise(var_limit=(100, 800), per_channel=True, p=0.5),
])

In [None]:
# Define augmentation count based on target
def get_augmentation_count(target):
    if target == 13:
        return 10
    elif target == 14:
        return 10
    elif target == 1:
        return 10
    else:
        return 3

ids = []
targets = []
data_path = train_img_path

for index, ID, target in tqdm(df.itertuples(), desc='Image augmentation', mininterval=0.1):
    image_path = os.path.join(data_path, ID)
    image = np.array(Image.open(image_path))
    n = get_augmentation_count(target)
    
    for i in range(n):
        transformed_image = transforms(image=image)['image']
        image_ID = f'tf{i}_{ID}' 
        ids.append(image_ID)
        targets.append(target)
        Image.fromarray(transformed_image).save(os.path.join(data_path, image_ID))
    
# Create augmented DataFrame and concatenate with original
aug_data = {
    'ID': ids,
    'target': targets
}
aug_df = pd.DataFrame(aug_data)
df = pd.concat([df, aug_df])

In [None]:
df.to_csv('../data/train2.csv', index=False)

In [None]:
train2_path = '../data/train2.csv'

In [None]:
# df = pd.read_csv('/root/CV_PJT/CV_PJT/code/train2.csv')
sample_submission_df = pd.read_csv(sub_path)

## 4. Load Data
* 학습, 테스트 데이터셋과 로더를 정의합니다.

In [12]:
# augmentation을 위한 transform 코드
trn_transform = A.Compose([
    # 이미지 크기 조정
    A.Resize(height=img_size, width=img_size),
    # images normalization
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    # numpy 이미지나 PIL 이미지를 PyTorch 텐서로 변환
    ToTensorV2(),
])

# test image 변환을 위한 transform 코드
tst_transform = A.Compose([
    A.Resize(height=img_size, width=img_size),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

In [None]:
# Dataset definition
full_dataset = ImageDataset(
    train2_path,
    train_img_path,
    transform=trn_transform
)

# Calculate the total number of samples in the dataset
dataset_size = len(full_dataset)

# Define the ratios for training and validation
train_ratio = 0.8  # Use 80% of the data for training
val_ratio = 1 - train_ratio  # Remaining 20% for validation

# Calculate the number of samples for training and validation
train_size = int(train_ratio * dataset_size)
val_size = dataset_size - train_size  # Ensure all samples are accounted for

# Split the dataset into training and validation sets
trn_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

# Define the test dataset
tst_dataset = ImageDataset(
    sub_path,
    test_img_path,
    transform=tst_transform
)

# Print the sizes of the datasets
print("Training dataset size:", len(trn_dataset))
print("Validation dataset size:", len(val_dataset))
print("Test dataset size:", len(tst_dataset))

In [15]:
# DataLoader 정의
trn_loader = DataLoader(
    trn_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=num_workers,
    pin_memory=True,
    drop_last=False
)
tst_loader = DataLoader(
    tst_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,
    pin_memory=True
)

## 5. Train Model
* 모델을 로드하고, 학습을 진행합니다.

In [17]:
# load model
model = timm.create_model(
    model_name,
    pretrained=True,
    num_classes=17
).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=LR)

In [None]:
for epoch in range(EPOCHS):
    ret = train_one_epoch(trn_loader, model, optimizer, loss_fn, device=device)
    ret['epoch'] = epoch

    log = ""
    for k, v in ret.items():
      log += f"{k}: {v:.4f}\n"
    print(log)

# 6. Inference & Save File
* 테스트 이미지에 대한 추론을 진행하고, 결과 파일을 저장합니다.

In [19]:
preds_list = []

model.eval()
for image, _ in tqdm(tst_loader):
    image = image.to(device)

    with torch.no_grad():
        preds = model(image)
    preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())

100%|██████████| 99/99 [00:14<00:00,  6.91it/s]


In [20]:
pred_df = pd.DataFrame(tst_dataset.df, columns=['ID', 'target'])
pred_df['target'] = preds_list

In [None]:
sample_submission_df = pd.read_csv(sub_path)
assert (sample_submission_df['ID'] == pred_df['ID']).all()

In [None]:
pred_df.to_csv("pred.csv", index=False)

In [23]:
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# Set the figure size
plt.figure(figsize=(8, 6))

# Count the number of predictions per class
sns.countplot(x='target', data=pred_df, palette='Set2')
plt.title('Distribution of Predicted Classes')
plt.xlabel('Class Labels')
plt.ylabel('Count')
plt.xticks(rotation=45)
plt.show()