SVM for Feature fusion

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.optim.lr_scheduler import _LRScheduler
import torch.utils.data as data
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from torchvision import  utils

from sklearn import decomposition
from sklearn import manifold
from sklearn import metrics
from sklearn import model_selection
from sklearn.metrics import confusion_matrix

import matplotlib.pyplot as plt
%matplotlib inline

import copy
from collections import namedtuple
import os, re, time
import random
import shutil
import math

import pandas as pd
import numpy as np
from sklearn import svm

# set seed to make sure the results are reproducible
SEED = 123
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

# change directory of data
CV_ID = 1
CV_ID = str(CV_ID)
datapath = os.path.join(".data","hcb","cv" + CV_ID)
load_name = 'basic-model-cv'+CV_ID+'.pt'
train_dir = os.path.join(datapath, 'train')
val_dir = os.path.join(datapath, 'val')
test_dir = os.path.join(datapath, 'test')

pretrained_size = (224, 224)
pretrained_means = [0.485, 0.456, 0.406]
pretrained_stds= [0.229, 0.224, 0.225]
test_transforms = transforms.Compose([
                           transforms.Resize(pretrained_size),
                           transforms.ToTensor(),
                           transforms.Normalize(mean = pretrained_means, 
                                                std = pretrained_stds)
                       ])
# inorder to load img with it's label
class MyImageFolder(datasets.ImageFolder):
    def __getitem__(self, index):
        path, _ = self.imgs[index] #img path, label
        return super(MyImageFolder, self).__getitem__(index), path # return image path
    

BATCH_SIZE = 1000 #4
train_data = MyImageFolder(root = train_dir,
                                transform = test_transforms)
test_data = MyImageFolder(root = test_dir,
                                transform = test_transforms)
valid_data = MyImageFolder(root = val_dir,
                                transform = test_transforms)
# train_iterator = data.DataLoader(train_data, 
#                                 shuffle = True,
#                                 drop_last = True,
#                                 batch_size = BATCH_SIZE) 
# valid_iterator = data.DataLoader(valid_data, drop_last = True,
#                                 batch_size = BATCH_SIZE)
# test_iterator = data.DataLoader(test_data, drop_last = True,
#                                 batch_size = BATCH_SIZE)     


# read csv data
path_to_text = '.data/text.csv'
text_table = pd.read_csv(path_to_text, header=0, index_col=0)
Dimension_Text = len(text_table.columns)

def imgid2index(id):
    return 1000*int(id[0])+int(id[2:])

def imgid2textinfo(imgid):
    # convert img path into text info in .csv
    # input: ['.data\\train\\zy\\1_152.bmp','2_8.bmp']
    # output: tensor([[ 1,  4,  7, 10],
    #    [ 3,  6,  9, 12]])
    return torch.tensor(text_table.loc[[imgid2index(re.search('\d_\d+',i.split('\\')[-1]).group()) for i in imgid],:].values, dtype=torch.float32, device=device)

basic_model = torch.hub.load('pytorch/vision:v0.6.0', 'resnet50')
# change output dimension to what we need
IN_FEATURES = basic_model.fc.in_features 
OUTPUT_DIM = 2
basic_model.fc = nn.Linear(IN_FEATURES, OUTPUT_DIM)
fc1=nn.Linear(IN_FEATURES, 32)
fc2=nn.Linear(32,OUTPUT_DIM)
basic_model.fc = nn.Sequential(fc1, fc2)
basic_model.load_state_dict(torch.load(load_name))
basic_model.fc[1] = nn.Identity()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# text_table.head()

In [None]:
# load train data
num_train = len(train_data)
train_data_combine_model = data.DataLoader(train_data,
                                           shuffle=True,
                                           batch_size=num_train)
xy, train_id = next(iter(train_data_combine_model))
x, y = xy
img_output = basic_model(x)
train_x = torch.cat((img_output, imgid2textinfo(
    train_id).cpu()), dim=-1).detach().numpy()
train_y = y
# load val data and combine with train
num_val = len(valid_data)
val_data_combine_model = data.DataLoader(valid_data,
                                         shuffle=True,
                                         batch_size=num_val)
xy, val_id = next(iter(val_data_combine_model))
x, y = xy
img_output = basic_model(x)
val_x = torch.cat((img_output, imgid2textinfo(val_id).cpu()),
                  dim=-1).detach().numpy()
val_y = y

train_x = np.concatenate((train_x, val_x), axis=0)
train_y = np.concatenate((train_y, val_y), axis=0)
# load test data
num_test = len(test_data)
test_data_combine_model = data.DataLoader(test_data, batch_size=num_test)
xy, test_id = next(iter(test_data_combine_model))
x, y = xy
img_output = basic_model(x)
test_x = torch.cat((img_output, imgid2textinfo(
    test_id).cpu()), dim=-1).detach().numpy()
test_y = y.numpy()
tmpt = [imgid2index(re.search('\d_\d+', i.split('\\')[-1]).group())
        for i in train_id]
tmpv = [imgid2index(re.search('\d_\d+', i.split('\\')[-1]).group())
        for i in val_id]
train_ID = tmpt+tmpv
test_ID = [imgid2index(re.search('\d_\d+', i.split('\\')[-1]).group())
           for i in test_id]

x_train = text_table.loc[train_ID, :].values
y_train = [i//1000-1 for i in train_ID]
# print(x_train,y_train)

x_test = text_table.loc[test_ID, :].values
y_test = [i//1000-1 for i in test_ID]
# print(x_test,y_test)

# SVM TEXT+IMG

In [None]:
#search best params for SVM
svm_para = svm.SVC()    
param_grid = {'C': range(5,30), 'gamma': [1e-2, 7e-3, 5e-3, 3e-3, 1e-3, 7e-4, 5e-4, 3e-4, 1e-4, 7e-5], 
              'kernel': ['rbf', 'linear', 'poly', 'sigmoid']}    
grid_search = model_selection.GridSearchCV(svm_para, param_grid)    
grid_search.fit(train_x, train_y)    
best_parameters = grid_search.best_estimator_.get_params()
for para, val in list(best_parameters.items()):    
    print(para, val) 

In [None]:
# combine text info and output of img model with Support Vector Machine
rbf_svc = svm.SVC(C=best_parameters['C'], gamma=best_parameters['gamma'], kernel=best_parameters['kernel'])
# rbf_svc = svm.SVC(kernel='rbf',C = 18, gamma = 0.001)
rbf_svc.fit(train_x, train_y)

svm_pred = rbf_svc.predict(train_x)
acc = np.equal(svm_pred, train_y).sum() / len(train_y)
fpr, tpr, thresholds = metrics.roc_curve(train_y, rbf_svc.decision_function(train_x), pos_label=1)
roc_auc = metrics.auc(fpr, tpr)
plt.plot(fpr, tpr, 'b',label='AUC = %0.2f'% roc_auc)
plt.legend(loc='lower right')
# plt.plot([0, 1], [0, 1], 'r--')
plt.xlim([-0.1, 1.1])
plt.ylim([-0.1, 1.1])
plt.xlabel('False Positive Rate') #横坐标是fpr
plt.ylabel('True Positive Rate')  #纵坐标是tpr
plt.title('Receiver operating characteristic Curve')
plt.show()

In [None]:
svm_pred = rbf_svc.predict(test_x)
acc = np.equal(svm_pred, test_y).sum() / len(test_y)
fpr, tpr, thresholds = metrics.roc_curve(test_y, rbf_svc.decision_function(test_x), pos_label=1)
roc_auc = metrics.auc(fpr, tpr)
plt.plot(fpr, tpr, 'b',label='AUC = %0.2f'% roc_auc)
plt.legend(loc='lower right')
# plt.plot([0, 1], [0, 1], 'r--')
plt.xlim([-0.1, 1.1])
plt.ylim([-0.1, 1.1])
plt.xlabel('False Positive Rate') #横坐标是fpr
plt.ylabel('True Positive Rate')  #纵坐标是tpr
plt.title('Receiver operating characteristic Curve')
plt.show()

# SVM TEXT

In [None]:
#search best params for SVM
svm_para = svm.SVC()    
param_grid = {'C': range(5,20), 'gamma': [1e-2, 7e-3, 5e-3, 3e-3, 1e-3, 7e-4], 
              'kernel': ['rbf', 'linear', 'poly', 'sigmoid']}    
grid_search = model_selection.GridSearchCV(svm_para, param_grid)    
grid_search.fit(x_train, y_train)    
best_parameters = grid_search.best_estimator_.get_params()
for para, val in list(best_parameters.items()):    
    print(para, val) 

In [None]:
# combine text info and output of img model with Support Vector Machine
rbf_svc = svm.SVC(C=best_parameters['C'], gamma=best_parameters['gamma'], kernel=best_parameters['kernel'])
# rbf_svc = svm.SVC(kernel='rbf',C = 18, gamma = 0.001)
rbf_svc.fit(x_train, y_train)

svm_pred = rbf_svc.predict(x_train)
acc = np.equal(svm_pred, y_train).sum() / len(y_train)
fpr, tpr, thresholds = metrics.roc_curve(y_train, rbf_svc.decision_function(x_train), pos_label=1)
roc_auc = metrics.auc(fpr, tpr)
plt.plot(fpr, tpr, 'b',label='AUC = %0.2f'% roc_auc)
plt.legend(loc='lower right')
# plt.plot([0, 1], [0, 1], 'r--')
plt.xlim([-0.1, 1.1])
plt.ylim([-0.1, 1.1])
plt.xlabel('False Positive Rate') #横坐标是fpr
plt.ylabel('True Positive Rate')  #纵坐标是tpr
plt.title('Receiver operating characteristic Curve')
plt.show()

In [None]:
svm_pred = rbf_svc.predict(x_test)
acc = np.equal(svm_pred, y_test).sum() / len(y_test)
fpr, tpr, thresholds = metrics.roc_curve(y_test, rbf_svc.decision_function(x_test), pos_label=1)
roc_auc = metrics.auc(fpr, tpr)
plt.plot(fpr, tpr, 'b',label='AUC = %0.2f'% roc_auc)
plt.legend(loc='lower right')
# plt.plot([0, 1], [0, 1], 'r--')
plt.xlim([-0.1, 1.1])
plt.ylim([-0.1, 1.1])
plt.xlabel('False Positive Rate') #横坐标是fpr
plt.ylabel('True Positive Rate')  #纵坐标是tpr
plt.title('Receiver operating characteristic Curve')
plt.show()