<a href="https://colab.research.google.com/github/LinCheungS/Deep_Learning_TF2/blob/master/tf_dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# tf.Dataset

如果需要训练的数据不大，不到1G，可以全部读入内存中训练，效率最高。

但如果训练的数据很大，超过10G，无法一次载入内存，需要分批逐渐训练

![](https://raw.githubusercontent.com/LinCheungS/PicGo_Image_Storage/master/2020-1/20200505050006.png)

In [0]:
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf
from tensorflow import keras

## tf.data API

### 生成dataset

In [0]:
# 生成dataset,tf.Tensor(0到2, shape=(), dtype=int64)
dataset = tf.data.Dataset.from_tensor_slices(np.arange(3))
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)


In [9]:
# 利用两个列表构建dataset
x = np.array([[1, 2], [3, 4], [5, 6]])
y = np.array(['cat', 'dog', 'fox'])
dataset3 = tf.data.Dataset.from_tensor_slices((x, y))
for item_x, item_y in dataset3:
    print(item_x.numpy(), item_y.numpy())

[1 2] b'cat'
[3 4] b'dog'
[5 6] b'fox'


In [0]:
# 利用字典构建dataset
dataset4 = tf.data.Dataset.from_tensor_slices({"feature": x,"label": y})
for item in dataset4:
    print(item["feature"].numpy(), item["label"].numpy())

[1 2] b'cat'
[3 4] b'dog'
[5 6] b'fox'


### dataset.repeat

In [0]:
# dataset重复多少次
dataset = tf.data.Dataset.from_tensor_slices(np.arange(2))
dataset = dataset.repeat(3)
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)


### dataset.batch

In [0]:
# 分成多少份
dataset = tf.data.Dataset.from_tensor_slices(np.arange(1,10))
dataset = dataset.batch(3)
for item in dataset:
    print(item)

tf.Tensor([1 2 3], shape=(3,), dtype=int64)
tf.Tensor([4 5 6], shape=(3,), dtype=int64)
tf.Tensor([7 8 9], shape=(3,), dtype=int64)


### dataset.interleave

In [0]:
""" 一对多,类似flaten
1. 从Dataset中取出cycle_length个element，
2. 得到cycle_length个新的Dataset对象。,对element应用map_func
3. 从新生成的Dataset中取数据，每个Dataset对象一次取block_length个数据。
4. 当新生成的某个Dataset的对象取尽时，从原Dataset中再取cycle_length个element，
5. 然后apply map_func，以此类推。
"""
dataset2 = dataset.interleave(
    lambda v: tf.data.Dataset.from_tensor_slices(v), # map_fn
    cycle_length = 2, # cycle_length
    block_length = 2, # block_length
)
for item in dataset2:
    print(item)

tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(7, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(9, shape=(), dtype=int64)


In [8]:
ds = tf.data.Dataset.from_tensor_slices([["hello world","hello China"],["hello world","hello China"]])
ds_interleave = ds.interleave(lambda x:tf.data.Dataset.from_tensor_slices(x))
for x in ds_interleave:
    print(x)

tf.Tensor(b'hello world', shape=(), dtype=string)
tf.Tensor(b'hello world', shape=(), dtype=string)
tf.Tensor(b'hello China', shape=(), dtype=string)
tf.Tensor(b'hello China', shape=(), dtype=string)


## Pandas

In [82]:
# 详细操作https://tensorflow.google.cn/tutorials/load_data/pandas_dataframe
import tensorflow as tf
from sklearn import datasets 
import pandas as pd
iris = datasets.load_iris()
dfiris = pd.DataFrame(iris["data"],columns = iris.feature_names)
dfiris.head()

Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm)
0,5.1,3.5,1.4,0.2
1,4.9,3.0,1.4,0.2
2,4.7,3.2,1.3,0.2
3,4.6,3.1,1.5,0.2
4,5.0,3.6,1.4,0.2


In [86]:
ds2 = tf.data.Dataset.from_tensor_slices((dfiris.values,iris["target"]))

for features,label in ds2.take(3):
    print(features,label)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)


## CSV操作

### 转成csv

In [5]:
# 加载california_housing数据
from sklearn.datasets import fetch_california_housing
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
housing = fetch_california_housing()
x_train_all, x_test, y_train_all, y_test = train_test_split(housing.data, housing.target, random_state = 7)
x_train, x_valid, y_train, y_valid = train_test_split(x_train_all, y_train_all, random_state = 11)
scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled = scaler.transform(x_test)
print(x_train_scaled.shape, y_train.shape,x_valid_scaled.shape, y_valid.shape,x_test_scaled.shape, y_test.shape)

(11610, 8) (11610,) (3870, 8) (3870,) (5160, 8) (5160,)


In [0]:
"""
   output_dir = 输出路径,data =原始数据,name_prefix =前缀名字,
   header = feature的label, n_parts = 分成几份
"""
def save_to_csv(output_dir, data, name_prefix, header=None, n_parts=10):
    # 拼接输出路径和输出文件名字
    path_format = os.path.join(output_dir, "{}_{:02d}.csv")
    # 存储路径名字
    file_dir_names = []
    # 生成data长度个index,并分成n_parts份
    index_n_parts = np.array_split(np.arange(len(data)), n_parts)
    """枚举index_n_parts, 得到序号和dataset的index"""
    for file_idx, row_indices in enumerate(index_n_parts):
        # 对每一份n_parts命名,例如train_01,valid_01
        part_csv = path_format.format(name_prefix, file_idx)
        # 将文件名追加到file_dir_names
        file_dir_names.append(part_csv)
        """将数据写入每一份n_parts"""
        with open(part_csv, "wt", encoding="utf-8") as f:
            # 如果header不是空的,写入header到第一行
            if header is not None:
                f.write(header + "\n")
            # 将这一份的n_parts写入train/valid/test_0x.csv中
            for row_index in row_indices:
                # 做字符串化处理,转成字符串
                f.write(",".join([repr(col) for col in data[row_index]]))
                f.write('\n')
    # 返回路径名
    return file_dir_names

In [7]:
# 不存在,则创建数据文件夹
output_dir = "generate_csv"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

# 拼接x和y,np.row_stack((a,b)),np.column_stack((a,b))
train_data = np.column_stack([x_train_scaled, y_train])
valid_data = np.column_stack([x_valid_scaled, y_valid])
test_data = np.column_stack([x_test_scaled, y_test])

# 把名字转成csv格式
header_cols = housing.feature_names + ["price_y"]
header_str = ",".join(header_cols)

# 生成csv文件
train_filenames = save_to_csv(output_dir, train_data, "train",header_str, n_parts=20)
valid_filenames = save_to_csv(output_dir, valid_data, "valid",header_str, n_parts=10)
test_filenames = save_to_csv(output_dir, test_data, "test",header_str, n_parts=10)

print(train_filenames)
print(valid_filenames)
print(test_filenames)

['generate_csv/train_00.csv', 'generate_csv/train_01.csv', 'generate_csv/train_02.csv', 'generate_csv/train_03.csv', 'generate_csv/train_04.csv', 'generate_csv/train_05.csv', 'generate_csv/train_06.csv', 'generate_csv/train_07.csv', 'generate_csv/train_08.csv', 'generate_csv/train_09.csv', 'generate_csv/train_10.csv', 'generate_csv/train_11.csv', 'generate_csv/train_12.csv', 'generate_csv/train_13.csv', 'generate_csv/train_14.csv', 'generate_csv/train_15.csv', 'generate_csv/train_16.csv', 'generate_csv/train_17.csv', 'generate_csv/train_18.csv', 'generate_csv/train_19.csv']
['generate_csv/valid_00.csv', 'generate_csv/valid_01.csv', 'generate_csv/valid_02.csv', 'generate_csv/valid_03.csv', 'generate_csv/valid_04.csv', 'generate_csv/valid_05.csv', 'generate_csv/valid_06.csv', 'generate_csv/valid_07.csv', 'generate_csv/valid_08.csv', 'generate_csv/valid_09.csv']
['generate_csv/test_00.csv', 'generate_csv/test_01.csv', 'generate_csv/test_02.csv', 'generate_csv/test_03.csv', 'generate_csv/t

### 读取csv

In [8]:
"""# https://tensorflow.google.cn/tutorials/load_data/csv
ds4 = tf.data.experimental.make_csv_dataset(
      file_pattern = ["/content/generate_csv/train_00.csv"],
      batch_size=300, 
      label_name="MidianHouseValue",
      na_value="",
      num_epochs=1,
      ignore_errors=True)"""

'# https://tensorflow.google.cn/tutorials/load_data/csv\nds4 = tf.data.experimental.make_csv_dataset(\n      file_pattern = ["/content/generate_csv/train_00.csv"],\n      batch_size=300, \n      label_name="MidianHouseValue",\n      na_value="",\n      num_epochs=1,\n      ignore_errors=True)'

In [9]:
"""核心方法: tf.io.decode_csv(str, record_defaults)"""
# 定义了数据
sample_str = '1,2,3'
# 定义了数据类型
record_defaults = [
    tf.constant(0,dtype=tf.int32),
    tf.constant(1.0,dtype=tf.float32),
    tf.constant("0",dtype=tf.string),]
parsed_fields = tf.io.decode_csv(sample_str, record_defaults)
print(parsed_fields)

[<tf.Tensor: shape=(), dtype=int32, numpy=1>, <tf.Tensor: shape=(), dtype=float32, numpy=2.0>, <tf.Tensor: shape=(), dtype=string, numpy=b'3'>]


In [10]:
# 1. filename -> dataset
# 2. read file -> dataset -> datasets -> merge
# 3. parse csv

def parse_csv_line(line, n_fields = 9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    return x, y

def csv_reader_dataset(filenames, n_readers=5,batch_size=32, n_parse_threads=5,
                       shuffle_buffer_size=10000):
    # 将文件路径封装在tf.datset
    dataset = tf.data.Dataset.list_files(filenames)
    # dataset = dataset.repeat()
    # 根据文件名, 一行一行读取数据, 可视化用take
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),# 跳过header
        cycle_length = n_readers
    )
    # 打乱数据
    dataset.shuffle(shuffle_buffer_size)
    # 调用parse_csv_line函数, 将每一行的每一个数据分开,分成x,y
    dataset = dataset.map(parse_csv_line,num_parallel_calls=n_parse_threads)
    # 规定多少数据为一批
    dataset = dataset.batch(batch_size)
    return dataset



train_set = csv_reader_dataset(train_filenames, batch_size=3)
for x_batch, y_batch in train_set.take(1):
    print("x:\n",x_batch)
    print("y:\n",y_batch)

x:
 tf.Tensor(
[[-1.0591781e+00  1.3935647e+00 -2.6331969e-02 -1.1006760e-01
  -6.1381990e-01 -9.6959352e-02  3.2471311e-01 -3.7477244e-02]
 [ 4.0127665e-01 -9.2934215e-01 -5.3330503e-02 -1.8659453e-01
   6.5456617e-01  2.6434466e-02  9.3125278e-01 -1.4406418e+00]
 [ 4.9710345e-02 -8.4924191e-01 -6.2146995e-02  1.7878747e-01
  -8.0253541e-01  5.0660671e-04  6.4664572e-01 -1.1060793e+00]], shape=(3, 8), dtype=float32)
y:
 tf.Tensor(
[[0.672]
 [2.512]
 [2.286]], shape=(3, 1), dtype=float32)


In [11]:
"""例子"""
# 生成完整数据
batch_size = 32
train_set = csv_reader_dataset(train_filenames,batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames,batch_size = batch_size)
test_set = csv_reader_dataset(test_filenames,batch_size = batch_size)
# 训练
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu',input_shape=[8]),#修改输入维度
    keras.layers.Dense(1),])
model.compile(loss="mean_squared_error", optimizer="sgd")
callbacks = [keras.callbacks.EarlyStopping(patience=2, min_delta=1e-2)]
history = model.fit(train_set, # train_set 包含x和y
                    validation_data = valid_set, # valid_set 包含x和y
                    epochs = 100,
                    callbacks = callbacks)
# 衡量
model.evaluate(test_set)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100


0.3884371519088745

## tfrecord

### 基础API

In [13]:
"""
tfrecord 文件格式
-> tf.train.Example
   -> tf.train.Features -> {"key": tf.train.Feature}
      -> tf.train.Feature -> tf.train.ByteList/FloatList/Int64List
"""
# 包装成 tf.train.ByteList/FloatList/Int64List
favorite_books = [name.encode('utf-8') for name in ["machine learning", "cc150"]]
favorite_books_bytelist = tf.train.BytesList(value = favorite_books)
hours_floatlist = tf.train.FloatList(value = [15.5, 9.5, 7.0, 8.0])
age_int64list = tf.train.Int64List(value = [42])

# 包装成 train.Features 由 train.Feature
features = tf.train.Features(
    feature = {
        "favorite_books": tf.train.Feature(
            bytes_list = favorite_books_bytelist),
        "hours": tf.train.Feature(
            float_list = hours_floatlist),
        "age": tf.train.Feature(int64_list = age_int64list),
    }
)

# 包装成tf.train.Example
example = tf.train.Example(features=features)
# 压缩成16进制
serialized_example = example.SerializeToString()

print(type(favorite_books_bytelist))
print(type(features))
print(type(example))
print(serialized_example)

<class 'tensorflow.core.example.feature_pb2.BytesList'>
<class 'tensorflow.core.example.feature_pb2.Features'>
<class 'tensorflow.core.example.example_pb2.Example'>
b'\n\\\n-\n\x0efavorite_books\x12\x1b\n\x19\n\x10machine learning\n\x05cc150\n\x0c\n\x03age\x12\x05\x1a\x03\n\x01*\n\x1d\n\x05hours\x12\x14\x12\x12\n\x10\x00\x00xA\x00\x00\x18A\x00\x00\xe0@\x00\x00\x00A'


In [0]:
output_dir = 'tfrecord_basic'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

"""保存成tfrecord文件,读取tfrecord文件,解析tfrecord文件"""
# 保存成tfrecord文件
filename_fullpath = os.path.join("tfrecord_basic", "test.tfrecords")
with tf.io.TFRecordWriter(filename_fullpath) as writer:
        writer.write(serialized_example)

# 读取读取tfrecord文件,16进制
dataset = tf.data.TFRecordDataset([filename_fullpath])
# 解析的规则, 字典的形式
expected_features = {
    "favorite_books": tf.io.VarLenFeature(dtype = tf.string),
    "hours": tf.io.VarLenFeature(dtype = tf.float32),
    "age": tf.io.FixedLenFeature([], dtype = tf.int64),}

for serialized_example_tensor in dataset:
    # 将规则和数据传入,解析tfrecord文件
    example = tf.io.parse_single_example(serialized_example_tensor,expected_features)
    books = tf.sparse.to_dense(example["favorite_books"])
    for book in books:
        print(book.numpy().decode("UTF-8"))

machine learning
cc150


In [0]:
"""保存压缩tfrecord,读取压缩tfrecord,解析tfrecord"""
filename_fullpath_zip = filename_fullpath + '.zip'
options = tf.io.TFRecordOptions(compression_type = "GZIP")
# 传入TFRecordOptions,比如compression_type = "GZIP"
with tf.io.TFRecordWriter(filename_fullpath_zip, options) as writer:
        writer.write(serialized_example)

# 读取tfrecord的压缩文件
dataset_zip = tf.data.TFRecordDataset([filename_fullpath_zip],compression_type= "GZIP")
for serialized_example_tensor in dataset_zip:
    example = tf.io.parse_single_example(serialized_example_tensor,expected_features)
    books = tf.sparse.to_dense(example["favorite_books"])
    for book in books:
        print(book.numpy().decode("UTF-8"))

machine learning
cc150


### 转成tfrecord

In [0]:
# 读取train,validation,test数据的文件路径
source_dir = "./generate_csv/"
def get_filenames_by_prefix(source_dir, prefix_name):
    all_files = os.listdir(source_dir)
    results = []
    for filename in all_files:
        if filename.startswith(prefix_name):
            results.append(os.path.join(source_dir, filename))
    return results
train_filenames = get_filenames_by_prefix(source_dir, "train")
valid_filenames = get_filenames_by_prefix(source_dir, "valid")
test_filenames = get_filenames_by_prefix(source_dir, "test")
print(train_filenames,"\n",valid_filenames,"\n",test_filenames)

# 由文件路径读取数据,返回x[,8],y[,1]
def parse_csv_line(line, n_fields = 9): # n_fields需要修改
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    return x, y
def csv_reader_dataset(filenames, n_readers=5,batch_size=32, n_parse_threads=5,
                       shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers)
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_csv_line,num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset
batch_size = 32
train_set = csv_reader_dataset(train_filenames,batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames,batch_size = batch_size)
test_set = csv_reader_dataset(test_filenames,batch_size = batch_size)

['./generate_csv/train_17.csv', './generate_csv/train_14.csv', './generate_csv/train_01.csv', './generate_csv/train_16.csv', './generate_csv/train_19.csv', './generate_csv/train_09.csv', './generate_csv/train_18.csv', './generate_csv/train_05.csv', './generate_csv/train_04.csv', './generate_csv/train_07.csv', './generate_csv/train_08.csv', './generate_csv/train_10.csv', './generate_csv/train_06.csv', './generate_csv/train_13.csv', './generate_csv/train_12.csv', './generate_csv/train_02.csv', './generate_csv/train_00.csv', './generate_csv/train_03.csv', './generate_csv/train_15.csv', './generate_csv/train_11.csv'] 
 ['./generate_csv/valid_00.csv', './generate_csv/valid_02.csv', './generate_csv/valid_06.csv', './generate_csv/valid_09.csv', './generate_csv/valid_08.csv', './generate_csv/valid_04.csv', './generate_csv/valid_01.csv', './generate_csv/valid_03.csv', './generate_csv/valid_07.csv', './generate_csv/valid_05.csv'] 
 ['./generate_csv/test_06.csv', './generate_csv/test_09.csv', './

In [0]:
# FloatList->features->example->SerializeToString
def serialize_example(x, y):
    """Converts x, y to tf.train.Example and serialize"""
    input_feautres = tf.train.FloatList(value = x)
    label = tf.train.FloatList(value = y)
    features = tf.train.Features(
        feature = {
            "input_features": tf.train.Feature(
                float_list = input_feautres),
            "label": tf.train.Feature(float_list = label)
        }
    )
    example = tf.train.Example(features = features)
    return example.SerializeToString()

# n_shards=要生成几个tfrecord文件, steps_per_shard=每个n_shards要读取几个batch
def csv_dataset_to_tfrecords(base_filename, dataset,n_shards,steps_per_shard,
                             compression_type = None):
    options = tf.io.TFRecordOptions(compression_type = compression_type)
    all_filenames = []
    for shard_id in range(n_shards):
        # 生成的tfrecord的名字
        filename_fullpath = '{}_{:05d}-of-{:05d}'.format(base_filename, shard_id, n_shards)
        with tf.io.TFRecordWriter(filename_fullpath, options) as writer:
            # 每个n_shards要读取几个batch,SerializeToString写入
            for x_batch, y_batch in dataset.take(steps_per_shard):
                for x_example, y_example in zip(x_batch, y_batch):
                    writer.write(serialize_example(x_example, y_example))
        all_filenames.append(filename_fullpath)
    return all_filenames

In [0]:
n_shards = 10
train_steps_per_shard = int(np.ceil(np.ceil(11610/batch_size)/n_shards))
valid_steps_per_shard = int(np.ceil(np.ceil(3880/batch_size)/n_shards))
test_steps_per_shard = int(np.ceil(np.ceil(5170/batch_size)/n_shards))
# 生成tfrecord文件
output_dir = "generate_tfrecords"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
train_basename = os.path.join(output_dir, "train")
valid_basename = os.path.join(output_dir, "valid")
test_basename = os.path.join(output_dir, "test")

train_tfrecord_filenames = csv_dataset_to_tfrecords(
    train_basename, train_set, n_shards, train_steps_per_shard, None)
valid_tfrecord_filenames = csv_dataset_to_tfrecords(
    valid_basename, valid_set, n_shards, valid_steps_per_shard, None)
test_tfrecord_fielnames = csv_dataset_to_tfrecords(
    test_basename, test_set, n_shards, test_steps_per_shard, None)

In [0]:
# 生成tfrecord压缩文件
output_dir = "generate_tfrecords_zip"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
train_basename = os.path.join(output_dir, "train")
valid_basename = os.path.join(output_dir, "valid")
test_basename = os.path.join(output_dir, "test")

train_tfrecord_filenames = csv_dataset_to_tfrecords(
    train_basename, train_set, n_shards, train_steps_per_shard,
    compression_type = "GZIP")
valid_tfrecord_filenames = csv_dataset_to_tfrecords(
    valid_basename, valid_set, n_shards, valid_steps_per_shard,
    compression_type = "GZIP")
test_tfrecord_fielnames = csv_dataset_to_tfrecords(
    test_basename, test_set, n_shards, test_steps_per_shard,
    compression_type = "GZIP")

### 读取rfrecord

In [0]:
expected_features = {
    "input_features": tf.io.FixedLenFeature([8], dtype=tf.float32),
    "label": tf.io.FixedLenFeature([1], dtype=tf.float32)
}
# tf.io.parse_single_example解析16进制的tfrecord数据
def parse_example(serialized_example):
    example = tf.io.parse_single_example(serialized_example,expected_features)
    return example["input_features"], example["label"]

def tfrecords_reader_dataset(filenames, n_readers=5,batch_size=32,
                             n_parse_threads=5, shuffle_buffer_size=10000,
                             compression_type = None):
    # 1.路径文件名写入dataset
    dataset = tf.data.Dataset.list_files(filenames)
    # 2.dataset.interleave()读取所有的数据
    dataset = dataset.interleave(
        lambda filename: tf.data.TFRecordDataset(filename, compression_type = compression_type),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    # 用设置好的字典,解析16进制的tfrecord数据
    dataset = dataset.map(parse_example, num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

tfrecords_train = tfrecords_reader_dataset(train_tfrecord_filenames,batch_size = 4,
                                           compression_type = "GZIP")
for x_batch, y_batch in tfrecords_train.take(1):
    print(x_batch)
    print(y_batch)

tf.Tensor(
[[ 0.15782312  0.4323619   0.3379948  -0.01588031 -0.37338907 -0.05305246
   0.80061346 -1.2359096 ]
 [ 0.63034356  1.8741661  -0.06713215 -0.12543367 -0.19737554 -0.02272263
  -0.69240725  0.72652334]
 [-0.66722274 -0.04823952  0.34529406  0.53826684  1.8521839  -0.06112538
  -0.8417093   1.5204847 ]
 [ 0.15782312  0.4323619   0.3379948  -0.01588031 -0.37338907 -0.05305246
   0.80061346 -1.2359096 ]], shape=(4, 8), dtype=float32)
tf.Tensor(
[[3.169]
 [2.419]
 [1.59 ]
 [3.169]], shape=(4, 1), dtype=float32)


In [0]:
batch_size = 32
tfrecords_train_set = tfrecords_reader_dataset(train_tfrecord_filenames, batch_size = batch_size,compression_type = "GZIP" )
tfrecords_valid_set = tfrecords_reader_dataset(valid_tfrecord_filenames, batch_size = batch_size,compression_type = "GZIP")
tfrecords_test_set = tfrecords_reader_dataset(test_tfrecord_fielnames, batch_size = batch_size,compression_type = "GZIP")
tf.keras.backend.clear_session()
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu',
                       input_shape=[8]),
    keras.layers.Dense(1),
])
model.compile(loss="mean_squared_error", optimizer="sgd")
callbacks = [keras.callbacks.EarlyStopping(
    patience=5, min_delta=1e-2)]

history = model.fit(tfrecords_train_set,
                    validation_data = tfrecords_valid_set,
                    epochs = 100,
                    callbacks = callbacks)
model.evaluate(tfrecords_test_set)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100


0.36626240611076355