In [8]:
import scipy
import numpy as np
from scipy.io import loadmat
import math

## 查看数据

In [9]:
def load_data():
    # mat文件是字典文件
    spam_train = loadmat(file_name="data/spamTrain.mat")
    print(spam_train.keys())
    spam_train_x = spam_train["X"]
    spam_train_y = spam_train["y"]
    # 一个数据的长度是1899，也就是说垃圾邮件一共有1899个特征
    spam_train_y = [math.pow(-1, i+1) for i in spam_train_y]
    spam_train_y = np.array(spam_train_y, dtype=int).reshape(-1, 1)
    # print(spam_train_y)
    # 同样的方式对测试集进行处理
    spam_test = loadmat(file_name="data/spamTest.mat")
    print(spam_test.keys())
    spam_test_x = spam_test["Xtest"]
    spam_test_y = spam_test["ytest"]
    spam_test_y = [math.pow(-1, i + 1) for i in spam_test_y]
    spam_test_y = np.array(spam_test_y, dtype=int).reshape(-1, 1)
    for x in spam_train_x:
        print("训练集特征长度:{}".format(len(x)))
        break
    for x in spam_test_x:
        print("测试集特征长度:{}".format(len(x)))
        break
    print("训练集样本数量:{}".format(spam_train_y.shape[0]))
    print("测试集样本数量:{}".format(spam_test_y.shape[0]))
    return spam_train_x, spam_train_y, spam_test_x, spam_test_y
spam_train_x, spam_train_y, spam_test_x, spam_test_y = load_data()

dict_keys(['__header__', '__version__', '__globals__', 'X', 'y'])
dict_keys(['__header__', '__version__', '__globals__', 'Xtest', 'ytest'])
训练集特征长度:1899
测试集特征长度:1899
训练集样本数量:4000
测试集样本数量:1000


## 编写Pegasos算法

In [11]:
# 批量Pegasos算法
# 输入的参数分别是数据，数据标签，C，训练轮数，batch大小
def batchPegasos(x, y, C, T, k):
    lam = 1 / (k * C)
    m, n = np.shape(x)
    w = np.zeros(n)
    dataIndex = np.array([i for i in range(m)])
    for t in range(1, T + 1):
        wDelta = np.zeros(n)  # reset wDelta
        eta = 1.0 / (lam * t)
        np.random.shuffle(dataIndex)
        for j in range(k):  # go over training set
            i = dataIndex[j]
            p = predict(w, x[i, :])  # mapper code
            if y[i][0] * p < 1:  # mapper code
                wDelta += y[i] * x[i, :]  # accumulate changes
        w = (1.0 - 1 / t) * w + (eta / k) * wDelta  # apply changes at each T
    return w

# 预测 wx+b
def predict(w, x):
    return w.T @ x

# 对测试集进行测试
def test(x, y, w):
    predict_y = []
    label_y = y.reshape(-1)
    # print(label_y)
    for x_i, y_i in zip(x, label_y):
        tmp = predict(w, x_i)
        if tmp <= 0:
            predict_y.append(-1)
        else:
            predict_y.append(1)
    predict_y = np.asarray(predict_y)
    # print(np.sum(predict_y == label_y))
    print("正确率为{}/{}".format(np.sum(predict_y == label_y), len(predict_y)))


In [14]:
# 训练
w = batchPegasos(spam_train_x, spam_train_y, 0.1, 100, 100)
w

array([ 0.017,  0.025, -0.001, ..., -0.026, -0.012,  0.01 ])

In [15]:
# 测试
test(spam_test_x, spam_test_y, w)

正确率为974/1000
