In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf

from tensorflow import keras

print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd, sklearn, tf, keras:
    print(module.__name__, module.__version__)

2.0.0
sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)
matplotlib 3.1.1
numpy 1.17.2
pandas 0.25.1
sklearn 0.21.3
tensorflow 2.0.0
tensorflow_core.keras 2.2.4-tf


In [2]:
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()

In [3]:
from sklearn.model_selection import train_test_split

x_train_all, x_test, y_train_all, y_test = train_test_split(
    housing.data, housing.target, random_state = 7)
x_train, x_valid, y_train, y_valid = train_test_split(
    x_train_all, y_train_all, random_state = 11)
print(x_train.shape, y_train.shape)
print(x_valid.shape, y_valid.shape)
print(x_test.shape, y_test.shape)

(11610, 8) (11610,)
(3870, 8) (3870,)
(5160, 8) (5160,)


In [4]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled = scaler.transform(x_test)

In [5]:
output_dir = "generated_csv"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

def save_to_csv(output_dir, data, name_prefix,
                header=None, n_parts=10):
    path_format = os.path.join(output_dir, "{}_{:02d}.csv")
    filenames = []
    
    for file_idx, row_indices in enumerate(
        np.array_split(np.arange(len(data)), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filenames.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header + "\n")
            for row_index in row_indices:
                f.write(",".join(
                    [repr(col) for col in data[row_index]]))
                f.write('\n')
    return filenames

train_data = np.c_[x_train_scaled, y_train]
valid_data = np.c_[x_valid_scaled, y_valid]
test_data = np.c_[x_test_scaled, y_test]
header_cols = housing.feature_names + ["MidianHouseValue"]
header_str = ",".join(header_cols)

train_filenames = save_to_csv(output_dir, train_data, "train",
                              header_str, n_parts=20)
valid_filenames = save_to_csv(output_dir, valid_data, "valid",
                              header_str, n_parts=10)
test_filenames = save_to_csv(output_dir, test_data, "test",
                             header_str, n_parts=10)

In [6]:
train_filenames

['generated_csv/train_00.csv',
 'generated_csv/train_01.csv',
 'generated_csv/train_02.csv',
 'generated_csv/train_03.csv',
 'generated_csv/train_04.csv',
 'generated_csv/train_05.csv',
 'generated_csv/train_06.csv',
 'generated_csv/train_07.csv',
 'generated_csv/train_08.csv',
 'generated_csv/train_09.csv',
 'generated_csv/train_10.csv',
 'generated_csv/train_11.csv',
 'generated_csv/train_12.csv',
 'generated_csv/train_13.csv',
 'generated_csv/train_14.csv',
 'generated_csv/train_15.csv',
 'generated_csv/train_16.csv',
 'generated_csv/train_17.csv',
 'generated_csv/train_18.csv',
 'generated_csv/train_19.csv']

In [7]:
filename_dataset = tf.data.Dataset.list_files(train_filenames)
print(filename_dataset)

<DatasetV1Adapter shapes: (), types: tf.string>


In [8]:
for file_name in filename_dataset:
    print(file_name)

tf.Tensor(b'generated_csv/train_06.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_11.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_14.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_01.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_17.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_18.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_07.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_13.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_19.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_05.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_08.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_10.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_12.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_04.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv/train_16.csv', shape=(), dtype=string)
tf.Tensor(b'generated_csv

In [9]:
n_readers = 5
dataset = filename_dataset.interleave(
    lambda filename :tf.data.TextLineDataset(filename).skip(1) # 第一行是列名
    , cycle_length = n_readers
)
for line in dataset.take(15):
    print(line.numpy())

b'2.51504373119231,1.0731637904355105,0.5574401201546321,-0.17273513019187772,-0.612912610473286,-0.01909156503651574,-0.5710993036045546,-0.027490309606616956,5.00001'
b'0.04971034572063198,-0.8492418886278699,-0.06214699417830008,0.17878747064657746,-0.8025354230744277,0.0005066066922077538,0.6466457006743215,-1.1060793768010604,2.286'
b'0.401276648075221,-0.9293421252555106,-0.05333050451405854,-0.1865945262276826,0.6545661895448709,0.026434465728210874,0.9312527706398824,-1.4406417263474771,2.512'
b'-0.6672227549433569,-0.04823952235146133,0.34529405473316743,0.5382668657200925,1.8521839533415545,-0.0611253832474835,-0.8417093045554153,1.520484740533546,1.59'
b'0.15782311132800697,0.43236189741438374,0.3379948076652917,-0.015880306122244434,-0.3733890577139493,-0.05305245634489608,0.8006134598360177,-1.2359095422966828,3.169'
b'1.8444675088321243,0.5124621340420246,0.505783649224786,-0.20645711406004988,-0.021362018052499883,-0.05811312281214649,0.8332732875369839,-1.26587034971875

In [10]:
# tf.io.decode_csv(str, record_defaults)
sample_str = '1,2,3,4,5'
record_defaults=[
    tf.constant(0, dtype=tf.int32),
    0,
    np.nan,
    "hello",
    tf.constant([])
]
parsed_fileds = tf.io.decode_csv(sample_str,record_defaults)
for it in parsed_fileds:
    print(it)

tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3.0, shape=(), dtype=float32)
tf.Tensor(b'4', shape=(), dtype=string)
tf.Tensor(5.0, shape=(), dtype=float32)


record_defaults:
```
A list of `Tensor` objects with specific types.
 Acceptable types are `float32`, `float64`, `int32`, `int64`, `string`.
 One tensor per column of the input record, with either a scalar default value for that column or an empty vector if the column is required.
```

```
具有特定类型的“张量”对象的列表。
可接受的类型有“float32”、“float64”、“int32”、“int64”、“string”。
输入记录的每列一个张量，该列可以是标量默认值，也可以是空向量(如果需要该列)。
```

In [11]:
# 
try:
    parsed_fileds = tf.io.decode_csv(',,,,' , record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Field 4 is required but missing in record 0! [Op:DecodeCSV]


In [12]:
try:
    parsed_fields = tf.io.decode_csv('1,2,3,4,5,6,7', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Expect 5 fields but have 7 in record 0 [Op:DecodeCSV]


In [13]:
['a','b']* 5

['a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b']

In [14]:
[tf.constant(np.nan)] * 5

[<tf.Tensor: id=113, shape=(), dtype=float32, numpy=nan>,
 <tf.Tensor: id=113, shape=(), dtype=float32, numpy=nan>,
 <tf.Tensor: id=113, shape=(), dtype=float32, numpy=nan>,
 <tf.Tensor: id=113, shape=(), dtype=float32, numpy=nan>,
 <tf.Tensor: id=113, shape=(), dtype=float32, numpy=nan>]

In [15]:
def parse_csv_line(line , n_fields = 9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line , record_defaults = defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fileds[-1])
    print(x)
    print(y)
    return x , y

In [16]:
parse_csv_line(
    b'-0.9868720801669367,0.832863080552588,-0.18684708416901633,-0.14888949288707784,-0.4532302419670616,-0.11504995754593579,1.6730974284189664,-0.7465496877362412,1.138'
    , n_fields=9)

tf.Tensor(
[-0.9868721   0.8328631  -0.18684709 -0.1488895  -0.45323023 -0.11504996
  1.6730974  -0.74654967], shape=(8,), dtype=float32)
tf.Tensor(5.0, shape=(), dtype=float32)


(<tf.Tensor: id=125, shape=(8,), dtype=float32, numpy=
 array([-0.9868721 ,  0.8328631 , -0.18684709, -0.1488895 , -0.45323023,
        -0.11504996,  1.6730974 , -0.74654967], dtype=float32)>,
 <tf.Tensor: id=104, shape=(), dtype=float32, numpy=5.0>)

In [17]:
# 1. filename -> dataset
# 2. read file -> dataset -> datasets -> merge
# 3. parse csv
def csv_reader_dataset(
    filenames
    , n_readers = 5
    , batch_size = 32
    , n_parse_threads = 5
    , shuffle_buffer_size = int(1e5)
):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave( # map reduce
        lambda filename : tf.data.TextLineDataset(filename).skip(1)
        , cycle_length = n_readers
    ) # map 的并行处理 读取数据
    # Randomly shuffles the elements of this dataset.
    dataset.shuffle(shuffle_buffer_size) # 
    dataset = dataset.map( # map
        parse_csv_line
        , num_parallel_calls = n_parse_threads
    )
    dataset = dataset.batch(batch_size)
    return dataset

train_set = csv_reader_dataset(train_filenames,batch_size = 3)

Tensor("stack:0", shape=(8,), dtype=float32)
Tensor("stack_1:0", shape=(), dtype=float32, device=/job:localhost/replica:0/task:0/device:CPU:0)


In [18]:
for item in train_set.take(5):
    print(item)

(<tf.Tensor: id=209, shape=(3, 8), dtype=float32, numpy=
array([[ 2.5150437 ,  1.0731637 ,  0.5574401 , -0.17273512, -0.6129126 ,
        -0.01909157, -0.5710993 , -0.02749031],
       [-1.1157656 ,  0.99306357, -0.334192  , -0.06535219, -0.32893205,
         0.04343066, -0.12785879,  0.30707204],
       [-0.82195884,  1.8741661 ,  0.1821235 , -0.03170019, -0.6011179 ,
        -0.14337493,  1.0852206 , -0.8613995 ]], dtype=float32)>, <tf.Tensor: id=210, shape=(3,), dtype=float32, numpy=array([5., 5., 5.], dtype=float32)>)
(<tf.Tensor: id=211, shape=(3, 8), dtype=float32, numpy=
array([[-1.0775077 , -0.4487407 , -0.5680568 , -0.14269263, -0.09666677,
         0.12326469, -0.31448638, -0.4818959 ],
       [ 0.15782312,  0.4323619 ,  0.3379948 , -0.01588031, -0.37338907,
        -0.05305246,  0.80061346, -1.2359096 ],
       [ 1.8444675 ,  0.51246214,  0.5057837 , -0.20645711, -0.02136202,
        -0.05811312,  0.8332733 , -1.2658703 ]], dtype=float32)>, <tf.Tensor: id=212, shape=(3,), dt

In [19]:
for x_batch,y_batch in train_set.take(5):
    print('x : ', x_batch)
    print('y : ', y_batch)

x :  tf.Tensor(
[[ 0.09734604  0.75276285 -0.20218964 -0.19547    -0.40605137  0.00678553
  -0.81371516  0.6566148 ]
 [ 0.4240821   0.91296333 -0.04437482 -0.15297213 -0.24727628 -0.10539167
   0.86126745 -1.335779  ]
 [ 0.63034356  1.8741661  -0.06713215 -0.12543367 -0.19737554 -0.02272263
  -0.69240725  0.72652334]], shape=(3, 8), dtype=float32)
y :  tf.Tensor([5. 5. 5.], shape=(3,), dtype=float32)
x :  tf.Tensor(
[[ 0.48530516 -0.8492419  -0.06530126 -0.02337966  1.4974351  -0.07790658
  -0.90236324  0.78145146]
 [ 2.5150437   1.0731637   0.5574401  -0.17273512 -0.6129126  -0.01909157
  -0.5710993  -0.02749031]
 [-1.4803331  -0.68904144 -0.35624704 -0.17255889 -0.82158846 -0.13823092
   1.9157133  -1.0211904 ]], shape=(3, 8), dtype=float32)
y :  tf.Tensor([5. 5. 5.], shape=(3,), dtype=float32)
x :  tf.Tensor(
[[ 0.4369235  -1.9706452  -0.16642106  0.05486205 -0.8379196  -0.1323988
  -0.99567705  0.94124246]
 [ 1.6312258   0.35226166  0.04080576 -0.14088951 -0.4632104  -0.06751624
  

In [20]:
batch_size = 32
train_set = csv_reader_dataset(train_filenames,
                               batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames,
                               batch_size = batch_size)
test_set = csv_reader_dataset(test_filenames,
                              batch_size = batch_size)

Tensor("stack:0", shape=(8,), dtype=float32)
Tensor("stack_1:0", shape=(), dtype=float32, device=/job:localhost/replica:0/task:0/device:CPU:0)
Tensor("stack:0", shape=(8,), dtype=float32)
Tensor("stack_1:0", shape=(), dtype=float32, device=/job:localhost/replica:0/task:0/device:CPU:0)
Tensor("stack:0", shape=(8,), dtype=float32)
Tensor("stack_1:0", shape=(), dtype=float32, device=/job:localhost/replica:0/task:0/device:CPU:0)


In [21]:
model = keras.models.Sequential(
    [
        keras.layers.Dense(30,activation='relu',input_shape=[8])
        ,keras.layers.Dense(1)
    ]
)
model.compile(loss='mean_squared_error',optimizer='sgd')


In [None]:
history = model.fit(
    train_set
    ,validation_data=valid_set
    ,steps_per_epoch=11160 // batch_size
    ,validation_steps= 3870 // batch_size
    ,epochs = 100
)

Train for 348 steps, validate for 120 steps
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epo

Epoch 80/100
Epoch 81/100
Epoch 82/100
Epoch 83/100
Epoch 84/100
Epoch 85/100

In [None]:
model.evaluate(test_set,steps = 5160 // batch_size)