In [2]:
import numpy as np
import idx2numpy
import matplotlib.pyplot as plt
from tqdm import tqdm

In [None]:

class AdamOptimizer:
    def __init__(self, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8):
        # 初始化Adam优化器的参数
        self.learning_rate = learning_rate  # 设置学习率
        self.beta1 = beta1  # 设置一阶矩估计（momentum）的衰减率
        self.beta2 = beta2  # 设置二阶矩估计（RMSprop）的衰减率
        self.epsilon = epsilon  # 防止除以零的小常数
        self.t = 0  # 初始化时间步，用于偏差校正

    def update_parameters(self, nn, gradients_w, gradients_b):
        # 更新神经网络的参数
        if not hasattr(self, 'm_w'):
            # 如果moment向量还没有初始化，则进行初始化
            self.m_w = [np.zeros_like(w) for w in nn.weights]  # 初始化权重的一阶矩估计
            self.v_w = [np.zeros_like(w) for w in nn.weights]  # 初始化权重的二阶矩估计
            self.m_b = [np.zeros_like(b) for b in nn.biases]   # 初始化偏置的一阶矩估计
            self.v_b = [np.zeros_like(b) for b in nn.biases]   # 初始化偏置的二阶矩估计

        self.t += 1  # 增加时间步
        correction1 = 1 - self.beta1 ** self.t  # 计算一阶矩估计的偏差校正因子
        correction2 = 1 - self.beta2 ** self.t  # 计算二阶矩估计的偏差校正因子

        for i in range(len(nn.weights)):
            # 遍历神经网络的每一层
            self.m_w[i] *= self.beta1  # 对权重的一阶矩估计应用指数衰减
            self.m_w[i] += (1 - self.beta1) * gradients_w[i]  # 更新权重的一阶矩估计
            self.v_w[i] *= self.beta2  # 对权重的二阶矩估计应用指数衰减
            self.v_w[i] += (1 - self.beta2) * np.square(gradients_w[i])  # 更新权重的二阶矩估计

            m_w_corrected = self.m_w[i] / correction1  # 应用偏差校正到权重的一阶矩估计
            v_w_corrected = self.v_w[i] / correction2  # 应用偏差校正到权重的二阶矩估计

            nn.weights[i] -= self.learning_rate * m_w_corrected / (np.sqrt(v_w_corrected) + self.epsilon)  # 更新权重

            self.m_b[i] *= self.beta1  # 对偏置的一阶矩估计应用指数衰减
            self.m_b[i] += (1 - self.beta1) * gradients_b[i]  # 更新偏置的一阶矩估计
            self.v_b[i] *= self.beta2  # 对偏置的二阶矩估计应用指数衰减
            self.v_b[i] += (1 - self.beta2) * np.square(gradients_b[i])  # 更新偏置的二阶矩估计

            m_b_corrected = self.m_b[i] / correction1  # 应用偏差校正到偏置的一阶矩估计
            v_b_corrected = self.v_b[i] / correction2  # 应用偏差校正到偏置的二阶矩估计

            nn.biases[i] -= self.learning_rate * m_b_corrected / (np.sqrt(v_b_corrected) + self.epsilon)  # 更新偏置


In [None]:
class SimpleNeuralNetwork:
    def __init__(self, layer_sizes, learning_rate, batch_size):
        # 神经网络的构造函数
        #np.random.seed(1)  # 设置随机种子以确保结果的可重复性
        # 初始化网络的权重，使用标准正态分布随机生成
        self.weights = [np.random.randn(layer_sizes[i], layer_sizes[i + 1]) for i in range(len(layer_sizes) - 1)]
        # 初始化网络的偏置，全部设置为零
        self.biases = [np.zeros((1, layer_sizes[i + 1])) for i in range(len(layer_sizes) - 1)]
        # 创建一个 Adam 优化器实例用于训练
        self.optimizer = AdamOptimizer(learning_rate=learning_rate)
        self.batch_size = batch_size

    def sigmoid(self, x):
        # Sigmoid 激活函数
        return 1 / (1 + np.exp(-x))

    def sigmoid_derivative(self, x):
        # Sigmoid 函数的导数
        return x * (1 - x)

    def softmax(self, x):
        # Softmax 函数，用于多分类问题的输出层
        exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))  # 防止数值溢出
        return exp_x / np.sum(exp_x, axis=1, keepdims=True)

    def fp(self, input):
        # 神经网络的前向传播
        activations = [input]  # 存储每层的激活值
        # 遍历除最后一层外的每一层
        for w, b in zip(self.weights[:-1], self.biases[:-1]):
            # 计算下一层的激活值
            activations.append(self.sigmoid(np.dot(activations[-1], w) + b))
        # 计算最后一层的激活值，使用 softmax
        activations.append(self.softmax(np.dot(activations[-1], self.weights[-1]) + self.biases[-1]))
        return activations  # 返回所有层的激活值


    def bp(self, x, y):
        # 神经网络的反向传播
        activations = self.fp(x)  # 获取前向传播的结果
        dL_dy = activations[-1] - y  # 计算输出层的误差的偏导数
        gradients_w = []  # 存储权重的梯度
        gradients_b = []  # 存储偏置的梯度

        # 从输出层到输入层反向遍历每一层
        for i in range(len(self.weights) - 1, -1, -1):
            # 计算激活值对加权输入的偏导数
            da_dz = self.sigmoid_derivative(activations[i+1])
            # 应用链式法则更新偏导数
            if i != len(self.weights) - 1:
                dL_dz = dL_dy * da_dz
            else:
                dL_dz = dL_dy  # 对于输出层，偏导数就是dL_dy
            grad_w = activations[i].T.dot(dL_dz)  # 计算权重的梯度
            grad_b = np.sum(dL_dz, axis=0, keepdims=True)  # 计算偏置的梯度
            gradients_w.append(grad_w)  # 添加权重梯度
            gradients_b.append(grad_b)  # 添加偏置梯度
            # 更新偏导数传播到前一层
            if i != 0:
                dL_dy = dL_dz.dot(self.weights[i].T)

        gradients_w.reverse()  # 反转梯度列表
        gradients_b.reverse()  # 反转梯度列表

        # 计算分类正确性
        predictions = self.predict(x)
        predicted_classes = np.argmax(predictions, axis=1)
        true_classes = np.argmax(y, axis=1)
        samples_tf_bp = predicted_classes != true_classes  # 标记分类正确与错误的样本

        return gradients_w, gradients_b, samples_tf_bp  # 返回梯度和分类正确与错误的样本信息


    # def train(self, x, y, update_weights=True):
    #     # 训练神经网络
    #     losses = []  # 用于存储每批次的损失
    #     num_batches = x.shape[0] // self.batch_size
    #     samples_ft = np.ones(x.shape[0], dtype=bool)  # 初始化为所有样本都分类正确

    #     with tqdm(total=num_batches, desc="Training", unit="batch") as pbar:
    #         for i in range(0, x.shape[0], self.batch_size):
    #             x_batch = x[i:i + self.batch_size]  # 获取一批训练数据
    #             y_batch = y[i:i + self.batch_size]  # 获取一批训练标签
    #             gradients_w, gradients_b, batch_errors = self.bp(x_batch, y_batch)  # 执行反向传播

    #             if update_weights:
    #                 self.optimizer.update_parameters(self, gradients_w, gradients_b)  # 更新权重和偏置

    #             samples_ft[i:i + self.batch_size] = ~batch_errors  # 更新该批次的样本分类正确性

    #             y_pred = self.predict(x_batch)  # 进行预测
    #             loss = self.cross_entropy_loss(y_pred, y_batch)  # 计算损失
    #             losses.append(loss)  # 添加损失

    #             pbar.update(1)
    #             pbar.set_postfix(loss=np.mean(losses))  # 显示当前批次的损失

    #     return samples_ft  # 返回分类正确的样本信息
    
    def train(self, x, y, epochs, update_weights=True):
        # 训练神经网络
        num_batches = x.shape[0] // self.batch_size

        for epoch in range(epochs):
            print(f"Epoch {epoch + 1}/{epochs}")
            losses = []  # 用于存储每批次的损失
            samples_ft = np.ones(x.shape[0], dtype=bool)  # 初始化为所有样本都分类正确

            with tqdm(total=num_batches, desc="Training", unit="batch") as pbar:
                for i in range(0, x.shape[0], self.batch_size):
                    x_batch = x[i:i + self.batch_size]  # 获取一批训练数据
                    y_batch = y[i:i + self.batch_size]  # 获取一批训练标签
                    gradients_w, gradients_b, batch_errors = self.bp(x_batch, y_batch)  # 执行反向传播

                    if update_weights:
                        self.optimizer.update_parameters(self, gradients_w, gradients_b)  # 更新权重和偏置

                    samples_ft[i:i + self.batch_size] = ~batch_errors  # 更新该批次的样本分类正确性

                    y_pred = self.predict(x_batch)  # 进行预测
                    loss = self.cross_entropy_loss(y_pred, y_batch)  # 计算损失
                    losses.append(loss)  # 添加损失

                    pbar.update(1)
                    pbar.set_postfix(loss=np.mean(losses))  # 显示当前批次的平均损失

            # 可以在此处添加代码来评估在验证集上的表现，如果有的话

        return samples_ft  # 返回分类正确的样本信息


    def cross_entropy_loss(self, y_pred, y_true):
        # 交叉熵损失函数
        m = y_true.shape[0]  # 获取样本数量
        loss = -np.sum(y_true * np.log(y_pred + 1e-9)) / m  # 计算交叉熵损失
        return loss

    def predict(self, input):
        # 预测函数
        return self.fp(input)[-1]  # 返回最后一层的激活值，即预测结果

    def evaluate_accuracy(self, x, y):
        # 评估模型准确率
        predictions = self.predict(x)  # 获取预测结果
        predicted_classes = np.argmax(predictions, axis=1)  # 获取预测的类别
        true_classes = np.argmax(y, axis=1)  # 获取真实的类别
        return np.mean(predicted_classes == true_classes)  # 计算准确率
    


    
    


In [None]:
def load_mnist():
    # 加载 MNIST 数据集
    train_images = idx2numpy.convert_from_file(r"..\train-images.idx3-ubyte")
    train_labels = idx2numpy.convert_from_file(r"..\train-labels.idx1-ubyte")
    test_images = idx2numpy.convert_from_file(r"..\t10k-images.idx3-ubyte")
    test_labels = idx2numpy.convert_from_file(r"..\t10k-labels.idx1-ubyte")

    # 将图像数据转换为一维数组并归一化
    train_images = train_images.reshape((train_images.shape[0], -1)).astype('float32') / 255
    test_images = test_images.reshape((test_images.shape[0], -1)).astype('float32') / 255

    # 将标签转换为 one-hot 编码
    train_labels = one_hot_encode(train_labels)
    test_labels = one_hot_encode(test_labels)

    return train_images, train_labels, test_images, test_labels


def one_hot_encode(labels, num_classes=10):
    # 将类别标签转换为 one-hot 编码格式
    one_hot_labels = np.zeros((labels.shape[0], num_classes))
    one_hot_labels[np.arange(labels.shape[0]), labels] = 1
    return one_hot_labels


def weighted_resampling(data, labels, probabilities, num_samples=None):
    if num_samples is None:
        num_samples = len(data)

    # 根据概率进行加权随机抽样
    sample_indices = np.random.choice(np.arange(len(data)), size=num_samples, p=probabilities)

    # 根据抽取的索引创建重采样后的数据集
    resampled_data = data[sample_indices]
    resampled_labels = labels[sample_indices]

    return resampled_data, resampled_labels

def train_nn(train_images, train_labels, weights):
    train_accuracies = []  # 用于存储每轮训练的准确率
    test_accuracies = []   # 用于存储每轮测试的准确率
    classifiers = []
    num_correct = []
    num_incorrect = []
    epsilon = []
    alpha = []

    resampled_train_images, resampled_train_labels = weighted_resampling(train_images, train_labels, weights)
    samples_ft = nn.train(resampled_train_images, resampled_train_labels, epochs=epoch_num)
    classifiers.append(nn)

    num_correct.append(np.count_nonzero(samples_ft))
    num_incorrect.append(len(samples_ft) - num_correct[-1])
    print('错误样本个数: ', num_incorrect[-1])

    train_accuracy = nn.evaluate_accuracy(resampled_train_images, resampled_train_labels)  # 计算训练准确率
    test_accuracy = nn.evaluate_accuracy(test_images, test_labels)  # 计算测试准确率
    train_accuracies.append(train_accuracy)  # 记录训练准确率
    test_accuracies.append(test_accuracy)  # 记录测试准确率
        
    epsilon.append(1 - np.sum(weights[samples_ft]))
    # print(f'当前错误率: {epsilon[-1]:.4f}')
        
    alpha.append(0.5 * np.log((2.5 - epsilon[-1]) / epsilon[-1]))
    samples_ft = np.where(samples_ft, 1, -1)
    
    for i in range(num_samples):
        weights[i] = weights[i]*np.exp(-alpha[-1]*samples_ft[i])
    weights = weights / np.sum(weights)
    # print(f'当前分类器输出的权重: {weights}')

    return weights, train_accuracies, test_accuracies, classifiers, alpha

In [None]:
# 加载 MNIST 数据集
train_images, train_labels, test_images, test_labels = load_mnist()

# 显示总样本数，训练集和测试集的大小
total_samples = train_images.shape[0] + test_images.shape[0]
train_size = train_images.shape[0]
test_size = test_images.shape[0]

print(f"总样本数: {total_samples}", f"训练集大小: {train_size}", f"测试集大小: {test_size}")


num_samples = len(train_images)
weights_0 = np.ones(num_samples) / num_samples  # 初始化样本权重

# 使用函数进行重采样

epoch_num = 5
learning_rate = 0.05
batch_size = 60000
layer_sizes = [784,15,10]

print(f"学习率: {learning_rate}, batch_size: {batch_size}")
# 创建神经网络实例



# 显示隐藏层的层数以及每层神经元的个数
num_hidden_layers = len(layer_sizes) - 2  # 输入层和输出层之外的层数
print(f"隐藏层的层数: {num_hidden_layers}")
for i, size in enumerate(layer_sizes[1:-1], start=1):
    print(f"隐藏层 {i} 的神经元个数: {size}")

In [None]:

nn = SimpleNeuralNetwork(layer_sizes, learning_rate=learning_rate, batch_size= batch_size)

num_classifier = 10  # 分类器的个数
weights =  np.ones(num_samples) / num_samples

train_accuracy = []
test_accuracy = []
final_accuracy_test_list = []
final_accuracy_train_list = []
combined_output_test = np.zeros((len(test_images), 10))
combined_output_train = np.zeros((len(train_images), 10))
alpha_test = []
for i in range(num_classifier):
    print(f'-------------------第{i+1}个分类器-------------------')
    weights, train_acc, test_acc, classifiers, alpha = train_nn(train_images, train_labels, weights)
    train_accuracy.append(train_acc)
    test_accuracy.append(test_acc)
    alpha_test.append(alpha)

    #在测试集上进行测试
    predictions_test = classifiers[-1].predict(test_images)  # 获取预测结果
    combined_output_test += alpha * predictions_test   # 计算组合输出
    final_predictions_test = np.argmax(combined_output_test, axis=1)  # 获取组合预测的类别
    true_label_test = np.argmax(test_labels, axis=1)    # 获取真实的类别
    final_accuracy_test = np.mean(final_predictions_test == true_label_test) # 计算准确率
    final_accuracy_test_list.append(final_accuracy_test)  # 记录训练准确率


    #在训练集上进行测试
    predictions_train = classifiers[-1].predict(train_images)  # 获取预测结果
    combined_output_train += alpha * predictions_train   # 计算组合输出
    final_predictions_train = np.argmax(combined_output_train, axis=1)  # 获取组合预测的类别
    true_label_train = np.argmax(train_labels, axis=1)    # 获取真实的类别
    final_accuracy_train = np.mean(final_predictions_train == true_label_train) # 计算准确率
    final_accuracy_train_list.append(final_accuracy_train)  # 记录训练准确率


    print('------------------------------------------------')
    print(f'训练集准确率: {train_accuracy[-1][-1]:.4f}')
    print(f'测试集准确率: {test_accuracy[-1][-1]:.4f}')
    print(f'Adaboost算法在训练集上的准确率: {final_accuracy_train:.4f}')
    print(f'Adaboost算法在测试集上的准确率: {final_accuracy_test:.4f}')
    print(alpha)
print(alpha_test)
plt.figure(figsize=(10, 6))
plt.plot(range(1, num_classifier + 1), train_accuracy, label='Train Accuracy', marker='o')
plt.plot(range(1, num_classifier + 1), test_accuracy, label='Test Accuracy', marker='o')
plt.plot(range(1, num_classifier + 1), final_accuracy_test_list, label='Adaboost Accuracy in Test', marker='o')
plt.plot(range(1, num_classifier + 1), final_accuracy_train_list, label='Adaboost Accuracy in Train', marker='o')
plt.xlabel('Number of Classifiers')
plt.ylabel('Accuracy')
plt.title('Accuracy vs Number of Classifiers')
plt.legend()
plt.grid(False)
plt.show()



In [3]:
        train_images = idx2numpy.convert_from_file(r"..\train-images.idx3-ubyte")
        train_labels = idx2numpy.convert_from_file(r"..\train-labels.idx1-ubyte")
        test_images = idx2numpy.convert_from_file(r"..\t10k-images.idx3-ubyte")
        test_labels = idx2numpy.convert_from_file(r"..\t10k-labels.idx1-ubyte")

In [7]:
train_labels.shape


(60000,)

In [13]:
import numpy as np
import math
for i in range(2,11):
    print(i,np.exp(math.log(i-1,2)))

2 1.0
3 2.718281828459045
4 4.879108412035346
5 7.38905609893065
6 10.195312898128853
7 13.262791735517345
8 16.56604174441005
9 20.085536923187668
10 23.805698896394073
