In [35]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import pickle

In [36]:
# 激活函数及其导数
def relu(x):
    return np.maximum(0, x)


def relu_derivative(x):
    return np.where(x > 0, 1, 0)


# 多分类
def softmax(x):
    exps = np.exp(x - np.max(x, axis=1, keepdims=True))
    return exps / np.sum(exps, axis=1, keepdims=True)

# 损失函数及其导数
def cross_entropy_loss(y_true, y_pred):
    loss = -np.mean(np.sum(y_true * np.log(y_pred + 1e-15), axis=1))
    return loss


def cross_entropy_loss_derivative(y_true, y_pred):
    return y_pred - y_true


# MLP类定义
class MLPClassifier:
    def __init__(self, input_size, hidden_size1, hidden_size2, output_size, learning_rate=0.01, batch_size=32):
        self.input_size = input_size
        self.hidden_size1 = hidden_size1
        self.hidden_size2 = hidden_size2
        self.output_size = output_size
        self.batch_size = batch_size
        self.learning_rate=learning_rate

        # 初始化权重和偏置
        self.W1 = np.random.randn(input_size, hidden_size1) * 0.01
        self.b1 = np.zeros((1, hidden_size1))
        self.W2 = np.random.randn(hidden_size1, hidden_size2) * 0.01
        self.b2 = np.zeros((1, hidden_size2))
        self.W3 = np.random.randn(hidden_size2, output_size) * 0.01
        self.b3 = np.zeros((1, output_size))

    def forward(self, X):
        # 前向传播
        self.z1 = np.dot(X, self.W1) + self.b1
        self.a1 = relu(self.z1)
        self.z2 = np.dot(self.a1, self.W2) + self.b2
        self.a2 = relu(self.z2)
        self.z3 = np.dot(self.a2, self.W3) + self.b3
        self.a3 = softmax(self.z3)  # 输出层使用Softmax激活函数
        return self.a3

    def backward(self, X, y):
        # 反向传播
        m = X.shape[0]  # 使用输入样本数
        # print(f"m:{m}")

        # 计算输出层的梯度
        d_loss_a3 = cross_entropy_loss_derivative(y, self.a3)

        # 计算第三层的梯度
        d_loss_W3 = np.dot(self.a2.T, d_loss_a3) / m
        d_loss_b3 = np.sum(d_loss_a3, axis=0, keepdims=True) / m

        # 计算第二层的梯度
        d_loss_a2 = np.dot(d_loss_a3, self.W3.T)
        d_loss_z2 = d_loss_a2 * relu_derivative(self.z2)
        d_loss_W2 = np.dot(self.a1.T, d_loss_z2) / m
        d_loss_b2 = np.sum(d_loss_z2, axis=0, keepdims=True) / m

        # 计算第一层的梯度
        d_loss_a1 = np.dot(d_loss_z2, self.W2.T)
        d_loss_z1 = d_loss_a1 * relu_derivative(self.z1)
        d_loss_W1 = np.dot(X.T, d_loss_z1) / m
        d_loss_b1 = np.sum(d_loss_z1, axis=0, keepdims=True) / m

        # 更新权重和偏置
        self.W3 -= self.learning_rate * d_loss_W3
        self.b3 -= self.learning_rate * d_loss_b3
        self.W2 -= self.learning_rate * d_loss_W2
        self.b2 -= self.learning_rate * d_loss_b2
        self.W1 -= self.learning_rate * d_loss_W1
        self.b1 -= self.learning_rate * d_loss_b1

    def train(self, X, y, epochs):
        n_samples = X.shape[0]
        n_batches = n_samples // self.batch_size
        for epoch in range(epochs):
            # 随机打乱数据集
            shuffled_indices = np.random.permutation(n_samples)
            X_shuffled = X[shuffled_indices]
            y_shuffled = y[shuffled_indices]
            batch_losses = []  # 存储每个批次的损失值
            # 小批量梯度下降
            for batch in range(n_batches):
                start = batch * self.batch_size
                end = start + self.batch_size
                X_batch = X_shuffled[start:end]
                y_batch = y_shuffled[start:end]
                output = self.forward(X_batch)
                loss = cross_entropy_loss(y_batch, output)
                batch_losses.append(loss)
                self.backward(X_batch, y_batch)
            # 处理剩余样本
            if n_samples % self.batch_size != 0:
                start = n_batches * self.batch_size
                X_batch = X_shuffled[start:]
                y_batch = y_shuffled[start:]
                output = self.forward(X_batch)
                loss = cross_entropy_loss(y_batch, output)
                batch_losses.append(loss)
                self.backward(X_batch, y_batch, self.learning_rate)
            # 打印每个epoch的平均损失
            epoch_loss = sum(batch_losses) / len(batch_losses)
            print(f'Epoch {epoch+1}/{epochs}, Average Loss: {epoch_loss:.4f}')

    def predict(self, X):
        output = self.forward(X)
        return np.argmax(output, axis=1)

    def accuracy(self, X, y):
        y_predicted=self.forward(X)
        predictions = np.argmax(y_predicted, axis=1)
        labels = np.argmax(y, axis=1)
        return np.mean(predictions == labels)
    
    
    
    def save_weights(self, file_path):
        weights = {
            'W1': self.W1,
            'b1': self.b1,
            'W2': self.W2,
            'b2': self.b2,
            'W3': self.W3,
            'b3': self.b3
        }
        with open(file_path, 'wb') as file:
            pickle.dump(weights, file)

    def load_weights(self, file_path):
        with open(file_path, 'rb') as file:
            weights = pickle.load(file)
            self.W1 = weights['W1']
            self.b1 = weights['b1']
            self.W2 = weights['W2']
            self.b2 = weights['b2']
            self.W3 = weights['W3']
            self.b3 = weights['b3']

In [37]:
def load_mnist_data(train_path):
    # 加载CSV文件
    train_data = pd.read_csv(train_path).values

    # # 打印数据集基本信息
    # print(f"Number of rows: {train_data.shape[0]}, Number of columns: {train_data.shape[1]}")

    # 提取特征和标签
    X_train = train_data[:, 1:]
    y_train = train_data[:, 0]

    # # 打印特征和标签的维度
    # print(f"Shape of X_train: {X_train.shape}")
    # print(f"Shape of y_train: {y_train.shape}")
    # 
    # # 打印部分数据以检查
    # print("First 5 rows of X_train:")
    # print(X_train[:5])
    # print("First 5 labels of y_train:")
    # print(y_train[:5])

    # 归一化像素值到 [0, 1]
    X_train = X_train / 255.0

    # 将标签转换为独热编码
    y_train = np.eye(10)[y_train.astype(int)]

    # # 打印转换后的独热编码标签
    # print("First 5 rows of one-hot encoded y_train:")
    # print(y_train[:5])

    return X_train, y_train

In [38]:
X_train,y_train=load_mnist_data('./data/mnist_train.csv')

In [40]:
# 创建MLP模型
mlp = MLPClassifier(input_size=784, hidden_size1=128, hidden_size2=64, output_size=10,learning_rate=0.005)
#
# # 训练MLP模型
mlp.train(X_train, y_train, epochs=100)
#
# 保存训练后的权重
mlp.save_weights('./weight/mlpclassifier_weights.pkl')
# 评估模型
train_accuracy = mlp.accuracy(X_train, y_train)

print(f'Train Accuracy: {train_accuracy * 100:.2f}%')


Epoch 1/100, Average Loss: 2.3011
Epoch 2/100, Average Loss: 2.2592
Epoch 3/100, Average Loss: 1.0944
Epoch 4/100, Average Loss: 0.6186
Epoch 5/100, Average Loss: 0.5011
Epoch 6/100, Average Loss: 0.3955
Epoch 7/100, Average Loss: 0.3346
Epoch 8/100, Average Loss: 0.2920
Epoch 9/100, Average Loss: 0.2576
Epoch 10/100, Average Loss: 0.2296
Epoch 11/100, Average Loss: 0.2060
Epoch 12/100, Average Loss: 0.1873
Epoch 13/100, Average Loss: 0.1712
Epoch 14/100, Average Loss: 0.1586
Epoch 15/100, Average Loss: 0.1477
Epoch 16/100, Average Loss: 0.1380
Epoch 17/100, Average Loss: 0.1298
Epoch 18/100, Average Loss: 0.1219
Epoch 19/100, Average Loss: 0.1149
Epoch 20/100, Average Loss: 0.1085
Epoch 21/100, Average Loss: 0.1025
Epoch 22/100, Average Loss: 0.0971
Epoch 23/100, Average Loss: 0.0919
Epoch 24/100, Average Loss: 0.0877
Epoch 25/100, Average Loss: 0.0829
Epoch 26/100, Average Loss: 0.0785
Epoch 27/100, Average Loss: 0.0746
Epoch 28/100, Average Loss: 0.0712
Epoch 29/100, Average Loss: 0

In [41]:
X_test,y_test=load_mnist_data('./data/mnist_test.csv')
test_accuracy = mlp.accuracy(X_test, y_test)
print(f'Test Accuracy: {test_accuracy * 100:.2f}%')

Test Accuracy: 97.52%
