In [None]:
import numpy as np
import matplotlib.pyplot as plt
from random import shuffle

In [None]:
def load_data(path):
    data = np.loadtxt(path)
    labels = data[:, 0].astype(int)
    features = data[:, 1:]
    # Normalize features
    # features = (features - np.mean(features, axis=0)) / np.std(features, axis=0)
    return features, labels


In [None]:
def visualize_digit(X, y):
    img_indices = [np.argwhere(y == i)[0, 0] for i in range(10)]  
    plt.figure(figsize=(10, 3))
    for i in range(10):
        plt.subplot(2, 5, i + 1)
        img_index = img_indices[i]
        plt.imshow(X[img_index, :].reshape(16, 16), cmap='plasma') 
        plt.axis('off')
    plt.tight_layout()
    plt.show()

In [None]:
def train_test_split(X, y, test_size=0.2, random_state=None):
    n_samples = len(X)
    if random_state is not None:
        rng = np.random.default_rng(random_state)
        indices = rng.permutation(n_samples)
    else:
        indices = np.random.permutation(n_samples)
    
    split_idx = int(n_samples * (1 - test_size))
    train_indices = indices[:split_idx]
    test_indices  = indices[split_idx:]

    X_train, X_test = X[train_indices], X[test_indices]
    y_train, y_test = y[train_indices], y[test_indices]
    return X_train, X_test, y_train, y_test


In [None]:
class KernelPerceptron:
    def __init__(self, K_train, epochs=3):
        self.K_train = K_train  
        self.epochs  = epochs
        self.alpha   = None
        self.y_train = None       

    def fit(self, y_binary):
        n_samples = len(y_binary)
        self.alpha = np.zeros(n_samples)
        self.y_train = y_binary

        for _ in range(self.epochs):
            indices = np.arange(n_samples)
            np.random.shuffle(indices)
            for t in indices:
                score = np.sum(self.alpha * self.K_train[:, t])
                prediction = np.sign(score)
                # print(f"Sample {t}: score={score}, prediction={prediction}, true_label={self.y_train[t]}")
                if prediction != self.y_train[t]:
                    self.alpha[t] += self.y_train[t]
                    # print(f"Updated alpha[{t}]: {self.alpha[t]}")

    def decision_function(self, K_test):
        return K_test.dot(self.alpha * self.y_train)

    def predict(self, K_test):
        return np.sign(self.decision_function(K_test))



In [None]:
def kernel_factory(type, d):
    def polynomial_kernel(x, y):
        return (x@y.T) ** d
    
    if type == "polynomial":
        return polynomial_kernel

In [None]:
def train_ovr(K_train, y, epochs):
    """
    Trains one Kernel Perceptron per unique class.
    Returns a dict: { class_label: trained_perceptron, ... }
    """
    classes = np.unique(y)
    models = {}
    for c in classes:
        # Binary labels: +1 if y == c, else -1
        y_binary = np.where(y == c, 1, -1)
        kp = KernelPerceptron(K_train, epochs=epochs)
        kp.fit(y_binary)
        models[c] = kp

    
    return models

def predict_ovr(K_test, models):
    """
    Returns an array of predictions (class labels) for each sample in X.
    """
    classes = list(models.keys())
    all_scores = []
    for c in classes:
        scores_c = models[c].decision_function(K_test)
        all_scores.append(scores_c)

    all_scores = np.vstack(all_scores) 
    max_indices = np.argmax(all_scores, axis=0)
    predicted_labels = [classes[idx] for idx in max_indices]
    return np.array(predicted_labels)


In [None]:
def classification_error(y_true, y_pred):
    return np.mean(y_true != y_pred)

In [None]:
def run_experiment(data_path, degrees=range(1,8), epochs=30, runs=20, test_size=0.2):
    print("[INFO] Starting experiment...")
    
    X, y = load_data(data_path)
    visualize_digit(X, y)
    
    n_degrees = len(degrees)
    train_errors = np.zeros((n_degrees, runs))
    test_errors  = np.zeros((n_degrees, runs))
    
    for run_id in range(runs):
        
        print(f"[INFO] ---------- Run {run_id+1}/{runs} ----------")
        X_train, X_test, y_train, y_test = train_test_split(X, y, 
                                                            test_size=test_size,
                                                            random_state=run_id)  
        print(f"      Train set size: {len(y_train)}, Test set size: {len(y_test)}")

        for i, d in enumerate(degrees):
            # -------------- Train --------------
            kernel = kernel_factory("polynomial",d)
            print(f"      polynomial kernel degree: {d}")
            K_train = kernel(X_train, X_train)
            
            models = train_ovr(K_train, y_train, epochs=epochs)
            
            y_train_pred = predict_ovr(K_train, models)
            train_err = classification_error(y_train, y_train_pred)

            # -------------- Test --------------
            K_test = kernel(X_test, X_train)
            y_test_pred = predict_ovr(K_test, models)
            test_err = classification_error(y_test, y_test_pred)

            train_errors[i, run_id] = train_err
            test_errors[i, run_id]  = test_err

            print(f"      [d={d}] train_err={train_err:.4f}, test_err={test_err:.4f}")
    
    # 4. Compute mean and std
    mean_train = np.mean(train_errors, axis=1)  # shape: (n_degrees,)
    std_train  = np.std(train_errors, axis=1)
    mean_test  = np.mean(test_errors, axis=1)
    std_test   = np.std(test_errors, axis=1)
    
    # 5. Print results in a 2x7 table format:
    #    Each cell: mean ± std
    print("Degree |   Train Error (mean±std)   |   Test Error (mean±std)")
    print("-------------------------------------------------------------")
    for i, d in enumerate(degrees):
        print(f"  {d}    {mean_train[i]*100:.2f}±{std_train[i]*100:.2f}%    "
              f"{mean_test[i]*100:.2f}±{std_test[i]*100:.2f}%")

    return (mean_train, std_train, mean_test, std_test)


In [None]:
if __name__ == "__main__":
    data_path = "data/zipcombo.dat.txt"  # Adjust if necessary
    run_experiment(data_path, degrees=range(1,8), epochs=6, runs=20, test_size=0.2)


In [None]:
import numpy as np

def polynomial_kernel(xi, xj, degree=2):
    return (xi @ xj.T) ** degree

class KernelPerceptron:
    """
    支持 OVR (One-Versus-Rest) 和 OVO (One-Versus-One) 两种多分类策略的核感知器。
    """
    def __init__(self, 
                 max_iter=30, 
                 min_iter=5, 
                 mode='ovr',
                 kernel_function=polynomial_kernel, 
                 degree=2):
        """
        初始化 KernelPerceptron
        Args:
            max_iter (int): 最大迭代轮数
            min_iter (int): 最少迭代轮数，少于这个轮数时不进行早停判断
            mode (str): 'ovr' 或 'ovo'
            kernel_function (callable): 核函数
            degree (int): 核函数参数（如多项式核的阶数等）
        """
        self.max_iter = max_iter
        self.min_iter = min_iter
        self.mode = mode.lower()
        self.kernel_function = kernel_function
        self.degree = degree

        # 下面这些成员变量在训练过程中赋值
        self.weight_matrix = None       # 用于 OVR
        self.pairwise_weights = dict()  # 用于 OVO, 存储 (i, j)->weights
        self.accuracy_history = []
        self.X_train = None
        self.y_train = None
        self.num_classes = None

    def fit(self, X_data, y_data):
        """
        训练核感知器
        Args:
            X_data (np.ndarray): 训练特征, shape = (num_samples, num_features)
            y_data (np.ndarray): 训练标签, shape = (num_samples,)
        Returns:
            self
        """
        self.X_train = X_data
        self.y_train = y_data
        self.num_classes = len(np.unique(y_data))

        # 预先计算完整核矩阵 K
        kernel_matrix = self.kernel_function(X_data, X_data, self.degree)

        # 根据 mode 调用不同的训练方法
        if self.mode == 'ovr':
            self._fit_ovr(kernel_matrix)
        elif self.mode == 'ovo':
            self._fit_ovo(kernel_matrix)
        else:
            raise ValueError("Mode must be either 'ovr' or 'ovo'.")

        return self

    def _fit_ovr(self, kernel_matrix):
        """
        OVR (One-Versus-Rest) 训练。
        """
        num_samples = self.X_train.shape[0]
        # 初始化权重矩阵： (num_classes, num_samples)
        self.weight_matrix = np.zeros((self.num_classes, num_samples))
        self.accuracy_history = []

        # 开始迭代训练
        for iteration in range(self.max_iter):
            misclassify = 0
            indices = np.arange(num_samples)
            np.random.shuffle(indices)

            for i in indices:
                # 预测时，对每个类别都计算 (weight_matrix[class] @ kernel_matrix[i]) 并取 argmax
                predicted_label = np.argmax(self.weight_matrix @ kernel_matrix[i, :])
                if self.y_train[i] != predicted_label:
                    misclassify += 1
                    # 正确类 +1, 错误类 -1
                    self.weight_matrix[self.y_train[i], i] += 1
                    self.weight_matrix[predicted_label, i] -= 1

            accuracy_iter = (num_samples - misclassify) / num_samples
            self.accuracy_history.append(accuracy_iter)

            # 早停判断
            if iteration >= self.min_iter:
                if (self.accuracy_history[-1] - self.accuracy_history[-2]) < 0.01:
                    break

    def _fit_ovo(self, kernel_matrix):
        """
        OVO (One-Versus-One) 训练。
        对每对类别 (i, j) 各自训练一个二分类感知器并储存在 self.pairwise_weights[(i,j)] 中。
        """
        num_samples = self.X_train.shape[0]
        classes = np.unique(self.y_train)
        
        # 字典 (i, j) -> 权重向量
        # 注意 i < j，保证唯一性
        for i in range(self.num_classes):
            for j in range(i + 1, self.num_classes):
                self.pairwise_weights[(i, j)] = np.zeros(num_samples)

        self.accuracy_history = []

        # 开始迭代
        for iteration in range(self.max_iter):
            misclassify = 0
            indices = np.arange(num_samples)
            np.random.shuffle(indices)

            # 对所有样本做在线更新
            for idx in indices:
                true_label = self.y_train[idx]
                # 遍历可能包含 true_label 的所有 (i, j) 对
                for i in range(self.num_classes):
                    for j in range(i+1, self.num_classes):
                        # 如果该样本不属于这两个类别, 跳过
                        if true_label not in (i, j):
                            continue

                        # binary_label: i -> +1, j -> -1
                        binary_label = 1 if (true_label == i) else -1
                        # 预测结果
                        prediction_value = self.pairwise_weights[(i, j)] @ kernel_matrix[idx, :]

                        predicted_binary_label = 1 if prediction_value >= 0 else -1
                        if predicted_binary_label != binary_label:
                            misclassify += 1
                            # 感知器更新
                            self.pairwise_weights[(i, j)][idx] += binary_label

            # 计算训练集上的准确率(对全体样本进行多数投票)
            y_pred_train = self._predict_full_ovo(kernel_matrix)
            accuracy_iter = np.mean(y_pred_train == self.y_train)
            self.accuracy_history.append(accuracy_iter)

            # 早停判断
            if iteration >= self.min_iter:
                if (self.accuracy_history[-1] - self.accuracy_history[-2]) < 0.01:
                    break

    def _predict_full_ovo(self, kernel_matrix):
        """
        在 OVO 模式下，对训练集中所有样本做一次完整预测，用于计算训练精度。
        Args:
            kernel_matrix (np.ndarray): 训练数据本身的核矩阵 (num_samples, num_samples)
        Returns:
            (np.ndarray): shape = (num_samples, ) 的预测标签
        """
        num_samples = self.X_train.shape[0]
        votes = np.zeros((num_samples, self.num_classes), dtype=int)

        # 对每对 (i, j) 做二分类预测，并投票
        for i in range(self.num_classes):
            for j in range(i + 1, self.num_classes):
                # 计算所有样本的预测分数
                decision_values = self.pairwise_weights[(i, j)] @ kernel_matrix
                # 正数 => i 类获票, 负数 => j 类获票
                for idx in range(num_samples):
                    if decision_values[idx] >= 0:
                        votes[idx, i] += 1
                    else:
                        votes[idx, j] += 1

        # 多数投票
        y_pred = np.argmax(votes, axis=1)
        return y_pred

    def predict(self, X_data):
        """
        对新的样本进行预测。需要用到训练时的 X_train 和对应的 kernel_function 来构建测试的核矩阵。

        Args:
            X_data (np.ndarray): 测试特征, shape = (num_test_samples, num_features)

        Returns:
            np.ndarray: 预测标签, shape = (num_test_samples, )
        """
        if self.X_train is None:
            raise ValueError("请先 fit 后再进行预测。")

        # 计算训练集与测试集之间的核矩阵: (num_samples, num_test_samples)
        test_kernel_mtx = self.kernel_function(self.X_train, X_data, self.degree)

        if self.mode == 'ovr':
            # OVR 预测: 计算 weight_matrix @ test_kernel_mtx 的 argmax
            # shape: (num_classes, num_test_samples) -> 取每列的 argmax
            scores = self.weight_matrix @ test_kernel_mtx
            return np.argmax(scores, axis=0)

        elif self.mode == 'ovo':
            # OVO 预测：对每对 (i, j) 做二分类投票
            num_test = X_data.shape[0]
            votes = np.zeros((num_test, self.num_classes), dtype=int)

            for i in range(self.num_classes):
                for j in range(i + 1, self.num_classes):
                    # 当前 (i, j) 分类器的决策分数
                    decision_values = self.pairwise_weights[(i, j)] @ test_kernel_mtx
                    # 大于等于 0 -> 判为 i, 否则 j
                    for idx in range(num_test):
                        if decision_values[idx] >= 0:
                            votes[idx, i] += 1
                        else:
                            votes[idx, j] += 1

            return np.argmax(votes, axis=1)
        else:
            raise ValueError("Mode must be either 'ovr' or 'ovo'.")


# --------------------- 以下为辅助函数，可根据需要自行修改或拆分 --------------------- #
def split_data_into_train_test(feature_matrix, target_vector, train_ratio=0.8):
    """
    划分数据集为训练集和测试集
    """
    total_samples = feature_matrix.shape[0]
    train_set_size = int(train_ratio * total_samples)
    shuffled_indices = np.random.permutation(total_samples)
    training_indices = shuffled_indices[:train_set_size]
    testing_indices = shuffled_indices[train_set_size:]
    training_features = feature_matrix[training_indices]
    testing_features = feature_matrix[testing_indices]
    training_targets = target_vector[training_indices]
    testing_targets = target_vector[testing_indices]
    return training_features, training_targets, testing_features, testing_targets

def test_error_comp(X_train, X_test, y_test, model):
    """
    计算测试集错误率和(可选)混淆矩阵。此处使用封装好的 model.predict。
    """
    y_pred = model.predict(X_test)
    test_error = np.mean(y_pred != y_test)

    # 构造混淆矩阵
    num_classes = len(np.unique(y_test))
    confusion_mtx = np.zeros((num_classes, num_classes))
    for actual, predicted in zip(y_test, y_pred):
        confusion_mtx[actual, predicted] += 1

    # 行归一化
    row_sums = confusion_mtx.sum(axis=1, keepdims=True)
    confusion_mtx = np.nan_to_num(confusion_mtx / row_sums)

    return test_error, confusion_mtx


# --------------------- 使用示例 --------------------- #
if __name__ == "__main__":
    import matplotlib.pyplot as plt

    # 随机生成数据进行演示
    np.random.seed(0)
    num_samples = 200
    num_features = 5
    num_classes = 3  # 0,1,2

    X = np.random.randn(num_samples, num_features)
    y = np.random.randint(0, num_classes, size=num_samples)

    # 划分训练/测试集
    X_train, y_train, X_test, y_test = split_data_into_train_test(X, y, train_ratio=0.8)

    # ---------- 演示 OVR ----------
    print("=== OVR Training ===")
    model_ovr = KernelPerceptron(mode='ovr', max_iter=30, min_iter=5, degree=2)
    model_ovr.fit(X_train, y_train)
    print("Final training accuracy (OVR):", model_ovr.accuracy_history[-1])

    # 测试集表现
    test_err_ovr, conf_mtx_ovr = test_error_comp(X_train, X_test, y_test, model_ovr)
    print(f"OVR Test Error: {test_err_ovr:.3f}")
    print("OVR Confusion Matrix:\n", conf_mtx_ovr)

    # ---------- 演示 OVO ----------
    print("\n=== OVO Training ===")
    model_ovo = KernelPerceptron(mode='ovo', max_iter=30, min_iter=5, degree=2)
    model_ovo.fit(X_train, y_train)
    print("Final training accuracy (OVO):", model_ovo.accuracy_history[-1])

    # 测试集表现
    test_err_ovo, conf_mtx_ovo = test_error_comp(X_train, X_test, y_test, model_ovo)
    print(f"OVO Test Error: {test_err_ovo:.3f}")
    print("OVO Confusion Matrix:\n", conf_mtx_ovo)
