**注意：**该notebook参考了 *"Machine Learning in Action"* 一书中的代码，书中的代码虽然变量命名极其混乱，不过逻辑非常清晰，非常适合用作参考代码。

In [1]:
import os
os.sys.path.append(os.path.dirname(os.path.abspath('.')))

## 数据准备

In [2]:
import numpy as np
from datasets.dataset import load_breast_cancer
data = load_breast_cancer()
X, Y = data.data, data.target
Y[Y == 0] = -1
del data

from model_selection.train_test_split import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2)

## 模型基础


In [3]:
# def select_lambda(a,n_samples):
#     '''
#     选取第二个lambda
#     '''
#     b=a
#     while b==a:
#         b=np.random.randint(n_samples)
#     return b

def clip_lambda(lambda_b,L,H):
    '''
    对lambda的截断
    '''
    if lambda_b<L:
        lambda_b=L
    if lambda_b>H:
        lambda_b=H
    return lambda_b

以下代码逻辑请参阅博客[SMO]()。

In [4]:
# def SMO_simp(X_train, Y_train, C, tol, max_iter):
#     '''
#     简化版SMO，使用遍历的方式来选取第一个lambda，再随机选取第二个lambda
#     '''
#     n_samples, n_feature = X_train.shape

#     lambdas = np.zeros(n_samples)    # 每一样本都对应一个lambda
#     b = 0
#     iter_cnt = 0

#     while iter_cnt < max_iter:
#         lambda_changed_cnt = 0

#         # 以遍历的方式来选取第一个lambda
#         for i in range(n_samples):  
#             # 第一个lambda对应的预测值与误差
#             y_i = np.sum((lambdas*Y_train)*np.dot(X_train, X_train[i, :].T))+b
#             err_i = y_i-Y_train[i]

#             if (Y_train[i]*err_i < -tol and lambdas[i] < C) or (Y_train[i]*err_i > tol and lambdas[i] > 0):
#                 j = select_lambda(i, n_samples)    # 选择第二个lambda
#                 # 第二个lambda对应的预测值与误差
#                 y_j = np.dot((lambdas*Y_train).T,
#                              np.dot(X_train, X_train[j, :].T))+b
#                 err_j = y_j-Y_train[j]

#                 lambda_i_pre, lambda_j_pre = lambdas[i].copy(
#                 ), lambdas[j].copy()    # 保存两lambda的旧值

#                 # 计算上下界
#                 if Y_train[i] != Y_train[j]:
#                     L = max(0, lambdas[j]-lambdas[i])
#                     H = min(C, C+lambdas[j]-lambdas[i])
#                 else:
#                     L = max(0, lambdas[j]+lambdas[i]-C)
#                     H = min(C, lambdas[j]+lambdas[i])
#                 if L == H:
#                     continue

#                 eta = 2*np.dot(X_train[i, :], X_train[j, :].T)-np.dot(
#                     X_train[i, :], X_train[i, :].T)-np.dot(X_train[j, :], X_train[j, :].T)
#                 if eta >= 0:
#                     continue

#                 # 优化lambda_j
#                 lambdas[j] -= Y_train[j]*(err_i-err_j)/eta
#                 lambdas[j] = clip_lambda(lambdas[j], L, H)
#                 delta_lambda_j = lambdas[j]-lambda_j_pre
#                 if abs(delta_lambda_j) < 1e-5:    # 更新量太小则放弃
#                     continue

#                 # 优化lambda_i
#                 lambdas[i] -= Y_train[i]*Y_train[j]*delta_lambda_j
#                 delta_lambda_i = lambdas[i]-lambda_i_pre

#                 # 偏移量
#                 b_i = b-err_i-Y_train[i]*delta_lambda_i*np.dot(
#                     X_train[i, :], X_train[i, :].T)-Y_train[j]*delta_lambda_j*np.dot(X_train[i, :], X_train[j, :].T)
#                 b_j = b-err_j-Y_train[i]*delta_lambda_i*np.dot(
#                     X_train[i, :], X_train[j, :].T)-Y_train[j]*delta_lambda_j*np.dot(X_train[j, :], X_train[j, :].T)
#                 if 0 < lambdas[i] < C:
#                     b = b_i
#                 elif 0 < lambdas[j] < C:
#                     b = b_j
#                 else:
#                     b = (b_i+b_j)/2

#                 lambda_changed_cnt += 1

#         if lambda_changed_cnt == 0:
#             iter_cnt += 1
#         else:
#             iter_cnt = 0

#     return lambdas, b


# lambdas, b=SMO_simp(X_train, Y_train, 0.6, 0.001, 5)

## 优化

以上代码就是 *'Machine Learning in Action'* 一书中简化版SMO的代码，相信参照博客内容应该很容易读懂。虽然上述实现符合朴素的SMO思想，但是作为代码实现，有很多可以优化的空间。

首先是$\lambda$的选取，上述简化版本的SMO算法是始终采用遍历方式来选取第一个$\lambda_{a}$，然后再随机选取第二个$\lambda_{b}$。实际上，有一种启发式的方法可以每次选出比较好的一对$\lambda$来做优化，大致思想如下：
1. 初始化所有$\lambda$为0，并且同简化版SMO一样，遍历所有$\lambda_{a}$，再随机选取$\lambda_{b}$，对所有可以优化的参数对做一轮优化
2. 选取满足$0<\lambda_{a}<C$的参数作为第一个$\lambda_{a}$，满足此条件$\lambda$对应的样本称为非边界样本；而对第二个参数，选取满足$max(E_{a}-E_{b})$的参数作为$\lambda_{b}$
3. 不断重复第$2$步，直到不存在非边界样本，然后再对所有$\lambda$遍历优化，再进入第$2$步

首先，定义一个RBF核函数，注意到优化后SVM的预测函数为：
$$
\begin{aligned}
\hat{y}&=
\left\{
\begin{aligned}
&+1,  &\sum_{i=1}^{m}\lambda_{i}^{*}y^{i}\langle\hat{x},x^{i}\rangle\ge+1\\
&-1,  &\sum_{i=1}^{m}\lambda_{i}^{*}y^{i}\langle\hat{x},x^{i}\rangle\le+1\\
\end{aligned}
\right.
\end{aligned}
$$
其中$\hat{x}$为测试样本，一般我们会以矩阵形式输入，而$x^{i}$为训练样本，这里的累加不知道可不可以使用numpy加速，我暂时没有想到怎么去实现。所以需要定义一个接受矩阵与向量为参数，返回RBF核矩阵的函数。

In [5]:
def rbf(mat, array, sigma=1):
    '''
    RBF核变换函数，返回mat与array的RBF核矩阵
    '''
    n_samples, n_feature = mat.shape
    K_mat = np.zeros(n_samples)

    for i in range(n_samples):
        K_mat[i] = np.dot((mat[i, :]-array), (mat[i, :]-array).T)
    K_mat = np.exp(-K_mat/2/(sigma**2))
    return K_mat

# rbf(X_test,X_train[0])

为了编程便利，同时也是书上的建议，定义一个数据结构类，该类包含了SVM所需要的参数以及数据缓存。

In [6]:
class svm_data:
    def __init__(self, X_train, Y_train, C, tol):
        self.X_train = X_train
        self.Y_train = Y_train
        self.C = C
        self.tol = tol
        self.lambdas = np.zeros(len(self.X_train))
        self.b = 0
        self.err = np.zeros((len(self.X_train), 2))    # 误差缓存，首列为有效位
        self.K_mat = np.zeros((len(X_train), len(X_train)))    # 预计算的核矩阵
        for i in range(len(X_train)):
            self.K_mat[:, i] = rbf(self.X_train, self.X_train[i, :])    # X_train自身对自身的核矩阵

接下来定义几个辅助函数。首先是误差计算函数，在上述简化代码中可以看到，第$i$个样本的预测值并没有什么实质意义，只是用于误差```err_i```的计算；然后就是第二个参数$\lambda_{b}$的选取函数，需要计算与```err_a```差距最大的那一个。

In [7]:
def compute_kth_err(DS, k):
    '''
    计算err_k
    DS: costumize data_structure
    k: k_th lambda
    '''
    y_pred = np.dot((DS.lambdas*DS.Y_train).T, DS.K_mat[k, :])+DS.b
    return y_pred-DS.Y_train[k]


def update_kth_err(DS, k):
    err_k = compute_kth_err(DS, k)
    DS.err[k] = [1, err_k]


def select_2nd_lambda(i, DS, err_i):
    '''
    i: 第一个lambda的下标
    DS: 
    err_i: 第一个lambda对应的误差
    '''
    max_delte_err = 0
    err_j = 0
    j = -1    # 默认j为最后一个

    DS.err[i] = [1, err_i]
    valid_err_idxs = np.nonzero(DS.err[:, 0])[0]    # 标记位非零的idxs

    if len(valid_err_idxs) > 1:    # 如果已有误差缓存
        for idx in valid_err_idxs:
            if idx == i:
                continue
            else:
                err_k = compute_kth_err(DS, idx)
                delta_err = abs(err_i-err_k)
                if delta_err > max_delte_err:
                    j = idx
                    err_j = err_k
                    max_delte_err = delta_err
        return j, err_j

    else:    # 如果没有误差缓存，则只能随机选第二个lambda
        j = i
        n_samples = len(DS.X_train)
        while j == i:
            j = np.random.randint(n_samples)
        return j, compute_kth_err(DS, j)

下面是一个成对优化$\lambda$的函数，给定第一个参数$\lambda_{a}$，它会自动选取第二个参数$\lambda_{b}$并同时优化它们。

In [8]:
def optimize_lambda(i, DS):
    '''
    成对优化lambda的函数，返回值为表示是否优化的布尔值
    '''
    err_i = compute_kth_err(DS, i)
    if (DS.Y_train[i]*err_i < -DS.tol and DS.lambdas[i] < DS.C) or (DS.Y_train[i]*err_i > DS.tol and DS.lambdas[i] > 0):
        j, err_j = select_2nd_lambda(i, DS, err_i)    # 以优化方式选取第二个lambda
        lambda_i_pre, lambda_j_pre = DS.lambdas[i].copy(
        ), DS.lambdas[j].copy()    # 保存两lambda的旧值

        # 计算上下界
        if DS.Y_train[i] != DS.Y_train[j]:
            L = max(0, DS.lambdas[j]-DS.lambdas[i])
            H = min(DS.C, DS.C+DS.lambdas[j]-DS.lambdas[i])
        else:
            L = max(0, DS.lambdas[j]+DS.lambdas[i]-DS.C)
            H = min(DS.C, DS.lambdas[j]+DS.lambdas[i])
        if L == H:
            return 0

        eta = 2*DS.K_mat[i,j]-DS.K_mat[i,i]-DS.K_mat[j,j]
        if eta >= 0:
            return 0

        # 优化lambda_j
        DS.lambdas[j] -= DS.Y_train[j]*(err_i-err_j)/eta
        DS.lambdas[j] = clip_lambda(DS.lambdas[j], L, H)
        update_kth_err(DS, j)    # 更新缓存
        delta_lambda_j = DS.lambdas[j]-lambda_j_pre
        if abs(delta_lambda_j) < 1e-5:    # 更新量太小则放弃
            return 0

        # 优化lambda_i
        DS.lambdas[i] -= DS.Y_train[i]*DS.Y_train[j]*delta_lambda_j
        update_kth_err(DS, i)    # 更新缓存
        delta_lambda_i = DS.lambdas[i]-lambda_i_pre

        # 偏移量
        b_i = DS.b-err_i-DS.Y_train[i]*delta_lambda_i*DS.K_mat[i,i]-DS.Y_train[j]*delta_lambda_j*DS.K_mat[i,j]
        b_j = DS.b-err_j-DS.Y_train[i]*delta_lambda_i*DS.K_mat[i,j]-DS.Y_train[j]*delta_lambda_j*DS.K_mat[j,j]
        if 0 < DS.lambdas[i] < DS.C:
            DS.b = b_i
        elif 0 < DS.lambdas[j] < DS.C:
            DS.b = b_j
        else:
            DS.b = (b_i+b_j)/2
        
        return 1
    return 0

In [9]:
def SMO(X_train,Y_train,C,tol,max_iter):
    n_samples,n_feature=X_train.shape
    DS=svm_data(X_train,Y_train,C,tol)
    iter_cnt=0
    full_set_flag=True    # 遍历所有lambda的标记位，第一次循环或找不到非边界样本时设为True
    lambda_changed_cnt=0
    
    while iter_cnt<max_iter and (lambda_changed_cnt>0 or full_set_flag):
        lambda_changed_cnt=0
        if full_set_flag:
            for i in range(n_samples):    # 遍历选取第一个lambda
                tmp=optimize_lambda(i,DS)
                lambda_changed_cnt+=tmp
            iter_cnt+=1
        else:    # 存在非边界样本时
            non_bound_idxs=np.nonzero((DS.lambdas>0)*(DS.lambdas<C))[0]    # 非边界样本，lambda大于0小于C
            for idx in non_bound_idxs:    # 在非边界样本中选取第一个lambda
                tmp=optimize_lambda(idx,DS)
                lambda_changed_cnt+=tmp
            iter_cnt+=1
        
        if full_set_flag:    # 如果上轮以遍历方式选取第一个lambda，则下次不使用遍历方式
            full_set_flag=False    
        elif lambda_changed_cnt==0:    # 如果一轮迭代没有做出优化，则下次尝试使用遍历方式
            full_set_flag=True
            
    return DS.lambdas,DS.b

lambdas,b=SMO(X_train,Y_train, 0.6, 0.001, 50)

训练完毕的SVM预测函数为：
$$
\begin{aligned}
\hat{y}&=
\left\{
\begin{aligned}
&+1,  &\sum_{i=1}^{m}\lambda_{i}^{*}y^{i}\langle\hat{x},x^{i}\rangle\ge+1\\
&-1,  &\sum_{i=1}^{m}\lambda_{i}^{*}y^{i}\langle\hat{x},x^{i}\rangle\le+1\\
\end{aligned}
\right.
\end{aligned}
$$

In [10]:
def predict(X_test,lambdas,X_train,Y_train):
    Y_pred=0
    for i in range(len(X_train)):
        Y_pred+=lambdas[i]*Y_train[i]*rbf(X_test,X_train[i,:])
    Y_pred+=b
    Y_pred[Y_pred<0]=-1
    Y_pred[Y_pred>0]=1
    return Y_pred

Y_pred=predict(X_test,lambdas,X_train,Y_train)
print('acc:{}'.format(np.sum(Y_pred==Y_test)/len(Y_test)))

acc:0.7631578947368421


In [11]:
from sklearn.svm import SVC
svc=SVC()
svc.fit(X_train,Y_train)
Y_pred=svc.predict(X_test)
print('sklearn acc:{}'.format(np.sum(Y_pred==Y_test)/len(Y_test)))

sklearn acc:0.7017543859649122


