## Using CNN and NCM for MNIST feature extraction and classification
### Summary
MNIST 데이터셋을 분류하기 위해 먼저 CNN을 Pretraining 시킨 후, FC layer만 제거하여 Feature Extractor로 활용.

CNN으로 추출된 feature를 NCM의 데이터로 사용.

기존 CNN Test Accuracy : 약 98.5% 

CNN + NCM : 약 93.7% 

In [37]:
import time
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
from utils.utils import Info
from sklearn.neighbors import NearestCentroid
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
from utils.NCM_Classifier import train, validate
from models.resnet_feature import resnet18_feature
from sklearn.metrics import accuracy_score

from utils.Data_Classifier import accuracy

In [38]:
class Config(Info):
    def __init__(self):
        super(Info, self).__init__()
        self.device = 'PC'
        self.dataset = 'MNIST'
        self.test_size = 0.2
        self.feature_size = 3072
        self.method = 'NCM'
        self.distance = 'Euclidean'
        self.reduction_method = [None, None] # method, n_components
        self.iter = 10

In [39]:
cig = Config()
cig.info()
cig.print_rutin()

Device ── PC
│
├──Dataset
│    └────MNIST
│    └────Train size 80%
│    └────Feature size: 3072
│
├──Method
│    └────NCM
│    └────Euclidean
│
├──Dimension reduction
│    └────Method: None
│    └────Component size: None
│    └────Feature Reduction Ratio: None%
│
└──Iteration
    └────10
PC - MNIST(80%) - NCM - 10 iteration


In [40]:
seed = 0
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
# torch.cuda.manual_seed_all(seed) # if use multi-GPU
cudnn.deterministic = True  # 연산 처리 속도 감소 -> 모델과 코드를 배포해야 하는 연구 후반 단계에 사용
cudnn.benchmark = False

## Load MNIST Dataset

In [41]:
transform = transforms.Compose([transforms.ToTensor(),
                                transforms.Normalize((0.5,), (1.0,))
                                ])

batch_size = 512

trainset = torchvision.datasets.MNIST(root='../../datasets', train=True, download=True, transform=transform)
train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)

validationset = torchvision.datasets.MNIST(root='../../datasets', train=False, download=True, transform=transform)
val_loader = DataLoader(validationset, batch_size=batch_size, shuffle=False, num_workers=2)

print(trainset.data.shape)
print(validationset.data.shape)

torch.Size([60000, 28, 28])
torch.Size([10000, 28, 28])


## Pretraining CNN

In [47]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, 3)
        self.conv2 = nn.Conv2d(16, 32, 3)
        self.conv3 = nn.Conv2d(32, 64, 3)

        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.relu = nn.ReLU(inplace=True)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(64, 10)

    def forward(self, x): # 28 x 28
        x = self.conv1(x)
        x = self.relu(x)
        x = self.maxpool(x) # 14 x 14

        x = self.conv2(x)
        x = self.relu(x)
        x = self.maxpool(x) # 7 x 7

        x = self.conv3(x)
        x = self.relu(x)
        x = self.maxpool(x) # 4 x 4

        x = self.avgpool(x) # 1 x 1
        x = torch.flatten(x, 1)
        x = self.fc(x)
        
        return x

In [64]:
learning_rate = 0.001
training_epochs = 15
batch_size = 100

net = CNN().to(device)

criterion = torch.nn.CrossEntropyLoss()  # 비용 함수에 소프트맥스 함수 포함되어져 있음.
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)

total_train_batch = len(train_loader)
total_test_batch = len(val_loader)
print('총 배치의 수 : {}, {}'.format(total_train_batch, total_test_batch))
net

총 배치의 수 : 118, 20


CNN(
  (conv1): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1))
  (conv2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1))
  (conv3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (relu): ReLU(inplace=True)
  (avgpool): AdaptiveAvgPool2d(output_size=(1, 1))
  (fc): Linear(in_features=64, out_features=10, bias=True)
)

In [51]:


for epoch in range(training_epochs):
    avg_cost = 0
    avg_train_acc = 0
    avg_test_acc = 0

    net.train()
    for X, Y in train_loader: # 미니 배치 단위로 꺼내온다. X는 미니 배치, Y느 ㄴ레이블.
        # image is already size of (28x28), no reshape
        # label is not one-hot encoded
        X = X.to(device)
        Y = Y.to(device)

        optimizer.zero_grad()
        hypothesis = net(X)
        cost = criterion(hypothesis, Y)
        acc = accuracy(hypothesis, Y)

        cost.backward()
        optimizer.step()

        avg_cost += cost / total_train_batch
        avg_train_acc += acc / total_train_batch

    print('[Epoch: {:>4}] Acc : {:>4} cost : {:>.9}'.format(epoch + 1, avg_train_acc.item(), avg_cost.item()))

    net.eval()
    with torch.no_grad():
        for X_test, Y_test in val_loader:
            X_test = X_test.to(device)
            Y_test = Y_test.to(device)

            prediction = net(X_test)

            acc = accuracy(prediction, Y_test)
            avg_test_acc += acc / total_test_batch

    print('Test Acc: {:>4}'.format(avg_test_acc.item()) )

[Epoch:    1] Acc : 61.57198715209961 cost : 1.34405637
Test Acc: 88.96599578857422
[Epoch:    2] Acc : 91.71138000488281 cost : 0.29286927
Test Acc: 93.60466766357422
[Epoch:    3] Acc : 94.4644775390625 cost : 0.185504705
Test Acc: 95.91107177734375
[Epoch:    4] Acc : 95.72738647460938 cost : 0.142004624
Test Acc: 96.11845397949219
[Epoch:    5] Acc : 96.37627410888672 cost : 0.11929287
Test Acc: 96.97552490234375
[Epoch:    6] Acc : 96.79668426513672 cost : 0.104789771
Test Acc: 97.41153717041016
[Epoch:    7] Acc : 97.20166015625 cost : 0.0904851854
Test Acc: 97.34432220458984
[Epoch:    8] Acc : 97.50676727294922 cost : 0.0803703144
Test Acc: 97.90613555908203
[Epoch:    9] Acc : 97.74124145507812 cost : 0.0732725561
Test Acc: 97.97449493408203
[Epoch:   10] Acc : 97.86259460449219 cost : 0.0690031052
Test Acc: 98.26861572265625
[Epoch:   11] Acc : 98.06503295898438 cost : 0.0630972534
Test Acc: 98.29905700683594
[Epoch:   12] Acc : 98.2349853515625 cost : 0.0581942722
Test Acc: 

## 64 Feature Extrator (CNN)

In [54]:
class Identity(nn.Module):
    def __init__(self):
        super(Identity, self).__init__()
        
    def forward(self, x):
        return x

net.fc = Identity()
net

CNN(
  (conv1): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1))
  (conv2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1))
  (conv3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (relu): ReLU(inplace=True)
  (avgpool): AdaptiveAvgPool2d(output_size=(1, 1))
  (fc): Identity()
)

In [63]:
# NCM
classifier = NearestCentroid()

net.eval()

features = []
targets = []

with torch.no_grad():
    start_time = time.time()
    for i, (images, target) in enumerate(train_loader):

        if torch.cuda.is_available():
            images = images.to(device)
            target = target.to(device)

        # compute output
        feature = net(images)
        features.extend(feature.cpu().numpy())
        targets.extend(target.cpu().numpy())
    feature_extract_time = time.time() - start_time

    features = np.array(features)
    targets = np.array(targets)

    c_start_time = time.time()
    classifier.fit(features, targets)
    fit_time = time.time() - c_start_time
    output = classifier.predict(features)

    # measure accuracy and record loss
    acc = accuracy_score(output, targets)

print('\nFinished Fit\n')
print("Train Data Feature Extract Time : %.4f" % feature_extract_time, "sec")
print("Train Data Fitting NCM Time : %.4f" % fit_time, "sec")
print("\nTrain Accuracy : %.2f" % (acc*100), "%")


Finished Fit

Train Data Feature Extract Time : 4.2274 sec
Train Data Fitting NCM Time : 0.0140 sec

Train Accuracy : 93.16 %


In [62]:
net.eval()

test_features = []
test_targets = []

with torch.no_grad():
    start_time = time.time()
    for i, (images, target) in enumerate(val_loader):

        if torch.cuda.is_available():
            images = images.to(device)
            target = target.to(device)

        # compute output
        feature = net(images)
        test_features.extend(feature.cpu().numpy())
        test_targets.extend(target.cpu().numpy())
    feature_extract_time = time.time() - start_time

    test_features = np.array(test_features)
    test_targets = np.array(test_targets)

    c_start_time = time.time()
    output = classifier.predict(test_features)
    predict_time = time.time() - c_start_time

    # measure accuracy and record loss
    acc = accuracy_score(output, test_targets)


print('\nFinished Predicting\n')
print("Test Data Feature Extract Time : %.4f" % feature_extract_time, "sec")
print("Test Data Prediction Time : %.4f" % predict_time, "sec")
print("\nTest Accuracy : %.2f" % (acc*100), "%")


Finished Predicting

Test Data Feature Extract Time : 1.4511 sec
Test Data Prediction Time : 0.0030 sec

Test Accuracy : 93.74 %
