# **📄 Document type classification baseline code**


In [None]:
import os
import time
import pandas as pd
import numpy as np
import copy
import wandb
from sklearn.metrics import accuracy_score, f1_score, precision_recall_fscore_support, confusion_matrix

import timm
import torch
import torch.nn as nn
from torch.optim import Adam
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from torchvision.datasets import ImageFolder
from torch.optim.lr_scheduler import CosineAnnealingLR, ReduceLROnPlateau
import random
import torch.backends.cudnn as cudnn
from focal_loss.focal_loss import FocalLoss # https://github.com/mathiaszinnen/focal_loss_torch

import cv2
from PIL import Image
from tqdm import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2
from matplotlib import pyplot as plt
import seaborn as sns
# pip uninstall charset-normalizer
# pip install charset-normalizer

import warnings
warnings.filterwarnings("ignore")

import os
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [None]:
# 시드 고정
def random_seed(seed_num):
    torch.manual_seed(seed_num)
    torch.cuda.manual_seed(seed_num)
    torch.cuda.manual_seed_all(seed_num)
    np.random.seed(seed_num)
    cudnn.benchmark = False
    cudnn.deterministic = True
    random.seed(seed_num)
random_seed(624)

In [None]:
# class와 label이 매핑되어있는 파일 
meta_df = pd.read_csv('data/meta.csv')
meta_df=pd.read_csv('data/meta.csv')
label2id = dict(zip(meta_df['class_name'], meta_df['target']))
id2label = dict(zip(meta_df['target'], meta_df['class_name']))

In [None]:
# 커스텀 데이터 셋
class ImageDataset(Dataset):
    def __init__(self, csv, path, transform=None):
        self.df = pd.read_csv(csv).values
        self.path = path
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        name, target = self.df[idx]
        img = np.array(Image.open(os.path.join(self.path, name)))
        # 이미지가 흑색조인 경우 RGB로
        if len(img.shape) < 3 or img.shape[2] != 3:
            img = np.stack([img] * 3, axis=-1)
        if self.transform:
            img = self.transform(image=img)['image']
        return img, target,name

In [None]:
# https://geunuk.tistory.com/456
def MixUp(input, target, alpha=1.0):
    if alpha > 0:
        lambda_ = np.random.beta(alpha, alpha)
    else:
        lambda_ = 1
 
    batch_size = input.size(0)
    index = torch.randperm(batch_size)
    
    mixed_input = lambda_ * input + (1 - lambda_) * input[index, :]    
    labels_a, labels_b = target, target[index]
 
    return mixed_input, labels_a, labels_b, lambda_

def MixUpLoss(criterion, pred, labels_a, labels_b, lambda_):
    return lambda_ * criterion(pred, labels_a) + (1 - lambda_) * criterion(pred, labels_b)

In [None]:
# DataLoader 정의
def dataset(AUG_BOOL,trn_transform,tst_transform):
    
    # 기존 이미지 혹은 증강된 이미지
    if(AUG_BOOL):train_img_file="data/aug_train.csv"
    else: train_img_file="data/train.csv"

    origin_train_dataset = ImageDataset(
        "data/train.csv",
        "data/train/",
        transform=trn_transform
    )
    trn_dataset = ImageDataset(
        train_img_file,  
        "data/aug_train/",
        transform=trn_transform
    )
    val_dataset = ImageDataset(
        "data/aug_valid.csv",
        "data/aug_valid/",
        transform=trn_transform
    )
    tst_dataset = ImageDataset(
        "data/aug_test.csv",
        "data/aug_test/",
        transform=trn_transform
    )
    origin_tst_dataset = ImageDataset(
        "data/sample_submission.csv",
        "data/test/",
        transform=tst_transform
    )
    
    return origin_train_dataset,trn_dataset,val_dataset,tst_dataset,origin_tst_dataset

# 파라미터에 따른 데이터 로더
def loader(batch_size,origin_train_dataset,trn_dataset,val_dataset,tst_dataset,origin_tst_dataset):
    origin_train_loader = DataLoader(
        origin_train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,
        pin_memory=True,
        drop_last=False
    )
    train_loader = DataLoader(
        trn_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,
        pin_memory=True,
        drop_last=False
    )
    valid_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,
        pin_memory=True,
        drop_last=False
    )
    test_loader = DataLoader(
        tst_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,
        pin_memory=True,
        drop_last=False
    )
    origin_test_loader = DataLoader(
        origin_tst_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,
        pin_memory=True,
        drop_last=False
    )
    
    return origin_train_loader,train_loader,valid_loader,test_loader,origin_test_loader

# 이미지 사이즈에 따른 Transform (각 모델별 input 이미지 사이즈와 정규화)
def image_trasform(image_size,model_mean,model_std):
    trn_transform = A.Compose([
        A.Resize(height=image_size, width=image_size),
        A.Normalize(mean=list(model_mean), std=list(model_std)),
        ToTensorV2(),
        ])
    tst_transform = A.Compose([
        A.Resize(height=image_size, width=image_size),
        A.Normalize(mean=list(model_mean), std=list(model_std)),
        ToTensorV2(),
    ])
    
    return trn_transform,tst_transform

In [None]:
# training, evaluation, training_loop 코드
def training(model, dataloader, train_dataset, criterion, optimizer, device, epoch, EPOCH):

  model.train()  # 모델을 학습 모드로 설정
  train_loss = 0.0
  train_accuracy = 0
  all_labels = []
  all_predicted = []
 
  tbar = tqdm(dataloader)
  for idx,(images, labels, names) in enumerate(tbar):
      images = images.to(device)
      labels = labels.to(device)
      
      # Mixup 적용 배치가 3으로 나눠떨어질 떄 마다 실행
      if (idx + 1) % 3 == 0:
          images, labels_a, labels_b, lambda_ = MixUp(images, labels)
          outputs = model(images)
          if isinstance(outputs, torch.Tensor): outputs = outputs
          else: outputs = outputs.logits
          loss = MixUpLoss(criterion, pred=outputs, labels_a=labels_a, labels_b=labels_b, lambda_=lambda_)
      else:                    
          outputs = model(images)
          if isinstance(outputs, torch.Tensor): outputs = outputs
          else: outputs = outputs.logits
          if(LOSS_F=='CE'):loss = criterion(outputs, labels)
          else: loss = criterion(m(outputs), labels) # focal loss
  
      # 역전파 및 가중치 업데이트
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
 
      # 손실과 정확도 계산
      train_loss += loss.item()
      
      # torch.max에서 dim 인자에 값을 추가할 경우, 해당 dimension에서 최댓값과 최댓값에 해당하는 인덱스를 반환
      # _는 가장 높은 클래스 확률값, predicted는 가장 높은 클래스
      _, predicted = torch.max(outputs, 1)
      train_accuracy += (predicted == labels).sum().item()
      
      all_labels.extend(labels.cpu().numpy())
      all_predicted.extend(predicted.cpu().numpy())
 
      tbar.set_description(f"Epoch [{epoch+1}/{EPOCH}], Train Loss: {loss.item():.4f}")
 
  # 에폭별 학습 결과 출력
  train_loss = train_loss / len(dataloader)
  train_accuracy = train_accuracy / len(train_dataset)
  precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_predicted, average='macro')
 
  return model, train_loss, train_accuracy, f1
 
def evaluation(model, dataloader, val_dataset, criterion, device, epoch, EPOCH):
  model.eval()  # 모델을 평가 모드로 설정
  valid_loss = 0.0
  valid_accuracy = 0
  all_labels = []
  all_predicted = []
 
  with torch.no_grad(): # model의 업데이트 막기
      tbar = tqdm(dataloader)
      for images, labels, names in tbar:
          images = images.to(device)
          labels = labels.to(device)
 
          # 순전파
          outputs = model(images)
          # timm에서 불러온 모델과 automodel에서 불러온 모델의 output 형태가 다름
          if isinstance(outputs, torch.Tensor): outputs = outputs
          else: outputs = outputs.logits
          
          if(LOSS_F=='CE'):loss = criterion(outputs, labels)
          else: loss = criterion(m(outputs), labels) # focal loss
 
          # 손실과 정확도 계산
          valid_loss += loss.item()
          # torch.max에서 dim 인자에 값을 추가할 경우, 해당 dimension에서 최댓값과 최댓값에 해당하는 인덱스를 반환
          _, predicted = torch.max(outputs, 1)
          valid_accuracy += (predicted == labels).sum().item()
          
          all_labels.extend(labels.cpu().numpy())
          all_predicted.extend(predicted.cpu().numpy())
 
          tbar.set_description(f"Epoch [{epoch+1}/{EPOCH}], Valid Loss: {loss.item():.4f}")
          
  # 평가지수
  valid_loss = valid_loss / len(dataloader)
  valid_accuracy = valid_accuracy / len(val_dataset)
  precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_predicted, average='macro') # zero_division=1
 
  return model, valid_loss, valid_accuracy, f1
 
 
def training_loop(model, train_loader,valid_loader,train_dataset, val_dataset, criterion, optimizer, scheduler, device, patience, EPOCH, lr_scheduler_name):
  
    best_valid_loss = float('inf')  # 가장 좋은 validation loss를 저장
    early_stop_counter = 0  # 카운터
    valid_max_accuracy = -1
    valid_max_f1 = -1
    

    for epoch in range(EPOCH):
        model, train_loss, train_accuracy,train_f1 = training(model, train_loader, train_dataset, criterion, optimizer, device, epoch, EPOCH)
        model, valid_loss, valid_accuracy,valid_f1 = evaluation(model, valid_loader, val_dataset, criterion, device, epoch, EPOCH)
        
        if(lr_scheduler_name=='CosineAnnealingLR'):
          scheduler.step()
          now_lr=scheduler.get_last_lr()[0]
        elif(lr_scheduler_name=='ReduceLROnPlateau'):
          scheduler.step(valid_loss) # ReduceLROnPlateau는 안에 모니터링할 value를 넣어줘야함. 이전에 scheduler을 선언할때 min,max도. 
          now_lr= optimizer.param_groups[0]['lr'] # ReduceLROnPlateau는 .get_last_lr()[0]을 지원하지 않음 
        else: now_lr = scheduler

        if valid_accuracy > valid_max_accuracy:  valid_max_accuracy = valid_accuracy
        if valid_f1 > valid_max_f1: valid_max_f1 = valid_f1
        
        if valid_loss < best_valid_loss: # validation loss가 감소하면 모델 저장 및 카운터 리셋
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), f"{MODEL_PATH}/{EXP_NAME}.pt")
            early_stop_counter = 0
        else: early_stop_counter += 1 # validation loss가 증가하거나 같으면 카운터 증가
 
        print(f"Epoch [{epoch + 1}/{EPOCH}], Train Accuracy: {train_accuracy:.4f}, Train Loss: {train_loss:.4f},  Train macro F1: {train_f1:.4f} ")
        print(f"Epoch [{epoch + 1}/{EPOCH}], Valid Accuracy: {valid_accuracy:.4f}, Valid Loss: {valid_loss:.4f},  Valid macro F1: {valid_f1:.4f} ")
        print(f"Epoch [{epoch + 1}/{EPOCH}], Learning Rate: {now_lr}")
        print("#"*70)
 
        # earlystopping
        if early_stop_counter >= patience:
            print("Early stopping")
            break
          
    return model
          
def inference(model,model_path,device,test_loader,tst_dataset,testmode=0):
  model.load_state_dict(torch.load(model_path)) # 모델 불러오기
  model = model.to(device)
  model.eval()
  
  total_labels = []
  total_preds = []
  image_names = []
  with torch.no_grad():
      for images, labels,names in tqdm(test_loader):
          images = images.to(device)
          labels = labels.to(device)
  
          outputs = model(images)
          if isinstance(outputs, torch.Tensor): outputs = outputs
          else: outputs = outputs.logits
          _, predicted = torch.max(outputs.data, 1)
  
          total_preds.extend(predicted.detach().cpu().tolist())
          total_labels.extend(labels.tolist())
          image_names.extend(names)
          
  total_preds = np.array(total_preds)
  total_labels = np.array(total_labels)
  image_names = np.array(image_names)
  total_acc = accuracy_score(total_labels, total_preds) 
  
  precision, recall, f1, _ = precision_recall_fscore_support(total_labels, total_preds, average='macro')
  
  # sample test용 -> 학습하지 않은 데이터셋 추론 결과 시각화 
  if(testmode==False):

    print("Test model accuracy : ",total_acc) 
    print("Test model macro f1 : ",f1) 
  
  # 최종 추론 결과를 제출형태로 만들기 위한 코드 
  meta_df=pd.read_csv('data/meta.csv')
  id2label = dict(zip(meta_df['target'], meta_df['class_name']))
  
  result_df = pd.DataFrame({'ID': image_names,'target': total_labels,'pred': total_preds})
  result_df['target'] = result_df['target'].map(id2label)
  result_df['pred'] = result_df['pred'].map(id2label)
  
  if(testmode):result_df.drop(['target'],axis=1,inplace=True)

  # inference 결과 반환
  return result_df

In [None]:
## timm말고 모델 불러올 떄 사용
# MODEL_NAME= 'microsoft/dit-base-finetuned-rvlcdip'
# image_processor  = AutoImageProcessor.from_pretrained(MODEL_NAME)
# model = AutoModelForImageClassification.from_pretrained(MODEL_NAME, 
#     label2id=label2id,
#     id2label=id2label,
#     ignore_mismatched_sizes = True, 
#     num_labels=17
# ).to(device)
# if "height" in image_processor.size:
#     IMG_SIZE = (image_processor.size["height"], image_processor.size["width"])
#     crop_size = size
#     max_size = None
# elif "shortest_edge" in image_processor.size:
#     IMG_SIZE = image_processor.size["shortest_edge"]
#     crop_size = (size, size)
#     max_size = image_processor.size.get("longest_edge")

# Setting

In [None]:
EXP_NAME = "test"

model_name = "resnet50"
# resnet50, resnet101.a1_in1k, resnet34, vgg16, beitv2_base_patch16_224.in1k_ft_in22k_in1k, swin_small_patch4_window7_224.ms_in22k_ft_in1k, convnext_small.fb_in22k

batch_size = 512

image_size = 32

lr_scheduler_name=False # 'ReduceLROnPlateau' / 'CosineAnnealingLR'

LOSS_F='CE' #  CE = Cross Entropy / Focal = Focal loss

weight_decay = False # 1e-3

aug_train = True

init_learning_rate = 1e-4

epoch = 1

patience = 5

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')   
num_workers = 0

MODEL_PATH="test_model"
SUB_PATH="test_submission"

In [None]:
break

# Train & Inference

In [None]:
# timm에서 지정된 모델을 불러옴 
model = timm.create_model(model_name,pretrained=True,num_classes=17).to(device)
model_config= timm.data.resolve_model_data_config(model)

# batchsize와 image size에 따라 transform, dataset, loader을 불러옴 
trn_transform,tst_transform=image_trasform(image_size,model_config['mean'],model_config['std'])
origin_train_dataset,trn_dataset,val_dataset,tst_dataset,origin_tst_dataset = dataset(aug_train,trn_transform,tst_transform)
origin_train_loader,train_loader,valid_loader,test_loader,origin_test_loader = loader(batch_size,origin_train_dataset,trn_dataset,val_dataset,tst_dataset,origin_tst_dataset)

# cross entropy or focal loss -> mixup할 경우 training에서 따로 적용
if(LOSS_F=='CE'): 
    loss_fn=nn.CrossEntropyLoss()
else:
    loss_fn = FocalLoss(gamma=1.5)
    m = torch.nn.Softmax(dim=-1)

# Weight decay 적용
if(weight_decay): optimizer = Adam(model.parameters(), lr=init_learning_rate, weight_decay=weight_decay) 
else: optimizer = Adam(model.parameters(), lr=init_learning_rate)

# learning rate scheduler 적용 -> Cosine Annealing / ReduceLROnPlateau / X
if(lr_scheduler_name=='CosineAnnealingLR'): scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=0)
elif(lr_scheduler_name=='ReduceLROnPlateau'):scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, verbose=True)
else: scheduler = init_learning_rate

# train set에서 분리한 train, valid set
model = training_loop(model, origin_train_loader, valid_loader, trn_dataset, val_dataset, loss_fn, optimizer, scheduler, device, patience, epoch, lr_scheduler_name)

# 학습하지 않은 평가용 sample test
result_df=inference(model,f"{MODEL_PATH}/{EXP_NAME}.pt",device,test_loader,tst_dataset,testmode=False)

# 실제 test set 
submission=inference(model,f"{MODEL_PATH}/{EXP_NAME}.pt",device,origin_test_loader,origin_tst_dataset,testmode=True)

sample_submission=pd.read_csv('data/sample_submission.csv')
submission = sample_submission.merge(submission, on='ID', how='left')
submission.drop(['target'],axis=1,inplace=True)
submission.columns=['ID','target']
submission['target'] = submission['target'].map(label2id)
submission.to_csv(f"{SUB_PATH}/{EXP_NAME}.csv",index=False)
del model
torch.cuda.empty_cache()

In [None]:
def display_random_samples(n, compare_df,is_correct):
    total_len=len(compare_df)
    # 맞은 이미지 출력
    if(is_correct==1):
        df=compare_df[compare_df['target']==compare_df['pred']]
        title="Correct predict"
        prefix='train'
    # 틀린 이미지 출력
    elif(is_correct==2):
        df=compare_df[compare_df['target']!=compare_df['pred']]
        title="Wrong predict"
        prefix='train'
    # test set 예측 결과 출력
    else:
        df=compare_df.copy()
        df['pred']=''
        title='Final result'
        prefix='test'
    random_indices = np.random.choice(len(df), n, replace=False)
    selected_rows = df.iloc[random_indices]

    rows = (n + 2) // 3  
    cols = min(n, 3)
    fig, axes = plt.subplots(rows, cols, figsize=(15, 5 * rows))

    for i, (_, row) in enumerate(selected_rows.iterrows()):
        ax = axes[i // 3, i % 3] if n > 1 else axes  
        image_path = f"data/aug_train/{row['ID']}"  
        image = cv2.imread(image_path)
        ax.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        ax.set_title(f"ID: {row['ID']} \n Real: {row['target']} \n Pred: {row['pred']}")
        ax.axis('off')
    for i in range(n, rows * cols):
        axes[i // 3, i % 3].axis('off')
    plt.suptitle(f"{title}:{len(df)}/{total_len}")
    plt.show()
display_random_samples(9, result_df,1);

# Ensemble

In [None]:
## 이미지 사이즈에 따른 Transform --> 추후에 정규화도 각 모델별로 정의
## 영천님 제목 잘라 붙이는 경우
# def tta_trasform(image_size,model_mean,model_std):

#     tst_transform = A.Compose([
#         A.LongestMaxSize(max_size=640, interpolation=cv2.INTER_CUBIC),
#         A.PadIfNeeded(min_height=640, min_width=640, border_mode=cv2.BORDER_CONSTANT, value=[255, 255, 255]),
#         A.ShiftScaleRotate(shift_limit_x=0.2, shift_limit_y=(0.0, 0.1), scale_limit=0.2, rotate_limit=0, p=0.7, border_mode=cv2.BORDER_CONSTANT, value=[255, 255, 255]),
#         A.Crop (x_min=128, y_min=0, x_max=128+384, y_max=384, p=1.0),
#         A.Normalize(mean=list(model_mean), std=list(model_std)),
#         ToTensorV2(),
#     ])
    
#     return tst_transform

def tta_trasform(image_size,model_mean,model_std):

    tst_transform = A.Compose([
        A.Resize(height=image_size, width=image_size),
        A.Normalize(mean=list(model_mean), std=list(model_std)),
        ToTensorV2(),
    ])
    
    return tst_transform

In [None]:
def tta_dataset(AUG_BOOL,tst_transform):

    origin_tst_dataset = ImageDataset(
        "data/sample_submission.csv",
        #"data/test_rot_catformer01/",
        "data/test/"
        transform=tst_transform
    )
    
    return origin_tst_dataset

def tta_loader(batch_size,origin_tst_dataset):
    
    origin_test_loader = DataLoader(
        origin_tst_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,
        pin_memory=True,
        drop_last=False
    )
    
    return origin_test_loader

In [None]:
meta_df=pd.read_csv('data/meta_kr.csv',encoding='cp949')
id2label = dict(zip(meta_df['target'], meta_df['kr']))
label2id = dict(zip(meta_df['kr'], meta_df['target']))

sample_submission=pd.read_csv('data/sample_submission.csv')
sample_submission.drop(['target'],axis=1,inplace=True)

image_size=384
batch_size=64

tst_transform=tta_trasform(image_size,(0.485, 0.456, 0.406),(0.229, 0.224, 0.225))
origin_tst_dataset = tta_dataset(True,tst_transform)
origin_test_loader = tta_loader(batch_size,origin_tst_dataset)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')   

## TTA: Test Time Augmentation

In [None]:
# ttach를 이용하여 augmentation하면 모든 transform 경우의 수가 적용됨.
# https://github.com/qubvel/ttach?tab=readme-ov-file

import ttach as tta

transforms = tta.Compose(
    [
        tta.HorizontalFlip(),
        tta.VerticalFlip(),
        #tta.Rotate90(angles = [0, 90, 180]),
        #tta.FiveCrops(224, 224),  # 다섯 가지 다른 crop 생성
        #tta.Multiply(factors=[0.8, 1, 1.1]),        
    ]
)

## tta의 augmentation을 했을 때 예시 이미지
# image = np.array(Image.open('data/test/0b8426f6b3d9d4a3.jpg')) / 255
# image = torch.from_numpy(image).permute(2, 0, 1).unsqueeze(0).to(torch.float32)

# fig = plt.figure(figsize=(20, 20))
# columns = 2
# rows = 5

# for i, transform in enumerate(transforms):
#     image_transformed = transform.augment_image(image)
#     image_transformed = np.array(image_transformed.squeeze()).transpose(1, 2, 0)
#     fig.add_subplot(rows, columns, i+1)
#     plt.imshow(image_transformed)

# plt.show()

In [None]:
def only_inference(model, test_loader, device):
    
    model.to(device)
    model.eval()
    probabilities=[]
    total_preds = []
    image_names = []
    with torch.no_grad():
        for images, labels, names in tqdm(test_loader):
            images = images.to(device)
            
            outputs = model(images)
            probs, predicted = torch.max(outputs.data, 1)

            total_preds.extend(predicted.detach().cpu().tolist())
            probabilities.extend(probs.detach().cpu().tolist())
            image_names.extend(names)
            
    return image_names,total_preds,probabilities

In [None]:
#model_name='resnet50'
model_name="caformer_s18.sail_in22k_ft_in1k_384"
            
model = timm.create_model(model_name,pretrained=True,num_classes=17)
model_config= timm.data.resolve_model_data_config(model)
trn_transform,tst_transform=image_trasform(model_config['input_size'][1],model_config['mean'],model_config['std'])

#model_path = 'model/Best_resnet50(is=384,bs=64,LS=Red_f1,Shuffle).pt'
model_path = 'model/caformer_s18_sail_in22k_ft_in1k_384_loss_titlecrop02.pth'

model.load_state_dict(torch.load(model_path)) 

# ttach에 있는 class로 wrapp해주면, 모든 aug경우의 수 중 merge_mode에 따른 값을 반환
# https://github.com/qubvel/ttach/blob/master/ttach/wrappers.py#L52

tta_model = tta.ClassificationTTAWrapper(model, transforms, merge_mode='mean')

In [None]:
# 선택 모델 tta 추론 결과
image_names,total_preds,probabilities = only_inference(tta_model,origin_test_loader,device)
result_tta = pd.DataFrame({'ID': image_names,'target': total_preds,'probs':probabilities})

# 선택 모델 기본 추론 결과
image_names,total_preds,probabilities = only_inference(model,origin_test_loader,device)
result_best = pd.DataFrame({'ID': image_names,'target': total_preds,'probs':probabilities})

In [None]:
tta_result = sample_submission.merge(result_tta, on='ID', how='left')
tta_result.columns=['ID','tta_target','tta_probs']

tta_result = tta_result.merge(result_best, on='ID', how='left')
tta_result.columns=['ID','tta_target','tta_probs','bestscore_target','bestscore_probs']
tta_result['tta_target']=tta_result['tta_target'].map(id2label)
tta_result['bestscore_target']=tta_result['bestscore_target'].map(id2label)
tta_result.head()

In [None]:
# 두 모델(tta적용,미적용) 결과의 클래스의 확률값을 비교하여 더 높은 클래스의 확률값을 선정
tta_result['final_target'] = tta_result.apply(lambda x: x['tta_target'] if x['tta_probs'] > x['bestscore_probs'] else x['bestscore_target'], axis=1)
tta_result[tta_result['tta_target']!=tta_result['bestscore_target']]

In [None]:
tta_submission= tta_result[['ID','final_target']]
tta_submission.columns=['ID','target']
tta_submission['target'] = submission['target'].map(label2id)
#submission.to_csv('tta_영천_crop제목.csv',index=False)

In [None]:
break

## Weighted soft voting

In [None]:
def load_model(model_path):
    print(model_path)
    model_path='best_model/'+model_path
    if(model_path in ["best_model/Final_resnet50(is=224,bs=128,LS=Red_f1,Shuffle).pt","best_model/Re_Final_resnet50(is=224,bs=128,LS=Red_f1,Shuffle).pt"]):
        model = timm.create_model('resnet50',pretrained=True,num_classes=17).to(device)
    else:
        model = timm.create_model('caformer_s18.sail_in22k_ft_in1k_384',num_classes=17).to(device)
    
    model.load_state_dict(torch.load(model_path)) 
    model.eval()
    return model

In [None]:
def ensemble_inference(models, weights, test_loader):
    predictions = torch.zeros(0, dtype=torch.long).to(device)
    image_names = []
    probabilities=[]
    with torch.no_grad():
        for images, labels, names in tqdm(test_loader):
            images = images.to(device)
            avg_output = torch.zeros(images.size(0), 17).to(device)
            
            for model, weight in zip(models, weights):
                # metaformer은 특정 모델 class 이름-> inputsize 384 
                if model.__class__.__name__ in ['MetaFormer']:
                    preprocess = transforms.Compose([
                        transforms.Resize((384, 384)),
                    ])
                    images = preprocess(images)
                outputs = model(images)
                avg_output += weight * F.softmax(outputs, dim=1)
            probs, preds = torch.max(avg_output, 1)
            
            predictions = torch.cat((predictions, preds), dim=0)
            probabilities.extend(probs.detach().cpu().tolist())
            image_names.extend(names)
            
        predictions=predictions.cpu().numpy()
            
    return predictions,image_names,probabilities

In [None]:
model_paths = ["Final_resnet50(is=224,bs=128,LS=Red_f1,Shuffle).pt",'caformer_s18_sail_in22k_ft_in1k_384_loss_titlecrop.pth','caformer_s18_sail_in22k_ft_in1k_384.pth'] 
models_ = [load_model(path) for path in model_paths]
weights = [0.1,0.4,0.4] # 각 모델별 가중치 

## tta모델도 추가할 경우
#tta_model.eval()
#tta_model.to(device)
#models_.append(tta_model)
#weights.append(0.1)

ensemble_predictions,image_names,probabilities = ensemble_inference(models, weights, origin_test_loader)

result_weighted = pd.DataFrame({'ID': image_names,'target': ensemble_predictions,'probs':probabilities})

In [None]:
weight_result = sample_submission.merge(result_tta, on='ID', how='left')
weight_result.columns=['ID','weight_target','weight_probs']

weight_result = weight_result.merge(result_best, on='ID', how='left')
weight_result.columns=['ID','weight_target','weight_probs','bestscore_target','bestscore_probs']
weight_result['weight_target'] = weight_result['weight_target'].map(id2label)
weight_result['bestscore_target'] = weight_result['bestscore_target'].map(id2label)
weight_result.head()

In [None]:
weight_result[(weight_result['weight_probs']>weight_result['bestscore_probs']) & (weight_result['weight_target']!=weight_result['bestscore_target'])]

In [None]:
weight_result['final_target'] = weight_result.apply(lambda x: x['weight_target'] if x['weight_probs'] > x['bestscore_probs'] else x['bestscore_target'], axis=1)
weight_result[weight_result['weight_target']!=weight_result['bestscore_target']]

In [None]:
weight_result= weight_result[['ID','final_target']]
weight_result.columns=['ID','target']
weight_result['target'] = weight_result['target'].map(label2id)

In [None]:
#weight_result.to_csv("submission/영천_창현_weigthed+probs.csv",index=False)

## Hard Voting

In [None]:
folder_path = "best_submission/"

file_list = [f for f in os.listdir(folder_path) if f.endswith('.csv')]

label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(file_list)

label_mapping = dict(zip(encoded_labels, file_list))

dfs = []
for label, file in label_mapping.items():
    file_path = os.path.join(folder_path, file)
    df = pd.read_csv(file_path)
    df = df.rename(columns={'target': f'target_{label}'})
    dfs.append(df)

voting_result = pd.concat(dfs, axis=1, join='inner')
voting_result = voting_result.loc[:,~voting_result.columns.duplicated()]
voting_result.head()

In [None]:
voting_result['mode_TARGET'] = voting_result.mode(axis=1)[0]
voting_result.iloc[:, 1:] = voting_result.iloc[:, 1:].apply(lambda col: col.map(id2label))
voting_result.head()

In [None]:
label_mapping

In [None]:
#diff = voting_result[(voting_result[['target_0', 'target_2', 'target_1','target_3']].values != voting_result['mode_TARGET'].values.reshape(-1, 1)).any(axis=1)]
diff = voting_result[(voting_result[['target_3']].values != voting_result['mode_TARGET'].values.reshape(-1, 1)).any(axis=1)]
len(diff)

In [None]:
# 모두 한번씩 나와서 최빈값이 적용이 안될 경우
nan_rows = voting_result[voting_result['mode_TARGET'].isna()]
nan_rows

In [None]:
# 가장 리더보드 스코어가 좋은 결과값으로 채움
voting_result.loc[nan_rows.index, 'mode_TARGET'] = nan_rows['target_3']

In [None]:
submission = voting_result[['ID','mode_TARGET']]
submission.columns=['ID','target']
submission['target']=submission['target'].map(label2id)
submission.head()

In [None]:
#submission.to_csv("영천096+영천crop_tta창현+예람pred21+09532.csv",index=False)