In [None]:
import matplotlib as mpl #画图用的库
import matplotlib.pyplot as plt
#下面这一句是为了可以在notebook中画图
%matplotlib inline
import numpy as np
import sklearn   #机器学习算法库
import pandas as pd #处理数据的库   
import os
import sys
import time
import tensorflow as tf
 
from tensorflow import keras   #使用tensorflow中的keras
#import keras #单纯的使用keras
 
print(tf.__version__)
print(sys.version_info)
for module in mpl, np, sklearn, pd, tf, keras:
    print(module.__name__, module.__version__)

In [None]:
#tfrecord 为tensorflow自己创建的一种文件格式
#TFRecord内部使用了“Protocol Buffer”二进制数据编码方案，它只占用一个内存块，只需要一次性加载一个二进制文件的方式即可，简单，快速，
#尤其对大型训练数据很友好。而且当我们的训练数据量比较大的时候，可以将数据分成多个TFRecord文件，来提高处理效率

#tfrecord里面存储的都是 tf.train.Example, Example可以是一个样本也可以是一组样本
#每个Example里面都是一个个的feature(tf.train.Features),Features里面可以看做是dicts {”key":tf.train.Feature}
#对于每一个不同的Feature都有不同的格式，包括tf.train.ByteList/FloatList/Int64List

#得到一个UTF-8的字符串列表
favorite_books = [name.encode("UTF-8") for name in ["machine learning", "cc150"]]

favorite_books_bytelist = tf.train.BytesList(value = favorite_books)
print(favorite_books_bytelist)

hours_floatList = tf.train.FloatList(value = [15.5, 9.5, 7.0, 8.0])
print(hours_floatList)

age_int64list = tf.train.Int64List(value=[27])
print(age_int64list)

#定义的features有三个特征，分别为 "favorite_books" "hours" "age"
features = tf.train.Features(
    feature = {
        "favorite_books" : tf.train.Feature(bytes_list = favorite_books_bytelist),
        "hours"          : tf.train.Feature(float_list = hours_floatList),
        "age"            : tf.train.Feature(int64_list = age_int64list)
    }
)

print(features)

In [None]:
example = tf.train.Example(features=features)
print(example)

#把example进行序列化压缩，以减小tfrecord文件的大小
serialized_example = example.SerializeToString()
print(serialized_example)

In [None]:
#把example存到tfrecord文件中，生成一个具体的tfrecord文件

import shutil
output_dir = "tfrecord_basic"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
os.mkdir(output_dir)

filename = "test.tfrecords"
filename_fullpath = os.path.join(output_dir, filename)
with tf.io.TFRecordWriter(filename_fullpath) as writer:
    for i in range(3):#往这个tfrecord文件中写三遍上面的序列化字符串
        writer.write(serialized_example)
#然后我们进入tfrecord_basic文件夹下可以看到 test.tfrecords 文件

#tf.data读取tfrecord文件
dataset = tf.data.TFRecordDataset([filename_fullpath])
for serialized_example_tensor in dataset:
    print(serialized_example_tensor)

In [None]:
#将序列化的字符串解析还原成example

#定义三个特征的具体类型
expected_features = {
    "favorite_books": tf.io.VarLenFeature(dtype=tf.string),
    "hours": tf.io.VarLenFeature(dtype=tf.float32),
    "age": tf.io.FixedLenFeature([], dtype=tf.int64)
}

#tf.data读取tfrecord文件
dataset = tf.data.TFRecordDataset([filename_fullpath])
for serialized_example_tensor in dataset:
    example = tf.io.parse_single_example(serialized_example_tensor, expected_features)
    print(example) #"favorite_books" 和 "hours" 都是 sparse_tensor

    books = tf.sparse.to_dense(example["favorite_books"], default_value=b"")
    for book in books:
        print(book.numpy().decode("UTF-8"))

#     hours = tf.sparse.to_dense(example["hours"], default_value=tf.float32)
#     for hour in hours:
#         print(hour.numpy())

    print(example["age"].numpy())
        
#{'favorite_books': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f56d0ffba90>, 'hours': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f57512479b0>, 'age': <tf.Tensor: id=46, shape=(), dtype=int64, numpy=27>}
# ”favorite_books“ 和 ”hours“ 都是 sparse tensor

In [None]:
#将tfrecord存储为压缩格式的文件

filename_fullpath_zip = filename_fullpath + ".zip"
options = tf.io.TFRecordOptions(compression_type="GZIP")#设置tfrecord文件的压缩格式

with tf.io.TFRecordWriter(filename_fullpath_zip, options) as writer:
    for i in range(3):#往这个tfrecord文件中写三遍上面的序列化字符串
        writer.write(serialized_example)
#然后我们进入tfrecord_basic文件夹下可以看到 test.tfrecords.zip 文件，该文件大小仅压缩前文件大小的一半


#读取压缩的tfrecord文件
dataset_zip = tf.data.TFRecordDataset([filename_fullpath_zip],compression_type="GZIP")
for serialized_example_tensor in dataset_zip:
    example = tf.io.parse_single_example(serialized_example_tensor, expected_features)
    print(example) #"favorite_books" 和 "hours" 都是 sparse_tensor

    books = tf.sparse.to_dense(example["favorite_books"], default_value=b"")
    for book in books:
        print(book.numpy().decode("UTF-8"))

#     hours = tf.sparse.to_dense(example["hours"], default_value=tf.float32)
#     for hour in hours:
#         print(hour.numpy())

    print(example["age"].numpy())



In [None]:
#
######## 将前面的csv文件读取出来存入到tfrecords文件中
#
source_dir = "./generate_csv"
# print(os.listdir(source_dir))

def get_filename_by_prefix(source_dir, prefix_name):
    all_files = os.listdir(source_dir)#获取所有文件的名称
    results = []
    for filename in all_files:
        if filename.startswith(prefix_name):#检测字符串是否以指定的前缀开始
            results.append(os.path.join(source_dir,filename))
    return results


train_filenames = get_filename_by_prefix(source_dir,"train")
valid_filenames = get_filename_by_prefix(source_dir,"valid")
test_filenames  = get_filename_by_prefix(source_dir,"test")

import pprint

pprint.pprint(train_filenames)
pprint.pprint(valid_filenames)
pprint.pprint(test_filenames)

In [None]:
def parse_csv_line(line, n_fields=9):
    defs = [tf.constant(np.nan)] * n_fields
    parse_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parse_fields[:-1])#前8个数据为x，需要区分出来
    y = tf.stack(parse_fields[-1:])#倒数第一个数据
    return x, y

###读取 csv 文件并形成一个datasets
#1.filename -> dataset
#2.read file -> dataset ->datasets ->merge
#3.parse csv
def csv_reader_dataset(filenames, n_readers=5, 
                       batch_size=32, n_parse_threads=5, 
                       shuffle_buffer_size=10000):
    
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()#repeat()表示 数据集遍历的次数，无参数表示遍历无数次
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)#将数据打乱，数值越大，混乱程度越大
    dataset = dataset.map(parse_csv_line, num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)#一次性取出 batch_size 个数据
    return dataset

batch_size = 32
train_set = csv_reader_dataset(train_filenames, batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames, batch_size = batch_size)
test_set  = csv_reader_dataset(test_filenames, batch_size = batch_size)

In [None]:
#将获取到的样本数据转换为tf.train.Example格式并序列化
def serialize_example(x, y):
    input_features = tf.train.FloatList(value = x)
    label = tf.train.FloatList(value = y)
    features = tf.train.Features(
        feature = {
            "input_features":tf.train.Feature(float_list=input_features),
            "label":tf.train.Feature(float_list=label)
        }
    
    )
    #根据构建的features构建Example
    example = tf.train.Example(features=features)
    #序列化
    return example.SerializeToString()

# 该function的作用是将 前面csv_reader_dataset中返回的数据集进行转换为tfrecords
# base_filename 表示生成的tfrecord文件基础名字
# dataset表示 csv_reader_dataset文件生成的dataset
# n_shards表示 生成的tfrecord文件数量
# steps_per_share表示 每一个csv文件在dataset上运行次数
def csv_dataset_to_tfrecords(base_filename, dataset, n_shards, steps_per_shard, compression_type=None):
    options = tf.io.TFRecordOptions(compression_type=compression_type)
    all_filenames = []
    for shard_id in range(n_shards):
        filename_fullpath='{}_{:05d}-of-{:05d}'.format(base_filename, shard_id, n_shards)#设置文件全名称
        with tf.io.TFRecordWriter(filename_fullpath, options) as writer:
            for x_batch, y_batch in dataset.take(steps_per_shard):
                for x_example, y_example in zip(x_batch, y_batch):
                    writer.write(serialize_example(x_example, y_example))
        all_filenames.append(filename_fullpath)
    return all_filenames

In [None]:
#1.最终实现将数据写入到tfrecords
n_shards = 20

train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880  // batch_size // n_shards
test_steps_per_shard  = 5170  // batch_size // n_shards


import shutil
output_dir = "generate_tfrecords"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
os.mkdir(output_dir)

train_basename = os.path.join(output_dir, "train")
valid_basename = os.path.join(output_dir, "valid")
test_basename  = os.path.join(output_dir, "test")

train_tfrecord_filename = csv_dataset_to_tfrecords(train_basename, train_set, n_shards, train_steps_per_shard,None)
valid_tfrecord_filename = csv_dataset_to_tfrecords(valid_basename, valid_set, n_shards, train_steps_per_shard,None)
test_tfrecord_filename  = csv_dataset_to_tfrecords(test_basename,  test_set,  n_shards, train_steps_per_shard,None)

In [None]:
#2.最终实现将数据写入到被压缩过的tfreocrds文件中
n_shards = 20

train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880  // batch_size // n_shards
test_steps_per_shard  = 5170  // batch_size // n_shards

import shutil
output_dir = "generate_tfrecords_zip"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
os.mkdir(output_dir)

train_basename = os.path.join(output_dir, "train")
valid_basename = os.path.join(output_dir, "valid")
test_basename  = os.path.join(output_dir, "test")

train_tfrecord_filename = csv_dataset_to_tfrecords(train_basename, train_set, n_shards, train_steps_per_shard, compression_type="GZIP")
valid_tfrecord_filename = csv_dataset_to_tfrecords(valid_basename, valid_set, n_shards, train_steps_per_shard, compression_type="GZIP")
test_tfrecord_filename  = csv_dataset_to_tfrecords(test_basename,  test_set,  n_shards, train_steps_per_shard, compression_type="GZIP")

In [None]:
pprint.pprint(train_tfrecord_filename)
pprint.pprint(valid_tfrecord_filename)
pprint.pprint(test_tfrecord_filename)

In [None]:
#接下来读取前面生成的tfrecords生成数据集然后传入模型中进行训练

expected_features = {
    "input_features":tf.io.FixedLenFeature([8], dtype=tf.float32),
    "label":tf.io.FixedLenFeature([1], dtype=tf.float32)
}

def parse_example(serialize_example):
    example = tf.io.parse_single_example(serialize_example, expected_features)
    return example["input_features"], example["label"]

def tfrecords_reader_dataset(filenames, n_readers=5,batch_size=32, n_parse_threads=5, shuffle_buffer_size=10000): 
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()#repeat()表示 数据集遍历的次数，无参数表示遍历无数次
    dataset = dataset.interleave(
        lambda filename: tf.data.TFRecordDataset(filename, compression_type="GZIP"),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)#将数据打乱，数值越大，混乱程度越大
    dataset = dataset.map(parse_example, num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)#一次性取出 batch_size 个数据
    return dataset

tfrecords_train = tfrecords_reader_dataset(train_tfrecord_filename, batch_size=3)
for x_batch, y_batch in tfrecords_train.take(2):
    print(x_batch)
    print(y_batch)

In [None]:
batch_size = 32
tfrecord_train_set = tfrecords_reader_dataset(train_tfrecord_filename, batch_size=batch_size)
tfrecord_valid_set = tfrecords_reader_dataset(valid_tfrecord_filename, batch_size=batch_size)
tfrecord_test_set = tfrecords_reader_dataset(test_tfrecord_filename, batch_size=batch_size)

In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="relu",input_shape=[8]),
    keras.layers.Dense(1),
])
model.compile(loss="mean_squared_error", optimizer="adam")


callbacks = [
    keras.callbacks.EarlyStopping(patience=5,min_delta=1e-3)
]

history=model.fit(tfrecord_train_set,
                  validation_data = tfrecord_valid_set,
                  steps_per_epoch = 11160 // batch_size,
                  validation_steps = 3870 // batch_size,
                  epochs = 100,
                  callbacks=callbacks)

In [None]:
model.evaluate(tfrecord_test_set, steps= 5160 // batch_size)