In [52]:
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

# 加载MNIST数据集
mnist = fetch_openml('mnist_784', version=1, parser='auto')  #从OPenML加载MNIST数据集

X, y = mnist["data"], mnist["target"].astype(np.uint8)  #分别获取特征数据X，和标签数据y，并将标签数据y转换为整数类型
X = X / 255.0  # 将像素值归一化到[0,1]，方便模型训练

# 转换为One-Hot标签
def one_hot(y, num_classes):
    return np.eye(num_classes)[y]

y_one_hot = one_hot(y, 10)

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y_one_hot, test_size=0.2, random_state=42)  #训练集的数据占比是80%，测试集的数据占比是20%


#初始化参数
def initialize_parameters(input_size, hidden_size, output_size):
    np.random.seed(42)   #指定一个固定的seed值
    W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size)  # He初始化，He初始化是一种用于神经网络的权重初始化方法特别适用于ReLU
    b1 = np.zeros(hidden_size)       #偏置向量b1全为零
    W2 = np.random.randn(hidden_size, output_size) * np.sqrt(2.0 / hidden_size)  #2.0 / input_size 用于缩放因子的分母部分，该初始化的目标是确保每一层输出的方差保持一致，避免梯度消失或爆炸问题
    b2 = np.zeros(output_size)        #偏置向量b2全为零
    return {"W1": W1, "b1": b1, "W2": W2, "b2": b2}

#ReLu激活函数，返回(0,Z)的最大值
def relu(Z):
    return np.maximum(0, Z)

#Softmax函数，将输入转换为概率分布
def softmax(Z):
    exp_Z = np.exp(Z - np.max(Z, axis=1, keepdims=True))  # 防止数值溢出，因为Z中某些值可能很大，直接计算以e为底的指数，其结果可能会非常大，甚至可能超出计算机的计算范围
    return exp_Z / exp_Z.sum(axis=1, keepdims=True)  #这里防溢出处理是Z中每行的数据都减去该行的最大值，如此该行的最大值变为0，其他元素变为负数或接近0，这样便将指数运算的结果控制在一个合理范围

#前向传播
def forward(X, parameters):
    W1, b1, W2, b2 = parameters["W1"], parameters["b1"], parameters["W2"], parameters["b2"]
    Z1 = np.dot(X, W1) + b1   #点乘
    A1 = relu(Z1)
    Z2 = np.dot(A1, W2) + b2
    A2 = softmax(Z2)
    return {"Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}

#计算损失
def compute_loss(Y, Y_hat):  #Y是真实值，Y_hat是预测值
    m = Y.shape[0]
    loss = -np.sum(Y * np.log(Y_hat + 1e-8)) / m  # 计算交叉熵损失，加入1e-8以防止 log(0) 导致数值变成负无穷溢出
    return loss

#反向传播
def backward(X, Y, parameters, forward_cache):
    W1, W2 = parameters["W1"], parameters["W2"]    #W1和W2是权重系数
    A1, A2 = forward_cache["A1"], forward_cache["A2"]  #A2是输出层的结果，A1是中间隐藏层经过ReLU函数的结果
    Z1 = forward_cache["Z1"]    #从前向传播的结果中获取隐藏层未ReLU函数前的结果，以便进行链式求偏导
    m = X.shape[0]

    # 输出层梯度
    dZ2 = (A2 - Y) / m  # Softmax交叉熵的梯度简化为(A2 - Y)，并除以m以取平均
    dW2 = np.dot(A1.T, dZ2)
    db2 = np.sum(dZ2, axis=0)

    # 隐藏层梯度
    dZ1 = np.dot(dZ2, W2.T) * (Z1 > 0)  # 乘以ReLU的导数：当 Z1 > 0 时导数为1，否则为0
    dW1 = np.dot(X.T, dZ1)
    db1 = np.sum(dZ1, axis=0)

    return {"dW1": dW1, "db1": db1, "dW2": dW2, "db2": db2}

#更新参数，使用梯度下降法
def update_parameters(parameters, grads, learning_rate=0.01):
    parameters["W1"] -= learning_rate * grads["dW1"]    #更新w1
    parameters["b1"] -= learning_rate * grads["db1"]    
    parameters["W2"] -= learning_rate * grads["dW2"] 
    parameters["b2"] -= learning_rate * grads["db2"] 
    return parameters


#训练模型
def train(X_train, y_train, epochs=100, batch_size=128, lr=0.01):

   #X_train是pandas里的DataFrame结构，这里要将其转换为NumPy结构，否则下面的X_shuffled = X_train[permutation]会报错
    X_train = X_train.values
    
    input_size = X_train.shape[1]
    hidden_size = 128
    output_size = 10
    parameters = initialize_parameters(input_size, hidden_size, output_size)
    
    for epoch in range(epochs):
        permutation = np.random.permutation(X_train.shape[0])
        X_shuffled = X_train[permutation]     #permutation是一个整数数组，表示打乱后的行索引，如果没有上面那行代码， X_train[permutation]试图用整数索引访问DataFrame的列，但DataFrame的列名是字符串
        y_shuffled = y_train[permutation]    #这里不会报错，因为y_train是Numpy结构
        
        for i in range(0, X_train.shape[0], batch_size):
            X_batch = X_shuffled[i:i+batch_size]
            y_batch = y_shuffled[i:i+batch_size]
            
            # 前向传播
            cache = forward(X_batch, parameters)
            
            # 计算损失
            loss = compute_loss(y_batch, cache["A2"])
            
            # 反向传播
            grads = backward(X_batch, y_batch, parameters, cache)
            
            # 参数更新
            parameters = update_parameters(parameters, grads, lr)
            
        print(f"Epoch {epoch+1}, Loss: {loss:.4f}")
    
    return parameters

#计算准确率
def accuracy(y_true, y_pred):                                            #np.argmax(y_true, axis=1)表示y_true沿axis=1(即每一行)获取该行最大值的索引
    return np.mean(np.argmax(y_true, axis=1) == np.argmax(y_pred, axis=1))  #当y_true表示的真实值与y_pred表示的预测值，在两者对应的每一行的最大值索引相同，表示预测值是准确的

# 训练模型
parameters = train(X_train, y_train, epochs=20, batch_size=128, lr=0.2)

# 测试集预测
test_cache = forward(X_test, parameters)
test_acc = accuracy(y_test, test_cache["A2"])
print(f"Test Accuracy: {test_acc:.4f}")


Epoch 1, Loss: 0.3238
Epoch 2, Loss: 0.2219
Epoch 3, Loss: 0.2007
Epoch 4, Loss: 0.0897
Epoch 5, Loss: 0.1276
Epoch 6, Loss: 0.0807
Epoch 7, Loss: 0.0540
Epoch 8, Loss: 0.0392
Epoch 9, Loss: 0.0189
Epoch 10, Loss: 0.0523
Epoch 11, Loss: 0.0148
Epoch 12, Loss: 0.0100
Epoch 13, Loss: 0.0606
Epoch 14, Loss: 0.0419
Epoch 15, Loss: 0.0110
Epoch 16, Loss: 0.1207
Epoch 17, Loss: 0.1000
Epoch 18, Loss: 0.0052
Epoch 19, Loss: 0.0163
Epoch 20, Loss: 0.0418
Test Accuracy: 0.9739
