

在“手写数字识别”案例的快速入门中，我们调用飞桨提供的API（[paddle.dataset.mnist](https://www.paddlepaddle.org.cn/documentation/docs/zh/api_cn/data/dataset_cn.html)）加载MNIST数据集。但在工业实践中，我们面临的任务和数据环境千差万别，需要编写适合当前任务的数据处理程序。

但是编写自定义的数据加载函数，一般会涉及以下四个部分：

- 数据读取与数据集划分
-  定义数据读取器
-  校验数据的有效性
-  异步数据读取



在数据读取与处理前，首先要加载飞桨平台和数据处理库，可能使用的库都需要加载进来：


In [1]:
#数据处理部分之前的代码，加入部分数据处理的库
import paddle
import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import FC
import numpy as np
import os
import gzip
import json
import random

# 1. 数据读取与数据集划分


实际保存到的数据存储格式多种多样，本节使用的mnist数据集以json格式存储在本地。

在'./work/'目录下读取文件名称为'mnist.json.gz'的MINST手写数字识别数据，文件格式是压缩后的json文件。文件内容包括：训练数据、验证数据、测试数据三部分，分别包含50000、10000、10000条手写数字数据和两个元素列表。

以训练集数据为例，它为两个元素的列表为[traim_imgs, train_labels]。 
* train_imgs：一个维度为[5000, 784]的二维列表，包含5000张图片。每张图片用一个长度为784的向量表示,内容是28*28尺寸的像素灰度值（黑白图片）。
* train_labels：一个维度为[5000, ]的列表，表示这些图片对应的分类标签，即0-9之间的一个数字。接下来我们将数据读取出来。


In [2]:
# 声明数据集文件位置
datafile = './work/mnist.json.gz'
print('loading mnist dataset from {} ......'.format(datafile))
# 加载json数据文件
data = json.load(gzip.open(datafile))
print('mnist dataset load done')
# 读取到的数据区分训练集，验证集，测试集
train_set, val_set, eval_set = data

# 数据集相关参数，图片高度IMG_ROWS, 图片宽度IMG_COLS
IMG_ROWS = 28
IMG_COLS = 28

# 打印数据信息
imgs, label = train_set[0], train_set[1]
print("训练数据集数量: ", len(imgs))

# 观察验证集数量
imgs, label = val_set[0], val_set[1]
print("验证数据集数量: ", len(imgs))

# 观察测试集数量
imgs, label = val= eval_set[0], eval_set[1]
print("测试数据集数量: ", len(imgs))

loading mnist dataset from ./work/mnist.json.gz ......
mnist dataset load done
训练数据集数量:  50000
验证数据集数量:  10000
测试数据集数量:  10000




# 2. 定义数据读取函数

飞桨提供分批次读取数据函数paddle.batch，该接口是一个reader的装饰器，返回的reader将输入的reader的数据打包成指定的batch_size大小的批处理数据（batched.data）

在定义数据读取函数中，我们需要做很多事情，包括但不限于：
- 打乱数据，保证每轮训练读取的数据顺序不同。
- 数据类型转换。


In [3]:
# 获得训练数据集
imgs, label = train_set[0], train_set[1]
print("训练数据集数量: ", len(imgs))
# 获得训练数据集长度
imgs_length = len(imgs)
# 定义数据集每个数据的序号，根据序号读取数据
index_list = list(range(imgs_length))
# 读入数据时用到的批次大小
BATCHSIZE = 100

# 定义数据生成器
def data_generator():
    if mode == 'train':
        # 训练模式下打乱数据
        random.shuffle(index_list)
    imgs_list = []
    labels_list = []
    for i in index_list:
        # 将数据处理成希望的格式，比如类型为float32，shape为[1, 28, 28]
        img = np.reshape(imgs[i], [1, IMG_ROWS, IMG_COLS]).astype('float32')
        label = np.reshape(labels[i], [1]).astype('float32')
        imgs_list.append(img) 
        labels_list.append(label)
        if len(imgs_list) == BATCHSIZE:
            # 获得一个batchsize的数据，并返回
            yield np.array(imgs_list), np.array(labels_list)
            # 清空数据读取列表
            imgs_list = []
            labels_list = []

    # 如果剩余数据的数目小于BATCHSIZE，
    # 则剩余数据一起构成一个大小为len(imgs_list)的mini-batch
    if len(imgs_list) > 0:
        yield np.array(imgs_list), np.array(labels_list)

训练数据集数量:  50000


# 3. 数据校验

实际任务原始的数据可能存在数据很“脏”的情况，这里的“脏”多指数据标注不准确，或者是数据杂乱，格式不统一等等。

因此，在完成数据处理函数时，我们需要执行数据校验和清理的操作。


数据校验一般有两种方式：
- 机器校验：加入一些校验和清理数据的操作。
- 人工校验：先打印数据输出结果，观察是否是设置的格式。再从训练的结果验证数据处理和读取的有效性。

##  机器校验

如下代码所示，如果数据集中的图片数量和标签数量不等，说明数据逻辑存在问题，可使用assert语句校验图像数量和标签数据是否一致。

In [None]:
    imgs_length = len(imgs)

    assert len(imgs) == len(labels), \
          "length of train_imgs({}) should be the same as train_labels({})".format(len(imgs), len(labels))

## 人工校验

人工校验分两步，首先打印数据输出结果，观察是否是设置的格式。再从训练的结果验证数据处理和读取的有效性。


实现数据处理和加载函数后，我们可以调用它读取一次数据，观察数据的shape和类型是否与函数中设置的一致。

In [6]:
# 声明数据读取函数，从训练集中读取数据
train_loader = load_data('train')
# 以迭代的形式读取数据
for batch_id, data in enumerate(train_loader()):
    image_data, label_data = data
    if batch_id == 0:
        # 打印数据shape和类型
        print(image_data.shape, label_data.shape, type(image_data), type(label_data))
    break

loading mnist dataset from ./work/mnist.json.gz ......
(100, 1, 28, 28) (100, 1) <class 'numpy.ndarray'> <class 'numpy.ndarray'>


### 观察训练结果

数据处理部分后的代码多数保持不变，仅在读取数据时候调用新编写的load_data函数。由于数据格式的转换工作在load_data函数中做了一部分，所以向模型输入数据的代码变得更加简洁。下面我们使用自己实现的数据加载函数重新训练我们的神经网络。

In [8]:
#数据处理部分之后的代码，数据读取的部分调用Load_data函数
# 定义网络结构，同上一节所使用的网络结构
class MNIST(fluid.dygraph.Layer):
    def __init__(self, name_scope):
        super(MNIST, self).__init__(name_scope)
        name_scope = self.full_name()
        self.fc = FC(name_scope, size=1, act=None)

    def forward(self, inputs):
        outputs = self.fc(inputs)
        return outputs

# 训练配置，并启动训练过程
with fluid.dygraph.guard():
    model = MNIST("mnist")
    model.train()
    #调用加载数据的函数
    train_loader = load_data('train')
    optimizer = fluid.optimizer.SGDOptimizer(learning_rate=0.001)
    EPOCH_NUM = 10
    for epoch_id in range(EPOCH_NUM):
        for batch_id, data in enumerate(train_loader()):
            #准备数据，变得更加简洁
            image_data, label_data = data
            image = fluid.dygraph.to_variable(image_data)
            label = fluid.dygraph.to_variable(label_data)
            
            #前向计算的过程
            predict = model(image)
            
            #计算损失，取一个批次样本损失的平均值
            loss = fluid.layers.square_error_cost(predict, label)
            avg_loss = fluid.layers.mean(loss)
            
            #每训练了200批次的数据，打印下当前Loss的情况
            if batch_id % 200 == 0:
                print("epoch: {}, batch: {}, loss is: {}".format(epoch_id, batch_id, avg_loss.numpy()))
            
            #后向传播，更新参数的过程
            avg_loss.backward()
            optimizer.minimize(avg_loss)
            model.clear_gradients()

    #保存模型参数
    fluid.save_dygraph(model.state_dict(), 'mnist')


loading mnist dataset from ./work/mnist.json.gz ......
epoch: 0, batch: 0, loss is: [36.861202]
epoch: 0, batch: 200, loss is: [4.169391]
epoch: 0, batch: 400, loss is: [4.018353]
epoch: 1, batch: 0, loss is: [4.7625484]
epoch: 1, batch: 200, loss is: [3.3847408]
epoch: 1, batch: 400, loss is: [3.0911984]
epoch: 2, batch: 0, loss is: [3.6650655]
epoch: 2, batch: 200, loss is: [3.4636102]
epoch: 2, batch: 400, loss is: [3.4108307]
epoch: 3, batch: 0, loss is: [4.1071653]
epoch: 3, batch: 200, loss is: [3.3901167]
epoch: 3, batch: 400, loss is: [3.8391578]
epoch: 4, batch: 0, loss is: [3.2438037]
epoch: 4, batch: 200, loss is: [2.573626]
epoch: 4, batch: 400, loss is: [3.184749]
epoch: 5, batch: 0, loss is: [4.2807035]
epoch: 5, batch: 200, loss is: [4.082668]
epoch: 5, batch: 400, loss is: [3.5914075]
epoch: 6, batch: 0, loss is: [3.1064441]
epoch: 6, batch: 200, loss is: [4.414713]
epoch: 6, batch: 400, loss is: [3.244702]
epoch: 7, batch: 0, loss is: [3.872587]
epoch: 7, batch: 200, l

相信读到这里，您已经对如何处理数据集并实现自己的数据读取函数有了一定的了解。不妨自己尝试写一个新的数据读取函数，验证一下自己的学习效果吧。

最后，将上述几部分操作合并到load_data函数，方便后续调用。下面代码为完整的数据读取函数，可以通过数据加载函数load_data的输入参数mode为'train', 'valid', 'eval'选择返回的数据是训练集，验证集，测试集。


In [3]:
#数据处理部分的展开代码
# 定义数据集读取器
def load_data(mode='train'):

    # 数据文件
    datafile = './work/mnist.json.gz'
    print('loading mnist dataset from {} ......'.format(datafile))
    data = json.load(gzip.open(datafile))
    # 读取到的数据可以直接区分训练集，验证集，测试集
    train_set, val_set, eval_set = data

    # 数据集相关参数，图片高度IMG_ROWS, 图片宽度IMG_COLS
    IMG_ROWS = 28
    IMG_COLS = 28
    # 获得数据
    if mode == 'train':
        imgs = train_set[0]
        labels = train_set[1]
    elif mode == 'valid':
        imgs = val_set[0]
        labels = val_set[1]
    elif mode == 'eval':
        imgs = eval_set[0]
        labels = eval_set[1]
    else:
        raise Exception("mode can only be one of ['train', 'valid', 'eval']")

    imgs_length = len(imgs)

    assert len(imgs) == len(labels), \
          "length of train_imgs({}) should be the same as train_labels({})".format(
                  len(imgs), len(labels))

    index_list = list(range(imgs_length))

    # 读入数据时用到的batchsize
    BATCHSIZE = 100

    # 定义数据生成器
    def data_generator():
        if mode == 'train':
            # 训练模式下，将训练数据打乱
            random.shuffle(index_list)
        imgs_list = []
        labels_list = []
        
        for i in index_list:
            img = np.reshape(imgs[i], [1, IMG_ROWS, IMG_COLS]).astype('float32')
            label = np.reshape(labels[i], [1]).astype('float32')
            imgs_list.append(img) 
            labels_list.append(label)
            if len(imgs_list) == BATCHSIZE:
                # 产生一个batch的数据并返回
                yield np.array(imgs_list), np.array(labels_list)
                # 清空数据读取列表
                imgs_list = []
                labels_list = []

        # 如果剩余数据的数目小于BATCHSIZE，
        # 则剩余数据一起构成一个大小为len(imgs_list)的mini-batch
        if len(imgs_list) > 0:
            yield np.array(imgs_list), np.array(labels_list)
    return data_generator


# 4. 异步数据读取


上面提到的数据读取是同步数据读取方式，针对于样本量较大、数据读取较慢的场景，建议采用异步数据读取方式，可以让数据读取和模型训练并行化，加快数据读取速度，牺牲一小部分内存换取数据读取效率的提升。


------
**说明：**

- 同步数据读取：每当模型需要数据的时候，运行数据读取函数获得当前批次的数据。在读取数据期间，模型一直在等待数据读取结束，获得数据后才会进行计算。
- 异步数据读取：数据读取和模型训练过程异步进行，读取到的数据先放入缓存区。模型训练完一个批次后，不用等待数据读取过程，直接从缓存区获得下一批次数据进行训练。

------

使用飞桨实现异步数据读取非常简单，代码如下所示。

In [4]:
# 定义数据读取后存放的位置，CPU或者GPU，这里使用CPU
# place = fluid.CUDAPlace(0) 时，数据读到GPU上
place = fluid.CPUPlace()
with fluid.dygraph.guard(place):
    # 声明数据加载函数，使用训练模式
    train_loader = load_data(mode='train')
    # 定义DataLoader对象用于加载Python生成器产生的数据
    data_loader = fluid.io.DataLoader.from_generator(capacity=5, return_list=True)
    # 设置数据生成器
    data_loader.set_batch_generator(train_loader, places=place)
    # 迭代的读取数据并打印数据的形状
    for i, data in enumerate(data_loader):
        image_data, label_data = data
        print(i, image_data.shape, label_data.shape)
        if i>=5:
            break

loading mnist dataset from ./work/mnist.json.gz ......
0 [100, 1, 28, 28] [100, 1]
1 [100, 1, 28, 28] [100, 1]
2 [100, 1, 28, 28] [100, 1]
3 [100, 1, 28, 28] [100, 1]
4 [100, 1, 28, 28] [100, 1]
5 [100, 1, 28, 28] [100, 1]


与同步数据读取相比，异步数据读取仅增加了三行代码，如下所示。

```python
place = fluid.CPUPlace() 
data_loader = fluid.io.DataLoader.from_generator(capacity=5, return_list=True)
data_loader.set_batch_generator(train_loader, place)
```
我们展开解读一下：

* **第一行代码：** 设置读取的数据是放在CPU还是GPU上。
* **第二行代码：** 创建一个DataLoader对象用于加载Python生成器产生的数据。数据会由Python线程预先读取，并异步送入一个队列中。fluid.io.DataLoader.from_generator参数名称、参数含义、默认值如下：

参数名和默认值如下：
* feed_list=None, 
* capacity=None, 
* use_double_buffer=True,
* iterable=True,
* return_list=False

参数含义如下：

- feed_list        仅在paddle静态图中使用，动态图中设置为None，本教程默认使用动态图的建模方式。
- capacity        表示在DataLoader中维护的队列容量，如果读取数据的速度很快，建议设置为更大的值。
- use_double_buffer   是一个布尔型的参数，设置为True时Dataloader会预先异步读取下一个batch的数据放到缓存区。
- iterable          表示创建的Dataloader对象是否是可迭代的，一般设置为True。
- return_list        在动态图下需要设置为True。

* **第三行代码：** 用创建的DataLoader对象设置一个数据生成器set_batch_generator，输入的参数是一个Python数据生成器train_loader和服务器资源类型place（标明CPU还是GPU）。







异步数据读取并训练的完整案例代码如下：

In [9]:
with fluid.dygraph.guard():
    model = MNIST("mnist")
    model.train()
    #调用加载数据的函数
    train_loader = load_data('train')
    # 创建异步数据读取器
    place = fluid.CPUPlace()
    data_loader = fluid.io.DataLoader.from_generator(capacity=5, return_list=True)
    data_loader.set_batch_generator(train_loader, places=place)
    
    optimizer = fluid.optimizer.SGDOptimizer(learning_rate=0.001)
    EPOCH_NUM = 3
    for epoch_id in range(EPOCH_NUM):
        for batch_id, data in enumerate(data_loader):
            image_data, label_data = data
            image = fluid.dygraph.to_variable(image_data)
            label = fluid.dygraph.to_variable(label_data)
            
            predict = model(image)
            
            loss = fluid.layers.square_error_cost(predict, label)
            avg_loss = fluid.layers.mean(loss)
            
            if batch_id % 200 == 0:
                print("epoch: {}, batch: {}, loss is: {}".format(epoch_id, batch_id, avg_loss.numpy()))
            
            avg_loss.backward()
            optimizer.minimize(avg_loss)
            model.clear_gradients()

    fluid.save_dygraph(model.state_dict(), 'mnist')

loading mnist dataset from ./work/mnist.json.gz ......
epoch: 0, batch: 0, loss is: [47.428585]
epoch: 0, batch: 200, loss is: [4.3324847]
epoch: 0, batch: 400, loss is: [4.013606]
epoch: 1, batch: 0, loss is: [4.2797394]
epoch: 1, batch: 200, loss is: [3.1092007]
epoch: 1, batch: 400, loss is: [3.254475]
epoch: 2, batch: 0, loss is: [3.7360413]
epoch: 2, batch: 200, loss is: [3.185952]
epoch: 2, batch: 400, loss is: [3.7419617]


从异步数据读取的训练结果来看，损失函数下降与同步数据读取训练结果基本一致。