为了避免额外的学习成本，Paddle的一些api跟PyTorch很相似，Paddle的中文教程也很清楚，易于上手。现在我们直接用Paddle搭建一个LeNet来进行手写数字识别的实践。

In [None]:
import warnings 
warnings.filterwarnings("ignore")

Paddle也内置了一些数据集。

In [None]:
# !python -m pip install paddlepaddle

In [None]:
# !python -c "import paddle; print(paddle.__version__)"

In [None]:
# !python -c "import paddle; print(paddle.utils.run_check())"

In [None]:
import paddle
print("计算机视觉（CV）相关数据集：", paddle.vision.datasets.__all__)
print("自然语言处理（NLP）相关数据集：", paddle.text.__all__)

In [None]:
import os
from paddle.vision.transforms import Normalize
# 定义图像归一化处理方法，这里的CHW指图像格式需为 [C通道数，H图像高度，W图像宽度]
transform = Normalize(mean=[0.5], std=[0.5], data_format="CHW")

# (下载)数据集并初始化 DataSet, 我们已经下载过了 "./datasets_/mnist_dataset"  "MNIST/raw/x x x x"
data_dir = r"./datasets_/mnist_dataset/MNIST/raw"
data_dir = os.path.normpath(os.path.join(os.getcwd(), data_dir))
test_img = os.path.join(data_dir, "t10k-images-idx3-ubyte.gz")
test_label = os.path.join(data_dir, "t10k-labels-idx1-ubyte.gz")
train_img = os.path.join(data_dir, "train-images-idx3-ubyte.gz")
train_label = os.path.join(data_dir, "train-labels-idx1-ubyte.gz")

train_dataset = paddle.vision.datasets.MNIST(image_path=train_img, label_path=train_label,mode="train", download=False,transform=transform)
test_dataset = paddle.vision.datasets.MNIST(image_path=test_img, label_path=test_label,mode="test", download=False,transform=transform)
print(
    "train images: ", len(train_dataset), ", test images: ", len(test_dataset),)


In [None]:
train_loader = paddle.io.DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2, drop_last=True)
test_loader = paddle.io.DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=2, drop_last=True)

In [None]:
for _ in range(2):
    for batch_id, data in enumerate(train_loader()):
        x_data = data[0]  # 图像数据
        y_data = data[1]  # 标签数据
        # print(f'Batch {batch_id}: x_data shape: {x_data.shape}, y_data: {y_data}')
        # # normal label: 0 1 2 ..
        # 可能是解析二进制文件有问题？会有一些奇怪的标签值
        if (y_data < 0).any() or (y_data > 9).any():
            print(f'Invalid labels detected in batch {batch_id}: {y_data}')
            break

num_workers：同步/异步读取数据，通过 num_workers 来设置加载数据的子进程个数，num_workers的值设为大于0时，即开启多进程方式异步加载数据，可提升数据读取速度。

In [None]:
print(f'训练数据集大小: {len(train_loader.dataset)}')
print(f'测试数据集大小: {len(test_loader.dataset)}')

In [None]:
# 查看训练数据集的一个样本的维度
print(f'一个训练图像的维度: {train_dataset[0][0].shape}')  # 图像的维度是 [1, 28, 28]
# print(f'一个训练图像的ndarray: {train_dataset[0][0]}')  # [...]
print(f'一个标签的类别: {train_dataset[0][-1]}')


In [None]:
from matplotlib import pyplot as plt
for data in train_dataset:
    # train_dataset[0]
    image, label = data
    print("shape of image: ", image.shape)
    plt.title(str(label))
    # plt.imshow(image[0])  # 原图
    plt.imshow(image[0].squeeze(), cmap='gray')  # 灰度图，输入模型
    break


接下来我们用Paddle构建LeNet

使用 paddle.nn.Sequential 构建模型，构建顺序的线性网络结构时，可以选择该方式，只需要按模型的结构顺序，一层一层加到 paddle.nn.Sequential 子类中即可。

In [None]:
from paddle import nn

# 使用 paddle.nn.Sequential 构建 LeNet 模型
lenet_Sequential = nn.Sequential(
    nn.Conv2D(1, 6, 3, stride=1, padding=2),
    nn.ReLU(),
    nn.MaxPool2D(2, 2),
    nn.Conv2D(6, 16, 5, stride=1, padding=0),
    nn.ReLU(),
    nn.MaxPool2D(2, 2),
    nn.Flatten(),
    nn.Linear(400, 120),
    nn.Linear(120, 84),
    nn.Linear(84, 10),
)
# 可视化模型组网结构和参数
paddle.summary(lenet_Sequential, (1, 1, 28, 28))


In [None]:
lenet_Sequential.state_dict

使用 paddle.nn.Layer 构建模型。构建一些比较复杂的网络结构时，可以选择该方式，组网包括三个步骤：

1. 创建一个继承自 paddle.nn.Layer 的类；

2. 在类的构造函数 __init__ 中定义组网用到的神经网络层（layer）；

3. 在类的前向计算函数 forward 中使用定义好的 layer 执行前向计算。

In [None]:
# 使用 Subclass 方式构建 LeNet 模型
class LeNet(nn.Layer):
    def __init__(self, num_classes=10):
        super().__init__()
        self.num_classes = num_classes
        # 构建 features 子网，用于对输入图像进行特征提取
        self.features = nn.Sequential(
            nn.Conv2D(1, 6, 3, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2D(2, 2),
            nn.Conv2D(6, 16, 5, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2D(2, 2),
        )
        # 构建 linear 子网，用于分类
        if num_classes > 0:
            self.linear = nn.Sequential(
                nn.Linear(400, 120),
                nn.Linear(120, 84),
                nn.Linear(84, num_classes),
            )

    # 执行前向计算
    def forward(self, inputs):
        x = self.features(inputs)

        if self.num_classes > 0:
            x = paddle.flatten(x, 1)
            x = self.linear(x)
        return x

lenet_SubClass = LeNet()

# 可视化模型组网结构和参数
params_info = paddle.summary(lenet_SubClass, (1, 1, 28, 28))
print(params_info)

In [None]:
lenet_SubClass.state_dict()

## 训练、评估、推理

我们先介绍利用paddle API快速训练测试，然后再来完整的自定义训练（类似与PyTorch手写数字识别那样）

指定训练的机器

In [None]:
# 指定在 CPU 上训练
paddle.device.set_device("cpu")

# 指定在 GPU 第 0 号卡上训练
# paddle.device.set_device('gpu:0')

### 使用paddle.Model 高层 API 训练、评估与推理

使用 paddle.Model 封装模型<br>使用高层 API 训练模型前，可使用 paddle.Model 将模型封装为一个实例，方便后续进行训练、评估与推理。代码如下：

In [None]:
# 封装模型为一个 Model 实例，便于进行后续的训练、评估和推理
model_api = paddle.Model(LeNet())

使用 Model.prepare 配置训练准备参数

用 paddle.Model 完成模型的封装后，需通过 Model.prepare 进行训练前的配置准备工作，包括设置优化算法、Loss 计算方法、评价指标计算方法：

- 优化器（optimizer）：即寻找最优解的方法，可计算和更新梯度，并根据梯度更新模型参数。飞桨框架在 paddle.optimizer 下提供了优化器相关 API。并且需要为优化器设置合适的学习率，或者指定合适的学习率策略，飞桨框架在 paddle.optimizer.lr 下提供了学习率策略相关的 API。

- 损失函数（loss）：用于评估模型的预测值和真实值的差距，模型训练过程即取得尽可能小的 loss 的过程。飞桨框架在 paddle.nn Loss层 提供了适用不同深度学习任务的损失函数相关 API。

- 评价指标（metrics）：用于评估模型的好坏，不同的任务通常有不同的评价指标。飞桨框架在 paddle.metric 下提供了评价指标相关 API。


In [None]:
# 为模型训练做准备，设置优化器及其学习率，并将网络的参数传入优化器，设置损失函数和精度计算方式
model_api.prepare(
    optimizer=paddle.optimizer.Adam(
        learning_rate=0.001, parameters=model_api.parameters()
    ),
    loss=paddle.nn.CrossEntropyLoss(),
    metrics=paddle.metric.Accuracy(),
)


使用 Model.fit 训练模型<br>
做好模型训练的前期准备工作后，调用 Model.fit 接口来启动训练。 训练过程采用二层循环嵌套方式：内层循环完成整个数据集的一次遍历，采用分批次方式；外层循环根据设置的训练轮次完成数据集的多次遍历。因此需要指定至少三个关键参数：训练数据集，训练轮次和每批次大小:

- 训练数据集：传入之前定义好的训练数据集。

- 训练轮次（epoch）：训练时遍历数据集的次数，即外循环轮次。

- 批次大小（batch_size）：内循环中每个批次的训练样本数。

除此之外，还可以设置样本乱序（shuffle）、丢弃不完整的批次样本（drop_last）、同步/异步读取数据（num_workers） 等参数，另外可通过 Callback 参数传入回调函数，在模型训练的各个阶段进行一些自定义操作，比如收集训练过程中的一些数据和参数，详细介绍可参见 自定义 Callback 章节。

In [None]:
# # 启动模型训练，指定训练数据集，设置训练轮次，设置每次数据集计算的批次大小，设置日志格式
model_api.fit(train_loader, epochs=2, batch_size=64, verbose=1)

训练好模型后，可在事先定义好的测试数据集上，使用 Model.evaluate 接口完成模型评估操作，结束后根据在 Model.prepare 中定义的 loss 和 metric 计算并返回相关评估结果。

返回格式是一个字典:

只包含loss， {'loss': xxx}

包含loss和一个评估指标， {'loss': xxx, 'metric name': xxx}

包含loss和多个评估指标， {'loss': xxx, 'metric name1': xxx, 'metric name2': xxx}

In [None]:
# 用 evaluate 在测试集上对模型进行验证
eval_result = model_api.evaluate(test_dataset, verbose=1)
print(eval_result)

使用 Model.predict 执行推理
高层 API 中提供了 Model.predict 接口，可对训练好的模型进行推理验证。只需传入待执行推理验证的样本数据，即可计算并返回推理结果。

返回格式是一个列表：

模型是单一输出：[(numpy_ndarray_1, numpy_ndarray_2, …, numpy_ndarray_n)]

模型是多输出：[(numpy_ndarray_1, numpy_ndarray_2, …, numpy_ndarray_n), (numpy_ndarray_1, numpy_ndarray_2, …, numpy_ndarray_n), …]

如果模型是单一输出，则输出的形状为 [1, n]，n 表示数据集的样本数。其中每个 numpy_ndarray_n 是对应原始数据经过模型计算后得到的预测结果，类型为 numpy 数组，例如 mnist 分类任务中，每个 numpy_ndarray_n 是长度为 10 的 numpy 数组。

如果模型是多输出，则输出的形状为[m, n]，m 表示标签的种类数，在多标签分类任务中，m 会根据标签的数目而定。

In [None]:
# 用 predict 在测试集上对模型进行推理
test_result = model_api.predict(test_dataset)
# 由于模型是单一输出，test_result的形状为[1, 10000]，10000是测试数据集的数据量。这里打印第一个数据的结果，这个数组表示每个数字的预测概率
print(len(test_result))
print(test_result[0][0])

# 从测试集中取出一张图片
img, label = test_dataset[0]

# 打印推理结果，这里的argmax函数用于取出预测值中概率最高的一个的下标，作为预测标签
pred_label = test_result[0][0].argmax()
print("true label: {}, pred label: {}".format(label[0], pred_label))
# 使用matplotlib库，可视化图片
from matplotlib import pyplot as plt

plt.imshow(img[0])


### paddle自定义训练

记录并可视化练过程中的精度、损失变化，测试模型指标等（类似于前面章节的PyTorch手写数字识别）。

训练

In [None]:
import os 
os.getcwd()

In [None]:
import os
import paddle

# transform = Normalize(mean=[0.5], std=[0.5], data_format="CHW")
# train_dataset = paddle.vision.datasets.MNIST(image_path=train_img, label_path=train_label,mode="train", download=False,transform=transform)
# test_dataset = paddle.vision.datasets.MNIST(image_path=test_img, label_path=test_label,mode="test", download=False,transform=transform)
# # 用 DataLoader 实现数据加载（前面已经做过了）
# train_loader = paddle.io.DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2, drop_last=True)
# test_loader = paddle.io.DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=2, drop_last=True)

# 实例化模型
model_paddle = LeNet()

# 设置迭代次数
epochs = 3

# 设置优化器
# optim = paddle.optimizer.SGD(learning_rate=0.01, parameters=model_paddle.parameters(), weight_decay=0.0001)
optim = paddle.optimizer.Adam(parameters=model_paddle.parameters(),learning_rate=1e-3)

# 设置损失函数
loss_fn = paddle.nn.CrossEntropyLoss()

train_acc = []
train_loss = []
test_acc = []
test_loss = []

model_save = r"./logs_dir/lenet_paddle"
os.makedirs(model_save, exist_ok=True)
model_root = os.path.normpath(os.path.join(os.getcwd(), model_save))

best_test_acc = 0.0

# 保存断点，用于恢复训练
checkpoint = dict()

for epoch in range(epochs):
    
    epoch_train_loss = 0
    epoch_train_correct = 0
    epoch_train_samples = 0
    
    # 训练模式。这只会影响某些模块，如Dropout和BatchNorm。
    model_paddle.train()
    for batch_id, data in enumerate(train_loader()):
        x_data = data[0]  # 训练数据
        y_data = data[1]  # 训练标签  [64, 1]
        predicts = model_paddle(x_data) # 预测

        loss = loss_fn(predicts, y_data)  # 计算损失
        loss.backward()  # 反向传播
        optim.step()  # 更新参数
        optim.clear_grad()  # 清空梯度

        epoch_train_loss += loss.numpy()  # 记录训练损失
        predicted = paddle.argmax(predicts, axis=1)  # 计算准确率  [64]
        epoch_train_correct += paddle.sum(predicted == y_data.reshape([-1])).numpy()
        epoch_train_samples += y_data.shape[0]  # += 64

    # 记录最后一个train batch的loss到断点
    checkpoint['loss'] = loss

    # 记录训练精度和损失
    train_acc_epoch = epoch_train_correct / epoch_train_samples
    train_loss_epoch = epoch_train_loss / len(train_loader) 
    train_acc.append(train_acc_epoch)
    train_loss.append(train_loss_epoch)

    # 在测试集上评估模型
    model_paddle.eval()  # 切换到评估模式
    epoch_test_loss = 0  
    epoch_test_correct = 0
    epoch_test_samples = 0

    # 设置为评估模式
    model_paddle.eval()  
    for batch_id, data in enumerate(test_loader):
        x_data = data[0]  # 测试数据
        y_data = data[1]  # 测试标签  y_data.shape=[64, 1]
        predicts = model_paddle(x_data)  # 预测 predicts.shape=[64, 10]
        loss = loss_fn(predicts, y_data)   # 计算损失
        epoch_test_loss += loss.numpy()
        predicted = paddle.argmax(predicts, axis=1)  # 计算准确率 [64]  
        epoch_test_correct += paddle.sum(predicted == y_data.reshape([-1])).numpy()
        epoch_test_samples += y_data.shape[0]
            
    # 记录测试精度和损失
    test_acc_epoch = epoch_test_correct / epoch_test_samples
    test_loss_epoch = epoch_test_loss / len(test_loader)
    test_acc.append(test_acc_epoch)
    test_loss.append(test_loss_epoch)

    # 保存最佳模型
    if test_acc_epoch > best_test_acc:
        best_test_acc = test_acc_epoch
        # 保存Layer参数
        paddle.save(model_paddle.state_dict(), os.path.join(model_root, "model.params"))
        # 保存优化器参数
        paddle.save(optim.state_dict(), os.path.join(model_root, "optim.pdopt"))
        checkpoint['epoch'] = epoch+1
        # 保存检查点checkpoint信息
        paddle.save(checkpoint, os.path.join(model_root, "checkpoint.pkl"))
        
        print(f"Saved best model with test accuracy: {test_acc_epoch*100:.2f}%")
        
    # 打印训练和测试结果
    print(f"Epoch {epoch+1}/{epochs}, "
          f"Train Loss: {train_loss_epoch:.4f}, Train Acc: {train_acc_epoch*100:.2f}%, "
          f"Test Loss: {test_loss_epoch:.4f}, Test Acc: {test_acc_epoch*100:.2f}%")

# 输出最终训练和测试结果
print("Training Finished!")
        

可视化

In [None]:
import matplotlib.pyplot as plt
# 绘制损失和精度变化曲线
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))

# 损失曲线
ax1.plot(range(1, epochs+1), train_loss, label='Train Loss', color='blue')
ax1.plot(range(1, epochs+1), test_loss, label='Test Loss', color='red')
ax1.set_title('Loss vs Epochs')
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Loss')
ax1.legend()

# 精度曲线
ax2.plot(range(1, epochs+1), train_acc, label='Train Accuracy', color='blue')
ax2.plot(range(1, epochs+1), test_acc, label='Test Accuracy', color='red')
ax2.set_title('Accuracy vs Epochs')
ax2.set_xlabel('Epochs')
ax2.set_ylabel('Accuracy (%)')
ax2.legend()

plt.tight_layout()
plt.show()

评估

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score,\
f1_score, classification_report
from PIL import Image

# 实例化模型
infer_model = LeNet()

# 加载模型
infer_model_sd = paddle.load(os.path.join(model_root, "model.params"))
opt_state_dict = paddle.load(os.path.join(model_root, "optim.pdopt"))
# （可选）加载断点
checkpoint_sd = paddle.load(os.path.join(model_root, "checkpoint.pkl"))

infer_model.set_state_dict(infer_model_sd)
optim.set_state_dict(opt_state_dict)

In [None]:
# 测试阶段
infer_model.eval()
correct_test = 0
total_test = 0
running_test_loss = 0.0
test_accuracy_best = 0
all_labels = []
all_preds = []
test_losses = []
test_accuracies = []

with paddle.no_grad():
    for datas in test_loader:
        x_data = datas[0]
        y_data = datas[1]
        preds = infer_model(x_data)
        loss = loss_fn(preds, y_data)
        running_test_loss += loss.numpy()
        # 计算测试准确率
        predicted = paddle.argmax(preds, 1)
        # print(f"{y_data=}")
        total_test += y_data.shape[0]
        correct_test += (predicted == y_data.reshape([-1])).sum().item()
        # 收集所有标签和预测值，用于计算精确率、召回率和F1分数
        all_labels.extend(y_data.cpu().numpy())
        all_preds.extend(predicted.cpu().numpy())

avg_test_loss = running_test_loss / len(test_loader)
test_accuracy = 100 * correct_test / total_test

# 打印分类报告（精确率、召回率、F1分数）
print(f"Test Accuracy: {test_accuracy:.2f}%")
print("Classification Report:")
print(classification_report(all_labels, all_preds))

test_losses.append(avg_test_loss)
test_accuracies.append(test_accuracy)

# 打印当前epoch的训练和测试结果
print(f"Epoch {epoch+1}/{epochs}")
print(f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%")

部署推理

In [None]:
from PIL import Image
import numpy as np
import paddle 
img_path = r"./datasets_/write_mnist/9.png"
color_image = Image.open(img_path).convert('RGB').resize((28,28))
# color_image.show()
image = Image.open(img_path).convert('L').resize((28,28))  # hw
image_np = np.array(image, dtype=float)  # float64
image_tensor = paddle.to_tensor(image_np, dtype='float32')  # float32
# image_tensor = paddle.cast(image_tensor, dtype='float32')
image_tensor = paddle.unsqueeze(image_tensor, 0)  # chw
transform = Normalize(mean=[127.5], std=[127.5], data_format="CHW")
image_transform = transform(image_tensor)  #  hwc
image_tensor = paddle.unsqueeze(image_transform, 0) # nchw

infer_model.eval()
pred_cls = infer_model(image_tensor)[0].argmax()
pred_cls = pred_cls.item()

# show()
def show_results():
    # 创建2个子图：左边是原图，右边是变换后的图
    fig, ax = plt.subplots(1, 2, figsize=(10, 5))

    # 左边展示原图
    ax[0].imshow(np.array(color_image))
    ax[0].set_title('Original rgb Image')
    ax[0].axis('off')  # 关闭坐标轴

    # 右边展示变换后的图
    ax[1].imshow(paddle.squeeze(image_transform), cmap='gray')
    ax[1].set_title(f'Predicted: {pred_cls}')
    ax[1].axis('off')  # 关闭坐标轴

    plt.show()
    
    
show_results()