In [1]:
import json
import torch
from datasets.custom_dataset import CustomDataset
from datasets.transform import TransformSelector
from models.model_selector import ModelSelector
from utils.train_utils import Trainer
from torch.utils.data import DataLoader
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
import torch.nn as nn
import pandas as pd
from sklearn.model_selection import train_test_split
import os
from tqdm import tqdm
import cv2

import matplotlib
import matplotlib.pyplot as plt

matplotlib.use('Agg')


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# 추론(inference) 함수
def inference(model, device, test_loader):
    model.eval()
    predictions = []
    with torch.no_grad():
        for images in tqdm(test_loader, desc="Inference"):
            images = images.to(device)
            outputs = model(images)
            preds = torch.argmax(outputs, dim=1)
            predictions.extend(preds.cpu().numpy())
    return predictions

# 'best_model' 파일을 찾는 함수 (가장 최근에 저장된 파일 선택)
def get_best_model_path(directory):
    files = [f for f in os.listdir(directory) if f.startswith('best_model') and f.endswith('.pt')]
    if not files:
        raise FileNotFoundError(f"No best model files found in directory: {directory}")
    
    # 파일의 수정 시간을 기준으로 가장 최근에 저장된 파일 선택
    best_file = max(files, key=lambda f: os.path.getmtime(os.path.join(directory, f)))
    return os.path.join(directory, best_file)

def validate(model, device, val_loader):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    
    loss_fn = nn.CrossEntropyLoss(label_smoothing=0.08)
    total_batches = len(val_loader)
    
    progress_bar = tqdm(val_loader, desc="Validating", leave=False)
    
    with torch.no_grad():
        for batch_idx, (images, targets, indices) in enumerate(progress_bar):
            images, targets = images.to(device), targets.to(device)
            
            # 모델 예측
            outputs = model(images)
            outputs_softmax = torch.nn.functional.softmax(outputs)
            
            # 손실 계산
            loss = loss_fn(outputs, targets)
            total_loss += loss.item()
            
            # 정확도 계산
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == targets).sum().item()
            total += targets.size(0)
            
            progress_bar.set_postfix({'val_batch_loss': loss.item()})

            # 이미지 그리기
            top_fives, top_fives_indices = torch.topk(outputs_softmax, 5)

            images = images.cpu().numpy()
            targets = targets.cpu().numpy()
            indices = indices.cpu().numpy()
            outputs_softmax = outputs_softmax.cpu().numpy()
            top_fives = top_fives.cpu().numpy()
            top_fives_indices = top_fives_indices.cpu().numpy()                    
    
    # 검증의 평균 손실과 정확도 계산
    avg_loss = total_loss / total_batches
    accuracy = correct / total
    
    print(f"Validation Epoch Average Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")
    
    return avg_loss, accuracy

def convert_image_for_display(image):
    # neural network를 위한 이미지에서 plt.imshow()를 위한 이미지 변환
    # plt.imshow()는 float 데이터에 대해 [0, 1] 구간을 truncate해서 display 하기 때문에 normalization이 필요함
    image = image - image.min()
    image = image / image.max()
    image = image.transpose(1, 2, 0)
    
    return image


def create_target_to_category(df):
    df = df.copy()
    df = df.sort_values(by = ['target'])
    groups = df.groupby('target')

    target_to_category = {}

    for _, group in groups:
        target = group.target.values[0]
        category = group.category.values[0]
        target_to_category[target] = category

    return target_to_category


In [3]:
setup = {
  "coatnet": r"/data/ephemeral/home/Dongjin/git/level1-imageclassification-cv-07/LV1/config_dj1.json",
  "ViT": r"/data/ephemeral/home/Dongjin/git/level1-imageclassification-cv-07/LV1/config_dj6.json",
  "resnet": r"/data/ephemeral/home/Dongjin/git/level1-imageclassification-cv-07/LV1/config_dj8.json",
  }

config_path = setup["coatnet"]

with open(config_path, 'r') as f:
  config = json.load(f)

config['batch_size'] = 2 # batch size 줄여서 vram 줄이기

py_dir_path = os.getcwd()
class_to_name_file = os.path.join(py_dir_path, "result/class/map_clsloc.txt")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_info = pd.read_csv(config['data_info_file'])

# class_name으로 category 불러오기
class_to_name = pd.read_csv(class_to_name_file, header = None, sep = " ")
class_to_name.columns = ["class_name", "index", "category"]
class_to_name = class_to_name.drop(["index"], axis = 1)

# train_info에 category merge하고 target_to_category 만들기
train_info = pd.merge(train_info, class_to_name, on = 'class_name', how = 'left')
target_to_category = create_target_to_category(train_info)

rel_train_index_path = r"datasets/train_index.csv"
rel_val_index_path = r"datasets/val_index.csv"

train_index_path = os.path.join(py_dir_path, rel_train_index_path)
val_index_path = os.path.join(py_dir_path, rel_val_index_path)

# train_index.csv와 val_index.csv를 이용하여 train_df와 val_df를 로드       
train_index = pd.read_csv(train_index_path, header = None).squeeze()
val_index = pd.read_csv(val_index_path, header = None).squeeze()

train_df = train_info.loc[train_index]
val_df = train_info.loc[val_index]

transform_selector = TransformSelector(transform_type="albumentations")
val_transform = transform_selector.get_transform(is_train=False)

val_dataset = CustomDataset(root_dir=config['train_data_dir'], info_df=val_df, transform=val_transform)
val_loader = DataLoader(val_dataset, batch_size=config['batch_size'], shuffle=False)

# 모델 설정
model_selector = ModelSelector(model_type="timm", num_classes=config['num_classes'], model_name=config['model_name'], pretrained=False)
model = model_selector.get_model()

# 베스트 모델 경로 설정
model_path = get_best_model_path(config['result_path'])
print(f"Loading best model from {model_path}")

# 저장된 모델 로드
model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device)

Loading best model from /data/ephemeral/home/Dongjin/git/level1-imageclassification-cv-07/LV1/results/coatnet_2_rw_224_1/best_model_1.2901.pt


TimmModel(
  (model): MaxxVit(
    (stem): Stem(
      (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (norm1): BatchNormAct2d(
        64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
        (drop): Identity()
        (act): SiLU(inplace=True)
      )
      (conv2): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    )
    (stages): Sequential(
      (0): MaxxVitStage(
        (blocks): Sequential(
          (0): MbConvBlock(
            (shortcut): Downsample2d(
              (pool): AvgPool2d(kernel_size=2, stride=2, padding=0)
              (expand): Identity()
            )
            (pre_norm): BatchNormAct2d(
              128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
              (drop): Identity()
              (act): SiLU(inplace=True)
            )
            (down): Identity()
            (conv1_1x1): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bia

In [4]:

def plot(model_name, images, targets, dataset_indices, outputs_softmax, top_fives, top_fives_indices, predicts):
    correct_save_fold = f'result/{model_name}/correct'
    wrong_save_fold = f'result/{model_name}/wrong'
    class_image_dir = os.path.join(py_dir_path, "result/class")

    for i in range(images.shape[0]):
        image = images[i]
        target = targets[i]
        dataset_index = dataset_indices[i] 
        output_softmax = outputs_softmax[i]
        top_five = top_fives[i]
        top_fives_index = top_fives_indices[i]
        predict = predicts[i]
        iscorrect = target == predict
        
        img_rel_path = train_info['image_path'].loc[dataset_index]
        img_path = os.path.join(config['train_data_dir'], img_rel_path)
        class_name = train_info['class_name'].loc[dataset_index]

        imag_target_path = os.path.join(class_image_dir, f'{target}.png')
        imag_pred_path = os.path.join(class_image_dir, f'{predict}.png')

        if iscorrect:
            fig, axes = plt.subplots(1, 3, figsize = (15, 6))
        else:
            fig, axes = plt.subplots(1, 4, figsize = (20, 6))

        image_origin = cv2.imread(img_path, cv2.IMREAD_COLOR)
        image_origin = cv2.cvtColor(image_origin, cv2.COLOR_BGR2RGB)
        image_input = convert_image_for_display(image) # network input image
        
        image_target = cv2.cvtColor(cv2.imread(imag_target_path, cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB)
        image_predict = cv2.cvtColor(cv2.imread(imag_pred_path, cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB)

        axes[0].imshow(image_origin)
        axes[1].imshow(image_input)
        axes[2].imshow(image_target)

        axes[0].set_title("Origin")
        axes[1].set_title("Common transformation")
        axes[2].set_title(f"Target: {target} ({target_to_category[target]})")

        if not iscorrect:
            axes[3].imshow(image_predict)
            axes[3].set_title(f"Predict: {predict} ({target_to_category[predict]})")

        for i in range(len(axes)):
            axes[i].axis('off')

        # title 만들기
        if iscorrect == True:
            title0 = f"{model_name} - Correct\ntarget: {target} ({target_to_category[target]}), predict: {predict} ({target_to_category[predict]})"
        else:
            title0 = f"{model_name} - Wrong\ntarget: {target} ({target_to_category[target]}), predict: {predict} ({target_to_category[predict]})"

        title1 = ""
        for index, prob in zip(top_fives_index, top_five * 100):
            if (title1 == ""):
                title1 = f"Top 5 - {index}: {prob:.1f}%"
            else:
                title1 = title1 + ", " + f"{index}: {prob:.1f}%"

        title = f"{title0}\n{title1}\n{img_rel_path}"
        plt.suptitle(title)

        # 저장하기

        if iscorrect:
            save_path = os.path.join(correct_save_fold, f"{target}/{dataset_index}.png")
        else:
            save_path = os.path.join(wrong_save_fold, f"{target}/{dataset_index}.png")

        if not os.path.isdir(os.path.split(save_path)[0]):
            os.makedirs(os.path.split(save_path)[0])

        plt.savefig(save_path)
        plt.tight_layout()
        


In [5]:
model.eval()
total_loss = 0.0
correct = 0
total = 0

loss_fn = nn.CrossEntropyLoss(label_smoothing=0.08)
total_batches = len(val_loader)

progress_bar = tqdm(val_loader, desc="Validating", leave=False)

with torch.no_grad():
    for batch_idx, (images, targets, dataset_indices) in enumerate(progress_bar):
        images, targets = images.to(device), targets.to(device)
        
        # 모델 예측
        outputs = model(images)
        outputs_softmax = torch.nn.functional.softmax(outputs)
        
        # 손실 계산
        loss = loss_fn(outputs, targets)
        total_loss += loss.item()
        
        # 정확도 계산
        _, predicts = torch.max(outputs, 1)
        correct += (predicts == targets).sum().item()
        total += targets.size(0)
        
        progress_bar.set_postfix({'val_batch_loss': loss.item()})

        # 이미지 그리기
        top_fives, top_fives_indices = torch.topk(outputs_softmax, 5)

        images = images.cpu().numpy()
        targets = targets.cpu().numpy()
        dataset_indices = dataset_indices.cpu().numpy()
        outputs_softmax = outputs_softmax.cpu().numpy()
        top_fives = top_fives.cpu().numpy()
        top_fives_indices = top_fives_indices.cpu().numpy()
        predicts = predicts.cpu().numpy()

        plot(config['model_name'], images, targets, dataset_indices,outputs_softmax, top_fives, top_fives_indices, predicts)


  outputs_softmax = torch.nn.functional.softmax(outputs)
  fig, axes = plt.subplots(1, 3, figsize = (15, 6))
                                                                                 

KeyboardInterrupt: 