# softmax回归的从零实现

In [None]:
import torch
from IPython import display
from d2l import torch as d2l

In [None]:
batch_size = 256
train_iter, test_iter = d2l.load_data_fashion_mnist(batch_size)


In [None]:
# 初始化模型参数
num_inputs = 28 * 28
num_outputs = 10

W = torch.normal(0, 0.01, size=(num_inputs, num_outputs), requires_grad=True)
b = torch.zeros(num_outputs, requires_grad=True)

In [None]:
# 定义softmax操作
def softmax(X):
    X_exp = torch.exp(X)
    partition = X_exp.sum(1, keepdim=True)
    """
    注意，虽然这在数学上看起来是正确的，但我们在代码实现中有点草率。
    矩阵中的非常大或非常小的元素可能造成数值上溢或下溢，但我们没有采取措施来防止这点。
    例如：exp(50)=?,exp(5000)=?(OverflowError: math range error)
    为了缓解这种数值稳定性问题，通常采用对输入进行平移或缩放操作来避免指数函数中出现过大或过小的值。
    常见的方法包括使用log-sum-exp技巧或将输入值减去最大值等。
    """
    return X_exp / partition  # 这里应用了广播机制

# 测试
X = torch.normal(0, 1, (2, 5))
X_prob = softmax(X)
X_prob, X_prob.sum(1)

In [None]:
# 定义模型
def net(X):
    return softmax(torch.matmul(X.reshape((-1, W.shape[0])), W) + b)

In [None]:
# 定义损失函数，交叉熵损失函数
def cross_entropy(y_hat, y):
    return - torch.log(y_hat[range(len(y_hat)), y])
"""
函数cross_entropy的实现可能存在问题，具体取决于其如何处理输入数据。
交叉熵损失函数的定义包括取对数操作，因此需要注意对数函数的定义域。
如果输入数据中包含非正值（即小于等于零的值），那么对这些值取对数将导致错误或异常。
因此，在实现交叉熵损失函数时需要特别注意对输入数据进行预处理，以确保其不包含非正值。
"""

# 计算预测正确的数量
def accuracy(y_hat, y):
    """计算预测正确的数量"""
    if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
        y_hat = y_hat.argmax(axis=1)  # 在指定维度上返回张量中最大值所在的索引
    cmp = y_hat.type(y.dtype) == y  # 类型转换为同一类别int，再进行比较是否相等
    return float(cmp.type(y.dtype).sum())


# test
y = torch.tensor([0, 2])  # 假设真是标签是0,2
y_hat = torch.tensor([[0.1, 0.3, 0.6], [0.3, 0.2, 0.5]])  # 假设预测结果
print("真实标签的预测概率：", y_hat[[0, 1], y])
print("cross_entropy=", cross_entropy(y_hat, y))
print("计算预测正确的数量：", accuracy(y_hat, y))

In [None]:
# 计算估计准确度
def evaluate_accuracy(net, data_iter):
    """计算在指定数据集上模型的精度"""
    if isinstance(net, torch.nn.Module):
        net.eval()  # 将模型设置为评估模式
    metric = d2l.Accumulator(2)  # 正确预测数、预测总数
    with torch.no_grad():
        for X, y in data_iter:
            metric.add(accuracy(net(X), y), y.numel())
    return metric[0] / metric[1]

In [None]:
# 训练
def train_epoch_ch3(net, train_iter, loss, updater):
    """训练模型一个迭代周期"""
    # 将模型设置为训练模式
    if isinstance(net, torch.nn.Module):
        net.train()  # 如果是torch.nn.Module, 将模型设置为训练模式
    metric = d2l.Accumulator(3)  # 训练损失总和、训练准确数量总和、样本数
    for X, y in train_iter:
        # 计算梯度并更新参数
        y_hat = net(X)
        l = loss(y_hat, y)
        if isinstance(updater, torch.optim.Optimizer):
            # 使用PyTorch内置的优化器和损失函数
            updater.zero_grad()
            l.mean().backward() # 使用mean()，就不需要再除以batch_size了
            updater.step()
        else:
            # 使用自定义的优化器和损失函数
            l.sum().backward() # 此处使用sum()，需要再除以batch_size
            updater(X.shape[0]) # X.shape[0] 为 batch_size
        metric.add(float(l.sum()), accuracy(y_hat, y), y.numel())
    # 返回训练损失和训练正确率【训练损失总和/样本数】，【训练准确数量总和/样本数】
    return metric[0] / metric[2], metric[1] / metric[2]

In [None]:
# 训练
def train_ch3(net, train_iter, test_iter, loss, num_epochs, updater):
    """训练模型（定义见第3章）"""
    animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs], ylim=[0.3, 0.9],
                        legend=['train loss', 'train acc', 'test acc'])
    for epoch in range(num_epochs):
        train_metrics = train_epoch_ch3(net, train_iter, loss, updater)
        test_acc = evaluate_accuracy(net, test_iter)
        animator.add(epoch + 1, train_metrics + (test_acc,))
        # train_loss, train_acc = train_metrics
    train_loss, train_acc = train_metrics
    print("最终结果：\n"
          "train_loss={:.4f},train_acc={:.4f},test_acc={:.4f}".format(train_loss, train_acc, test_acc))
    assert train_loss < 0.5, train_loss  # 最终训练结果，train_loss 应该小于0.5
    assert train_acc <= 1 and train_acc > 0.5, train_acc  # 最终训练结果，train_acc 应该大于0.5
    assert test_acc <= 1 and test_acc > 0.5, test_acc  # 最终训练结果，test_acc 应该大于0.5


In [None]:
lr = 0.05  # 学习率


def sgd(params, lr, batch_size):
    """小批量随机梯度下降 sgd算法"""
    with torch.no_grad():
        for param in params:
            param -= lr * param.grad / batch_size
            param.grad.zero_()


def updater(batch_size):  # 更新参数
    return sgd([W, b], lr, batch_size)


In [None]:
num_epochs = 10
train_ch3(net, train_iter, test_iter, cross_entropy, num_epochs, updater)

In [None]:
# 预测
def predict_ch3(net, test_iter, n=6):
    """预测标签（定义见第3章）"""
    metric = d2l.Accumulator(2)  # 正确预测数、预测总数
    for X, y in test_iter:
        y_hat = net(X)
        metric.add(accuracy(y_hat, y), y.numel())  # numel()获取张量中元素的总数

    trues = d2l.get_fashion_mnist_labels(y)
    preds = d2l.get_fashion_mnist_labels(net(X).argmax(axis=1))
    titles = ["{}\n[{}]".format(true, pred) for true, pred in zip(trues, preds)]
    d2l.show_images(X[0:n].reshape((n, 28, 28)), 1, n, titles=titles[0:n])
    print("预测准确度：{}/{}={}".format(metric[0], metric[1], metric[0] / metric[1]))


predict_ch3(net, test_iter)