2022-04-15 이주형

라이브러리 import 부분

In [None]:
import argparse
import torch
import os
# from model.MobileNet_V3 import *

import torchvision.transforms as transforms
import numpy as np
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from PIL import Image
import torchvision.datasets as datasets
import matplotlib.pyplot as plt
from utils.transforms import transforms_test, NormalizePerImage
import glob
import shutil

## argument 파싱 부분

args.data: infer 할 데이터셋의 위치입니다. 기본적으로 data/test로 설정되어있습니다.\
args.workers: data I/O처리시 multi-processing 의 병렬 갯수 부분.\
args.batch_size: 미니배치 갯수.\
args.model: 모델파일 path입니다. infer.py 의 위치를 기준으로 상대위치를 찾습니다. train.py코드는 기본적으로 infer.py와 동일한 path에 모델파일을 저장하기 때문에 따로 옮기지 않는 이상 폴더를 지정해주지 않아도 됩니다.\
args.results: 결과***폴더***의 이름입니다. 본 폴더와 동일한 이름으로 폴더가 있다면 해당 폴더안의 모든것을 지우고 다시 생성시킵니다 (line 32~34, 원하시면 수정하시면 됩니다).\
args.num_categories: class 갯수입니다. ai-hub 감정인식 데이터 기준으로 하여 디폴트 값은 7 입니다.\
args.image: ***본 코드에서 가장 중요한 부분입니다.***\
args.image는 1) None (기본), 2) ```'fault_finder'```, 3) 얼굴영상 path 의 3가지 값을 넣을 수 있습니다.
1) None의 경우:
 args.data 아래아래의 모든 영상들을 추론하여 정확도를 계산합니다.\
 PyTorch의 convention대로, args.data아래에는 클래스명의 폴더들이 있고 각 폴더 아래에 영상들이 있게됩니다.
2) ```'fault_finder'```의 경우:
 args.data 아래(라인 37 또는 40에서 명시된 ```EMOTION```의 이름으로 된 폴더)아래(```*.jpg```)의 모든 영상들에서 feed-forward를 수행한 뒤, 정답지와 비교해 틀린 영상들에 대해 ```os.path.join(args.results, f"fault_finder.csv")```에 명시합니다.

3) 얼굴영상 path:
 해당 영상 한장에 대한 feed-forward만을 수행하여 결과(확률)과 모델이 판단한 감정명을 console에 띄워줍니다.

args.gpu: gpu 번호\
args.if_display_cm: ```args.image```를 ```None```을 선택했을 경우, confusion matrix를 계산하여 ```os.path.join(args.results, f"{os.path.basename(args.model.replace('.pth.tar',''))}.csv")``` 에 저장합니다.

In [None]:
parser = argparse.ArgumentParser(description='Emotion inference from cropped face image')
parser.add_argument('--data', type=str, default='data/test')
# parser.add_argument('--data', type=str, default='/home/keti/FER_AR/codes/FER/data/faces_extracted')
parser.add_argument('--workers', default=16, type=int, help='number of data loading workers (default: 16)')
parser.add_argument('--batch-size', default=256, type=int, help='number of mini batch size (default: 256)')
parser.add_argument('--model', type=str, default='mobilenet_v3_small_model_best.pth.tar')
parser.add_argument('--results', type=str, default='results')
parser.add_argument('--num-categories', default=7, type=int, help='number of categories (default: 7)')
# parser.add_argument('--image', type=str, default='data/test/중립/10.jpg')
parser.add_argument('--image', type=str, default=None)
parser.add_argument('--gpu', type=int, default=0)
parser.add_argument('--if-display-cm', action='store_true', help='If display confusion matrix heatmap')

args.results를 지우고 다시 생성합니다.\
EMOTION은 ```args.image```가 ```'fault_finder'``` 일 때, 사용됩니다 (바로 윗 블록 참조)

In [None]:
def main():
    args = parser.parse_args()

    if os.path.isdir(args.results):
        shutil.rmtree(args.results)
    os.mkdir(args.results)

    if args.num_categories == 3:
        EMOTION = ['기쁨', '중립', '슬픔']
        EMOTION_eng = ['HAPPY', 'NEUTRAL', 'SAD']
    elif args.num_categories == 7:
        EMOTION = ['기쁨', '당황', '분노', '불안', '상처', '슬픔', '중립']
        EMOTION_eng = ['HAPPY', 'EMBARRASED', 'ANGRY', 'ANXIOUS', 'HURT', 'SAD', 'NEUTRAL']

모델 instance 생성 부분입니다.\
num_classes가 클래스 갯수를 의미합니다.\
5가지 모델 이외의 모델은 현재 사용하고 있지 않습니다 (예, ResNet)

In [None]:
    top1 = AverageMeter('Acc@1', ':6.2f')
    if 'effnetv2_s' in os.path.basename(args.model):
        import model.efficientNetV2 as models
        model = models.__dict__['effnetv2_s'](num_classes = args.num_categories)
    elif 'effnetv2_m' in os.path.basename(args.model):
        import model.efficientNetV2 as models
        model = models.__dict__['effnetv2_m'](num_classes = args.num_categories)
    elif 'effnetv2_l' in os.path.basename(args.model):
        import model.efficientNetV2 as models
        model = models.__dict__['effnetv2_l'](num_classes = args.num_categories)
    elif 'mobilenet_v3_small' in os.path.basename(args.model):
        import torchvision.models as models
        model = models.__dict__['mobilenet_v3_small'](num_classes=args.num_categories)
    elif 'mobilenet_v3_large' in os.path.basename(args.model):
        import torchvision.models as models
        model = models.__dict__['mobilenet_v3_large'](num_classes=args.num_categories)
    else:
        raise ValueError('Invalid model !!!')

In [None]:
    torch.cuda.set_device(args.gpu)
    
    # 명시된 gpu로의 모델 카피
    model = model.cuda(args.gpu)
    # args.model 에서 모델weight 을 불러와서 args.gpu에 카피.
    checkpoint = torch.load(args.model, map_location=f'cuda:{args.gpu}')
    # 위에서 카피된 모델weight를 gpu로 카피된 모델에 로딩시킴
    model.load_state_dict(checkpoint['state_dict'])
    # 훈련이 아니므로 eval 모드로 전환
    model.eval()

## args.image == None

args.data 아래아래의 모든 영상들을 추론하여 정확도를 계산합니다.\
PyTorch의 convention대로, args.data아래에는 클래스명의 폴더들이 있고 각 폴더 아래에 영상들이 있게됩니다.\
추론은 fp-16으로 이루어 집니다.\
그라디언트를 disable시키고 dataloader에서 영상과 정답지를 tuple로 읽어와 gpu에 복사하고 모델에 feed-forward를 수행한뒤 y_pred에 추론된 감정 index, y_true에는 정답지를 extend 시킵니다.\

args.if_display_cm 이 만일 켜져있다면 confucion matrix를 1) ```matplotlib.pyplot```을 통해 보여주고, 2) numpy로 저장하고(파일명이 좀 이상?), 3) ```os.path.join(args.results, f"{os.path.basename(args.model.replace('.pth.tar',''))}.csv")```에 csv파일로 저장합니다.\

마지막으로 console에 test 정확도를 보여줍니다.

In [None]:
    if args.image == None:
        dataset = datasets.ImageFolder(args.data, transform=transforms_test())
        dataloader = torch.utils.data.DataLoader(dataset, batch_size=args.batch_size, shuffle=False, num_workers=args.workers, pin_memory=True)

        # model = mobilenet_v3_large(pretrained=True, num_classes=7)
        # checkpoint = torch.load(args.model)
        # model.load_state_dict(checkpoint['state_dict'])
        y_pred = []
        y_true = []

        with torch.no_grad():
            with torch.cuda.amp.autocast():
                for i, (images, target) in enumerate(dataloader):
                    if args.gpu is not None:
                        images = images.cuda(args.gpu, non_blocking=True)
                    if torch.cuda.is_available():
                        target = target.cuda(args.gpu, non_blocking=True)

                    output = model(images)
                    y_pred.extend(torch.argmax(output, dim=1).tolist())
                    y_true.extend(target.tolist())
                    acc1, _ = accuracy(output, target, topk=(1, 2))
                    top1.update(acc1[0], images.size(0))

        cm = confusion_matrix(y_true, y_pred, labels=list(range(args.num_categories)))
        if args.if_display_cm:
            disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels = EMOTION_eng)
            disp.plot()
            plt.show()
        np.savetxt(os.path.join(args.results, f"{os.path.basename(args.model.replace('.pth.tar',''))}.csv"), cm, delimiter=",",fmt ='%u')
        with open(os.path.join(args.results, f"{os.path.basename(args.model.replace('.pth.tar',''))}.csv"), 'a') as f:
            f.write(f'\n\ntest acc (%): {round(top1.avg.item(), 2)}\n\n\n')

## args.image == 'fault_finder'

```os.path.join(args.results, f"fault_finder.csv")```를 새로 만듭니다\
args.data 아래(라인 37 또는 40에서 명시된 ```EMOTION```의 이름으로 된 폴더)아래(```*.jpg```)의 모든 영상들에서 feed-forward를 수행한 뒤, 정답지와 비교해 틀린 영상들에 대해 ```os.path.join(args.results, f"fault_finder.csv")```에 명시합니다.\
주의할 점은 영상의 확장자가 ```jpg```여야 한다는 점입니다.

In [None]:
    elif args.image == 'fault_finder':
        if os.path.isdir(os.path.join(args.results, f"fault_finder.csv")):
            shutil.rmtree(os.path.join(args.results, f"fault_finder.csv"))

        transform = transforms_test()

        for i, emo in enumerate(EMOTION):
            with open(os.path.join(args.results, f"fault_finder.csv"), 'a') as f:
                f.write(f'\n{emo}\n')
            for path in glob.glob(args.data+f'/{emo}/*.jpg'):
                img = Image.open(path)
                img = torch.unsqueeze(transform(img).cuda(args.gpu, non_blocking=True), 0)

                with torch.no_grad():
                    output = model(img)
                    idx_pred = torch.argmax(output)
                    if i != idx_pred:
                        with open(os.path.join(args.results, f"fault_finder.csv"), 'a') as f:
                            f.write(f'{os.path.basename(path)}\n')


## args.image == 얼굴영상 path

영상을 명시된 gpu에 복사하고 feed-forward를 거쳐 logit을 output에 저장하고, softmax를 거쳐 확률값을 prob에 저장해줍니다.\
최종적으로 결과(확률)와 모델이 판단한 감정명을 console창에 띄워줍니다.

In [None]:
    else:
        transform = transforms_test()
        # img = io.imread(args.image)
        img = Image.open(args.image)
        img = torch.unsqueeze(transform(img).cuda(args.gpu, non_blocking=True), 0)

        with torch.no_grad():
            output = model(img)
            prob = torch.nn.functional.softmax(output)
            emotion_dict = {}
            for i, em in enumerate(EMOTION):
                emotion_dict[em] = round(prob.tolist()[0][i]*100,2)
            # emotion_dict['']
            print(f'결과(확률): {emotion_dict}')
            print(f'감정: {EMOTION[torch.argmax(output)]}')

건들일 필요가 없는 부분입니다.

In [None]:
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self, name, fmt=':f'):
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
        return fmtstr.format(**self.__dict__)


class ProgressMeter(object):
    def __init__(self, num_batches, meters, prefix=""):
        self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
        self.meters = meters
        self.prefix = prefix

    def display(self, batch):
        entries = [self.prefix + self.batch_fmtstr.format(batch)]
        entries += [str(meter) for meter in self.meters]
        print('\t'.join(entries))

    def _get_batch_fmtstr(self, num_batches):
        num_digits = len(str(num_batches // 1))
        fmt = '{:' + str(num_digits) + 'd}'
        return '[' + fmt + '/' + fmt.format(num_batches) + ']'



def accuracy(output, target, topk=(1,)):
    """Computes the accuracy over the k top predictions for the specified values of k"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        # if maxk == 1:                           웃음 두가지 하나로 할때
        #     for i, p in enumerate(pred):
        #         if p == 2:
        #             pred[i]=1
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res

본 파일을 메인 함수로 계산합니다.

In [None]:
if __name__ == '__main__':
    main()
