In [1]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import os
import matplotlib as mpl
import matplotlib.pyplot as plt

In [2]:
X = tf.range(10)
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset

<TensorSliceDataset shapes: (), types: tf.int32>

In [3]:
for data in dataset:
    print(data)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


In [4]:
dataset = tf.data.Dataset.range(10)

In [5]:
for d in dataset:
    print(d)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(7, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(9, shape=(), dtype=int64)


In [6]:
dataset = dataset.repeat(2).batch(6)
for d in dataset:
    print(d)

tf.Tensor([0 1 2 3 4 5], shape=(6,), dtype=int64)
tf.Tensor([6 7 8 9 0 1], shape=(6,), dtype=int64)
tf.Tensor([2 3 4 5 6 7], shape=(6,), dtype=int64)
tf.Tensor([8 9], shape=(2,), dtype=int64)


In [7]:
dataset =dataset.map(lambda x: x*2)
for d in dataset:
    print(d)

tf.Tensor([ 0  2  4  6  8 10], shape=(6,), dtype=int64)
tf.Tensor([12 14 16 18  0  2], shape=(6,), dtype=int64)
tf.Tensor([ 4  6  8 10 12 14], shape=(6,), dtype=int64)
tf.Tensor([16 18], shape=(2,), dtype=int64)


In [8]:
dataset = dataset.unbatch()
for d in dataset:
    print(d)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(10, shape=(), dtype=int64)
tf.Tensor(12, shape=(), dtype=int64)
tf.Tensor(14, shape=(), dtype=int64)
tf.Tensor(16, shape=(), dtype=int64)
tf.Tensor(18, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(10, shape=(), dtype=int64)
tf.Tensor(12, shape=(), dtype=int64)
tf.Tensor(14, shape=(), dtype=int64)
tf.Tensor(16, shape=(), dtype=int64)
tf.Tensor(18, shape=(), dtype=int64)


In [9]:
dataset = dataset.filter(lambda x: x<10)
for d in dataset:
    print(d)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)


In [10]:
for d in dataset.take(5):
    print(d)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)


In [11]:
dataset = tf.data.Dataset.range(10).repeat(2)
dataset = dataset.shuffle(buffer_size=6, seed=123).batch(3)
for d in dataset:
    print(d)

tf.Tensor([4 0 3], shape=(3,), dtype=int64)
tf.Tensor([8 2 9], shape=(3,), dtype=int64)
tf.Tensor([7 2 0], shape=(3,), dtype=int64)
tf.Tensor([3 5 6], shape=(3,), dtype=int64)
tf.Tensor([1 4 6], shape=(3,), dtype=int64)
tf.Tensor([9 7 5], shape=(3,), dtype=int64)
tf.Tensor([1 8], shape=(2,), dtype=int64)


In [12]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_

In [13]:
def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = os.path.join('Datasets', 'housing')
    os.makedirs(housing_dir, exist_ok=True)
    path_format = os.path.join(housing_dir, 'my_{}_{:02d}.csv')
    filepaths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filepaths.append(part_csv)
        with open(part_csv, 'wt', encoding='utf-8') as f:
            if header is not None:
                f.write(header)
                f.write('\n')
            for row_idx in row_indices:
                f.write(','.join([repr(col) for col in data[row_idx]]))
                f.write('\n')
    return filepaths

In [14]:
train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)

train_filepaths = save_to_multiple_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_multiple_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_multiple_csv_files(test_data, "test", header, n_parts=10)

In [15]:
train_filepaths

['Datasets\\housing\\my_train_00.csv',
 'Datasets\\housing\\my_train_01.csv',
 'Datasets\\housing\\my_train_02.csv',
 'Datasets\\housing\\my_train_03.csv',
 'Datasets\\housing\\my_train_04.csv',
 'Datasets\\housing\\my_train_05.csv',
 'Datasets\\housing\\my_train_06.csv',
 'Datasets\\housing\\my_train_07.csv',
 'Datasets\\housing\\my_train_08.csv',
 'Datasets\\housing\\my_train_09.csv',
 'Datasets\\housing\\my_train_10.csv',
 'Datasets\\housing\\my_train_11.csv',
 'Datasets\\housing\\my_train_12.csv',
 'Datasets\\housing\\my_train_13.csv',
 'Datasets\\housing\\my_train_14.csv',
 'Datasets\\housing\\my_train_15.csv',
 'Datasets\\housing\\my_train_16.csv',
 'Datasets\\housing\\my_train_17.csv',
 'Datasets\\housing\\my_train_18.csv',
 'Datasets\\housing\\my_train_19.csv']

In [16]:
import pandas as pd

pd.read_csv(train_filepaths[0]).head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
0,3.5214,15.0,3.049945,1.106548,1447.0,1.605993,37.63,-122.43,1.442
1,5.3275,5.0,6.49006,0.991054,3464.0,3.44334,33.69,-117.39,1.687
2,3.1,29.0,7.542373,1.591525,1328.0,2.250847,38.44,-122.98,1.621
3,7.1736,12.0,6.289003,0.997442,1054.0,2.695652,33.55,-117.7,2.621
4,2.0549,13.0,5.312457,1.085092,3297.0,2.244384,33.93,-116.93,0.956


In [17]:
with open(train_filepaths[0]) as f:
    for i in range(5):
        print(f.readline(), end='')

MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
3.5214,15.0,3.0499445061043287,1.106548279689234,1447.0,1.6059933407325193,37.63,-122.43,1.442
5.3275,5.0,6.490059642147117,0.9910536779324056,3464.0,3.4433399602385686,33.69,-117.39,1.687
3.1,29.0,7.5423728813559325,1.5915254237288134,1328.0,2.2508474576271187,38.44,-122.98,1.621
7.1736,12.0,6.289002557544757,0.9974424552429667,1054.0,2.6956521739130435,33.55,-117.7,2.621


In [18]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=123)

for file in filepath_dataset:
    print(file)

tf.Tensor(b'Datasets\\housing\\my_train_00.csv', shape=(), dtype=string)
tf.Tensor(b'Datasets\\housing\\my_train_03.csv', shape=(), dtype=string)
tf.Tensor(b'Datasets\\housing\\my_train_01.csv', shape=(), dtype=string)
tf.Tensor(b'Datasets\\housing\\my_train_16.csv', shape=(), dtype=string)
tf.Tensor(b'Datasets\\housing\\my_train_06.csv', shape=(), dtype=string)
tf.Tensor(b'Datasets\\housing\\my_train_18.csv', shape=(), dtype=string)
tf.Tensor(b'Datasets\\housing\\my_train_15.csv', shape=(), dtype=string)
tf.Tensor(b'Datasets\\housing\\my_train_07.csv', shape=(), dtype=string)
tf.Tensor(b'Datasets\\housing\\my_train_10.csv', shape=(), dtype=string)
tf.Tensor(b'Datasets\\housing\\my_train_17.csv', shape=(), dtype=string)
tf.Tensor(b'Datasets\\housing\\my_train_13.csv', shape=(), dtype=string)
tf.Tensor(b'Datasets\\housing\\my_train_08.csv', shape=(), dtype=string)
tf.Tensor(b'Datasets\\housing\\my_train_12.csv', shape=(), dtype=string)
tf.Tensor(b'Datasets\\housing\\my_train_09.csv', sh

In [19]:
n_readers = 5
dataset = filepath_dataset.interleave(lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
                                     cycle_length = n_readers)

In [20]:
for data in dataset.take(5):
    print(data)

tf.Tensor(b'4.2083,44.0,5.323204419889502,0.9171270718232044,846.0,2.3370165745856353,37.47,-122.2,2.782', shape=(), dtype=string)
tf.Tensor(b'3.8456,35.0,5.461346633416459,0.9576059850374065,1154.0,2.8778054862842892,37.96,-122.05,1.598', shape=(), dtype=string)
tf.Tensor(b'2.1856,41.0,3.7189873417721517,1.0658227848101265,803.0,2.0329113924050635,32.76,-117.12,1.205', shape=(), dtype=string)
tf.Tensor(b'5.9522,26.0,6.196521739130435,1.0069565217391305,1479.0,2.5721739130434784,34.5,-119.75,4.384', shape=(), dtype=string)
tf.Tensor(b'3.6875,44.0,4.524475524475524,0.993006993006993,457.0,3.195804195804196,34.04,-118.15,1.625', shape=(), dtype=string)


In [21]:
c=0
for data in dataset:
    c+=1

print(c)

11610


In [22]:
record_defaults=[0, np.nan, tf.constant(np.nan, dtype=tf.float64), "Hello", tf.constant([])]
parsed_fields = tf.io.decode_csv('1,2,3,4,5', record_defaults)
parsed_fields

[<tf.Tensor: id=11880, shape=(), dtype=int32, numpy=1>,
 <tf.Tensor: id=11881, shape=(), dtype=float32, numpy=2.0>,
 <tf.Tensor: id=11882, shape=(), dtype=float64, numpy=3.0>,
 <tf.Tensor: id=11883, shape=(), dtype=string, numpy=b'4'>,
 <tf.Tensor: id=11884, shape=(), dtype=float32, numpy=5.0>]

In [23]:
parsed_fields = tf.io.decode_csv(',,,,5', record_defaults)
parsed_fields

[<tf.Tensor: id=11889, shape=(), dtype=int32, numpy=0>,
 <tf.Tensor: id=11890, shape=(), dtype=float32, numpy=nan>,
 <tf.Tensor: id=11891, shape=(), dtype=float64, numpy=nan>,
 <tf.Tensor: id=11892, shape=(), dtype=string, numpy=b'Hello'>,
 <tf.Tensor: id=11893, shape=(), dtype=float32, numpy=5.0>]

In [34]:
n_inputs = 8
@tf.function
def preprocess(line):
    defs = [0.]*n_inputs + [tf.constant([], tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults = defs)
    x = tf.stack(fields[:-1])
    y =  tf.stack(fields[-1:])
    return (x-X_mean)/X_std, y

In [35]:
preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')

(<tf.Tensor: id=11957, shape=(8,), dtype=float32, numpy=
 array([ 0.16579157,  1.216324  , -0.05204565, -0.39215982, -0.5277444 ,
        -0.2633488 ,  0.8543046 , -1.3072058 ], dtype=float32)>,
 <tf.Tensor: id=11958, shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)

In [54]:
def csv_reader_dataset(filepaths, n_readers=5, repeat=1, n_read_threads=None,
                      n_parse_threads=5, shuffle_buffer_size=10000, batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)
    dataset = dataset.interleave(lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
                                cycle_length=n_readers, num_parallel_calls=n_read_threads)
    dataset = dataset.shuffle(buffer_size = shuffle_buffer_size)
    dataset = dataset.map(preprocess, num_parallel_calls = n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset.prefetch(1)

In [55]:
train_set = csv_reader_dataset(train_filepaths, batch_size=3)
for X_batch, y_batch in train_set.take(2):
    print("X =", X_batch)
    print("y =", y_batch)
    print()

X = tf.Tensor(
[[-1.4117197   0.34613404 -0.42621252  0.20311084 -1.0844357  -0.28271785
   1.0932282  -0.82743007]
 [-1.2320702   0.8998913  -0.7743248  -0.17561583  0.46973675  0.3794408
  -0.8931223   0.65187943]
 [ 0.09262241 -0.8404887  -0.79305923 -0.3465419  -0.25578707 -0.44558677
   0.83556503 -1.277221  ]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[0.804]
 [2.   ]
 [3.786]], shape=(3, 1), dtype=float32)

X = tf.Tensor(
[[-1.388203   -1.3942459  -0.5835349   0.16621785  1.2664441   0.2855145
  -1.3850268   1.2416046 ]
 [-0.00280667  1.0581076   0.3021292  -0.14668481 -0.4592988   0.10595185
   0.9198914  -0.75246423]
 [ 0.44951203  1.3745404   0.24453813 -0.24077806 -0.43100792 -0.25015336
   0.887098   -1.3621805 ]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[0.831]
 [1.714]
 [3.882]], shape=(3, 1), dtype=float32)



In [56]:
train_set = csv_reader_dataset(train_filepaths, repeat=None)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)

In [57]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="relu", input_shape=X_train.shape[1:]),
    keras.layers.Dense(1),
])

In [51]:
model.compile(loss="mse", optimizer=keras.optimizers.SGD(lr=1e-3))

batch_size = 32
model.fit(train_set, steps_per_epoch=len(X_train) // batch_size, epochs=10,
          validation_data=valid_set)

Train for 362 steps
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x1128f6f7888>

In [52]:
new_set = test_set.map(lambda X,y : X) # we could instead just pass test_set, Keras would ignore the labels
X_new = X_test
model.predict(new_set, steps=len(X_new) // batch_size)

array([[1.108017 ],
       [1.0587025],
       [2.1327744],
       ...,
       [0.9123148],
       [1.7360092],
       [2.2882626]], dtype=float32)

In [58]:
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error

@tf.function
def train(model, n_epochs, batch_size=32,
          n_readers=5, n_read_threads=5, shuffle_buffer_size=10000, n_parse_threads=5):
    train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, n_readers=n_readers,
                       n_read_threads=n_read_threads, shuffle_buffer_size=shuffle_buffer_size,
                       n_parse_threads=n_parse_threads, batch_size=batch_size)
    for X_batch, y_batch in train_set:
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

train(model, 5)

In [59]:
for m in dir(tf.data.Dataset):
    if not(m.startswith('_') or m.endswith('_')):
        func = getattr(tf.data.Dataset, m)
        if hasattr(func, '__doc__'):
            print("> {:21s} {}".format(m+'()', func.__doc__.split('\n')[0]))

> apply()               Applies a transformation function to this dataset.
> batch()               Combines consecutive elements of this dataset into batches.
> cache()               Caches the elements in this dataset.
> concatenate()         Creates a `Dataset` by concatenating the given dataset with this dataset.
> element_spec()        The type specification of an element of this dataset.
> enumerate()           Enumerates the elements of this dataset.
> filter()              Filters this dataset according to `predicate`.
> flat_map()            Maps `map_func` across this dataset and flattens the result.
> from_generator()      Creates a `Dataset` whose elements are generated by `generator`.
> from_tensor_slices()  Creates a `Dataset` whose elements are slices of the given tensors.
> from_tensors()        Creates a `Dataset` with a single element, comprising the given tensors.
> interleave()          Maps `map_func` across this dataset, and interleaves the results.
> list_files() 

# TFRecord Binary Format

In [60]:
with tf.io.TFRecordWriter('my_data.tfrecord') as f:
    f.write(b"This is the first record")
    f.write(b"And this is the second record")

In [61]:
filepaths = ["my_data.tfrecord"]
dataset = tf.data.TFRecordDataset(filepaths)
for data in dataset:
    print(data)

tf.Tensor(b'This is the first record', shape=(), dtype=string)
tf.Tensor(b'And this is the second record', shape=(), dtype=string)


In [62]:
filepaths = ["my_test_{}.tfrecord".format(i) for i in range(5)]
for i, filepath in enumerate(filepaths):
    with tf.io.TFRecordWriter(filepath) as f:
        for j in range(3):
            f.write("File {} record {}".format(i, j).encode("utf-8"))

dataset = tf.data.TFRecordDataset(filepaths, num_parallel_reads=3)
for item in dataset:
    print(item)

tf.Tensor(b'File 0 record 0', shape=(), dtype=string)
tf.Tensor(b'File 1 record 0', shape=(), dtype=string)
tf.Tensor(b'File 2 record 0', shape=(), dtype=string)
tf.Tensor(b'File 0 record 1', shape=(), dtype=string)
tf.Tensor(b'File 1 record 1', shape=(), dtype=string)
tf.Tensor(b'File 2 record 1', shape=(), dtype=string)
tf.Tensor(b'File 0 record 2', shape=(), dtype=string)
tf.Tensor(b'File 1 record 2', shape=(), dtype=string)
tf.Tensor(b'File 2 record 2', shape=(), dtype=string)
tf.Tensor(b'File 3 record 0', shape=(), dtype=string)
tf.Tensor(b'File 4 record 0', shape=(), dtype=string)
tf.Tensor(b'File 3 record 1', shape=(), dtype=string)
tf.Tensor(b'File 4 record 1', shape=(), dtype=string)
tf.Tensor(b'File 3 record 2', shape=(), dtype=string)
tf.Tensor(b'File 4 record 2', shape=(), dtype=string)


In [63]:
options = tf.io.TFRecordOptions(compression_type="GZIP")
with tf.io.TFRecordWriter("my_compressed.tfrecord", options) as f:
    f.write(b"This is the first record")
    f.write(b"And this is the second record")

In [64]:
dataset = tf.data.TFRecordDataset(["my_compressed.tfrecord"], compression_type='GZIP')
for item in dataset:
    print(item)

tf.Tensor(b'This is the first record', shape=(), dtype=string)
tf.Tensor(b'And this is the second record', shape=(), dtype=string)
