In [1]:
# 使用tf.keras搭建回归模型，数据集使用加利福尼亚的房价预测
#导入必要的库即版本

import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf

from tensorflow import keras
#import keras

print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd,sklearn,tf,keras:
    print(module.__name__, module.__version__)

2.0.0-beta1
sys.version_info(major=3, minor=7, micro=0, releaselevel='final', serial=0)
matplotlib 2.2.3
numpy 1.16.4
pandas 0.23.4
sklearn 0.19.2
tensorflow 2.0.0-beta1
tensorflow.python.keras.api._v2.keras 2.2.4-tf


In [14]:
# 以csv为数据源
# 数据文件夹
source_dir = './generate_csv/'
print(os.listdir(source_dir))

# 区分文件
def get_filename_by_prefix(source_dir, prefix_name):
    all_files = os.listdir(source_dir)
    results = []
    for filename in all_files:
        if filename.startswith(prefix_name):
            results.append(os.path.join(source_dir,filename))
    return results

train_filenames = get_filename_by_prefix(source_dir, 'train')
valid_filenames = get_filename_by_prefix(source_dir, 'valid')
test_filenames = get_filename_by_prefix(source_dir, 'test')

import pprint
pprint.pprint(train_filenames)
pprint.pprint(valid_filenames)
pprint.pprint(test_filenames)

['test_00.csv', 'test_01.csv', 'test_02.csv', 'test_03.csv', 'test_04.csv', 'test_05.csv', 'test_06.csv', 'test_07.csv', 'test_08.csv', 'test_09.csv', 'train_00.csv', 'train_01.csv', 'train_02.csv', 'train_03.csv', 'train_04.csv', 'train_05.csv', 'train_06.csv', 'train_07.csv', 'train_08.csv', 'train_09.csv', 'train_10.csv', 'train_11.csv', 'train_12.csv', 'train_13.csv', 'train_14.csv', 'train_15.csv', 'train_16.csv', 'train_17.csv', 'train_18.csv', 'train_19.csv', 'valid_00.csv', 'valid_01.csv', 'valid_02.csv', 'valid_03.csv', 'valid_04.csv', 'valid_05.csv', 'valid_06.csv', 'valid_07.csv', 'valid_08.csv', 'valid_09.csv']
['./generate_csv/train_00.csv',
 './generate_csv/train_01.csv',
 './generate_csv/train_02.csv',
 './generate_csv/train_03.csv',
 './generate_csv/train_04.csv',
 './generate_csv/train_05.csv',
 './generate_csv/train_06.csv',
 './generate_csv/train_07.csv',
 './generate_csv/train_08.csv',
 './generate_csv/train_09.csv',
 './generate_csv/train_10.csv',
 './generate_csv/

In [15]:
# 解析dataset中的一行，line参数为具体的字符串，n_fields需要解析为多少个field
def parse_csv_line(line, n_fields=9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    # 每一行中有9个元素，前8个为x，最后一个位y
    # 前八个变为一个向量
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    return x,y

In [16]:
# 完整读取csv文件流程
def csv_reader_dataset(filenames, n_readers=5,
                      batch_size=32, n_parse_threads=5,
                      shuffle_buffer_size=10000):
    # n_parse_threads：解析并行度
    # shuffle_buffer_size：
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers)
    # shuffle对数据进行混排,shuffle_buffer_size所需内存
    dataset.shuffle(shuffle_buffer_size)
    # 解析,map与interleave很类似
    dataset = dataset.map(parse_csv_line,
                         num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset



In [17]:
batch_size = 32
train_set = csv_reader_dataset(train_filenames, batch_size=batch_size)
valid_set = csv_reader_dataset(valid_filenames, batch_size=batch_size)
test_set = csv_reader_dataset(test_filenames, batch_size=batch_size)

In [26]:
# 遍历数据，并将数据写进tfrecord
def serialize_example(x, y):
    '''将x,y转为tf.train.Example,并序列化'''
    input_features = tf.train.FloatList(value = x)
    label = tf.train.FloatList(value = y)
    features = tf.train.Features(
        feature = {
            'input_features':tf.train.Feature(
            float_list = input_features),
            'label': tf.train.Feature(float_list=label)
        }
    )
    example = tf.train.Example(features = features)
    return example.SerializeToString()


In [27]:

def csv_dataset_to_tfrecords(base_filename,dataset,
                            n_shards,steps_per_shard,
                            compression_type=None):
    options = tf.io.TFRecordOptions(
        compression_type=compression_type)
    all_filenames = []
    for shard_id in range(n_shards):
        filename_fullpath = '{}_{:05d}-of-{:05d}'.format(
            base_filename,shard_id,n_shards)
        with tf.io.TFRecordWriter(filename_fullpath,options) as writer:
            for x_batch, y_batch in dataset.take(steps_per_shard):
                for x_example, y_example in zip(x_batch, y_batch):
                    writer.write(
                    serialize_example(x_example,y_example))
        all_filenames.append(filename_fullpath)
        
    return all_filenames

In [28]:
n_shards = 20
train_steps_per_shard = 11160 // batch_size // n_shards
valid_steps_per_shard = 3380 //batch_size // n_shards
test_steps_per_shard = 5170 //batch_size // n_shards

output_dir = 'generate_tfrecords'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
    
train_basename = os.path.join(output_dir,'train')
valid_basename = os.path.join(output_dir,'valid')
test_basename = os.path.join(output_dir,'test')

train_tfrecord_filenames = csv_dataset_to_tfrecords(
    train_basename,train_set,n_shards,train_steps_per_shard,None)

valid_tfrecord_filenames = csv_dataset_to_tfrecords(
    valid_basename,valid_set,n_shards,valid_steps_per_shard,None)

test_tfrecord_filenames = csv_dataset_to_tfrecords(
    test_basename,test_set,n_shards,test_steps_per_shard,None)


In [29]:
# 压缩后的tfrecoord文件
n_shards = 20
train_steps_per_shard = 11160 // batch_size // n_shards
valid_steps_per_shard = 3380 //batch_size // n_shards
test_steps_per_shard = 5170 //batch_size // n_shards

output_dir = 'generate_tfrecords_zip'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
    
train_basename = os.path.join(output_dir,'train')
valid_basename = os.path.join(output_dir,'valid')
test_basename = os.path.join(output_dir,'test')

train_tfrecord_filenames = csv_dataset_to_tfrecords(
    train_basename,train_set,n_shards,train_steps_per_shard,
    compression_type='GZIP')

valid_tfrecord_filenames = csv_dataset_to_tfrecords(
    valid_basename,valid_set,n_shards,valid_steps_per_shard,
    compression_type='GZIP')

test_tfrecord_filenames = csv_dataset_to_tfrecords(
    test_basename,test_set,n_shards,test_steps_per_shard,
    compression_type='GZIP')


In [30]:

pprint.pprint(train_tfrecord_filenames)

['generate_tfrecords_zip\\train_00000-of-00020',
 'generate_tfrecords_zip\\train_00001-of-00020',
 'generate_tfrecords_zip\\train_00002-of-00020',
 'generate_tfrecords_zip\\train_00003-of-00020',
 'generate_tfrecords_zip\\train_00004-of-00020',
 'generate_tfrecords_zip\\train_00005-of-00020',
 'generate_tfrecords_zip\\train_00006-of-00020',
 'generate_tfrecords_zip\\train_00007-of-00020',
 'generate_tfrecords_zip\\train_00008-of-00020',
 'generate_tfrecords_zip\\train_00009-of-00020',
 'generate_tfrecords_zip\\train_00010-of-00020',
 'generate_tfrecords_zip\\train_00011-of-00020',
 'generate_tfrecords_zip\\train_00012-of-00020',
 'generate_tfrecords_zip\\train_00013-of-00020',
 'generate_tfrecords_zip\\train_00014-of-00020',
 'generate_tfrecords_zip\\train_00015-of-00020',
 'generate_tfrecords_zip\\train_00016-of-00020',
 'generate_tfrecords_zip\\train_00017-of-00020',
 'generate_tfrecords_zip\\train_00018-of-00020',
 'generate_tfrecords_zip\\train_00019-of-00020']


In [31]:
# 读取tfrecord文件
expect_features = {
    'input_features':tf.io.FixedLenFeature([8],dtype=tf.float32),
    'label':tf.io.FixedLenFeature([1],dtype=tf.float32)
}

def parse_example(serialize_example):
    example = tf.io.parse_single_example(serialize_example,
                                        expect_features)
    return example['input_features'], example['label']


# 完整读取csv文件流程
def tfrecord_reader_dataset(filenames, n_readers=5,
                      batch_size=32, n_parse_threads=5,
                      shuffle_buffer_size=10000):
    # n_parse_threads：解析并行度
    # shuffle_buffer_size：
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename: tf.data.TFRecordDataset(filename,compression_type='GZIP'),
        cycle_length = n_readers)
    # shuffle对数据进行混排,shuffle_buffer_size所需内存
    dataset.shuffle(shuffle_buffer_size)
    # 解析,map与interleave很类似
    dataset = dataset.map(parse_example,
                         num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

tfrecords_train = tfrecord_reader_dataset(train_tfrecord_filenames,
                                         batch_size=3)
for x_batch,y_batch in tfrecords_train.take(2):
    print(x_batch)
    print(y_batch)
    

tf.Tensor(
[[-0.66722274 -0.04823952  0.34529406  0.53826684  1.8521839  -0.06112538
  -0.8417093   1.5204847 ]
 [-0.66722274 -0.04823952  0.34529406  0.53826684  1.8521839  -0.06112538
  -0.8417093   1.5204847 ]
 [-0.66722274 -0.04823952  0.34529406  0.53826684  1.8521839  -0.06112538
  -0.8417093   1.5204847 ]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[1.59]
 [1.59]
 [1.59]], shape=(3, 1), dtype=float32)
tf.Tensor(
[[-0.66722274 -0.04823952  0.34529406  0.53826684  1.8521839  -0.06112538
  -0.8417093   1.5204847 ]
 [-0.66722274 -0.04823952  0.34529406  0.53826684  1.8521839  -0.06112538
  -0.8417093   1.5204847 ]
 [-0.32652634  0.4323619  -0.09345459 -0.08402992  0.8460036  -0.02663165
  -0.56176794  0.1422876 ]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[1.59 ]
 [1.59 ]
 [2.431]], shape=(3, 1), dtype=float32)


In [32]:
batch_size = 32
tfrecords_train_set = tfrecord_reader_dataset(train_tfrecord_filenames,
                                         batch_size=batch_size)

tfrecords_valid_set = tfrecord_reader_dataset(valid_tfrecord_filenames,
                                         batch_size=batch_size)

tfrecords_test_set = tfrecord_reader_dataset(test_tfrecord_filenames,
                                         batch_size=batch_size)



model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu', input_shape=[8]),
    #最后一层一个神经元
    keras.layers.Dense(1),
])
# 编译，目标函数使用均方差，优化方法使用随机梯度下降，也可以自己定义
model.compile(loss='mean_squared_error', optimizer='sgd')
# 使用earlystopping
callbacks = [keras.callbacks.EarlyStopping(patience=5, min_delta=1e-3)]


history = model.fit(tfrecords_train_set,
                   validation_data = tfrecords_valid_set,
                    # dataset不停的产生数据，fit函数需要知道跑多少次才是一个周期（epoch）
                    # 即遍历一次需要多少次迭代
                    steps_per_epoch = 11160 // batch_size,
                    validation_steps = 3870 // batch_size,
                   epochs=100, callbacks=callbacks)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100


In [34]:
model.evaluate(tfrecords_test_set,steps= 5160 // batch_size)



0.4673103103356332