In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf

from tensorflow import keras

print(tf.__version__)
print(sys.version_info)
for module in mpl,np,pd,sklearn,tf,keras:
    print(module.__name__,module.__version__)

tf.test.is_gpu_available()

2.0.0-dev20191002
sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)
matplotlib 3.1.1
numpy 1.17.2
pandas 0.25.1
sklearn 0.21.3
tensorflow 2.0.0-dev20191002
tensorflow_core.keras 2.2.4-tf
Instructions for updating:
Use `tf.config.experimental.list_physical_devices('GPU')` instead.


False

# 我们读取之前存储好的房价预测的csv文件，然后将其转换成tfrecord文件

In [2]:
source_dir = './generate_csv'
print(os.listdir(source_dir)) # 这是40个csv文件

def get_filenames_by_prefix(source_dir,prefix_name):
    all_files = os.listdir(source_dir)
    result = []
    for file_name in all_files:
        if file_name.startswith(prefix_name):
            result.append(os.path.join(source_dir,file_name))
    return sorted(result)

import pprint
train_filenames = get_filenames_by_prefix(source_dir,'train')
pprint.pprint(train_filenames)

valid_filenames = get_filenames_by_prefix(source_dir,'valid')
pprint.pprint(valid_filenames)

test_filenames = get_filenames_by_prefix(source_dir,'test')
pprint.pprint(test_filenames)

['train_15.csv', 'train_01.csv', 'train_00.csv', 'train_14.csv', 'test_08.csv', 'train_02.csv', 'train_16.csv', 'train_17.csv', 'train_03.csv', 'test_09.csv', 'train_07.csv', 'train_13.csv', 'train_12.csv', 'train_06.csv', 'train_10.csv', 'train_04.csv', 'train_05.csv', 'train_11.csv', 'valid_01.csv', 'valid_00.csv', 'valid_02.csv', 'valid_03.csv', 'valid_07.csv', 'valid_06.csv', 'valid_04.csv', 'valid_05.csv', 'valid_08.csv', 'valid_09.csv', 'test_02.csv', 'train_08.csv', 'train_09.csv', 'test_03.csv', 'test_01.csv', 'test_00.csv', 'test_04.csv', 'test_05.csv', 'test_07.csv', 'train_19.csv', 'train_18.csv', 'test_06.csv']
['./generate_csv/train_00.csv',
 './generate_csv/train_01.csv',
 './generate_csv/train_02.csv',
 './generate_csv/train_03.csv',
 './generate_csv/train_04.csv',
 './generate_csv/train_05.csv',
 './generate_csv/train_06.csv',
 './generate_csv/train_07.csv',
 './generate_csv/train_08.csv',
 './generate_csv/train_09.csv',
 './generate_csv/train_10.csv',
 './generate_csv/

In [3]:
# tf.stack 可以吧一个数组,np_array,tf_tensor都变成一个tf_tensor
test = tf.constant([
                 [1.,2.,3.],
                 [4.,5.,6.]
                ])
print(test)
print(tf.stack(test))
print(tf.stack([
                 [1.,2.,3.],
                 [4.,5.,6.]
                ]))
print(tf.stack(np.array([
                 [1.,2.,3.],
                 [4.,5.,6.]
                ])))

tf.Tensor(
[[1. 2. 3.]
 [4. 5. 6.]], shape=(2, 3), dtype=float32)
tf.Tensor(
[[1. 2. 3.]
 [4. 5. 6.]], shape=(2, 3), dtype=float32)
tf.Tensor(
[[1. 2. 3.]
 [4. 5. 6.]], shape=(2, 3), dtype=float32)
tf.Tensor(
[[1. 2. 3.]
 [4. 5. 6.]], shape=(2, 3), dtype=float64)


In [4]:
def parse_csv_line(line_str, n_fields = 9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line_str,record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])# 最后一个值时y, 注意这里一定要写成[-1:],不然返回的tensor是一个标量
    print(y)
    return x,y

def csv_reader_dataset(filenames,n_readers=5,
                       batch_size = 32,n_parse_threads=5,
                       shuffle_buffer_size = 10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat() # 不传参数就是一个无限循环的generate
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_csv_line,
                          num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

batch_size = 32
train_set = csv_reader_dataset(train_filenames,
                               batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames,
                               batch_size = batch_size)
test_set = csv_reader_dataset(test_filenames,
                              batch_size = batch_size)

Tensor("stack_1:0", shape=(1,), dtype=float32)
Tensor("stack_1:0", shape=(1,), dtype=float32)
Tensor("stack_1:0", shape=(1,), dtype=float32)


In [5]:
# 将读取出来的csv文件数据存储到tfrecord文件中去

# 定义一个序列化方法
def serialize_example(x,y):
    """
    Converts x,y to tf.train.Example and serialize
    """
#     print(x)
    input_features = tf.train.FloatList(value = x)
#     print(y)
    label = tf.train.FloatList(value = y)
    features = tf.train.Features(
        feature = {
            "input_features": tf.train.Feature(
                float_list = input_features),
            "label": tf.train.Feature(
                float_list = label)
        }
    )
    example = tf.train.Example(features = features)
    return example.SerializeToString()

# 将csv文件转换为tfrecords文件
def csv_dataset_to_tfrecords(base_filename, # 存放tfrecord文件的文件夹路径
                             dataset, # csv文件读取出来的数据生成的dataset
                             n_shards, # “碎片” 要把生成的tfrecord文件分成多少个
                             step_per_shard, # 要去多少个batch才能取到一个完整的epoch数据集
                             compression_type = None # 压缩类型
                            ):
    options = tf.io.TFRecordOptions(
        compression_type = compression_type )
    all_filenames = []
    for shard_id in range(n_shards):
        filename_fullpath = '{}_{:05d}-of-{:05d}'.format(
            base_filename, shard_id, n_shards)
        with tf.io.TFRecordWriter(filename_fullpath,options) as writer:
            for x_batch,y_batch in dataset.take(step_per_shard):
                for x_example , y_example in zip(x_batch,y_batch):
                    writer.write(
                        serialize_example(x_example,y_example))
        all_filenames.append(filename_fullpath)
    return all_filenames

### 生成不压缩的tfrecord文件

In [6]:
n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards

output_dir = "generate_tfrecords"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

train_basename = os.path.join(output_dir,"train")
valid_basename = os.path.join(output_dir,"valid")
test_basename = os.path.join(output_dir,"test")

train_tfrecord_filenames = csv_dataset_to_tfrecords(train_basename,
                                                    train_set,
                                                    n_shards,
                                                    train_steps_per_shard,
                                                    None)
valid_tfrecord_filenames = csv_dataset_to_tfrecords(valid_basename,
                                                    valid_set,
                                                    n_shards,
                                                    valid_steps_per_shard,
                                                    None)
test_tfrecord_filenames = csv_dataset_to_tfrecords(test_basename,
                                                   test_set,
                                                   n_shards,
                                                   test_steps_per_shard,
                                                   None)

### 生成压缩后的tfrecord文件

In [7]:
n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards

output_dir = "generate_tfrecords_zip" # 文件名称改一下用来区分
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

train_basename = os.path.join(output_dir,"train")
valid_basename = os.path.join(output_dir,"valid")
test_basename = os.path.join(output_dir,"test")

train_tfrecord_filenames = csv_dataset_to_tfrecords(train_basename,
                                                    train_set,
                                                    n_shards,
                                                    train_steps_per_shard,
                                                    compression_type = "GZIP") # 只用修改这一个地方
valid_tfrecord_filenames = csv_dataset_to_tfrecords(valid_basename,
                                                    valid_set,
                                                    n_shards,
                                                    valid_steps_per_shard,
                                                    compression_type = "GZIP")
test_tfrecord_filenames = csv_dataset_to_tfrecords(test_basename,
                                                   test_set,
                                                   n_shards,
                                                   test_steps_per_shard,
                                                   compression_type = "GZIP")

# 读取tfrecord文件

In [8]:
pprint.pprint(train_tfrecord_filenames)
pprint.pprint(valid_tfrecord_filenames)
pprint.pprint(test_tfrecord_filenames)

['generate_tfrecords_zip/train_00000-of-00020',
 'generate_tfrecords_zip/train_00001-of-00020',
 'generate_tfrecords_zip/train_00002-of-00020',
 'generate_tfrecords_zip/train_00003-of-00020',
 'generate_tfrecords_zip/train_00004-of-00020',
 'generate_tfrecords_zip/train_00005-of-00020',
 'generate_tfrecords_zip/train_00006-of-00020',
 'generate_tfrecords_zip/train_00007-of-00020',
 'generate_tfrecords_zip/train_00008-of-00020',
 'generate_tfrecords_zip/train_00009-of-00020',
 'generate_tfrecords_zip/train_00010-of-00020',
 'generate_tfrecords_zip/train_00011-of-00020',
 'generate_tfrecords_zip/train_00012-of-00020',
 'generate_tfrecords_zip/train_00013-of-00020',
 'generate_tfrecords_zip/train_00014-of-00020',
 'generate_tfrecords_zip/train_00015-of-00020',
 'generate_tfrecords_zip/train_00016-of-00020',
 'generate_tfrecords_zip/train_00017-of-00020',
 'generate_tfrecords_zip/train_00018-of-00020',
 'generate_tfrecords_zip/train_00019-of-00020']
['generate_tfrecords_zip/valid_00000-of-

In [14]:
# 定义要读取的一条训练数据中各个数据的类型与长度，
expected_features = {
    "input_features": tf.io.FixedLenFeature([8],dtype=tf.float32),
    "label": tf.io.FixedLenFeature([1],dtype=tf.float32)
}

def parse_example(serialized_example):
    example = tf.io.parse_single_example(serialized_example,
                                         expected_features)
    return example["input_features"],example["label"]

def tfrecord_reader_dataset(filenames,n_readers=5,
                            batch_size=32,n_parse_threads=5,
                            shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat() # 无限循环
    dataset = dataset.interleave(
        lambda filename:tf.data.TFRecordDataset(filename,compression_type="GZIP"),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_example,
                          num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

# 我们尝试去取数据
tfrecords_train = tfrecord_reader_dataset(train_tfrecord_filenames,batch_size=3)

print(tfrecords_train)
for x_batch,y_batch in tfrecords_train.take(2):
    print(x_batch)
    print(y_batch)

<BatchDataset shapes: ((None, 8), (None, 1)), types: (tf.float32, tf.float32)>
tf.Tensor(
[[ 0.09734604  0.75276285 -0.20218964 -0.19547    -0.40605137  0.00678553
  -0.81371516  0.6566148 ]
 [ 0.63034356  1.8741661  -0.06713215 -0.12543367 -0.19737554 -0.02272263
  -0.69240725  0.72652334]
 [ 2.5150437   1.0731637   0.5574401  -0.17273512 -0.6129126  -0.01909157
  -0.5710993  -0.02749031]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[1.119  ]
 [2.419  ]
 [5.00001]], shape=(3, 1), dtype=float32)
tf.Tensor(
[[-1.0591781   1.3935647  -0.02633197 -0.1100676  -0.6138199  -0.09695935
   0.3247131  -0.03747724]
 [-1.119975   -1.3298433   0.14190045  0.4658137  -0.10301778 -0.10744184
  -0.7950524   1.5304717 ]
 [ 0.8015443   0.27216142 -0.11624393 -0.20231152 -0.5430516  -0.02103962
  -0.5897621  -0.08241846]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[0.672]
 [0.66 ]
 [3.226]], shape=(3, 1), dtype=float32)


In [15]:
# 我们取出正式训练的数据集
batch_size = 32
tfrecord_train_set = tfrecord_reader_dataset(train_tfrecord_filenames,
                                             batch_size=batch_size)
tfrecord_valid_set = tfrecord_reader_dataset(valid_tfrecord_filenames,
                                             batch_size=batch_size)
tfrecord_test_set = tfrecord_reader_dataset(test_tfrecord_filenames,
                                            batch_size=batch_size)

# 我们在模型中使用读取出来的tfrecord数据

In [17]:
model = keras.models.Sequential([
    keras.layers.Dense(30,activation='relu',
                       input_shape=[8]),
    keras.layers.Dense(1)
])
model.compile(loss="mean_squared_error",optimiezer='sgd')
callbacks = [
    keras.callbacks.EarlyStopping(patience=5,min_delta=1e-2)
]

history = model.fit(tfrecord_train_set,
                    validation_data = tfrecord_valid_set,
                    steps_per_epoch = 11160//batch_size,
                    validation_steps = 3870//batch_size,
                    epochs = 100,
                    callbacks = callbacks)

Train for 348 steps, validate for 120 steps
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100


In [19]:
model.evaluate(tfrecord_test_set,steps=5160//batch_size)



0.40388731487234186