# Datasets 使用之 CSV

In [1]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import os
import sys
import matplotlib.pyplot as plt

### 1. 准备数据

In [2]:
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()

In [3]:
from sklearn.model_selection import train_test_split

x_train_all, x_test, y_train_all, y_test = train_test_split(
    housing.data, housing.target, random_state = 7)
x_train, x_valid, y_train, y_valid = train_test_split(
    x_train_all, y_train_all, random_state = 11)
print(x_train.shape, y_train.shape)
print(x_valid.shape, y_valid.shape)
print(x_test.shape, y_test.shape)

(11610, 8) (11610,)
(3870, 8) (3870,)
(5160, 8) (5160,)


In [4]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled = scaler.transform(x_test)

**把数据保存为CSV格式**

In [5]:
output_dir = "../generate_csv"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

def save_to_csv(output_dir, data, name_prefix, header=None, n_parts=10):
    path_format = os.path.join(output_dir, "{}_{:02d}.csv")
    filenames = []
    
    for file_idx, row_indies in enumerate(
        np.array_split(np.arange(len(data)), n_parts)):
        """data 是元组的形式"""
        path_csv = path_format.format(name_prefix, file_idx)
        filenames.append(path_csv)
        # 把数据写入 CSV 文件
        with open(path_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header + "\n")
            for row_indie in row_indies:
                f.write(",".join([
                    repr(col) for col in data[row_indie]
                ]))
                f.write("\n")
    return filenames

# merge data
train_data = np.hstack([x_train_scaled, y_train.reshape(-1, 1)])
valid_data = np.hstack([x_valid_scaled, y_valid.reshape(-1, 1)])
test_data = np.hstack([x_test_scaled, y_test.reshape(-1, 1)])
header_cols = housing.feature_names + ["MidianHouseValue"]
header_str = ",".join(header_cols)

**写入数据到文件**

In [6]:
train_filenames = save_to_csv(output_dir, train_data, "train", 
                            header_str, n_parts=20)
valid_filenames = save_to_csv(output_dir, valid_data, "valid",
                             header_str, n_parts=10)
test_filenames = save_to_csv(output_dir, test_data, "test",
                             header_str, n_parts=10)

In [7]:
import pprint
print("train filenames:")
pprint.pprint(train_filenames)
print("valid filenames:")
pprint.pprint(valid_filenames)
print("test filenames:")
pprint.pprint(test_filenames)

train filenames:
['../generate_csv/train_00.csv',
 '../generate_csv/train_01.csv',
 '../generate_csv/train_02.csv',
 '../generate_csv/train_03.csv',
 '../generate_csv/train_04.csv',
 '../generate_csv/train_05.csv',
 '../generate_csv/train_06.csv',
 '../generate_csv/train_07.csv',
 '../generate_csv/train_08.csv',
 '../generate_csv/train_09.csv',
 '../generate_csv/train_10.csv',
 '../generate_csv/train_11.csv',
 '../generate_csv/train_12.csv',
 '../generate_csv/train_13.csv',
 '../generate_csv/train_14.csv',
 '../generate_csv/train_15.csv',
 '../generate_csv/train_16.csv',
 '../generate_csv/train_17.csv',
 '../generate_csv/train_18.csv',
 '../generate_csv/train_19.csv']
valid filenames:
['../generate_csv/valid_00.csv',
 '../generate_csv/valid_01.csv',
 '../generate_csv/valid_02.csv',
 '../generate_csv/valid_03.csv',
 '../generate_csv/valid_04.csv',
 '../generate_csv/valid_05.csv',
 '../generate_csv/valid_06.csv',
 '../generate_csv/valid_07.csv',
 '../generate_csv/valid_08.csv',
 '../gene

**读取CSV文件数据**

In [8]:
# 1. 读取所有文件名为数据集对象 filename -> dataset
filename_dataset = tf.data.Dataset.list_files(train_filenames)
for filename in filename_dataset:
    print(filename)

tf.Tensor(b'../generate_csv/train_00.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_05.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_09.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_10.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_19.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_02.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_01.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_04.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_13.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_17.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_07.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_11.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_14.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_12.csv', shape=(), dtype=string)
tf.Tensor(b'../generate_csv/train_03.csv', shape=(), dtype=str

In [9]:
# 2. 根据文件名读取相应的数据 read file -> dataset -> datasets -> merge
n_readers = 5
dataset = filename_dataset.interleave(
    # 按行读取文本文件 TextLineDataset, 并忽略第一行（头部信息）
    lambda filename: tf.data.TextLineDataset(filename).skip(1),
    cycle_length=n_readers
)
for line in dataset.take(10):
    print(line.numpy())

b'0.6303435674178064,1.874166156711919,-0.06713214279531016,-0.12543366804152128,-0.19737553788322462,-0.022722631725889016,-0.692407235065288,0.7265233438487496,2.419'
b'0.15782311132800697,0.43236189741438374,0.3379948076652917,-0.015880306122244434,-0.3733890577139493,-0.05305245634489608,0.8006134598360177,-1.2359095422966828,3.169'
b'-1.0591781535672364,1.393564736946074,-0.026331968874673636,-0.11006759528831847,-0.6138198966579805,-0.09695934953589447,0.3247131133362288,-0.037477245413977976,0.672'
b'0.801544314532886,0.27216142415910205,-0.11624392696666119,-0.2023115137272354,-0.5430515742518128,-0.021039615516440048,-0.5897620622908205,-0.08241845654707416,3.226'
b'0.04326300977263167,-1.0895425985107923,-0.38878716774583305,-0.10789864528874438,-0.6818663605100649,-0.0723871014747467,-0.8883662012710817,0.8213992340186296,1.426'
b'1.6312258686346301,0.3522616607867429,0.04080576110152256,-0.1408895163348976,-0.4632103899987006,-0.06751623819156843,-0.8277122355407183,0.59669

In [10]:
# 3. 解析 CSV 文本数据 parse csv
# tf.io.decode_csv(str, record_defaults)
sample_str = '1,2,3,4,5'
record_defaults = [
    tf.constant(0, dtype=tf.int32),
    0,
    np.nan,
    "hello",
    tf.constant([])
]
parsed_fields = tf.io.decode_csv(sample_str, record_defaults)
print(parsed_fields)

[<tf.Tensor: id=95, shape=(), dtype=int32, numpy=1>, <tf.Tensor: id=96, shape=(), dtype=int32, numpy=2>, <tf.Tensor: id=97, shape=(), dtype=float32, numpy=3.0>, <tf.Tensor: id=98, shape=(), dtype=string, numpy=b'4'>, <tf.Tensor: id=99, shape=(), dtype=float32, numpy=5.0>]


In [11]:
try:
    parsed_fields = tf.io.decode_csv(',,,,', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Field 4 is required but missing in record 0! [Op:DecodeCSV]


In [12]:
try:
    parsed_fields = tf.io.decode_csv('1,2,3,4,5,6,7', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Expect 5 fields but have 7 in record 0 [Op:DecodeCSV]


In [13]:
# 按行解析数据
def parse_csv_line(line, n_fields = 9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    return x, y

parse_csv_line(b'-0.32652634129448693,0.43236189741438374,-0.09345459539684739,-0.08402991822890092,0.8460035745154013,-0.0266316482653991,-0.5617679242614233,0.1422875991184281,2.431',
              n_fields=9)

(<tf.Tensor: id=119, shape=(8,), dtype=float32, numpy=
 array([-0.32652634,  0.4323619 , -0.09345459, -0.08402992,  0.8460036 ,
        -0.02663165, -0.56176794,  0.1422876 ], dtype=float32)>,
 <tf.Tensor: id=120, shape=(1,), dtype=float32, numpy=array([2.431], dtype=float32)>)

In [14]:
# 1. filename -> dataset
# 2. read file -> dataset -> datasets -> merge
# 3. parse csv
def csv_reader_dataset(filenames, n_readers=5,
                       batch_size=32, n_parse_threads=5,
                       shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_csv_line,
                          num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

train_set = csv_reader_dataset(train_filenames, batch_size=3)
for x_batch, y_batch in train_set.take(2):
    print("x:")
    pprint.pprint(x_batch)
    print("y:")
    pprint.pprint(y_batch)

x:
<tf.Tensor: id=204, shape=(3, 8), dtype=float32, numpy=
array([[ 0.15782312,  0.4323619 ,  0.3379948 , -0.01588031, -0.37338907,
        -0.05305246,  0.80061346, -1.2359096 ],
       [ 0.81150836, -0.04823952,  0.5187339 , -0.0293864 , -0.03406402,
        -0.05081595, -0.7157357 ,  0.91627514],
       [ 2.5150437 ,  1.0731637 ,  0.5574401 , -0.17273512, -0.6129126 ,
        -0.01909157, -0.5710993 , -0.02749031]], dtype=float32)>
y:
<tf.Tensor: id=205, shape=(3, 1), dtype=float32, numpy=
array([[3.169  ],
       [2.147  ],
       [5.00001]], dtype=float32)>
x:
<tf.Tensor: id=206, shape=(3, 8), dtype=float32, numpy=
array([[ 0.09734604,  0.75276285, -0.20218964, -0.19547   , -0.40605137,
         0.00678553, -0.81371516,  0.6566148 ],
       [-1.0591781 ,  1.3935647 , -0.02633197, -0.1100676 , -0.6138199 ,
        -0.09695935,  0.3247131 , -0.03747724],
       [ 2.2878418 , -1.890545  ,  0.66071063, -0.14964779, -0.06672633,
         0.44788057, -0.5337738 ,  0.56673235]], dtype=fl

In [15]:
batch_size = 32
train_set = csv_reader_dataset(train_filenames,
                               batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames,
                               batch_size = batch_size)
test_set = csv_reader_dataset(test_filenames,
                              batch_size = batch_size)

In [17]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu', input_shape=[8]),
    keras.layers.Dense(1)
])
model.compile(loss="mean_squared_error", optimizer='sgd')
callbacks = [keras.callbacks.EarlyStopping(
    patience=3, min_delta=1e-2
)]

In [21]:
history = model.fit(train_set, validation_data=valid_set,
                   steps_per_epoch = 11160 // batch_size,
                   validation_steps = 3870 // batch_size,
                   epochs = 100,
                   callbacks = callbacks)

Train for 348 steps, validate for 120 steps
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100


In [22]:
model.evaluate(test_set, steps = 5160 //batch_size, verbose=2)

161/161 - 0s - loss: 0.3899


0.3899131120176789