In [2]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all" 

In [3]:
import numpy as np
import os
import gzip

import tensorflow as tf
from tensorflow import keras
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt

In [4]:
from sklearn.datasets import fetch_california_housing
housing = fetch_california_housing()
# print(housing.DESCR)
print(housing.data.shape)
print(housing.target.shape)
from sklearn.model_selection import train_test_split
x_train_all, x_test, y_train_all, y_test = train_test_split(housing.data, housing.target, random_state=7)
x_train, x_valid, y_train, y_valid = train_test_split(x_train_all, y_train_all, random_state=11)
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled = scaler.transform(x_test)
print(x_train_scaled.shape)
print(x_valid_scaled.shape)
print(housing.feature_names)

(20640, 8)
(20640,)
(11610, 8)
(3870, 8)
['MedInc', 'HouseAge', 'AveRooms', 'AveBedrms', 'Population', 'AveOccup', 'Latitude', 'Longitude']


In [7]:
output_dir = 'generate_csv'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
    
def save_to_csv(output_dir, data, name_prefix, header=None, n_parts=10):
    # name_prefix：因为train、valid、test分开放
    # n_parts：决定数据集切割成多个文件存储
    path_format = os.path.join(output_dir, "{}_{:02d}.csv")   # 生成文件名
    filenames = [] 

    # 生成 0 到 len(train)-1 长度的数组，再分成n_parts个
    # https://blog.csdn.net/m0_37393514/article/details/79537639
    for file_idx, row_indices in enumerate(np.array_split(np.arange(len(data)), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filenames.append(part_csv)
        with open (part_csv, 'wt', encoding='utf-8') as f:
            if header is not None:
                f.write(header + "\n")
            for row_index in row_indices:
                # 1）字符串化   2）","间隔  3）拼接  
                # repr(object)  返回一个对象的 string 格式。
                f.write(",".join([repr(col) for col in data[row_index]]))
                f.write("\n")
    return filenames


train_data = np.c_[x_train_scaled, y_train]   # 按行拼接 注意[]不能用()
valid_data = np.c_[x_valid_scaled, y_valid]
test_data = np.c_[x_test_scaled, y_test]
header_cols = housing.feature_names + ["MidianHousingValue"]
header_str = ",".join(header_cols)

train_filenames = save_to_csv(output_dir, train_data, "train", header_str, n_parts=20)
valid_filenames = save_to_csv(output_dir, valid_data, "valid", header_str, n_parts=10)
test_filenames = save_to_csv(output_dir, test_data, "test", header_str, n_parts=10)

In [17]:
# 使用csv文件
import pprint
print("train_filenames:")
pprint.pprint(train_filenames)
print("valid_filenames:")
print(valid_filenames)
print("test_filenames:")
pprint.pprint(test_filenames)

train_filenames:
['generate_csv\\train_00.csv',
 'generate_csv\\train_01.csv',
 'generate_csv\\train_02.csv',
 'generate_csv\\train_03.csv',
 'generate_csv\\train_04.csv',
 'generate_csv\\train_05.csv',
 'generate_csv\\train_06.csv',
 'generate_csv\\train_07.csv',
 'generate_csv\\train_08.csv',
 'generate_csv\\train_09.csv',
 'generate_csv\\train_10.csv',
 'generate_csv\\train_11.csv',
 'generate_csv\\train_12.csv',
 'generate_csv\\train_13.csv',
 'generate_csv\\train_14.csv',
 'generate_csv\\train_15.csv',
 'generate_csv\\train_16.csv',
 'generate_csv\\train_17.csv',
 'generate_csv\\train_18.csv',
 'generate_csv\\train_19.csv']
valid_filenames:
['generate_csv\\valid_00.csv', 'generate_csv\\valid_01.csv', 'generate_csv\\valid_02.csv', 'generate_csv\\valid_03.csv', 'generate_csv\\valid_04.csv', 'generate_csv\\valid_05.csv', 'generate_csv\\valid_06.csv', 'generate_csv\\valid_07.csv', 'generate_csv\\valid_08.csv', 'generate_csv\\valid_09.csv']
test_filenames:
['generate_csv\\test_00.csv',

In [18]:
# 读取csv，生成dataset
# 1、filenames -> dataset(files)
# 2、read file -> dataset -> datasets -> merge
# 3、parse csv
filename_dataset = tf.data.Dataset.list_files(train_filenames)
for filename in filename_dataset:
    print(filename)  # 20个tensor，每个tensor就是一个文件名

tf.Tensor(b'generate_csv\\train_11.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_10.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_18.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_15.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_01.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_13.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_16.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_14.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_12.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_05.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_02.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_03.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_06.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_19.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_04.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\

In [19]:
# object.interleave() 遍历object中每一个元素，对其进行操作，再合并结果
# TextLineDataset 读取文本文件，生成一个dataset。.skip() 跳过多少行(避免header混入数据)
# cycle_length    控制读取文件的并行度
n_readers = 5
dataset = filename_dataset.interleave(lambda filename: tf.data.TextLineDataset(filename).skip(1),
                                 cycle_length = n_readers)  
# .take() 只读取前15个
for line in dataset.take(15):
    print(line.numpy()) # 结果是一个字符串，带有9个field

b'0.09734603446040174,0.7527628439249472,-0.20218964416999152,-0.1954700015215477,-0.4060513603629498,0.006785531677655949,-0.813715166526018,0.656614793197258,1.119'
b'-1.0591781535672364,1.393564736946074,-0.026331968874673636,-0.11006759528831847,-0.6138198966579805,-0.09695934953589447,0.3247131133362288,-0.037477245413977976,0.672'
b'-0.32652634129448693,0.43236189741438374,-0.09345459539684739,-0.08402991822890092,0.8460035745154013,-0.0266316482653991,-0.5617679242614233,0.1422875991184281,2.431'
b'-0.09719300311107498,-1.249743071766074,0.36232962250170797,0.026906080250728295,1.033811814747154,0.045881586971778555,1.3418334617377423,-1.6353869745909178,1.832'
b'0.401276648075221,-0.9293421252555106,-0.05333050451405854,-0.1865945262276826,0.6545661895448709,0.026434465728210874,0.9312527706398824,-1.4406417263474771,2.512'
b'-1.4803330571456954,-0.6890414153725881,-0.35624704887282904,-0.1725588908792445,-0.8215884329530113,-0.1382309124854157,1.9157132913404298,-1.02119042243

In [20]:
# tf.io.decode_csv(str, record_defaults(定义类型))

sample_str = '1, 2, 3, 4, 5'
record_defaults = [tf.constant(0, dtype=tf.int32)] * 5  # 定义默认值和类型，维度相同
parse_fields = tf.io.decode_csv(sample_str, record_defaults)
print(parse_fields)  # 将字符串转为tensor的列表

[<tf.Tensor: id=367, shape=(), dtype=int32, numpy=1>, <tf.Tensor: id=368, shape=(), dtype=int32, numpy=2>, <tf.Tensor: id=369, shape=(), dtype=int32, numpy=3>, <tf.Tensor: id=370, shape=(), dtype=int32, numpy=4>, <tf.Tensor: id=371, shape=(), dtype=int32, numpy=5>]


In [21]:
# 多个类型， 默认float32
sample_str = '1, 2, 3, 4, 5'
record_defaults = [tf.constant(0, dtype=tf.int32),
                   0,
                   np.nan,
                   "hello",
                   tf.constant([])] 
parse_fields = tf.io.decode_csv(sample_str, record_defaults)
print(parse_fields) 

[<tf.Tensor: id=378, shape=(), dtype=int32, numpy=1>, <tf.Tensor: id=379, shape=(), dtype=int32, numpy=2>, <tf.Tensor: id=380, shape=(), dtype=float32, numpy=3.0>, <tf.Tensor: id=381, shape=(), dtype=string, numpy=b' 4'>, <tf.Tensor: id=382, shape=(), dtype=float32, numpy=5.0>]


In [22]:
# 字符串错误
try:
    parse_fields = tf.io.decode_csv(',,,,', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)
    
# 给的值过多，溢出
try:
    parse_fields = tf.io.decode_csv('1,2,3,4,5,6,7', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Field 4 is required but missing in record 0! [Op:DecodeCSV]
Expect 5 fields but have 7 in record 0 [Op:DecodeCSV]


In [23]:
# 转行具体一行
def parse_csv_line(line, n_fields = 9):
    # record_defaults
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    return x, y
line = b'0.199384450496934,1.0731637904355105,\
       -0.19840853933562783,-0.29328906965393414,-0.07852104768825069,0.018804888420646343,0.8006134598360177,-1.1510205879341566,1.99'
parse_csv_line(line, n_fields=9)

(<tf.Tensor: id=402, shape=(8,), dtype=float32, numpy=
 array([ 0.19938445,  1.0731637 , -0.19840854, -0.29328907, -0.07852105,
         0.01880489,  0.80061346, -1.1510206 ], dtype=float32)>,
 <tf.Tensor: id=403, shape=(1,), dtype=float32, numpy=array([1.99], dtype=float32)>)

In [27]:
# 完整函数
def csv_reader_dataset(filenames, n_readers=5, batch_size=32,
                       n_parse_threads=5, shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(lambda filename: tf.data.TextLineDataset(filename).skip(1),
                                          cycle_length=n_readers)
    # shuffle_buffer_size作用：https://blog.csdn.net/qq_16234613/article/details/81703228#commentsedit
    # shuffle,batch,repeat作用：https://blog.csdn.net/angel_hben/article/details/84341421
    #                           https://blog.csdn.net/YQMind/article/details/82901442
    dataset.shuffle(shuffle_buffer_size)
    # 解析 有点像interleave(一对多)，map(一对一)
    # map中传入的是函数名，默认传第一个参数line，这里就先将n_fields设一个默认值
    dataset = dataset.map(parse_csv_line, num_parallel_calls=n_parse_threads)
    # batch级别操作
    dataset = dataset.batch(batch_size)
    return dataset

train_set = csv_reader_dataset(train_filenames, batch_size=3)
for x_batch, y_batch in train_set.take(2):
    print("x:")
    pprint.pprint(x_batch)
    print("y:")
    pprint.pprint(y_batch)

x:
<tf.Tensor: id=2233, shape=(3, 8), dtype=float32, numpy=
array([[ 0.40127665, -0.92934215, -0.0533305 , -0.18659453,  0.65456617,
         0.02643447,  0.9312528 , -1.4406418 ],
       [-1.119975  , -1.3298433 ,  0.14190045,  0.4658137 , -0.10301778,
        -0.10744184, -0.7950524 ,  1.5304717 ],
       [ 0.09734604,  0.75276285, -0.20218964, -0.19547   , -0.40605137,
         0.00678553, -0.81371516,  0.6566148 ]], dtype=float32)>
y:
<tf.Tensor: id=2234, shape=(3, 1), dtype=float32, numpy=
array([[2.512],
       [0.66 ],
       [1.119]], dtype=float32)>
x:
<tf.Tensor: id=2235, shape=(3, 8), dtype=float32, numpy=
array([[ 0.81150836, -0.04823952,  0.5187339 , -0.0293864 , -0.03406402,
        -0.05081595, -0.7157357 ,  0.91627514],
       [ 0.48530516, -0.8492419 , -0.06530126, -0.02337966,  1.4974351 ,
        -0.07790658, -0.90236324,  0.78145146],
       [-0.8757754 ,  1.8741661 , -0.94874996, -0.09657185, -0.7163432 ,
        -0.07790191,  0.98257536, -1.4206679 ]], dtype=float

In [28]:
batch_size = 32
train_set = csv_reader_dataset(train_filenames, batch_size=batch_size)
valid_set = csv_reader_dataset(valid_filenames, batch_size=batch_size)
test_set = csv_reader_dataset(test_filenames, batch_size=batch_size)

In [29]:
# 将dataset与keras集成
model = keras.Sequential([
    keras.layers.Dense(30, activation='relu', input_shape=[8]),
    keras.layers.Dense(1)
])
model.compile(loss='mse', optimizer='sgd')
callbacks = [keras.callbacks.EarlyStopping(patience=5, min_delta=1e-2)]

history = model.fit(train_set, 
                    epochs = 100,
                    steps_per_epoch = 11160 // batch_size, 
                    validation_data = valid_set, 
                    validation_steps = 3870 // batch_size,
                    callbacks=callbacks)

Train for 348 steps, validate for 120 steps
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100


In [30]:
model.evaluate(test_set, steps = 5160 // batch_size)



0.3762262584815114