In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf

from tensorflow import keras

print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd, sklearn, tf, keras:
    print(module.__name__, module.__version__)

2.9.1
sys.version_info(major=3, minor=9, micro=7, releaselevel='final', serial=0)
matplotlib 3.5.2
numpy 1.23.0
pandas 1.4.3
sklearn 1.1.1
tensorflow 2.9.1
keras.api._v2.keras 2.9.0


In [2]:
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()

In [3]:
from sklearn.model_selection import train_test_split

x_train_all, x_test, y_train_all, y_test = train_test_split(
    housing.data, housing.target, random_state = 7)
x_train, x_valid, y_train, y_valid = train_test_split(
    x_train_all, y_train_all, random_state = 11)
print(x_train.shape, y_train.shape)
print(x_valid.shape, y_valid.shape)
print(x_test.shape, y_test.shape)

(11610, 8) (11610,)
(3870, 8) (3870,)
(5160, 8) (5160,)


In [4]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled = scaler.transform(x_test)

In [5]:
!rm -rf generate_csv

In [6]:
type(x_train_scaled)

numpy.ndarray

In [7]:
os.listdir()  # 把当前目录下的所有文件名导出成一个列表

['.ipynb_checkpoints',
 'generate_tfrecords',
 'temp.csv',
 'tf01-dataset_basic_api.ipynb',
 'tf02_data_generate_csv.ipynb',
 'tf03-tfrecord_basic_api.ipynb',
 'tf04_data_generate_tfrecord.ipynb',
 'tfrecord_basic']

In [29]:
np.array_split(np.arange(20), 4)

[array([0, 1, 2, 3, 4]),
 array([5, 6, 7, 8, 9]),
 array([10, 11, 12, 13, 14]),
 array([15, 16, 17, 18, 19])]

In [8]:
# enumerate会在上面的基础上对数据编号
for file_idx, row_indices in enumerate(np.array_split(np.arange(20), 4)):
    print(file_idx,row_indices)

0 [0 1 2 3 4]
1 [5 6 7 8 9]
2 [10 11 12 13 14]
3 [15 16 17 18 19]


In [10]:
# 下面要把特征工程后的数据存为csv文件
output_dir = "generate_csv"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

# save_to_csv的代码其实是可以直接复用的
def save_to_csv(output_dir,  # 文件夹
                data,  # 数据
                name_prefix,  # 前缀
                header=None,  # 为True时，第一行存了那些特征，读的时候就要跳过
                n_parts=10):  # 将数据集分成多少份
    # 生成文件名 格式generate_csv/{}_{:02d}.csv
    path_format = os.path.join(output_dir, "{}_{:02d}.csv")  # os.path.join会自动加/
    filenames = []
    # 把数据分为n_parts部分，写到文件中去
    for file_idx, row_indices in enumerate(
        np.array_split(np.arange(len(data)), n_parts)):
        # 生成子文件名
        part_csv = path_format.format(name_prefix, file_idx)
        filenames.append(part_csv)  # 文件名添加到列表
        with open(part_csv, "w", encoding="utf-8") as f:
            # 先写头部
            if header is not None:
                f.write(header + "\n")
            for row_index in row_indices:
                # 把字符串化后的每个字符串用逗号拼接起来
                f.write(",".join(
                    [repr(col) for col in data[row_index]]))
                f.write('\n')
    return filenames
# np.c_把x和y合并起来,按轴1合并
train_data = np.c_[x_train_scaled, y_train]
valid_data = np.c_[x_valid_scaled, y_valid]
test_data = np.c_[x_test_scaled, y_test]
# 头部,特征，也有目标
header_cols = housing.feature_names + ["MidianHouseValue"]
# 把列表变为字符串
header_str = ",".join(header_cols)
print(header_str)
train_filenames = save_to_csv(output_dir, train_data, "train",
                              header_str, n_parts=20)
valid_filenames = save_to_csv(output_dir, valid_data, "valid",
                              header_str, n_parts=10)
test_filenames = save_to_csv(output_dir, test_data, "test",
                             header_str, n_parts=10)

MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MidianHouseValue


In [11]:
temp_array=np.array([[1,2,3],[4,5,6]])
np.savetxt("temp.csv",temp_array)  # savetxt会自动将整型数或者浮点数转为字符串存储

In [12]:
!cat temp.csv

1.000000000000000000e+00 2.000000000000000000e+00 3.000000000000000000e+00
4.000000000000000000e+00 5.000000000000000000e+00 6.000000000000000000e+00


In [13]:
# 看下生成文件的文件名
print(train_filenames)
import pprint  # 为了打印美观性
print("train filenames:")
pprint.pprint(train_filenames)
print("valid filenames:")
pprint.pprint(valid_filenames)
print("test filenames:")
pprint.pprint(test_filenames)

['generate_csv\\train_00.csv', 'generate_csv\\train_01.csv', 'generate_csv\\train_02.csv', 'generate_csv\\train_03.csv', 'generate_csv\\train_04.csv', 'generate_csv\\train_05.csv', 'generate_csv\\train_06.csv', 'generate_csv\\train_07.csv', 'generate_csv\\train_08.csv', 'generate_csv\\train_09.csv', 'generate_csv\\train_10.csv', 'generate_csv\\train_11.csv', 'generate_csv\\train_12.csv', 'generate_csv\\train_13.csv', 'generate_csv\\train_14.csv', 'generate_csv\\train_15.csv', 'generate_csv\\train_16.csv', 'generate_csv\\train_17.csv', 'generate_csv\\train_18.csv', 'generate_csv\\train_19.csv']
train filenames:
['generate_csv\\train_00.csv',
 'generate_csv\\train_01.csv',
 'generate_csv\\train_02.csv',
 'generate_csv\\train_03.csv',
 'generate_csv\\train_04.csv',
 'generate_csv\\train_05.csv',
 'generate_csv\\train_06.csv',
 'generate_csv\\train_07.csv',
 'generate_csv\\train_08.csv',
 'generate_csv\\train_09.csv',
 'generate_csv\\train_10.csv',
 'generate_csv\\train_11.csv',
 'generate

In [14]:
# 1. filename -> dataset
# 2. read file -> dataset -> datasets -> merge
# 3. parse csv
# list_files把文件名搞为一个dataset
# list_files默认行为是按不确定的随机混排顺序返回文件名
filename_dataset = tf.data.Dataset.list_files(train_filenames)
for filename in filename_dataset:
    print(filename)

tf.Tensor(b'generate_csv\\train_00.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_17.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_01.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_18.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_16.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_14.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_12.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_15.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_03.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_11.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_06.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_13.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_02.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_19.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_04.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\

In [15]:
filename_mydataset=tf.data.Dataset.from_tensor_slices(train_filenames)
filename_mydataset=filename_mydataset.repeat(1)
for i in filename_mydataset:
    print(i)

tf.Tensor(b'generate_csv\\train_00.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_01.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_02.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_03.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_04.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_05.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_06.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_07.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_08.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_09.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_10.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_11.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_12.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_13.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\\train_14.csv', shape=(), dtype=string)
tf.Tensor(b'generate_csv\

In [None]:
# 把数据从文件中拿出来

In [16]:
# 一访问list_files的dataset对象就随机了文件顺序
# for filename in filename_dataset:
#     print(filename)
n_readers = 5
dataset = filename_mydataset.interleave(
    # 前面1行是header
    # lambda filename: tf.data.TextLineDataset(filename),
    #不带header,把特征名字去掉
    lambda filename: tf.data.TextLineDataset(filename).skip(1),
    cycle_length = n_readers,  # cycle_length和block_length增加获取了数据的随机性
    block_length=2
)
for line in dataset.take(15):
    print(line)

tf.Tensor(b'0.801544314532886,0.27216142415910205,-0.11624392696666119,-0.2023115137272354,-0.5430515742518128,-0.021039615516440048,-0.5897620622908205,-0.08241845654707416,3.226', shape=(), dtype=string)
tf.Tensor(b'-0.2980728090942217,0.3522616607867429,-0.10920507530549702,-0.25055520947444,-0.034064024638222286,-0.006034004264459185,1.080554840130013,-1.0611381656679573,1.514', shape=(), dtype=string)
tf.Tensor(b'0.8115083791797953,-0.04823952235146133,0.5187339067174729,-0.029386394873127775,-0.034064024638222286,-0.05081594842905086,-0.7157356834231196,0.9162751241885168,2.147', shape=(), dtype=string)
tf.Tensor(b'-0.6906143291679195,-0.1283397589791022,7.0201810347470595,5.624287386169439,-0.2663292879200034,-0.03662080416157129,-0.6457503383496215,1.2058962626018372,1.352', shape=(), dtype=string)
tf.Tensor(b'0.401276648075221,-0.9293421252555106,-0.05333050451405854,-0.1865945262276826,0.6545661895448709,0.026434465728210874,0.9312527706398824,-1.4406417263474771,2.512', shap

In [None]:
# 把每一行数据切分为对应类型

In [17]:
# parse csv 解析csv，通过decode_csv
# tf.io.decode_csv(str, record_defaults)

sample_str = '1,2,3,4,5'
record_defaults = [
    tf.constant(0, dtype=tf.int32),
    0,
    np.nan,
    "hello1",
    tf.constant([])  # 没有固定类型，默认是float32
]
# sample_str数据格式化，按照record_defaults进行处理
parsed_fields = tf.io.decode_csv(sample_str, record_defaults)
print(parsed_fields)

[<tf.Tensor: shape=(), dtype=int32, numpy=1>, <tf.Tensor: shape=(), dtype=int32, numpy=2>, <tf.Tensor: shape=(), dtype=float32, numpy=3.0>, <tf.Tensor: shape=(), dtype=string, numpy=b'4'>, <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]


In [20]:
# 传一个空的字符串测试
try:
    parsed_fields = tf.io.decode_csv(',,,,1', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)
parsed_fields

[<tf.Tensor: shape=(), dtype=int32, numpy=0>,
 <tf.Tensor: shape=(), dtype=int32, numpy=0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=nan>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'hello1'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=1.0>]

In [21]:
# 给的值过多的情况
try:
    parsed_fields = tf.io.decode_csv('1,2,3,4,5,6,', record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Expect 5 fields but have 7 in record 0 [Op:DecodeCSV]


In [22]:
# 解析一行
def parse_csv_line(line, n_fields = 9):
    # 先写一个默认的格式，就是9个nan,如果从csv中读取缺失数据，就会变为nan
    defs = [tf.constant(np.nan)] * n_fields
    # 使用decode_csv解析
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    # 前8个是x，最后一个是y
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    return x, y

parse_csv_line(b'-0.9868720801669367,0.832863080552588,-0.18684708416901633,-0.14888949288707784,-0.4532302419670616,-0.11504995754593579,1.6730974284189664,-0.7465496877362412,1.138',
               n_fields=9)

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([-0.9868721 ,  0.8328631 , -0.18684709, -0.1488895 , -0.45323023,
        -0.11504996,  1.6730974 , -0.74654967], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.138], dtype=float32)>)

In [36]:
# 1. filename -> dataset
# 2. read file -> dataset -> datasets -> merge
# 3. parse csv
# 完成整个流程
def csv_reader_dataset(filenames, n_readers=5,
                       batch_size=32, n_parse_threads=5,
                       shuffle_buffer_size=10000):
    # 把文件名类别变为dataset tensor
    dataset = tf.data.Dataset.list_files(filenames)
    # 变为repeat dataset可以让读到最后一个样本时，重新去读第一个样本
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        # skip(1)是因为每个文件存了特征名字，target名字
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)  # 对数据进行洗牌，混乱
    # map，通过parse_csv_line对数据集进行映射，map只会给函数传递一个参数，这个参数就是dataset中的tensor
    dataset = dataset.map(parse_csv_line,
                          num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)  # 分块，把batch_size个tensor合并为一个tensor，就是一个batch
    return dataset

# 这里是一个测试,写4是为了理解
train_set = csv_reader_dataset(train_filenames, batch_size=4)
print(train_set)  # 迭代器
print('-'*50)
# 是csv_reader_dataset处理后的结果
for x_batch, y_batch in train_set.take(2):  # take去拿那个迭代器里的数据
    print("x:")
    pprint.pprint(x_batch)
    print('-'*50)
    print("y:")
    pprint.pprint(y_batch)
    print('-'*50)

<BatchDataset element_spec=(TensorSpec(shape=(None, 8), dtype=tf.float32, name=None), TensorSpec(shape=(None, 1), dtype=tf.float32, name=None))>
--------------------------------------------------
x:
<tf.Tensor: shape=(4, 8), dtype=float32, numpy=
array([[-0.82195884,  1.8741661 ,  0.1821235 , -0.03170019, -0.6011179 ,
        -0.14337493,  1.0852206 , -0.8613995 ],
       [-0.66722274, -0.04823952,  0.34529406,  0.53826684,  1.8521839 ,
        -0.06112538, -0.8417093 ,  1.5204847 ],
       [-1.1157656 ,  0.99306357, -0.334192  , -0.06535219, -0.32893205,
         0.04343066, -0.12785879,  0.30707204],
       [-0.32652634,  0.4323619 , -0.09345459, -0.08402992,  0.8460036 ,
        -0.02663165, -0.56176794,  0.1422876 ]], dtype=float32)>
--------------------------------------------------
y:
<tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.054],
       [1.59 ],
       [0.524],
       [2.431]], dtype=float32)>
--------------------------------------------------
x:
<tf.Tensor: sha

In [25]:
%%time
batch_size = 32
train_set = csv_reader_dataset(train_filenames,
                               batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames,
                               batch_size = batch_size)
test_set = csv_reader_dataset(test_filenames,
                              batch_size = batch_size)

# print(train_set)
# print(valid_set)
# print(test_set)

CPU times: total: 62.5 ms
Wall time: 60 ms


In [26]:
# 知道长度为8
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu',
                       input_shape=[8]),
    keras.layers.Dense(1),
])
model.compile(loss="mean_squared_error", optimizer="sgd")
callbacks = [keras.callbacks.EarlyStopping(patience=5, min_delta=1e-2)]

# 当是BatchDataset,必须制定steps_per_epoch，validation_steps
history = model.fit(train_set,
                    validation_data = valid_set,
                    steps_per_epoch = 11160 // batch_size,  # 每次epoch训练的步数
                    validation_steps = 3870 // batch_size,
                    epochs = 100,
                    callbacks = callbacks)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100


In [27]:
model.evaluate(test_set, steps = 5160 // batch_size)



0.38053369522094727

In [37]:
dataset = tf.data.Dataset.range(8)
print(dataset)
dataset = dataset.batch(4)  # 把tensor组合到一起，就是分了batch
list(dataset)

<RangeDataset element_spec=TensorSpec(shape=(), dtype=tf.int64, name=None)>


[<tf.Tensor: shape=(4,), dtype=int64, numpy=array([0, 1, 2, 3], dtype=int64)>,
 <tf.Tensor: shape=(4,), dtype=int64, numpy=array([4, 5, 6, 7], dtype=int64)>]