In [None]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf

from tensorflow import keras


In [None]:
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()

In [None]:
from sklearn.model_selection import train_test_split

x_train_all, x_test, y_train_all, y_test = train_test_split(
    housing.data, housing.target, random_state = 7)
x_train, x_valid, y_train, y_valid = train_test_split(
    x_train_all, y_train_all, random_state = 11)
print(x_train.shape, y_train.shape)
print(x_valid.shape, y_valid.shape)
print(x_test.shape, y_test.shape)
print(type(x_train),type(y_train))

In [None]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled = scaler.transform(x_test)

# 生成csv文件

In [None]:
output_dir = "generate_csv"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

def save_to_csv(output_dir, data, name_prefix,
                header=None, n_parts=10):
    '''
    output_dir:生成数据集的存放位置
    data：源数据
    name_prefix：数据前缀，用来区分训练集，验证集，测试集
    n_parts：将数据分为多少份
    '''
    path_format = os.path.join(output_dir, "{}_{:02d}.csv")
    # od.path.join()：在output_dir下生成新的文件名  output_dir/~.csv
    # {}_{:02d}.csv：第一个{}用name_prefix来填充，指定是train，validata或test;
    #               第二个{}用file_idx来填充，制定时那种数据的第几个文件
    filenames = []
    
    for file_idx, row_indices in enumerate(
        np.array_split(np.arange(len(data)), n_parts)):
        '''从内向外看：
        np.arrange(len(data)):生成一个和元素数量相同的数据，用来在data中取数据。这里的一个元素指的是feature和label构成的元组
        np.array_split(np.arange(len(data)), n_parts)):将数据分为制定组数
        enumerate（～）：获得分成制定组后的数据的索引及索引对应的元素
        
        '''
        part_csv = path_format.format(name_prefix, file_idx) #生成output_dir/~.csv
        filenames.append(part_csv) #统计所有创建的文件名
        with open(part_csv, "wt", encoding="utf-8") as f:  #
            if header is not None:
                f.write(header + "\n")  #写入第一行，也就是列名
            for row_index in row_indices:
                f.write(",".join(
                    [repr(col) for col in data[row_index]]))
                    # repr(object):将对象转化为工解释器读取的形式（也就是在数据的最外围加上一个冒号，"object"）
                    # 存数据都是要以字符串的形式的，json
                f.write('\n')
    return filenames 
    
    
    
    
# np.c_[]:将两个按行融合   
train_data = np.c_[x_train_scaled, y_train] #train_data：<class 'numpy.ndarray'>
valid_data = np.c_[x_valid_scaled, y_valid]
test_data = np.c_[x_test_scaled, y_test]
# print(type(train_data))
# 由housing.feature_names获得房子影响价格的各项指标名，然后再加上一个估价的中值。形成行名
header_cols = housing.feature_names + ["MidianHouseValue"]
header_str = ",".join(header_cols) #以逗号分隔，由列表转为字符串

train_filenames = save_to_csv(output_dir, train_data, "train",  #返回的train_filenames是一个内部元素为文件名的数组
                              header_str, n_parts=20)
valid_filenames = save_to_csv(output_dir, valid_data, "valid",
                              header_str, n_parts=10)
test_filenames = save_to_csv(output_dir, test_data, "test",
                             header_str, n_parts=10)
    

In [None]:
import pprint
print("train filenames:")
# print(train_filenames)  #print打印内容很多的数据时就会很乱
pprint.pprint(train_filenames)

print("valid filenames:")
pprint.pprint(valid_filenames)
print("test filenames:")
pprint.pprint(test_filenames)

#pprint与print功能基本一致，区别就在于print会将所有内容用一行来打印，而pprint会根据数据的格式，分行打印，更符合人类阅读习惯

# tf.data.Dataset.listfile(filenames)
    func:将文件名转化为一个数据集
    

In [None]:
# 1. 读取文件名，形成一个文件：                    filename -> dataset
# 2. 分别读取读取数据集，形成数据集，然后将其合并。   read file -> dataset -> datasets -> merge
# 3. 读取出来的record是字符串，要将其进行解析。     parse csv

filename_dataset = tf.data.Dataset.list_files(train_filenames)  #dataset.list_files():将文件名转换为一个数据集 
print(type(filename_dataset))
for filename in filename_dataset:
    print(filename.numpy())

# tf.data.Dataset.TextLineDataset()
    func:按行读取文本，形成一个dataset
    paras:map_fn:对数据的操作
    cycle_length:读取文件的并行数
    
# dataset.skip(n)：省略(跳过)数据集中的指定行
    # 跳过的元素数量要考虑并行数

In [None]:
n_readers = 5
dataset = filename_dataset.interleave(
    lambda filename: tf.data.TextLineDataset(filename).skip(1),  #因为读取并行数是5，所以一次读取5行，每次跳一行,所以就是吧第一行跳过了
    cycle_length = n_readers
)
for line in dataset.take(15):
    print(line.numpy())


# tf.io.decode_csv(record, record_defaults， na_value, select_cols)
    func:将csv文件解析成数据集
    paras:record：要解析的源文件
          record_defaults:指定csv文件解析后，每一行的默认属性及默认值（当某列数据缺失时，采用默认值，如果没指定属性，则默认float32）
          na_value:可选参数。如果在record中识别到了NA/NAN，用什么字符代替，默认时“”。
          select_cols:可选参数。以一个排序的（sorted）列表指定哪些行将会被parse并返回。
          
                                
注：csv文件中，把每一行成为一个record，record中的每个元素称为一个field



In [None]:
# 测试一下decode_csv()
sample_str = '1,2,3,4,5'
record_defaults = [tf.constant(0, dtype=tf.int32)]*5


# sample_str = '1,2,3,,5'
# record_defaults = [
#     tf.constant(0, dtype=tf.int32),
#     0,
#     np.nan,
#     "hello",
#     tf.constant([])
# ]
parsed_fields = tf.io.decode_csv(sample_str, record_defaults)
pprint.pprint(parsed_fields)

In [None]:
# 将某一行转化为一个数据集
def parse_csv_line(line, n_fields = 9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    
    # stack()将数据沿指定维度进行堆叠
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
#     x = parsed_fields[0:-1]
#     y = parsed_fields[-1:]
    return x, y

parse_csv_line(b'-0.9868720801669367,0.832863080552588,-0.18684708416901633,-0.14888949288707784,-0.4532302419670616,-0.11504995754593579,1.6730974284189664,-0.7465496877362412,1.138',
               n_fields=9) 

# tf.map(map_fn, num_parallel_calls )
    func:对数据集中的所有元素执行相同操作，并返回处理后的新数据集。
    paras：map_fn：用什么函数对数据进行处理。可以用lambda匿名函数，也可以现在外部定义一个函数，在map中使用。
          num_parallel_calls:同时处理的元素数量。
        
# tf.data.Dataset.interleave()
    https://blog.csdn.net/menghuanshen/article/details/104240189
    func:对源数据集进行处理，最后得到一个交错的型数据集
    paras：map_fn:对源数据集进行制定操作的函数
          cycle_length:从源文件同时读取的数据条数
          block_length:每次从处理后的数据中读取几个
          num_parallel_calls:   还不知道啥用
          
# tf.data.Dataset.shuffle(shuffle_buffle_size)       
    func:将一按数据即打乱
    paras：shuffle_buffer_size:缓存大小（这个缓存可以理解为c语言里通过三个标量让两个变量的值相互交换一样）

In [None]:
# 编写函数读取整个csv文件

def csv_reader_dataset(filenames, n_readers=5,
                       batch_size=32, n_parse_threads=5,
                       shuffle_buffer_size=10000):
    '''
    n_readers:从text文件中同时读取的行数
    n_parse_threads:同时解析的并行数
    shuffle_buffer_size:读取文件的缓存大小
    '''
    dataset = tf.data.Dataset.list_files(filenames)  #先由文件名构建数据集
    dataset = dataset.repeat()  #不指定count，表示dataset重复无限次
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers
    )
    print(list(dataset.numpy()))
    dataset.shuffle(shuffle_buffer_size) #随机打乱顺序
    dataset = dataset.map(parse_csv_line,
                          num_parallel_calls=n_parse_threads)
    #通过前面定义的 行解析函数 对csv进行逐行解析
    dataset = dataset.batch(batch_size) # 指定batch_size
    return dataset

#测试是否能读取
train_set = csv_reader_dataset(train_filenames, batch_size=3)
for x_batch, y_batch in train_set.take(2):
    print("x:")
    pprint.pprint(x_batch)
    print("y:")
    pprint.pprint(y_batch)
    
    
'''
通过batch_size将文件划分为许多个batch，每个batch包含3个元素，每个元素由8个值的房子指标和一个值的房价构成。
然后通过take(2)拿出两个batch。

'''

In [None]:
# 通过定义的函数读取整个csv文件
batch_size = 32
train_set = csv_reader_dataset(train_filenames,
                               batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames,
                               batch_size = batch_size)
test_set = csv_reader_dataset(test_filenames,
                              batch_size = batch_size)
print(train_set) #DatasetV1Adapter类型， 

In [None]:
# 修改模型，将生成的数据集应用在model中
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu',
                       input_shape=[8]),  # 直接指定input_shape
    keras.layers.Dense(1),
])

model.compile(loss="mean_squared_error", optimizer="sgd")
callbacks = [keras.callbacks.EarlyStopping(
    patience=5, min_delta=1e-2)]

history = model.fit(train_set,  # 在我们创建的数据集中，x和y本就是以元组的形式结合在一起的，所以直接用train_set，
                                # 不用分别指定x_train,y_train（用逗号分隔，其实也就是形成了一个元祖）
                    validation_data = valid_set,
                    steps_per_epoch = 11160 // batch_size,  #告诉解释器执行了多少个batch算是一个epoch
                    validation_steps = 3870 // batch_size,
                    epochs = 100,
                    callbacks = callbacks)
'''
训练集用steps_per_epoch
验证集用validation_steps
测试集用steps

'''

In [None]:
model.evaluate(test_set, steps = 5160 // batch_size)