In [1]:
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)

# Is this notebook running on Colab or Kaggle?
IS_COLAB = "google.colab" in sys.modules
IS_KAGGLE = "kaggle_secrets" in sys.modules

if IS_COLAB or IS_KAGGLE:
    %pip install -q -U tfx
    print("You can safely ignore the package incompatibility errors.")

# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"

# TensorFlow ≥2.0 is required
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

# Common imports
import numpy as np
import os

# to make this notebook's output stable across runs
np.random.seed(42)

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "data"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

## 什么是数据API

In [13]:
# 首先来创建一个小数据集来做示例
X = tf.range(10)
# from_tensor_slices可以将X的切片（沿第一维度去切）组合成一个tf.data.Dataset对象
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset

<TensorSliceDataset shapes: (), types: tf.int32>

就相当于这么做:

In [3]:
dataset = tf.data.Dataset.range(10)
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(7, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(9, shape=(), dtype=int64)


### 链式转换

将数据集放入到Dataset对象之后就可以使用链式表达式来对其进行操作了

In [14]:
# 比方说将原数据集复制三次，然后每批7个元素分批输出
# 这里要注意，使用repeat方法复制并不会真的在内存中复制3次，并且如果不给repeat()方法传参，他就会复制无数遍数据集
# 虽然不会真的复制，链式转换返回的数据集是真是存在于内存中的
# 对于batch方法，向其中传递的参数是每批次中元素的个数，可看到最后的一批中只有两个元素
# 如果在batch()方法中设置rop_remainder = True，那么就可以删除不满足元素数量要求的批
dataset = dataset.repeat(3).batch(7, drop_remainder = False)
for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


In [15]:
# 使用map方法可以操作数据集中所有元素
# 比如说要将数据集中的元素都乘2
dataset = dataset.map(lambda x: x * 2)

In [16]:
for item in dataset:
    print(item)

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int32)
tf.Tensor([16 18], shape=(2,), dtype=int32)


In [17]:
# 使用unbatch方法可以将所有批次中的元素都变成一个张量
dataset = dataset.unbatch()

In [20]:
# 使用filter方法可以简单对每个元素进行过滤
# 下面语句的含义是：值保留小于10的元素
dataset = dataset.filter(lambda x: x < 10)  # keep only items < 10

In [21]:
for item in dataset.take(3):
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)


### 乱序数据

已知，当训练集中各实例都相互独立且分布均匀的时候，梯度下降最快  
所以使用数据API的shuffle()方法来对数据集进行混洗就很有必要了  
实现混洗的原理是，在一开始把数据集的前几个元素放进缓冲区中，每次从缓冲区中随机抽出一个实例，并从剩余实例中抽一个实例放进缓冲区中，直到数据集中所有的实例都遍历完成  
  
这就像是洗牌，左边有一堆牌，最开始只拿上面的三张牌，随机抽出一张牌放到右边，然后再从左边拿一张牌，再从三张牌中随机抽出一张牌放到右边，...直到所有的牌都在右边  
这里的三张牌指的就是缓冲区的容量，很明显，缓冲区得大一点效果才能好。但是实践证明并不能太大

In [22]:
tf.random.set_seed(42)

# 使用range方法创建一个Dataset对象，重复3次
dataset = tf.data.Dataset.range(10).repeat(3)
# 将数据集混洗，缓冲区容量为3，每个批次元素数量为7
dataset = dataset.shuffle(buffer_size=3, seed=42).batch(7)
for item in dataset:
    print(item)

tf.Tensor([1 3 0 4 2 5 6], shape=(7,), dtype=int64)
tf.Tensor([8 7 1 0 3 2 5], shape=(7,), dtype=int64)
tf.Tensor([4 6 9 8 9 7 0], shape=(7,), dtype=int64)
tf.Tensor([3 1 4 5 2 8 7], shape=(7,), dtype=int64)
tf.Tensor([6 9], shape=(2,), dtype=int64)


In [24]:
# 如果在原本已经混洗过的数据集上再复制一次，也是一个非常好的点子
# 并且此时复制出的不仅仅是元素，还有混洗的操作
# 可以看到复制出来的批也是混洗之后的
for item in dataset.repeat(3):
    print(item)

tf.Tensor([1 3 0 2 6 7 5], shape=(7,), dtype=int64)
tf.Tensor([8 9 4 0 2 1 3], shape=(7,), dtype=int64)
tf.Tensor([4 6 7 8 9 1 2], shape=(7,), dtype=int64)
tf.Tensor([0 5 4 6 3 5 9], shape=(7,), dtype=int64)
tf.Tensor([7 8], shape=(2,), dtype=int64)
tf.Tensor([0 1 3 4 5 6 7], shape=(7,), dtype=int64)
tf.Tensor([2 8 1 2 0 9 3], shape=(7,), dtype=int64)
tf.Tensor([6 5 7 4 0 9 2], shape=(7,), dtype=int64)
tf.Tensor([1 3 5 8 4 7 8], shape=(7,), dtype=int64)
tf.Tensor([9 6], shape=(2,), dtype=int64)
tf.Tensor([2 3 1 0 5 6 8], shape=(7,), dtype=int64)
tf.Tensor([7 9 0 4 3 4 5], shape=(7,), dtype=int64)
tf.Tensor([1 2 8 7 6 9 0], shape=(7,), dtype=int64)
tf.Tensor([2 1 4 5 3 7 6], shape=(7,), dtype=int64)
tf.Tensor([9 8], shape=(2,), dtype=int64)


In [32]:
# 如果想在复制的同时不打乱原有混洗的顺序，可以在shuffle()方法中设置reshuffle_each_iteration = False
dataset = tf.data.Dataset.range(10).repeat(3)
dataset = dataset.shuffle(buffer_size=3, seed=42, reshuffle_each_iteration = False).batch(7)

# 此时再对混洗之后的批数据集进行复制，就会保持原有的混洗顺序进行复制了
for item in dataset.repeat(3):
    print(item)

tf.Tensor([0 2 3 5 6 4 8], shape=(7,), dtype=int64)
tf.Tensor([9 0 1 1 7 3 2], shape=(7,), dtype=int64)
tf.Tensor([4 5 7 8 9 0 6], shape=(7,), dtype=int64)
tf.Tensor([1 2 4 6 7 3 9], shape=(7,), dtype=int64)
tf.Tensor([8 5], shape=(2,), dtype=int64)
tf.Tensor([0 2 3 5 6 4 8], shape=(7,), dtype=int64)
tf.Tensor([9 0 1 1 7 3 2], shape=(7,), dtype=int64)
tf.Tensor([4 5 7 8 9 0 6], shape=(7,), dtype=int64)
tf.Tensor([1 2 4 6 7 3 9], shape=(7,), dtype=int64)
tf.Tensor([8 5], shape=(2,), dtype=int64)
tf.Tensor([0 2 3 5 6 4 8], shape=(7,), dtype=int64)
tf.Tensor([9 0 1 1 7 3 2], shape=(7,), dtype=int64)
tf.Tensor([4 5 7 8 9 0 6], shape=(7,), dtype=int64)
tf.Tensor([1 2 4 6 7 3 9], shape=(7,), dtype=int64)
tf.Tensor([8 5], shape=(2,), dtype=int64)


### 同时对多个CSV文件进行混洗

当面临的数据集非常非常大的时候，使用shuffle对数据集进行混洗可能就不太够用了，毕竟缓冲区相对于整个数据集的大小来说可能有点小马拉大车了  
但是可以在使用shuffle之前使用别的方法来对原始完整数据集进行一个预混洗，然后再使用shuffle  
  
  
或者将原数据集分成很多个子文件，然后使用shuffle分别对各个子文件进行混洗后再组合到一起，但是这么做有可能会导致同一个文件中的数据仍然十分接近。  
为了避免这种情况可以同时随机读取多个文件的数据，并且使用shuffle方法建立一个公用的缓冲区。虽然这听起来很麻烦，其中还得会用到多线程，Data API只需要几行代码就能实现

In [41]:
# 这次拿加利福尼亚的housing数据来做示例
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)

# 准备好训练集的数据，等会儿就混洗训练集的数据
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_

#### 将数据集分成多个csv文件

In [44]:
def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = os.path.join("datasets", "housing")
    os.makedirs(housing_dir, exist_ok=True)
    path_format = os.path.join(housing_dir, "my_{}_{:02d}.csv")

    filepaths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filepaths.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([repr(col) for col in data[row_idx]]))
                f.write("\n")
                
    # 在文件都保存好之后返回所有文件的地址
    return filepaths

In [45]:
train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)

train_filepaths = save_to_multiple_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_multiple_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_multiple_csv_files(test_data, "test", header, n_parts=10)

In [46]:
import pandas as pd

# 来测试一下看看能不能正常读取
pd.read_csv(train_filepaths[0]).head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
0,3.5214,15.0,3.049945,1.106548,1447.0,1.605993,37.63,-122.43,1.442
1,5.3275,5.0,6.49006,0.991054,3464.0,3.44334,33.69,-117.39,1.687
2,3.1,29.0,7.542373,1.591525,1328.0,2.250847,38.44,-122.98,1.621
3,7.1736,12.0,6.289003,0.997442,1054.0,2.695652,33.55,-117.7,2.621
4,2.0549,13.0,5.312457,1.085092,3297.0,2.244384,33.93,-116.93,0.956


In [47]:
with open(train_filepaths[0]) as f:
    for i in range(5):
        print(f.readline(), end="")

MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
3.5214,15.0,3.0499445061043287,1.106548279689234,1447.0,1.6059933407325193,37.63,-122.43,1.442
5.3275,5.0,6.490059642147117,0.9910536779324056,3464.0,3.4433399602385686,33.69,-117.39,1.687
3.1,29.0,7.5423728813559325,1.5915254237288134,1328.0,2.2508474576271187,38.44,-122.98,1.621
7.1736,12.0,6.289002557544757,0.9974424552429667,1054.0,2.6956521739130435,33.55,-117.7,2.621


In [52]:
# 这个就是包含所有训练集数据的文件列表
train_filepaths

['datasets\\housing\\my_train_00.csv',
 'datasets\\housing\\my_train_01.csv',
 'datasets\\housing\\my_train_02.csv',
 'datasets\\housing\\my_train_03.csv',
 'datasets\\housing\\my_train_04.csv',
 'datasets\\housing\\my_train_05.csv',
 'datasets\\housing\\my_train_06.csv',
 'datasets\\housing\\my_train_07.csv',
 'datasets\\housing\\my_train_08.csv',
 'datasets\\housing\\my_train_09.csv',
 'datasets\\housing\\my_train_10.csv',
 'datasets\\housing\\my_train_11.csv',
 'datasets\\housing\\my_train_12.csv',
 'datasets\\housing\\my_train_13.csv',
 'datasets\\housing\\my_train_14.csv',
 'datasets\\housing\\my_train_15.csv',
 'datasets\\housing\\my_train_16.csv',
 'datasets\\housing\\my_train_17.csv',
 'datasets\\housing\\my_train_18.csv',
 'datasets\\housing\\my_train_19.csv']

#### 创建一个Input Pipline

In [53]:
# 接下来将使用tf.data.Dataset.list_files函数创建一个文件路径数据集
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)

In [60]:
# 文件路径数据集是包含所有文件路径的张量，以二进制的形式保存
# 并且文件名是以乱序来保存的，所以可以看到filepath_dataset是ShuffleDataset类型的数据集
# 将文件名打乱后保存是一个很好的做法，毕竟数据越乱序越好嘛，但是也可以使用shuffle = False取消乱序
print(filepath_dataset)
for filepath in filepath_dataset:
    print(filepath)

<ShuffleDataset shapes: (), types: tf.string>
tf.Tensor(b'datasets\\housing\\my_train_15.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_09.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_01.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_03.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_13.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_08.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_11.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_18.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_19.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_00.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_14.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_04.csv', shape=(), dtype=string)
tf.Tensor(b'datasets\\housing\\my_train_05.csv', shape=(), dtype=string)
tf.Te

接下来就将使用数据集对象的`interleave`方法将各个文件路径对应的数据交织起来  
`interleave`方法的第一个参数就是可以映射每个元素（这里的元素时文件地址）的函数（要求放回一个数据子集），然后再使用函数返回的数据子集进行交织  
交织的方法就是每次都从多个数据子集中取一行元素，直到数都遍历完了为止。其中`cycle_length`就是设置一次从多少个数据子集中取数  
  
这里给`interleave`传递的函数含义就是使用`TextLineDataset`类来读取每个文件的数据，然后返回数据子集（去掉第一行）  
向`TextLineDataset`类传递一个或多个文本数据的文件地址可以返回一整个数据集，传递的不一定要字符串张量，Python字符串也行

In [72]:
n_readers = 5
#tf.data.experimental.AUTOTUNE = 1
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=n_readers)

为了交织的效果能好，各个数据文件的长度最好相等，不然最长文件的结尾将不会被交织  
默认情况下数据集对象的`interleave`方法不会使用并行，但是可以通过设置`num_parallel_calls`参数来设置来并行的线程数  
再或者设置`tf.data.experimental.AUTOTUNE = 1`，这样TensorFlow就会根据可用的CPU动态选择合适的线程

In [73]:
# 看看数据集前5行
for line in dataset.take(5):
    print(line.numpy())

b'3.3456,37.0,4.514084507042254,0.9084507042253521,458.0,3.2253521126760565,36.67,-121.7,2.526'
b'2.4792,24.0,3.4547038327526134,1.1341463414634145,2251.0,3.921602787456446,34.18,-118.38,2.0'
b'4.5909,16.0,5.475877192982456,1.0964912280701755,1357.0,2.9758771929824563,33.63,-117.71,2.418'
b'4.2083,44.0,5.323204419889502,0.9171270718232044,846.0,2.3370165745856353,37.47,-122.2,2.782'
b'3.6875,44.0,4.524475524475524,0.993006993006993,457.0,3.195804195804196,34.04,-118.15,1.625'


在继续进行下去之前，需要首先介绍一下tf中参与数据类型转换的`tf.io.decode_csv`函数，它可以将原本CSV格式的目标序列的每个元素转换成参考序列中各元素的格式，它接受两个非关键字参数是：`records`和`record_defaults`， `records`就是目标序列，`record_defaults`是参考序列

In [74]:
# 注意第四个元素被转换成了字符串
record_defaults=[0, np.nan, tf.constant(np.nan, dtype=tf.float64), "Hello", tf.constant([])]
parsed_fields = tf.io.decode_csv('1,2,3,4,5', record_defaults)
parsed_fields

[<tf.Tensor: shape=(), dtype=int32, numpy=1>,
 <tf.Tensor: shape=(), dtype=float32, numpy=2.0>,
 <tf.Tensor: shape=(), dtype=float64, numpy=3.0>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'4'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]

In [75]:
# 如果目标序列的元素有缺失，就会直接使用参考序列相应位置的元素来代替
parsed_fields = tf.io.decode_csv(',,,,5', record_defaults)
parsed_fields

[<tf.Tensor: shape=(), dtype=int32, numpy=0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=nan>,
 <tf.Tensor: shape=(), dtype=float64, numpy=nan>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'Hello'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]

In [77]:
# 但若在某位置上的目标和参考序列都缺失，那就会报错
# 就比如下面这个例子，目标序列的5缺失了，并且参考序列对应位置上只有个定义：tf.constant([])，并没有值，
# 所以也没办法用参考序列的值进行插补，这时就会这样报错
try:
    parsed_fields = tf.io.decode_csv('1,2,3,4,', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Field 4 is required but missing in record 0! [Op:DecodeCSV]


In [78]:
# 并且目标序列和参考序列的长度一定要一致，不然也会报错
try:
    parsed_fields = tf.io.decode_csv('1,2,3,4,5,6,7', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Expect 5 fields but have 7 in record 0 [Op:DecodeCSV]
