In [1]:
import tensorflow as tf
import numpy as np
import pandas as pd
import sklearn
import os
import sys
print(tf.__version__)
print(sys.version_info)
for module in tf, np, pd, sklearn:
    print(module.__name__, module.__version__)

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


2.0.0-beta1
sys.version_info(major=3, minor=6, micro=5, releaselevel='final', serial=0)
tensorflow 2.0.0-beta1
numpy 1.18.1
pandas 0.24.1
sklearn 0.21.2


  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [2]:
source_dir = './data/csv/'

def get_filenames_by_prefix(source_dir, prefix_name):
    all_files = os.listdir(source_dir)
    results = []
    for filename in all_files:
        if filename.startswith(prefix_name):
            results.append(os.path.join(source_dir, filename))
    return results

In [3]:
train_filenames = get_filenames_by_prefix(source_dir, 'train')
test_filenames = get_filenames_by_prefix(source_dir, 'test')
valid_filenames = get_filenames_by_prefix(source_dir, 'valid')

In [4]:
import pprint
pprint.pprint(train_filenames)

['./data/csv/train_14.csv',
 './data/csv/train_18.csv',
 './data/csv/train_ 7.csv',
 './data/csv/train_10.csv',
 './data/csv/train_ 9.csv',
 './data/csv/train_ 5.csv',
 './data/csv/train_ 8.csv',
 './data/csv/train_12.csv',
 './data/csv/train_19.csv',
 './data/csv/train_11.csv',
 './data/csv/train_ 1.csv',
 './data/csv/train_15.csv',
 './data/csv/train_ 3.csv',
 './data/csv/train_ 2.csv',
 './data/csv/train_ 0.csv',
 './data/csv/train_13.csv',
 './data/csv/train_ 4.csv',
 './data/csv/train_16.csv',
 './data/csv/train_17.csv',
 './data/csv/train_ 6.csv']


In [5]:
# 定义一个解析csv每行内容的方法
def parse_csv_line(line, n_fields=9):
    records = [tf.constant(np.nan)]*n_fields
    parsed_fields = tf.io.decode_csv(line, records)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1])
    return x, y

# 定义一个多线程读取csv文件并解析的方法
def csv_read_dataset(filenames, n_readers=5, batch_size=32, n_parse_threads=5, shuffle_buffer_size=10000):
    '''
    filenames: 文件名列表
    n_readers: 并行程度
    batch_size: 批大小
    n_parse_threads: 解析的并行程度
    shuffle_buffer_size: 
    '''
    filenames_dataset = tf.data.Dataset.list_files(filenames)
    filenames_dataset = filenames_dataset.repeat()
    dataset = filenames_dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length=n_readers)
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_csv_line, num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

In [6]:
train_set = csv_read_dataset(train_filenames, batch_size=32)
valid_set = csv_read_dataset(valid_filenames, batch_size=32)
test_set = csv_read_dataset(test_filenames, batch_size=32)

In [7]:
def serialize_example(x, y):
    '''Convert x,y to example and serialize'''
    input_features = tf.train.FloatList(value=x)
    label = tf.train.FloatList(value=[y])
    features = tf.train.Features(
        feature = {
            'input_features': tf.train.Feature(float_list=input_features),
            'label': tf.train.Feature(float_list=label)
        }
    )
    example = tf.train.Example(features=features)
    return example.SerializeToString()

def csv_dataset_to_tfrecord(filename, dataset, n_shards, steps_per_shard, compression_type=None):
    options = tf.io.TFRecordOptions(compression_type=compression_type)
    all_filenames = []
    for shard_id in range(n_shards):
        filename_fullpath = '{}_{:05d}-of-{:05d}'.format(filename, shard_id, n_shards)
        with tf.io.TFRecordWriter(filename_fullpath, options) as writer:
            for x_batch, y_batch in dataset.take(steps_per_shard):
                for x, y in zip(x_batch, y_batch):
                    writer.write(serialize_example(x, y))
        all_filenames.append(filename_fullpath)
    return all_filenames

In [8]:
n_shards = 10
batch_size = 32
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards

output_dir = './data/tfrecord'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

In [9]:
train_basename = os.path.join(output_dir, 'train')
test_basename = os.path.join(output_dir, 'test')
valid_basename = os.path.join(output_dir, 'valid')

print(train_basename)

./data/tfrecord/train


In [10]:
train_tfrecord_filenames = csv_dataset_to_tfrecord(
    train_basename, train_set, n_shards, train_steps_per_shard, None)
test_tfrecord_filenames = csv_dataset_to_tfrecord(
    test_basename, test_set, n_shards, test_steps_per_shard, None)
valid_tfrecord_filenames = csv_dataset_to_tfrecord(
    valid_basename, valid_set, n_shards, valid_steps_per_shard, None)

In [11]:
# 生成压缩文件

output_dir = './data/tfrecord_zip'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

train_basename = os.path.join(output_dir, 'train')
test_basename = os.path.join(output_dir, 'test')
valid_basename = os.path.join(output_dir, 'valid')

print(train_basename)

train_tfrecordZIP_filenames = csv_dataset_to_tfrecord(
    train_basename, train_set, n_shards, train_steps_per_shard, compression_type='GZIP')
test_tfrecordZIP_filenames = csv_dataset_to_tfrecord(
    test_basename, test_set, n_shards, test_steps_per_shard, compression_type='GZIP')
valid_tfrecordZIP_filenames = csv_dataset_to_tfrecord(
    valid_basename, valid_set, n_shards, valid_steps_per_shard, compression_type='GZIP')


./data/tfrecord_zip/train


In [12]:
pprint.pprint(train_tfrecordZIP_filenames)

['./data/tfrecord_zip/train_00000-of-00010',
 './data/tfrecord_zip/train_00001-of-00010',
 './data/tfrecord_zip/train_00002-of-00010',
 './data/tfrecord_zip/train_00003-of-00010',
 './data/tfrecord_zip/train_00004-of-00010',
 './data/tfrecord_zip/train_00005-of-00010',
 './data/tfrecord_zip/train_00006-of-00010',
 './data/tfrecord_zip/train_00007-of-00010',
 './data/tfrecord_zip/train_00008-of-00010',
 './data/tfrecord_zip/train_00009-of-00010']


In [13]:
expected_features = {
    'input_features': tf.io.FixedLenFeature(shape=[8], dtype=tf.float32),
    'label': tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
}

def parse(serialized_example):
    example = tf.io.parse_single_example(serialized_example, expected_features)
    return example['input_features'], example['label']


In [14]:
def tfrecords_reader_dataset(filenames, n_readers=5, batch_size=32, n_parse_threads=5, shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename: tf.data.TFRecordDataset(filename, compression_type='GZIP'),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse, num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

tfrecords_train_set = tfrecords_reader_dataset(train_tfrecordZIP_filenames, batch_size=32)
tfrecords_test_set = tfrecords_reader_dataset(test_tfrecordZIP_filenames, batch_size=32)
tfrecords_valid_set = tfrecords_reader_dataset(valid_tfrecordZIP_filenames, batch_size=32)

In [15]:
train_ = tfrecords_reader_dataset(train_tfrecordZIP_filenames, batch_size=3)
for x_batch, y_batch in train_.take(2):
    print(x_batch)
    print(y_batch)

tf.Tensor(
[[ 0.63034356  1.8741661  -0.06713215 -0.12543367 -0.19737554 -0.02272263
  -0.69240725  0.72652334]
 [ 0.63034356  1.8741661  -0.06713215 -0.12543367 -0.19737554 -0.02272263
  -0.69240725  0.72652334]
 [ 0.63034356  1.8741661  -0.06713215 -0.12543367 -0.19737554 -0.02272263
  -0.69240725  0.72652334]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[2.419]
 [2.419]
 [2.419]], shape=(3, 1), dtype=float32)
tf.Tensor(
[[ 0.63034356  1.8741661  -0.06713215 -0.12543367 -0.19737554 -0.02272263
  -0.69240725  0.72652334]
 [ 0.63034356  1.8741661  -0.06713215 -0.12543367 -0.19737554 -0.02272263
  -0.69240725  0.72652334]
 [ 0.04326301 -1.0895426  -0.38878718 -0.10789865 -0.68186635 -0.0723871
  -0.8883662   0.8213992 ]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[2.419]
 [2.419]
 [1.426]], shape=(3, 1), dtype=float32)


In [16]:
batch_size = 32
model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(30, activation='relu', input_shape=[8]),
    tf.keras.layers.Dense(1)
])
model.compile(loss='mean_squared_error', optimizer='sgd')
callbacks = [tf.keras.callbacks.EarlyStopping(patience=5, min_delta=1e-3)]
history = model.fit(tfrecords_train_set,
                   validation_data = tfrecords_valid_set,
                   steps_per_epoch=11160 // batch_size,
                   validation_steps=3870 // batch_size,
                   epochs=100,
                   callbacks=callbacks)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100


In [17]:
model.evaluate(tfrecords_test_set, steps=5160//batch_size)



0.3868438932836426