Open Set Recognition + Deep Networks

unknown class가 존재하는 Open Set Recognition 상황에서, 입력 데이터가 unknown 클래스에 속하는지 판단하기.

## import Package

In [None]:
import os
import numpy as np
import pandas as pd
from statistics import mean

import torch
import torchvision

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from scipy import stats

from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns

import warnings
warnings.filterwarnings("ignore")

PROJECT_PATH = os.getenv('HOME') + '/aiffel/socar_open_set'
MODEL_PATH = os.path.join(PROJECT_PATH, 'weights')
DATA_PATH = os.path.join(PROJECT_PATH, 'data')
TRAIN_PATH = os.path.join(DATA_PATH, 'train')
TEST_PATH = os.path.join(DATA_PATH, 'test')
REJECT_PATH = os.path.join(DATA_PATH, 'reject')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


## 데이터 전처리 파이프라인

In [None]:
def create_dataloader(path, batch_size, istrain):
    nearest_mode = torchvision.transforms.InterpolationMode.NEAREST
    normalize = torchvision.transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
    )
    train_transformer = torchvision.transforms.Compose([
        torchvision.transforms.Resize((320,320), interpolation=nearest_mode),
        torchvision.transforms.CenterCrop((224,224)),
        torchvision.transforms.RandomHorizontalFlip(),
        torchvision.transforms.RandomVerticalFlip(),
        torchvision.transforms.ColorJitter(),
        torchvision.transforms.ToTensor(),
        normalize
    ])

    test_transformer = torchvision.transforms.Compose([
        torchvision.transforms.Resize((320,320), interpolation=nearest_mode),
        torchvision.transforms.CenterCrop((224,224)),
        torchvision.transforms.ToTensor(),
        normalize
    ])

    if istrain:
        data = torchvision.datasets.ImageFolder(path, transform=train_transformer)
        dataloader = torch.utils.data.DataLoader(data, batch_size=batch_size, shuffle=True)

    else:
        data = torchvision.datasets.ImageFolder(path, transform=test_transformer)
        dataloader = torch.utils.data.DataLoader(data, shuffle=False)

    return dataloader, data

## 학습 데이터를 연결하고, 모델 불러오기
- 모델은 pretrained 된 resnet50을 불러옴
- 모델의 가중치는 클래스가 atower_b5, balsan_b5, balsan_b6, dcube_b6로 총 4개인 모델을 학습한 가중치

In [None]:
train_loader, _train_data = create_dataloader(TRAIN_PATH, 1, False)
target_class_num = len(os.listdir(TRAIN_PATH))

net = torchvision.models.resnet50(pretrained=True)
net.fc = torch.nn.Linear(
    net.fc.in_features,
    target_class_num
)

saved_weight_path = os.path.join(MODEL_PATH, 'classifier_acc_0.96008.pth')
net.load_state_dict(torch.load(saved_weight_path, map_location=device))
print('Successfully Loaded the Network Weight!')
net.eval()

net.to(device)

Successfully Loaded the Network Weight!


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

## 모델에서 Activation Vector를 추출

### 1. Activation Vector 추출
- OpenMax에 필요한 데이터는 분류에 성공한 데이터의 Activation Vector
- Activation Vector는 Softmax 층에 입력되는 값
    - torch.softmax()의 입력이 되는 값에서 Activation Vector를 뽑아오면 됨

In [None]:
train_preds = list()
train_actvecs = list()
train_outputs_softmax = list()
train_labels = list()

with torch.no_grad():
    for idx, (img, label) in enumerate(train_loader):
        img = img.to(device)
        label = label.to(device)

        out = net(img)
        out_actvec = out.cpu().detach().numpy()[0]
        out_softmax = torch.softmax(out, 1).cpu().detach().numpy()[0]
        out_pred = int(torch.argmax(out).cpu().detach().numpy())
        out_label = int(label.cpu().detach().numpy())

        train_actvecs.append(out_actvec) # component 1: softmax 전의 Activation Vector
        train_preds.append(out_pred) # componenet 2: 각 데이터에 대한 예측값
        train_outputs_softmax.append(out_softmax) # component 3: 각 데이터에 대한 softmax 확률
        train_labels.append(out_label) # component 4: 각 데이터에 대한 Label (정답)

train_actvecs = np.asarray(train_actvecs)
train_preds = np.asarray(train_preds)
train_outputs_softmax = np.asarray(train_outputs_softmax)
train_labels = np.asarray(train_labels)

In [None]:
train_actvecs.shape

(5000, 4)

- 5,000 개의 데이터에서 4개의 클래스를 구분하는 Activation Vector

### 2. 정답에 해당하는 Activation Vector 추출
- OpenMax에서는 모델에서 나온 Activation Vector를 모두 사용하지 않음
- 모델이 정답을 맞힌 Activation Vector만 사용
- 학습을 통해 얻은 prediction과 label이 동일했을 때의 Activation Vector만 추출함

In [None]:
train_correct_actvecs = train_actvecs[train_labels==train_preds]
train_correct_labels = train_labels[train_labels==train_preds]
print('Activation vector: ', train_correct_actvecs.shape)
print('Labels: ', train_correct_labels.shape)

Activation vector:  (4790, 4)
Labels:  (4790,)


- 4,790 개의 데이터는 prediction과 label이 일치함
- 4개의 클래스를 구분하므로 Activation Vector의 2번째 디멘젼은 4

## Activation Vector를 베이불 분포에 적용
1. Activation Vector를 클래스마다 나눠 담음
2. 클래스별로 나눠진 Activation Vector별 평균으로부터 가장 먼 100개의 Vector를 이용해 베이불 분포의 모수를 추출
3. 각 클래스당 베이불 분포의 모수들을 저장
    - 베이불 분포의 모수는 shape, loc, scale로 3개이고, 클래스는 4개이므로 총 12개의 숫자가 나옴
    - 아직 unknown 클래스는 만들어지지 않음 (아직 class는 5개가 아님)

In [None]:
class_means = list()
dist_to_means = list()
mr_models = {}

for class_idx in np.unique(train_labels):

    print('class_idx: ', class_idx)
    class_act_vec = train_correct_actvecs[train_correct_labels==class_idx]
    print(class_act_vec.shape)

    class_mean = class_act_vec.mean(axis=0)
    class_means.append(class_mean)

    dist_to_mean = np.square(class_act_vec - class_mean).sum(axis=1) # 각 activation vector의 거리를 계산
    dist_to_mean_sorted = np.sort(dist_to_mean).astype(np.float64) # 거리를 기준으로 오름차순 정렬
    dist_to_means.append(dist_to_mean_sorted)

    shape, loc, scale = stats.weibull_max.fit(dist_to_mean[-100:]) # 거리가 가장 먼 100개를 사용하여 모수 추출

    mr_models[str(class_idx)] = {
        'shape':shape,
        'loc':loc,
        'scale':scale
    }

class_means = np.asarray(class_means)

class_idx:  0
(1250, 4)
class_idx:  1
(1170, 4)
class_idx:  2
(1179, 4)
class_idx:  3
(1191, 4)


### 1. shape
- 베이불 분포의 모양을 결정

![image.png](attachment:image.png)

### 2. loc
- 분포의 가로축 평행 이동을 뜻함  

### 3. scale
- 분포가 얼마나 넓게 퍼져있는지를 뜻함

![image.png](attachment:image.png)

## 이미지 클래스별 확률 계산

In [None]:
def compute_openmax(actvec, class_means, mr_models):
    dist_to_mean = np.square(actvec - class_means).sum(axis=1)

    scores = list()
    for class_idx in range(len(class_means)):
        params = mr_models[str(class_idx)]
        score = stats.weibull_max.cdf(
            dist_to_mean[class_idx],
            params['shape'],
            params['loc'],
            params['scale']
        )
        scores.append(score)
    scores = np.asarray(scores)

    weight_on_actvec = 1 - scores # 각 class별 가중치
    rev_actvec = np.concatenate([
        weight_on_actvec * actvec, # known class에 대한 가중치 곱
        [((1-weight_on_actvec) * actvec).sum()] # unknown class에 새로운 계산식
    ])

    openmax_prob = np.exp(rev_actvec) / np.exp(rev_actvec).sum()
    return openmax_prob

## 테스트셋 로드

In [None]:
test_loader, _test_data = create_dataloader(TEST_PATH, 1, False)
reject_loader, _reject_data = create_dataloader(REJECT_PATH, 1, False)
target_class_num = len(os.listdir(TEST_PATH))

## Inference 함수
- threshold 값을 지정하여 계산한 최대 확률이 모두 다 낮은 경우라면 강제로 reject클래스로 분류해주는 방법
- 단점 : threshold값을 하나하나 찾아봐야 함

In [None]:
def inference(actvec, threshold, target_class_num, class_means, mr_models):
    openmax_prob = compute_openmax(actvec, class_means, mr_models)
    openmax_softmax = np.exp(openmax_prob)/sum(np.exp(openmax_prob))

    pred = np.argmax(openmax_softmax)
    if np.max(openmax_softmax) < threshold:
        pred = target_class_num
    return pred

## threshold 탐색

In [None]:
def inference_dataloader(net, data_loader, threshold, target_class_num, class_means, mr_models, is_reject=False):
    result_preds = list()
    result_labels = list()

    with torch.no_grad():
        for idx, (img, label) in enumerate(data_loader):
            img = img.to(device)
            label = label.to(device)

            out = net(img)
            out_actvec = out.cpu().detach().numpy()[0]
            out_softmax = torch.softmax(out, 1).cpu().detach().numpy()[0]
            out_label = int(label.cpu().detach().numpy())

            pred = inference(out_actvec, threshold, target_class_num, class_means, mr_models)

            result_preds.append(pred)
            if is_reject:
                result_labels.append(target_class_num)
            else:
                result_labels.append(out_label)

    return result_preds, result_labels

- 데이터셋
    - 정위치에 주차된 이미지
    - 오위치에 주차된 이미지
    
- 모델의 성능 평가
    - 정위치에 주차된 이미지들을 알맞은 class로 추론했는지
    - 오위치에 주차된 이미지들을 추론하지 않고 reject 했는지

In [None]:
history = []

for i in np.arange(0.30, 0.40, 0.01):
    test_preds, test_labels = inference_dataloader(net, test_loader, round(i, 2), target_class_num, class_means, mr_models)
    reject_preds, reject_labels = inference_dataloader(net, reject_loader, round(i, 2), target_class_num, class_means, mr_models, is_reject=True)

    history.append({'threshold' : round(i, 2),
                    'Test Acc' : accuracy_score(test_labels, test_preds),
                    'Reject Acc' : accuracy_score(reject_labels, reject_preds)})

In [None]:
pd.DataFrame(history).sort_values(by='Reject Acc', ascending=False)

Unnamed: 0,threshold,Test Acc,Reject Acc
10,0.4,0.561,0.963
9,0.39,0.699,0.948
8,0.38,0.776,0.939
7,0.37,0.812,0.932
6,0.36,0.837,0.927
5,0.35,0.851,0.92
4,0.34,0.854,0.869
3,0.33,0.861,0.828
2,0.32,0.869,0.8
1,0.31,0.873,0.758


- Threshold가 떨어질수록 Test Accuracy는 올라감
- Threshold가 떨어질수록 Reject Accuracy도 떨어짐


- 기준점 : Test Acc 0.85 이상
    - **Threshold 0.35일 때, Test Acc 0.85 이상 Reject Acc 0.90이 나옴**

## 회고

### 1. 새로 알아갔던 점
 - OpenMax에 대해 알아감 (주어진 class가 아니어도 분류할 수 있다는 점)

### 2. 흥미로웠던 점
 - OpenMax가 어떤 과정으로 결과물을 산출하는지 하나하나씩 알아보는게 흥미로웠음

### 3. 아쉬웠던 점
 - 프로젝트를 앞두고 있어서 과제를 급하게 한 느낌이 들었음

### 4. 앞으로의 다짐
 - 마지막 과제 제출이 끝나서 앞으로 프로젝트를 통해 지금까지 익힌 지식을 잘 풀어내야겠다는 생각을 함