In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
%matplotlib inline
plt.rcParams['figure.figsize'] = [8, 4]
plt.rcParams['figure.dpi'] = 144
sns.set_style("darkgrid")

In [2]:
class SVM:
    def __init__(self, C, toler, kernel_option):
        '''
        C : 松弛变量的代价
        toler : 迭代的中止条件之一.当参数更新步长小于toler时，迭代中止
        kernel_option : 核函数选项
            1. linear : 线性核
            2. rbf : 高斯核
            3. poly : 多项式核
            4. laplace : 拉普拉斯核
            5. sigmoid : Sigmoid核
        '''
        self.C = C
        self.toler = toler
        self.kernel_option = kernel_option
    
    def train(self, data, label, max_iter):
        self.data = np.array(data)
        self.label = np.array(label)
        self.max_iter = max_iter
        self.b = 0
        self.m, self.n = data.shape
        self.kernel = self.calc_kernel()
        self.alpha = np.zeros(self.m)
        self.error = np.zeros((self.m, 2))
        self.SMO()
        self.evaluate(self.data, self.label)
        # self.CV_plot()
        # self.plot()
        
    def calc_kernel(self):
        _kernel = np.zeros((self.m, self.m))
        for i in range(self.m):
            _kernel[:, i] = self.calc_kernel_value(self.data[i, :])
        return _kernel
    
    def calc_kernel_value(self, data_i):
        kernel_type = self.kernel_option['type']
        kernel_value = np.zeros(self.m)
        
        if kernel_type == 'rbf':
            sigma = self.kernel_option['sigma']
            if sigma == 'auto':
                sigma = 1.0 / self.m
            for i in range(self.m):
                _diff = self.data[i, :] - data_i
                kernel_value[i] = np.exp(-np.dot(_diff, _diff.T) / (2 * sigma ** 2))        
        elif kernel_type == 'linear':
            kernel_value = np.dot(self.data, data_i.T)
        elif kernel_type == 'poly':
            d = self.kernel_option['d']
            kernel_value = np.dot(self.data, data_i.T) ** d
        elif kernel_type == 'laplace':
            sigma = self.kernel_option['sigma']
            for i in range(self.m):
                _diff = self.data[i, :] - data_i
                kernel_value[i] = np.exp(-np.dot(_diff, _diff.T) / sigma)
        elif kernel_type == 'sigmoid':
            beta = self.kernel_option['beta']
            theta = self.kernel_option['theta']
            kernel_value = np.tanh(beta * np.dot(self.data, data_i.T) + theta)
        return kernel_value
    
    def SMO(self):
        loop_all = True # 控制当前轮是否针对整个样本集循环，否则只针对非边界样本循环
        changed = False
        iteration = 0
        
        keep_going = True
        while (iteration < self.max_iter) and ((changed >0) or loop_all):
#             if iteration / (self.max_iter / 10) == 0:
#                 print("%d / %d iterations passed..." % (iteration, self.max_iter))
            print("\t iteration : ", iteration)
            changed = 0
            
            if loop_all: # 针对所有样本
                for i in range(self.m):
                    changed += self.once_iterate(i)
                iteration += 1
            else: # 针对非边界样本
                inner = []
                for i in range(self.m):
                    if 0 < self.alpha[i] < self.C:
                        inner.append(i)
                for i in inner:
                    changed += self.once_iterate(i)
                iteration += 1
            
            if loop_all:
                loop_all = False
            elif changed == 0:
                loop_all = True
            
#             if not ((iteration < self.max_iter) and (changed or loop_all)):
#                 print("STOP @ %d" % iteration)
#                 keep_going = False
                
    def once_iterate(self, alpha_i):
        error_i = self.calc_error(alpha_i)
        
        if (self.label[alpha_i] * error_i < -self.toler) and (self.alpha[alpha_i] < self.C) \
            or (self.label[alpha_i] * error_i > self.toler) and (self.alpha[alpha_i] > 0):
            
            alpha_j, error_j = self.get_alpha_j(alpha_i, error_i)
            alpha_i_old = self.alpha[alpha_i].copy()
            alpha_j_old = self.alpha[alpha_j].copy()
            
            i = alpha_i
            j = alpha_j
            Ei = error_i
            Ej = error_j
            
            L, H = self.get_L_H(i, j)
            if L == H:
                # print("Bad L==H")
                return 0
            
            eta = 2 * self.kernel[i, j] - self.kernel[i, i] - self.kernel[j, j]
            if eta >= 0: 
                # print("Bad eta>=0")
                return 0
            
            alpha_j_new = self.alpha[j] - self.label[j] * (Ei - Ej) / eta
            self.alpha[j] = np.clip(alpha_j_new, L, H)
            
            self.update_error(j)
            if (abs(self.alpha[j] - alpha_j_old) < 0.00001):  # a_j 变化过小，认为没有变化
                self.update_error(j)
                # print("Bad new - old < 10**-5")
                return 0
            
            self.alpha[i] += self.label[i] * self.label[j] * (alpha_j_old - self.alpha[j])
            
            self.update_error(i)
            diff_1 = self.alpha[i] - alpha_i_old
            diff_2 = self.alpha[j] - alpha_j_old
            b1 = self.b - Ei - self.label[i] * diff_1 * self.kernel[i, i] - self.label[j] * diff_2 * self.kernel[i, j]
            b2 = self.b - Ej - self.label[i] * diff_1 * self.kernel[i, j] - self.label[j] * diff_2 * self.kernel[j, j]
            
            if 0 < self.alpha[i] < self.C:
                self.b = b1
            elif 0 < self.alpha[j] < self.C:
                self.b = b2
            else:
                self.b = (b1 + b2) / 2.0
            
            
            
            
            return 1
        else:
            return 0
        
    def calc_error(self, ix):
        pred = np.dot((self.alpha * self.label).T, self.kernel[ix]) + self.b
        Eix = pred - float(self.label[ix])
        return Eix
    
    def get_L_H(self, i, j):
        if self.label[i] != self.label[j]:
            _diff = self.alpha[j] - self.alpha[i]
            L = max(0, _diff)
            H = min(self.C, self.C + _diff)
        else:
            _sum = self.alpha[i] + self.alpha[j]
            L = max(0, _sum - self.C)
            H = min(self.C, _sum)
        return L, H
    
    def get_alpha_j(self, alpha_i, error_i):
        self.error[alpha_i] = [1, error_i] # 标记为已优化并更新误差
        optmize_list = self.error[:, 0].nonzero()[0]
        
        max_diff = 0
        alpha_j = 0
        error_j = 0
        
        if len(optmize_list) > 1:
            for k in optmize_list:
                if k != alpha_i:
                    error_k = self.calc_error(k)
                    if abs(error_k - error_i) > max_diff:
                        max_diff = abs(error_k - error_i)
                        alpha_j = k
                        error_j = error_k
        else:
            alpha_j = alpha_i
            while alpha_j == alpha_i:
                alpha_j = np.random.randint(self.m)
            error_j = self.calc_error(alpha_j)
        
        return alpha_j, error_j
    
    def update_error(self, ix):
        error_ix = self.calc_error(ix)
        self.error[ix] = [1, error_ix]
    
    def evaluate(self, test_data, test_label):
        m = test_data.shape[0]
        err_cnt = 0
        for i in range(m):
            kernel_value = self.calc_kernel_value(test_data[i])
            pred = np.dot(kernel_value.T, self.label * self.alpha) + self.b
            if np.sign(pred) != np.sign(test_label[i]):
                err_cnt += 1
        print("test acc : %.4lf" % (1 - err_cnt / m))
        
    def plot(self):
        SV_1 = [[], []]
        SV_2 = [[], []]
        mid_1 = [[], []]
        mid_2 = [[], []]
        other_1 = [[], []]
        other_2 = [[], []]
        for i in range(self.m):
            plt.text(self.data[i][0], self.data[i][1], "%.2f" % self.alpha[i], color='white', fontsize=12)
            
            if self.alpha[i] != 0 and self.alpha[i] < self.C:
                if self.label[i] == 1:
                    SV_1[0].append(self.data[i][0])
                    SV_1[1].append(self.data[i][1])
                else:  # -1
                    SV_2[0].append(self.data[i][0])
                    SV_2[1].append(self.data[i][1])
            elif self.alpha[i] == self.C:
                if self.label[i] == 1:
                    mid_1[0].append(self.data[i][0])
                    mid_1[1].append(self.data[i][1])
                else:  # -1
                    mid_2[0].append(self.data[i][0])
                    mid_2[1].append(self.data[i][1])
            elif self.label[i] == 1:
                other_1[0].append(self.data[i][0])
                other_1[1].append(self.data[i][1])
            else:  # -1
                other_2[0].append(self.data[i][0])
                other_2[1].append(self.data[i][1])
        plt.scatter(SV_1[0], SV_1[1], s=60, c='red', marker=r'$\bigodot$')
        plt.scatter(SV_2[0], SV_2[1], s=60, c='blue', marker=r'$\bigodot$')        
        plt.scatter(mid_1[0], mid_1[1], s=30, c='red', marker='s')
        plt.scatter(mid_2[0], mid_2[1], s=30, c='blue', marker='s')
        plt.scatter(other_1[0], other_1[1], s=30, c='red', marker='o')
        plt.scatter(other_2[0], other_2[1], s=30, c='blue', marker='o')
    
    def CV_plot(self):
        x_min = self.data[:, 0].min()
        x_max = self.data[:, 0].max()
        y_min = self.data[:, 1].min()
        y_max = self.data[:, 1].max()
        
        # for i in np.arange()
        for i in np.arange(x_min,x_max, (x_max-x_min)/60):
            for j in np.arange(y_min,y_max, (y_max-y_min)/30):
                w = self.calc_kernel_value(np.array([i, j]))
                pred = np.dot(w.T, self.label * self.alpha) + self.b
                if pred > 0:
                    plt.scatter(i, j, color='red',s=20, marker='*')
                else:
                    plt.scatter(i, j, color='blue', s=20, marker='*')

In [3]:
def load_data(path):
    train_data_ori = pd.read_excel(path, sheet_name='traindata', header=None)
    train_label = np.array(train_data_ori.iloc[:, -1])
    train_label[train_label == 0] = -1
    train_data = np.array(train_data_ori)[:, :-1]
    test_data_ori = pd.read_excel(path, sheet_name='testdata', header=None)
    test_label = np.array(test_data_ori.iloc[:, -1])
    test_label[test_label == 0] = -1
    test_data = np.array(test_data_ori)[:, :-1]
    
#     train_mean = train_data.mean(axis=0)
#     test_mean = test_data.mean(axis=0)
#     train_var = train_data.var(axis=0)
#     test_var = test_data.var(axis=0)
#     train_data = (train_data - train_mean) / train_var
#     test_data = (test_data - test_mean) / test_var
    
#     train_min = train_data.min(axis=0)
#     test_min = test_data.min(axis=0)
#     train_max = train_data.max(axis=0)
#     test_max = test_data.max(axis=0)
#     train_data = (train_data + train_min) / (train_max - train_min)
#     test_data = (test_data + test_min) / (test_max - test_min)
    
    return train_data, train_label, test_data, test_label

In [4]:
train_data, train_label, test_data, test_label = load_data('PIDD.xlsx')

In [5]:
# data_ori = pd.read_csv('h_s.csv', header=None)
# train_data, train_label = data_ori.iloc[:, :-1], data_ori.iloc[:, -1]

In [7]:
kernel_option = {
    'type' : 'rbf',
    'sigma' : 10,
    'd' : 12
}
svm = SVM(C=10, toler=0.001, kernel_option=kernel_option)
svm.train(train_data, train_label, max_iter=500)
svm.evaluate(test_data, test_label)

	 iteration :  0
	 iteration :  1
	 iteration :  2
	 iteration :  3
	 iteration :  4
	 iteration :  5
test acc : 0.9953
test acc : 0.6406
