如果需要训练的数据大小不大，例如不到1G，那么可以直接全部读入内存中进行训练，这样一般效率最高。

但如果需要训练的数据很大，例如超过10G，无法一次载入内存，那么通常需要在训练的过程中分批逐渐读入。

使用 tf.data API 可以构建数据输入管道，轻松处理大量的数据，不同的数据格式，以及不同的数据转换。

## 1 从numpy array构建数据管道

In [1]:
import tensorflow as tf
import numpy as np 


  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [2]:
from sklearn import datasets 

In [3]:
# 使用iris 数据集
iris = datasets.load_iris()

In [4]:
ds1 = tf.data.Dataset.from_tensor_slices((iris["data"], iris["target"]))
for features, label in ds1.take(5):
#     print(features)
#     print(label, "label")
    print(features,'\n',label)

tf.Tensor([5.1 3.5 1.4 0.2], shape=(4,), dtype=float64) 
 tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor([4.9 3.  1.4 0.2], shape=(4,), dtype=float64) 
 tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor([4.7 3.2 1.3 0.2], shape=(4,), dtype=float64) 
 tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor([4.6 3.1 1.5 0.2], shape=(4,), dtype=float64) 
 tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor([5.  3.6 1.4 0.2], shape=(4,), dtype=float64) 
 tf.Tensor(0, shape=(), dtype=int32)


## 2从 Pandas DataFrame构建数据管道

In [5]:
import pandas as pd
iris = datasets.load_iris()
dfiris = pd.DataFrame(iris["data"],columns = iris.feature_names)
ds2 = tf.data.Dataset.from_tensor_slices((dfiris.to_dict("list"),iris["target"]))

for features,label in ds2.take(3):
    print(features,'\n',label)

{'sepal length (cm)': <tf.Tensor: id=46, shape=(), dtype=float32, numpy=5.1>, 'sepal width (cm)': <tf.Tensor: id=47, shape=(), dtype=float32, numpy=3.5>, 'petal length (cm)': <tf.Tensor: id=44, shape=(), dtype=float32, numpy=1.4>, 'petal width (cm)': <tf.Tensor: id=45, shape=(), dtype=float32, numpy=0.2>} 
 tf.Tensor(0, shape=(), dtype=int32)
{'sepal length (cm)': <tf.Tensor: id=56, shape=(), dtype=float32, numpy=4.9>, 'sepal width (cm)': <tf.Tensor: id=57, shape=(), dtype=float32, numpy=3.0>, 'petal length (cm)': <tf.Tensor: id=54, shape=(), dtype=float32, numpy=1.4>, 'petal width (cm)': <tf.Tensor: id=55, shape=(), dtype=float32, numpy=0.2>} 
 tf.Tensor(0, shape=(), dtype=int32)
{'sepal length (cm)': <tf.Tensor: id=66, shape=(), dtype=float32, numpy=4.7>, 'sepal width (cm)': <tf.Tensor: id=67, shape=(), dtype=float32, numpy=3.2>, 'petal length (cm)': <tf.Tensor: id=64, shape=(), dtype=float32, numpy=1.3>, 'petal width (cm)': <tf.Tensor: id=65, shape=(), dtype=float32, numpy=0.2>} 
 t

## 3 从Python generator构建数据管道

In [6]:
from matplotlib import pyplot as plt 
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [7]:
# # 定义一个从文件中读取图片的generator
# image_generator = ImageDataGenerator(rescale=1.0/255).flow_from_directory(
#                     "./data/cifar2/test/",
#                     target_size=(32, 32),
#                     batch_size=20,
#                     class_mode='binary')

# classdict = image_generator.class_indices
# print(classdict)

# def generator():
#     for features,label in image_generator:
#         yield (features,label)

# ds3 = tf.data.Dataset.from_generator(generator,output_types=(tf.float32,tf.int32))

In [8]:
# %matplotlib inline
# %config InlineBackend.figure_format = 'svg'
# plt.figure(figsize=(6,6)) 
# for i,(img,label) in enumerate(ds3.unbatch().take(9)):
#     ax=plt.subplot(3,3,i+1)
#     ax.imshow(img.numpy())
#     ax.set_title("label = %d"%label)
#     ax.set_xticks([])
#     ax.set_yticks([]) 
# plt.show()

# 应用数据转换


In [16]:
# map 将转换函数映射到数据集的每一个元素
ds = tf.data.Dataset.from_tensor_slices(["H, h", "S, s", "Z, z"])
ds_map = ds.map(lambda x:tf.strings.split(x, " "))
ds_map1 = ds.map(lambda x:tf.strings.split(x, ", "))
print(ds_map,'\n', ds)
for x in ds_map:
    print(x)
for y in ds_map1:
    print(y)

<MapDataset shapes: (None,), types: tf.string> 
 <TensorSliceDataset shapes: (), types: tf.string>
tf.Tensor([b'H,' b'h'], shape=(2,), dtype=string)
tf.Tensor([b'S,' b's'], shape=(2,), dtype=string)
tf.Tensor([b'Z,' b'z'], shape=(2,), dtype=string)
tf.Tensor([b'H' b'h'], shape=(2,), dtype=string)
tf.Tensor([b'S' b's'], shape=(2,), dtype=string)
tf.Tensor([b'Z' b'z'], shape=(2,), dtype=string)


In [17]:
#flat_map:将转换函数映射到数据集的每一个元素，并将嵌套的Dataset压平。

ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
ds_flatmap = ds.flat_map(lambda x:tf.data.Dataset.from_tensor_slices(tf.strings.split(x," ")))
for x in ds_flatmap:
    print(x)

tf.Tensor(b'hello', shape=(), dtype=string)
tf.Tensor(b'world', shape=(), dtype=string)
tf.Tensor(b'hello', shape=(), dtype=string)
tf.Tensor(b'China', shape=(), dtype=string)
tf.Tensor(b'hello', shape=(), dtype=string)
tf.Tensor(b'Beijing', shape=(), dtype=string)


In [18]:
# interleave: 效果类似flat_map,但可以将不同来源的数据夹在一起。

ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
ds_interleave = ds.interleave(lambda x:tf.data.Dataset.from_tensor_slices(tf.strings.split(x," ")))
for x in ds_interleave:
    print(x)

tf.Tensor(b'hello', shape=(), dtype=string)
tf.Tensor(b'hello', shape=(), dtype=string)
tf.Tensor(b'hello', shape=(), dtype=string)
tf.Tensor(b'world', shape=(), dtype=string)
tf.Tensor(b'China', shape=(), dtype=string)
tf.Tensor(b'Beijing', shape=(), dtype=string)


In [19]:
#filter:过滤掉某些元素。

ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
#找出含有字母a或B的元素
ds_filter = ds.filter(lambda x: tf.strings.regex_full_match(x, ".*[a|B].*"))
for x in ds_filter:
    print(x)

tf.Tensor(b'hello China', shape=(), dtype=string)
tf.Tensor(b'hello Beijing', shape=(), dtype=string)


In [26]:
#zip:将两个长度相同的Dataset横向铰合。
ds1 = tf.data.Dataset.range(0,3)
ds2 = tf.data.Dataset.range(3,6)
ds3 = tf.data.Dataset.range(6,9)
print(ds1)
ds_zip = tf.data.Dataset.zip((ds1,ds2,ds3))
for x,y,z in ds_zip:
    print(x.numpy(),y.numpy(),z.numpy())

<RangeDataset shapes: (), types: tf.int64>
0 3 6
1 4 7
2 5 8


In [36]:
#condatenate:将两个Dataset纵向连接。

ds1 = tf.data.Dataset.range(0,3)
ds2 = tf.data.Dataset.range(3,6)
ds_concat = tf.data.Dataset.concatenate(ds1,ds2)
# print(tf.rank(ds_concat))
for x in ds_concat:
    print(x)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)


In [37]:
#reduce:执行归并操作。

ds = tf.data.Dataset.from_tensor_slices([1,2,3,4,5.0])
result = ds.reduce(0.0,lambda x,y:tf.add(x,y))
result

<tf.Tensor: id=1238, shape=(), dtype=float32, numpy=15.0>

In [38]:
#batch:构建批次，每次放一个批次。比原始数据增加一个维度。 其逆操作为unbatch。 

ds = tf.data.Dataset.range(12)
ds_batch = ds.batch(4)
for x in ds_batch:
    print(x)

tf.Tensor([0 1 2 3], shape=(4,), dtype=int64)
tf.Tensor([4 5 6 7], shape=(4,), dtype=int64)
tf.Tensor([ 8  9 10 11], shape=(4,), dtype=int64)


In [39]:
#padded_batch:构建批次，类似batch, 但可以填充到相同的形状。

elements = [[1, 2],[3, 4, 5],[6, 7],[8]]
ds = tf.data.Dataset.from_generator(lambda: iter(elements), tf.int32)

ds_padded_batch = ds.padded_batch(2,padded_shapes = [4,])
for x in ds_padded_batch:
    print(x)

Instructions for updating:
tf.py_func is deprecated in TF V2. Instead, there are two
    options available in V2.
    - tf.py_function takes a python function which manipulates tf eager
    tensors instead of numpy arrays. It's easy to convert a tf eager tensor to
    an ndarray (just call tensor.numpy()) but having access to eager tensors
    means `tf.py_function`s can use accelerators such as GPUs as well as
    being differentiable using a gradient tape.
    - tf.numpy_function maintains the semantics of the deprecated tf.py_func
    (it is not differentiable, and manipulates numpy arrays). It drops the
    stateful argument making all functions stateful.
    
tf.Tensor(
[[1 2 0 0]
 [3 4 5 0]], shape=(2, 4), dtype=int32)
tf.Tensor(
[[6 7 0 0]
 [8 0 0 0]], shape=(2, 4), dtype=int32)


In [46]:
#window:构建滑动窗口，返回Dataset of Dataset.

ds = tf.data.Dataset.range(12)
#window返回的是Dataset of Dataset,可以用flat_map压平
ds_window = ds.window(3, shift=1).flat_map(lambda x: x.batch(3,drop_remainder=True)) 
ds_window1 = ds.window(3, shift=1)
for x in ds_window:
    print(x)
for x in ds_window1:
    print(x)

tf.Tensor([0 1 2], shape=(3,), dtype=int64)
tf.Tensor([1 2 3], shape=(3,), dtype=int64)
tf.Tensor([2 3 4], shape=(3,), dtype=int64)
tf.Tensor([3 4 5], shape=(3,), dtype=int64)
tf.Tensor([4 5 6], shape=(3,), dtype=int64)
tf.Tensor([5 6 7], shape=(3,), dtype=int64)
tf.Tensor([6 7 8], shape=(3,), dtype=int64)
tf.Tensor([7 8 9], shape=(3,), dtype=int64)
tf.Tensor([ 8  9 10], shape=(3,), dtype=int64)
tf.Tensor([ 9 10 11], shape=(3,), dtype=int64)
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>
<_VariantDataset shapes: (), types: tf.int64>


In [51]:
#shuffle:数据顺序洗牌。

ds = tf.data.Dataset.range(12)
ds_shuffle = ds.shuffle(buffer_size = 5)
for x in ds_shuffle:
    print(x)

tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(7, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(9, shape=(), dtype=int64)
tf.Tensor(10, shape=(), dtype=int64)
tf.Tensor(11, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)


In [52]:
#repeat:重复数据若干次，不带参数时，重复无数次。

ds = tf.data.Dataset.range(3)
ds_repeat = ds.repeat(3)
for x in ds_repeat:
    print(x)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)


In [53]:
#shard:采样，从某个位置开始隔固定距离采样一个元素。

ds = tf.data.Dataset.range(12)
ds_shard = ds.shard(3,index = 1)

for x in ds_shard:
    print(x)

tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(7, shape=(), dtype=int64)
tf.Tensor(10, shape=(), dtype=int64)


# 三，提升管道性能
训练深度学习模型常常会非常耗时。模型训练的耗时主要来自于两个部分，一部分来自数据准备，另一部分来自参数迭代,而数据准备过程的耗时则可以通过构建高效的数据管道进行提升。

In [54]:
#打印时间分割线
@tf.function
def printbar():
    ts = tf.timestamp()
    today_ts = ts%(24*60*60)

    hour = tf.cast(today_ts//3600+8,tf.int32)%tf.constant(24)
    minite = tf.cast((today_ts%3600)//60,tf.int32)
    second = tf.cast(tf.floor(today_ts%60),tf.int32)

    def timeformat(m):
        if tf.strings.length(tf.strings.format("{}",m))==1:
            return(tf.strings.format("0{}",m))
        else:
            return(tf.strings.format("{}",m))

    timestring = tf.strings.join([timeformat(hour),timeformat(minite),
                timeformat(second)],separator = ":")
    tf.print("=========="*8,end = "")
    tf.print(timestring)

In [55]:
import time

# 数据准备和参数迭代两个过程默认情况下是串行的。

# 模拟数据准备
def generator():
    for i in range(10):
        #假设每次准备数据需要2s
        time.sleep(2) 
        yield i 
ds = tf.data.Dataset.from_generator(generator,output_types = (tf.int32))

# 模拟参数迭代
def train_step():
    #假设每一步训练需要1s
    time.sleep(1)

In [56]:
# 训练过程预计耗时 10*2+10*1+ = 30s
printbar()
tf.print(tf.constant("start training..."))
for x in ds:
    train_step()  
printbar()
tf.print(tf.constant("end training..."))

start training...
end training...


In [57]:
# 使用 prefetch 方法让数据准备和参数迭代两个过程相互并行。

# 训练过程预计耗时 max(10*2,10*1) = 20s
printbar()
tf.print(tf.constant("start training with prefetch..."))

# tf.data.experimental.AUTOTUNE 可以让程序自动选择合适的参数
for x in ds.prefetch(buffer_size = tf.data.experimental.AUTOTUNE):
    train_step()  

printbar()
tf.print(tf.constant("end training..."))

start training with prefetch...
end training...
