<a href="https://colab.research.google.com/github/KarlYang2013/tf_study/blob/master/tf_data_generate_tfrecord.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf

from tensorflow import keras

print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd, sklearn, tf, keras:
    print(module.__name__, module.__version__)

2.0.0
sys.version_info(major=3, minor=7, micro=3, releaselevel='final', serial=0)
matplotlib 3.1.1
numpy 1.17.3
pandas 0.25.3
sklearn 0.21.3
tensorflow 2.0.0
tensorflow_core.keras 2.2.4-tf


In [None]:
source_dir = "./generate_csv/"

def get_filenames_by_prefix(source_dir, prefix_name):
    all_files = os.listdir(source_dir)
    results = []
    for filename in all_files:
        if filename.startswith(prefix_name):
            results.append(os.path.join(source_dir, filename))
    return results

train_filenames = get_filenames_by_prefix(source_dir, "train")
valid_filenames = get_filenames_by_prefix(source_dir, "valid")
test_filenames = get_filenames_by_prefix(source_dir, "test")

import pprint
pprint.pprint(train_filenames)
pprint.pprint(valid_filenames)
pprint.pprint(test_filenames)


['./generate_csv/train_13.csv',
 './generate_csv/train_03.csv',
 './generate_csv/train_09.csv',
 './generate_csv/train_19.csv',
 './generate_csv/train_15.csv',
 './generate_csv/train_11.csv',
 './generate_csv/train_08.csv',
 './generate_csv/train_02.csv',
 './generate_csv/train_17.csv',
 './generate_csv/train_01.csv',
 './generate_csv/train_07.csv',
 './generate_csv/train_12.csv',
 './generate_csv/train_16.csv',
 './generate_csv/train_14.csv',
 './generate_csv/train_00.csv',
 './generate_csv/train_10.csv',
 './generate_csv/train_05.csv',
 './generate_csv/train_06.csv',
 './generate_csv/train_04.csv',
 './generate_csv/train_18.csv']
['./generate_csv/valid_08.csv',
 './generate_csv/valid_00.csv',
 './generate_csv/valid_07.csv',
 './generate_csv/valid_09.csv',
 './generate_csv/valid_03.csv',
 './generate_csv/valid_02.csv',
 './generate_csv/valid_06.csv',
 './generate_csv/valid_01.csv',
 './generate_csv/valid_04.csv',
 './generate_csv/valid_05.csv']
['./generate_csv/test_01.csv',
 './gener

In [None]:
def parse_csv_line(line, n_fields = 9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    return x, y

def csv_reader_dataset(filenames, n_readers=5,
                       batch_size=32, n_parse_threads=5,
                       shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_csv_line,
                          num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

batch_size = 32
train_set = csv_reader_dataset(train_filenames,
                               batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames,
                               batch_size = batch_size)
test_set = csv_reader_dataset(test_filenames,
                              batch_size = batch_size)


In [None]:
def serialize_example(x, y):
    """Converts x, y to tf.train.Example and serialize"""
    input_feautres = tf.train.FloatList(value = x)
    label = tf.train.FloatList(value = y)
    features = tf.train.Features(
        feature = {
            "input_features": tf.train.Feature(
                float_list = input_feautres),
            "label": tf.train.Feature(float_list = label)
        }
    )
    example = tf.train.Example(features = features)
    return example.SerializeToString()

def csv_dataset_to_tfrecords(base_filename, dataset,
                             n_shards, steps_per_shard,
                             compression_type = None):
    options = tf.io.TFRecordOptions(
        compression_type = compression_type)
    all_filenames = []
    
    for shard_id in range(n_shards):
        filename_fullpath = '{}_{:05d}-of-{:05d}'.format(
            base_filename, shard_id, n_shards)
        with tf.io.TFRecordWriter(filename_fullpath, options) as writer:
            for x_batch, y_batch in dataset.skip(shard_id * steps_per_shard).take(steps_per_shard):
                for x_example, y_example in zip(x_batch, y_batch):
                    writer.write(
                        serialize_example(x_example, y_example))
        all_filenames.append(filename_fullpath)
    return all_filenames

In [None]:
n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards

output_dir = "generate_tfrecords"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

train_basename = os.path.join(output_dir, "train")
valid_basename = os.path.join(output_dir, "valid")
test_basename = os.path.join(output_dir, "test")

train_tfrecord_filenames = csv_dataset_to_tfrecords(
    train_basename, train_set, n_shards, train_steps_per_shard, None)
valid_tfrecord_filenames = csv_dataset_to_tfrecords(
    valid_basename, valid_set, n_shards, valid_steps_per_shard, None)
test_tfrecord_fielnames = csv_dataset_to_tfrecords(
    test_basename, test_set, n_shards, test_steps_per_shard, None)


In [None]:
n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards

output_dir = "generate_tfrecords_zip"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

train_basename = os.path.join(output_dir, "train")
valid_basename = os.path.join(output_dir, "valid")
test_basename = os.path.join(output_dir, "test")

train_tfrecord_filenames = csv_dataset_to_tfrecords(
    train_basename, train_set, n_shards, train_steps_per_shard,
    compression_type = "GZIP")
valid_tfrecord_filenames = csv_dataset_to_tfrecords(
    valid_basename, valid_set, n_shards, valid_steps_per_shard,
    compression_type = "GZIP")
test_tfrecord_fielnames = csv_dataset_to_tfrecords(
    test_basename, test_set, n_shards, test_steps_per_shard,
    compression_type = "GZIP")

In [None]:
pprint.pprint(train_tfrecord_filenames)
pprint.pprint(valid_tfrecord_filenames)
pprint.pprint(test_tfrecord_fielnames)

['generate_tfrecords_zip/train_00000-of-00020',
 'generate_tfrecords_zip/train_00001-of-00020',
 'generate_tfrecords_zip/train_00002-of-00020',
 'generate_tfrecords_zip/train_00003-of-00020',
 'generate_tfrecords_zip/train_00004-of-00020',
 'generate_tfrecords_zip/train_00005-of-00020',
 'generate_tfrecords_zip/train_00006-of-00020',
 'generate_tfrecords_zip/train_00007-of-00020',
 'generate_tfrecords_zip/train_00008-of-00020',
 'generate_tfrecords_zip/train_00009-of-00020',
 'generate_tfrecords_zip/train_00010-of-00020',
 'generate_tfrecords_zip/train_00011-of-00020',
 'generate_tfrecords_zip/train_00012-of-00020',
 'generate_tfrecords_zip/train_00013-of-00020',
 'generate_tfrecords_zip/train_00014-of-00020',
 'generate_tfrecords_zip/train_00015-of-00020',
 'generate_tfrecords_zip/train_00016-of-00020',
 'generate_tfrecords_zip/train_00017-of-00020',
 'generate_tfrecords_zip/train_00018-of-00020',
 'generate_tfrecords_zip/train_00019-of-00020']
['generate_tfrecords_zip/valid_00000-of-

In [None]:
expected_features = {
    "input_features": tf.io.FixedLenFeature([8], dtype=tf.float32),
    "label": tf.io.FixedLenFeature([1], dtype=tf.float32)
}

def parse_example(serialized_example):
    example = tf.io.parse_single_example(serialized_example,
                                         expected_features)
    return example["input_features"], example["label"]

def tfrecords_reader_dataset(filenames, n_readers=5,
                             batch_size=32, n_parse_threads=5,
                             shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename: tf.data.TFRecordDataset(
            filename, compression_type = "GZIP"),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_example,
                          num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

tfrecords_train = tfrecords_reader_dataset(train_tfrecord_filenames,
                                           batch_size = 3)
for x_batch, y_batch in tfrecords_train.take(10):
    print(x_batch)
    print(y_batch)

tf.Tensor(
[[-0.902364   -0.7691417  -0.20091914  0.10880736  0.7507385  -0.08213767
   1.3978218  -0.9013472 ]
 [-0.04001952 -1.7303445   0.3000681   0.09866919  1.183514   -0.05759396
   0.567329   -0.11237926]
 [-0.7809836   0.27216142 -0.8818754  -0.35522854  0.33610874  0.2504401
  -0.86503774  0.6616083 ]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[1.434]
 [1.45 ]
 [1.28 ]], shape=(3, 1), dtype=float32)
tf.Tensor(
[[-6.2379646e-01  3.5226166e-01  2.0165265e-01  5.5866838e-03
  -4.4687924e-01 -4.2026456e-02  2.5175872e+00 -9.3630147e-01]
 [ 5.9549600e-01  5.1246214e-01  3.1088710e-02 -2.1964614e-01
  -4.0877321e-01 -4.8546847e-03 -8.4170932e-01  6.0168666e-01]
 [ 8.4598297e-01 -1.8905450e+00  3.0823600e-01 -5.6496419e-02
   5.1982555e+00  8.4930323e-03 -5.1977670e-01  4.3690220e-01]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[0.775]
 [3.428]
 [2.363]], shape=(3, 1), dtype=float32)
tf.Tensor(
[[ 0.44454306  0.35226166  0.3828868  -0.07468192 -0.62380004 -0.02691609
   1.0945519  -1

In [None]:
batch_size = 32
tfrecords_train_set = tfrecords_reader_dataset(
    train_tfrecord_filenames, batch_size = batch_size)
tfrecords_valid_set = tfrecords_reader_dataset(
    valid_tfrecord_filenames, batch_size = batch_size)
tfrecords_test_set = tfrecords_reader_dataset(
    test_tfrecord_fielnames, batch_size = batch_size)


In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu',
                       input_shape=[8]),
    keras.layers.Dense(1),
])
model.compile(loss="mean_squared_error", optimizer="sgd")
callbacks = [keras.callbacks.EarlyStopping(
    patience=5, min_delta=1e-2)]

history = model.fit(tfrecords_train_set,
                    validation_data = tfrecords_valid_set,
                    steps_per_epoch = 11160 // batch_size,
                    validation_steps = 3870 // batch_size,
                    epochs = 100,
                    callbacks = callbacks)

Train for 348 steps, validate for 120 steps
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100


In [None]:
model.evaluate(tfrecords_test_set, steps = 5160 // batch_size)



0.38986776496127523