In [21]:
import torch
import os
from importlib import import_module

from sklearn.metrics import confusion_matrix
import seaborn as sns
from tqdm import tqdm
import matplotlib.pyplot as plt
import random
import numpy as np

from einops import rearrange

from train import parse_model_param

from dataset import MaskSplitByProfileDataset, BaseAugmentation
from PIL import Image
import multiprocessing
from torch.utils.data import DataLoader

In [1]:
class UnNormalize():
    # 정규화 되었던 이미지를 복원하는 Transforms Function

    def __init__(self, mean=(0.548, 0.504, 0.479), std=(0.237, 0.247, 0.246)):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        """
        Args:
            tensor (Tensor): Tensor image of size (C, H, W) to be normalized.
        Returns:
            Tensor: Normalized image.
        """
        for t, m, s in zip(tensor, self.mean, self.std):
            t.mul_(s).add_(m)
            # The normalize code -> t.sub_(m).div_(s)
        return tensor

In [25]:
def load_model(saved_model, model:str, num_classes, device, **kwargs):
    model_cls = getattr(import_module("model"), model)
    model = model_cls(
        num_classes=num_classes,
        # 아래는 model에 추가적인 파라미터가 필요할 때 사용하시면 됩니다.
        # **kwargs
    )

    model_path = os.path.join(saved_model, 'best.pth')
    model.load_state_dict(torch.load(model_path, map_location=device))

    return model

In [6]:
def inference_model(loader, model_dir, model:str, num_classes, break_idx=300, **kwargs) -> list:
    """Confusion Matrix 출력 함수.
    Args:
        loader (dataloader)): dataloader
        model_dir (str): 저장된 모델 경로.
        num_classes (int): 클래스 갯수
        model (str): 모델 클래스명. 단, 문자열로 줘야 함.
        break_idx (int, optional): 모두 다 inference하는 것은 너무 오래 걸려서 끊어주기 위한 장치를 마련. Defaults to 300.
        **kwargs : 모델에 들어가게 되는 추가적인 파라미터들.
    Returns:
        list: 튜플 (wrong_imgs, wrong_answers, wrong_labels)을 담아낸 리스트. 이미지에 종속된 라벨을 한꺼번에 묶음.
    """
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")
    
    model = load_model(model_dir, model, num_classes, device, **kwargs).to(device)
    model.eval()

    print(f"Calculating inference results for {model_dir}..")
    
    answers = []
    preds = []
    
    # 오답 처리된 이미지들에 대해 (이미지, 정답, 오답)을 출력하기 위한 과정들.
    wrong_imgs, wrong_answers, wrong_labels = [], [], []
    with torch.no_grad():
        for idx, (images, labels) in enumerate(tqdm(loader)):
            # <계산 과정>
            images = images.to(device)
            pred = model(images)
            pred = pred.argmax(dim=-1).cpu()
            
            # </계산 과정>
            indices = np.array(range(len(answers), len(answers)+len(labels)+1))
            answers.extend(labels)
            preds.extend(pred.numpy()) 
            
            # 오답 판별 과정.
            wrong_ans = (pred != labels)
            
            # 오답 판별 결과 누적.
            wrong_imgs.extend(images[wrong_ans].cpu())
            wrong_answers.extend(labels[wrong_ans].cpu())
            wrong_labels.extend(pred[wrong_ans].cpu())
            
            if idx >= break_idx:
                break

    return answers, preds, list(zip(wrong_imgs, wrong_answers, wrong_labels))

In [7]:
def display_cfmatrix(model_label, num_classes, answers, preds):
    ax = plt.subplot(1,1,1)
    
    map_label = {
        'gender':{0:'male', 1:'female'}, 
        'age':{0:'<30', 1:'>=30 and <60', 2:'>60'}, 
        'mask':{0:'mask', 1:'incorrect', 2:'normal'}
    }
    
    # https://scikit-learn.org/stable/modules/generated/sklearn.metrics.confusion_matrix.html
    # sklearn.metrics에서의 confusion_matrix 메서드 사용.
    cf_matrix = confusion_matrix(answers, preds, labels=list(range(num_classes)))
    sns.heatmap(cf_matrix, annot=True)
    
    # confusion_matrix의 첫 파라미터는 y축으로 두 번째 파라미터는 x축으로 감.
    ax.xaxis.set_ticklabels(map_label[model_label].values())
    ax.yaxis.set_ticklabels(map_label[model_label].values())
    plt.xlabel('pred', fontsize=14)
    plt.ylabel('answer', fontsize=14)

In [8]:
def show(pic, label, num=20, mean_std=None):
    """Confusion Matrix 출력을 위한 공간.

    Args:
        pic (List:[tuple(wrong_imgs, wrong_answers, wrong_labels)]): inference_model의 결과값.
        num (int, optional): 출력하고 싶은 이미지 갯수. Defaults to 20.
        mean_std (tuple, optional): ((Mean), (Std))). Defaults to None.
    """
    # 출력할 때마다 다른 이미지 보여지도록 추가한 라인.
    pic = random.choices(list(pic), k=num)
    
    for idx, (img, ans, preds) in enumerate(pic):
        # 위에서 random.choices로 갯수 제한해서 필요없긴 한데 갯수 제한없이 출력했을 때,
        # 컴퓨터 터질 뻔해서 이중으로 대비했습니다.
        if idx >= num:
            break
        
        # plt.imshow는 (H, W, C)만 출력 가능해서 추가한 라인.
        img = rearrange(img, 'c h w -> h w c')
        
        # UnNormalize를 위해 추가한 코드.
        if mean_std:
            trfm = UnNormalize(*mean_std)
            img = trfm(img)
        
        map_label = {
        'gender':{0:'male', 1:'female'}, 
        'age':{0:'<30', 1:'>=30 and <60', 2:'>60'}, 
        'mask':{0:'mask', 1:'incorrect', 2:'normal'}
        }
        
        answer = map_label[label][ans.item()]
        prediction = map_label[label][preds.item()]
        
        # 이미지 출력 코드.
        plt.imshow(img)
        plt.title(f'num: {idx+1} answer: {answer}, pred: {prediction}')
        plt.axis('off')
        plt.show()

In [23]:
def seed_everything(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # if use multi-GPU
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    random.seed(seed)

In [22]:
def get_dataloader(label: str='multi'):
    seed_everything(42)

    # -- settings
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    # -- dataset
    data_dir = '/opt/ml/input/data/train/images'
    dataset = MaskSplitByProfileDataset(
        data_dir=data_dir,
        label=label,
    )
    
    # -- augmentation
    resize = (224, 224)
    transform = BaseAugmentation(
        resize=resize,
        mean=dataset.mean,
        std=dataset.std,
    )
    dataset.set_transform(transform)

    # -- data_loader
    dataloader = DataLoader(
        dataset,
        # batch_size=64, #args.batch_size,
        num_workers=multiprocessing.cpu_count()//2,
        shuffle=True,
        pin_memory=use_cuda,
        drop_last=True,
        # sampler=sampler,
    )
    
    return dataloader

In [11]:
class ModelEvaluator:
    def __init__(self, script: str, inference: bool=False):
        self.script = script
        self.model_param = self.parse_script()
        self.num_classes_label = {
            'multi': 18,
            'age': 3,
            'gender': 2,
            'mask': 3
        }
        self.dataloader = None
        if inference:
            self.inference
        

    def parse_script(self, script: str=None):
        """
        takes as input a script to generate a model
        returns a prediction made using the model
        """
        # i.e. python train.py --epochs 120 --dataset MaskSplitByProfileDataset --augmentation CustomAugmentation --model PretrainedModels --model_param resnet false --optimizer SGD --name ResNet_Ep120_SplitProf_Downsample_CustAugv1_WeightedCEnSamplev2_SGD_MASK --label mask
        if script is None:
            script = self.script
        
        model_param = {}
        lst_param = script.split(sep=' ')
        # default params
        model_param['label'] = 'multi'
        for idx, param in enumerate(lst_param):
            if param == '--model':
                model_param['model'] = lst_param[idx+1]
            if param == '--label':
                model_param['label'] = lst_param[idx+1]
            if param == '--name':
                model_param['name'] = lst_param[idx+1]
            if param == '--model_param':
                model_param['model_param'] = lst_param[idx+1]
        model_param['model_dir'] = os.path.join('./model', model_param['label'], model_param['name'])
        # print(model_param)
        return model_param

    def get_dataloader(self):
        self.dataloader = get_dataloader(self.model_param['label'])

    def inference(self, n_samples: int=5000, model_param: dict=None):
        if model_param is None:
            model_param = self.model_param

        model_dir = self.model_param['model_dir']
        label = self.model_param['label']
        model = self.model_param['model']
        
        self.model_param['num_classes'] = self.num_classes_label[label]
        params = parse_model_param(self.model_param['model_param'], pretrained=True)

        if self.dataloader is None:
            self.get_dataloader()
        self.ans, self.preds, self.wrong = inference_model(self.dataloader, model_dir, model, self.num_classes_label[label], 10000, **params)

    def display_cfmatrix(self):
        display_cfmatrix(
            self.model_param['label'], 
            self.model_param['num_classes'], 
            self.ans,
            self.preds
        )
    
    def display_incorrect(self, n_display: int=10):
        show(self.wrong, self.model_param['label'], n_display)

In [9]:
torch.cuda.is_available()

True

In [19]:
script_mask = 'python train.py --epochs 60 --model PretrainedModels --model_param resnet false --criterion focal --label mask --name mask_focal'
mask_eval = ModelEvaluator(script_mask)

In [26]:
mask_eval.inference()
mask_eval.display_cfmatrix()

  0%|          | 0/7733 [00:00<?, ?it/s]

Calculating inference results for ./model/mask/mask_focal..


  0%|          | 0/7733 [00:00<?, ?it/s]


RuntimeError: Given groups=1, weight of size [64, 3, 7, 7], expected input[1, 512, 384, 3] to have 3 channels, but got 512 channels instead