In [60]:
import numpy as np
import time as t

# generate data

In [61]:
# In the real world, you cannot learn how the data was generated. So do not rely on this function when coding your lab.
def generate_data(dim, num):
    x = np.random.normal(0, 10, [num, dim])
    coef = np.random.uniform(-1, 1, [dim, 1])
    pred = np.dot(x, coef)
    pred_n = (pred - np.mean(pred)) / np.sqrt(np.var(pred))
    label = np.sign(pred_n)
    mislabel_value = np.random.uniform(0, 1, num)
    mislabel = 0
    for i in range(num):
        if np.abs(pred_n[i]) < 1 and mislabel_value[i] > 0.9 + 0.1 * np.abs(pred_n[i]):
            label[i] *= -1
            mislabel += 1
    return x, label, mislabel/num

In [62]:
# example
x, y, mr = generate_data(5, 100)

In [63]:
x[:5], y[:5]

(array([[  2.13760666,   1.87454384,  -6.95997602,  13.97529007,
          -0.51487694],
        [-10.91257963,  -1.36220876,  -6.61320399,  -5.82544723,
          15.90106451],
        [ -4.00000957,  -5.27710355,   1.26506736,   2.39644677,
          11.46462714],
        [  6.66319978, -11.50620263, -11.80816857,  -5.85170807,
           4.65032036],
        [ 16.73316604,   5.39077602,   2.87937728,  10.96184704,
           8.17381161]]),
 array([[ 1.],
        [-1.],
        [ 1.],
        [-1.],
        [ 1.]]))

# write your model class

In [64]:
# Modifying the overall structure is acceptable but not recommended
class SVM1:
    def __init__(self, dim, C=1.0):
        """
        parameter:
        - dim: 数据维度
        - C: lagrange乘子的上界
        """
        self.dim = dim
        self.C = C
        self.alpha = None # lagrange乘子
        self.w = None # 分类超平面的法向量
        self.b = 0
    
    def p(self, x):
        """
        计算预测值
        """
        return np.sign(self.w.dot(x) + self.b)

    def fit(self, X, y, max_iter = 1000):
        """
        Train SVM by SMO algorithm. 

        parameter:
        - X: 训练数据
        - y: 训练标签
        - max_iter: 最大迭代次数
        """
        m, n = X.shape
        self.alpha = np.zeros(m)
        self.w = np.zeros(n)
        self.b = 0

        for _ in range(max_iter):
            for i in range(m):
                # 计算预测值和误差
                g_i = self.p(X[i])
                E_i = g_i - y[i]

                # KKT条件
                if (y[i] * g_i - 1 < -1e-3 and self.alpha[i] < self.C) or \
                   (y[i] * g_i - 1 > 1e-3 and self.alpha[i] > 0):

                    # 选择第二个变量 j
                    j = self.select_second_variable(i, m)
                    g_j = self.p(X[j])
                    E_j = g_j - y[j]

                    # 保存旧的alpha
                    alpha_i_old, alpha_j_old = self.alpha[i], self.alpha[j]

                    # 计算上下界L和H
                    L, H = self.compute_L_H(y[i], y[j], alpha_i_old, alpha_j_old)

                    # 如果L和H相等，则不进行更新
                    if L == H:
                        continue

                    # 计算未经剪辑的新alpha_j
                    eta = 2.0 * X[i].dot(X[j]) - X[i].dot(X[i]) - X[j].dot(X[j])
                    if eta >= 0:
                        continue

                    # 计算新的未经剪辑的alpha_j
                    self.alpha[j] = alpha_j_old - (y[j] * (E_i - E_j)) / eta

                    # 剪辑alpha_j
                    self.alpha[j] = np.clip(self.alpha[j], L, H)

                    # 更新alpha_i
                    self.alpha[i] = alpha_i_old + y[i] * y[j] * (alpha_j_old - self.alpha[j])

                    # 更新阈值b
                    b_i = self.b - E_i - y[i] * (self.alpha[i] - alpha_i_old) * X[i].dot(X[i]) \
                          - y[j] * (self.alpha[j] - alpha_j_old) * X[i].dot(X[j])
                    b_j = self.b - E_j - y[i] * (self.alpha[i] - alpha_i_old) * X[i].dot(X[j]) \
                          - y[j] * (self.alpha[j] - alpha_j_old) * X[j].dot(X[j])
                    if 0 < self.alpha[i] < self.C:
                        self.b = b_i
                    elif 0 < self.alpha[j] < self.C:
                        self.b = b_j
                    else:
                        self.b = (b_i + b_j) / 2.0
            # 更新w
            self.w = np.zeros(n)
            for i in range(m):
                self.w += self.alpha[i] * y[i] * X[i]

        
    def predict(self, X):
        """
        Generate prediction probabilities on a new
        collection of data points by your model.
        """
        # 计算预测值
        pred = []
        for x in X:
            pred.append(self.p(x))
        return np.array(pred)

    
    def kernel(self, X1, X2):
        """
        Compute the kernel between two matrices.
        """
        return np.dot(X1, X2.T)

    def select_second_variable(self, i, m):
        """
        Select the second variable alpha_j.
        """
        j = i
        while j == i:
            j = np.random.randint(0, m)
        return j

    def compute_L_H(self, y_i, y_j, alpha_i_old, alpha_j_old):
        """
        Compute the lower and upper bounds for alpha_j.
        """
        if y_i != y_j:
            L = max(0, alpha_j_old - alpha_i_old)
            H = min(self.C, self.C + alpha_j_old - alpha_i_old)
        else:
            L = max(0, alpha_i_old + alpha_j_old - self.C)
            H = min(self.C, alpha_i_old + alpha_j_old)
        return L, H


In [69]:
from tqdm import tqdm
class SVM2:
    def __init__(self, dim, lr=0.01, gamma=0.1, max_iter=1000, tol=1e-3, batch_size=16, epochs=10):
        """
        parameter:
        - dim: 数据维度
        - lr: 学习率
        - gamma: 正则化系数
        - max_iter: 最大迭代次数
        - tol: 容忍度
        - batch_size: 每个batch的大小
        - epochs: 训练的总轮数
        """
        self.lr = lr
        self.gamma = gamma
        self.max_iter = max_iter
        self.tol = tol
        self.w = None # 分类超平面的法向量
        self.b = 0
        self.batch_size = batch_size
        self.epochs = epochs

    def p(self, x):
        """
        计算预测值
        """
        return np.sign(self.w.dot(x) + self.b)

    def fit(self, X, y):
        """
        Train the SVM by gradient descent
        """
        m, n = X.shape
        self.w = np.zeros(n)
        loss = []

        for epoch in range(self.epochs):
            for batch_start in tqdm(range(0, m, self.batch_size)):
                batch_end = min(batch_start + self.batch_size, m)
                X_batch, y_batch = X[batch_start:batch_end], y[batch_start:batch_end]

                for _ in range(self.max_iter):
                    gradient_w, gradient_b, l = self.grad(X_batch, y_batch)

                    # 更新参数
                    self.w -= self.lr * gradient_w
                    self.b -= self.lr * gradient_b

                    loss.append(l)

                    # 检查是否收敛
                    if np.linalg.norm(gradient_w) < self.tol:
                        break

        return loss

    def grad(self, X, y):
        loss = 0
        grad_w = 0
        grad_b = 0
        for i in range(X.shape[0]):
            z = y[i] * (self.p(X[i])) - 1
            if 1 - z > 0:
                loss += 1 - z
                grad_w += -y[i] * X[i]
                grad_b += -y[i]
            
        loss += self.gamma * np.linalg.norm(self.w, ord=2) * np.linalg.norm(self.w, ord=2)
        grad_w += self.gamma * 2 * self.w / X.shape[0]

        return grad_w, grad_b, loss

    def predict(self, X):
        """
        A same predict function with SVM1 is acceptable.
        """
        # 计算预测值
        pred = []
        for x in X:
            pred.append(self.p(x))
        return np.array(pred)

# construct and train your models

In [79]:
# generate data
X_data, y_data, mislabel = generate_data(20, 10000) 

# split data
X_train = X_data[:8000]
y_train = y_data[:8000]
X_test = X_data[8000:]
y_test = y_data[8000:]

print("Mislabel rate: ", mislabel)


Mislabel rate:  0.0351


In [73]:
# constrcut model and train (remember to record your time consumption)
model1 = SVM1(20,10)
start = t.time()
model1.fit(X_train, y_train)
end = t.time()
print("SVM1 time:", end - start)

SVM1 time: 120.04548382759094


In [74]:
model2 = SVM2(20,0.001,0.01,200,1e-3,16,10)
start = t.time()
loss = model2.fit(X_train, y_train)
end = t.time()
print("SVM2 time:", end - start)

100%|██████████| 500/500 [00:23<00:00, 21.62it/s]
100%|██████████| 500/500 [00:23<00:00, 21.52it/s]
100%|██████████| 500/500 [00:23<00:00, 21.69it/s]
100%|██████████| 500/500 [00:23<00:00, 21.33it/s]
100%|██████████| 500/500 [00:23<00:00, 21.44it/s]
100%|██████████| 500/500 [00:23<00:00, 21.24it/s]
100%|██████████| 500/500 [00:23<00:00, 21.31it/s]
100%|██████████| 500/500 [00:23<00:00, 21.18it/s]
100%|██████████| 500/500 [00:23<00:00, 21.38it/s]
100%|██████████| 500/500 [00:23<00:00, 21.28it/s]

SVM2 time: 233.70003581047058





# predict and compare your results

In [76]:
import sklearn.svm as svm
model3 = svm.SVC(kernel='linear')
start = t.time()
model3.fit(X_train, y_train)
end = t.time()
print("SVM3 time:", end - start)

  y = column_or_1d(y, warn=True)


SVM3 time: 19.82708239555359


In [78]:
# make prediction
pred_1 = model1.predict(X_test)
pred_2 = model2.predict(X_test)
pred_3 = model3.predict(X_test)
acc_1 = 0
acc_2 = 0
acc_3 = 0

# compare with generated label
for i in range(y_test.shape[0]):
    if pred_1[i] == y_test[i]:
        acc_1 += 1
    if pred_2[i] == y_test[i]:
        acc_2 += 1
    if pred_3[i] == y_test[i]:
        acc_3 += 1
        
# # compare each method
print("Acc_1:", acc_1 / y_test.shape[0])
print("Acc_2:", acc_2 / y_test.shape[0])
print("Acc_3:", acc_3 / y_test.shape[0])


# (Optional) compare with sklearn

Acc_1: 0.914
Acc_2: 0.941
Acc_3: 0.952
