In [26]:
import sys
sys.path.append("../../")

import torch
import torchvision.utils
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.autograd import Variable
from torch import optim
from torch.utils.data import DataLoader
from torchvision import models
from torchsummary import summary

import numpy as np
import cv2
import pandas as pd
from sklearn.metrics import accuracy_score

import src.dataPreparation.CreatePartial as create_partial
import src.neuralNetworksArch.BstCnn as bst
import src.utils.Visual as vis
import src.utils.Checkpoint as ckp
import src.utils.Metrics as metrics

from src.config.Param import *

In [27]:
MODEL_PATH = [
    '../../models/PARTIAL_1 #1 - CUHK03.pth',
    '../../models/PARTIAL_1 #2 - CUHK03.pth',
    '../../models/PARTIAL_1 #3 - CUHK03.pth'
]
DATATEST_PATH = '../../dataset/testing/same_cam/testing.csv'
IMAGES_PATH = '../../dataset/testing/same_cam/images/full/'
IMAGES_OCCL_PATH = '../../dataset/testing/same_cam/images/occl_20/'

In [28]:
def cv_image2tensor(img, convert=True):
    if convert:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = img[:, :, ::-1].transpose((2, 0, 1)).copy()
    img = torch.from_numpy(img).float() / 255.0
    img = [img]
    img_tensors = torch.stack(img)

    return img_tensors

In [29]:
import torch.nn as nn

class BstCnn2(nn.Module):
    def __init__(self):
        super(BstCnn2, self).__init__()

        self.conv5x5_1 = nn.Sequential(
            nn.ReflectionPad2d(2),
            nn.Conv2d(3, 16, kernel_size=5),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(16)
        )

        self.conv3x3_1 = nn.Sequential(
            nn.ReflectionPad2d(1),
            nn.Conv2d(3, 16, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(16)
        )

        self.conv3x3_2 = nn.Sequential(
            nn.ReflectionPad2d(1),
            nn.Conv2d(35, 16, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(16)
        )

        self.conv1x1_2 = nn.Sequential(
            nn.Conv2d(35, 16, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(16)
        )

        self.conv1x1_3 = nn.Sequential(
            nn.Conv2d(32, 16, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(16)
        )

        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)

        self.avgpool = nn.AdaptiveAvgPool2d((8, 8)) # partial images

        self.fc = nn.Sequential(
            nn.Linear(16*8*8, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.2),
            
            nn.Linear(4096, 2048),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.2),
            
            nn.Linear(2048, 1024),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.2),
            
            nn.Linear(1024, 512),
        )

        self._initialize_weights()

    def forward_once(self, x):
        out_5x5 = self.conv5x5_1(x)
        out_3x3 = self.conv3x3_1(x)
        output = torch.cat([out_5x5, out_3x3, x], 1)
        output = self.maxpool(output)
        
        out_3x3 = self.conv3x3_2(output)
        out_1x1 = self.conv1x1_2(output)
        output = torch.cat([out_3x3, out_1x1], 1)
        output = self.maxpool(output)

        out_1x1 = self.conv1x1_3(output)
        output = self.avgpool(out_1x1)

        output = output.reshape(output.size(0), -1)
        output = self.fc(output)

        return output

    def forward(self, input1, input2, input3=None):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        if input3 == None:
            return output1, output2
        else:
            output3 = self.forward_once(input3)
            return output1, output2, output3

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)


In [30]:
models = []
for index, path in enumerate(MODEL_PATH):
    data = {}
    if index != 2:
        model  = bst.BstCnn()
    else:
        model = BstCnn2()
    checkpoint = ckp.load_checkpoint(load_dir=path)
    model.load_state_dict(checkpoint['state_dict'])
    model.eval()
    
    data['id'] = 'PART-' + str(index+1)
    data['model'] = model
    data['min_dist'] = checkpoint['dist'][0]
    data['max_dist'] = checkpoint['dist'][1]
#     data['threshold'] = checkpoint['threshold']
    data['threshold'] = 0.3
    data['threshold_list'] = checkpoint['threshold_list']
    models.append(data)

In [31]:
a = 0
b = 0
for i in models[1]['threshold_list']:
    if i == 0.15000000000000002:
        a +=1
    elif i == 0.20000000000000004:
        b += 1
        
print('a : {}'.format(a))
print('b : {}'.format(b))

a : 151805
b : 99420


In [49]:
df = pd.read_csv(DATATEST_PATH)
y_true = []
y_pred = []
verbose = False
with torch.no_grad():
    for index, data in df.iterrows():
#         if index != 47:
#             continue
        if verbose == True:
            print('No-{}'.format(index+1))
            print('-'*50)
        img1 = cv2.imread(IMAGES_PATH + data['image_1'])
        img2 = cv2.imread(IMAGES_OCCL_PATH + data['image_2'])
        label = data['label']

        img1_part = list(create_partial.partial_image_1(img1))
        img2_part = list(create_partial.partial_image_1(img2))

        thresholds = []
        dists = []
        for i, (input1, input2, model) in enumerate(zip(img1_part, img2_part, models)):
            input1 = cv_image2tensor(input1)
            input2 = cv_image2tensor(input2)
            out1, out2 = model['model'](input1, input2)
            euclidean_distance = F.pairwise_distance(out1, out2)
            
            dist = metrics.normalize_data(euclidean_distance.item(), model['max_dist'])
            dists.append(dist)
            thresholds.append(model['threshold'])
            
            if verbose == True:
                print('PART-{} DISTANCE => {}'.format((i+1), dist))
                print('Threshold => {}'.format(model['threshold']))
                print('-'*50)
            
        cat_dist, cat_thresh = metrics.concatenate(dists, thresholds, [0.2, 0.6, 0.2])
        
        if verbose == True:
            print('actual distance => {}'.format(cat_dist))
            print('actual thresh => {}'.format(cat_thresh))
            print('actual label => {}'.format(label))
        
        y_true.append(float(label))
        pred = 0.0 if cat_dist <= cat_thresh else 1.0
        y_pred.append(pred)
        concatenated = torch.cat((cv_image2tensor(img1, False), cv_image2tensor(img2, False)),0)
        
        if verbose == True:
            vis.imshow(torchvision.utils.make_grid(concatenated))
            print('='*50, end='\n\n')

print(len([i==0.0 for i in y_pred]))
print(len([i==1.0 for i in y_pred]))
acc = accuracy_score(np.array(y_true), np.array(y_pred))
print('Accuracy : {}'.format(acc))

100
100
Accuracy : 0.64


In [50]:
# for t, p in zip(y_true, y_pred):
#     print('True : {}'.format(t))
#     print('Predict : {}'.format(p))
#     print('-'*30)