# Foreword

For datasets large enough, that may not fit in RAM, Tensorflow's Data API makes it easy. You can create dataset, transform it, and tensorflow takes care of all the implementation details, such as multithreading, queuing, batching, prefetching, and so on. 

Data API can read from :
- CSV file, text file
- binary file
- SQL databases,
- Open source extensions available to read from all sorts of data sources. 

Data API performs :
- Data Preprocessing
- Transforms

In this chapter, we will cover Data API, the TFRecord format and Feature API. 


## The Data API

This represents a sequence of data items. Generally, we use datasets that gradually read data from disk, but for simplicity let's create a dataset entirely in RAM using ```tf.data.Dataset.from_tensor_slices(X)```. This function takes a tensor and creates a tf.data.Dataset whose elements are all the slices of X(along the first dimension)

In [1]:
import tensorflow as tf
import numpy as np
from tensorflow import keras
import os

In [2]:
X = tf.range(10) #any data tensor
dataset = tf.data.Dataset.from_tensor_slices(X)

dataset

<TensorSliceDataset shapes: (), types: tf.int32>

In [3]:
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


## Chaining Transformations
Once you have dataset, you can apply all sorts of transformations to it by calling its transformation methods. Each method returns a new dataset, so you can chain transformations. 
- ```repeat()``` method repeats the dataset. so ```repeat(3)``` will change dataset size to 30.
- ```batch(num)``` method will group the dataset in num sizes

In [4]:
dataset = dataset.repeat(3).batch(7)
for item in dataset:
    print(item)
    
# if you use drop_remainder = true in batch method, last two tensor will be omitted

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


In [5]:
# This creates a new dataset with all the items doubled
dataset = dataset.map(lambda x: x * 2)

In [6]:
for item in dataset.take(3):
    print(item)

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)


In [7]:
# shuffling the data
tf.random.set_seed(42)
dataset = tf.data.Dataset.range(10).repeat(3)
dataset = dataset.shuffle(buffer_size = 3,seed = 42).batch(7)
for item in dataset:
    print(item)



tf.Tensor([1 3 0 4 2 5 6], shape=(7,), dtype=int64)
tf.Tensor([8 7 1 0 3 2 5], shape=(7,), dtype=int64)
tf.Tensor([4 6 9 8 9 7 0], shape=(7,), dtype=int64)
tf.Tensor([3 1 4 5 2 8 7], shape=(7,), dtype=int64)
tf.Tensor([6 9], shape=(2,), dtype=int64)


### Working with California dataset (Split California Dataset)

In [8]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()

X_train_full,X_test,y_train_full,y_test = train_test_split(
    housing.data,housing.target.reshape(-1,1),random_state = 42
)

X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full,y_train_full,random_state = 42
)

scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_

For a very large dataset that does not fit in memory, you will typically want to split it into many files first, then have Tensorflow read these files in parallel. To demonstrate this, let's start by splitting the housing dataset and save it to 20 CSV file:

In [9]:
def save_to_multiple_csv_files(data,name_prefix,header = None,n_parts = 10):
    housing_dir = os.path.join("datasets","housing")
    os.makedirs(housing_dir, exist_ok = True)
    path_format = os.path.join(housing_dir,"my_{}_{:02d}.csv")
    
    filepaths = []
    m = len(data)
    for file_idx,row_indices in enumerate(np.array_split(np.arange(m),n_parts)):
        part_csv = path_format.format(name_prefix,file_idx)
        filepaths.append(part_csv)
        with open(part_csv,"wt",encoding = "utf-8") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([repr(col) for col in data[row_idx]]))
                f.write("\n")
    return filepaths    

In [10]:
train_data = np.c_[X_train,y_train]
valid_data = np.c_[X_valid,y_valid]
test_data = np.c_[X_test,y_test]

header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)

train_filepaths = save_to_multiple_csv_files(
    train_data,"train",header,n_parts = 20
)
valid_filepaths = save_to_multiple_csv_files(
    valid_data,"valid",header,n_parts = 10
)
test_filepaths = save_to_multiple_csv_files(
    test_data,"test",header,n_parts = 10
)

Okay, now let's take a peek at the first few lines of one of these CSV files.

In [11]:
import pandas as pd
pd.read_csv(train_filepaths[0]).head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
0,3.5214,15.0,3.049945,1.106548,1447.0,1.605993,37.63,-122.43,1.442
1,5.3275,5.0,6.49006,0.991054,3464.0,3.44334,33.69,-117.39,1.687
2,3.1,29.0,7.542373,1.591525,1328.0,2.250847,38.44,-122.98,1.621
3,7.1736,12.0,6.289003,0.997442,1054.0,2.695652,33.55,-117.7,2.621
4,2.0549,13.0,5.312457,1.085092,3297.0,2.244384,33.93,-116.93,0.956


## Building an Input Pipeline
Let's suppose *train_filepaths* contains the list of file paths like shown below

In [12]:
train_filepaths

['datasets/housing/my_train_00.csv',
 'datasets/housing/my_train_01.csv',
 'datasets/housing/my_train_02.csv',
 'datasets/housing/my_train_03.csv',
 'datasets/housing/my_train_04.csv',
 'datasets/housing/my_train_05.csv',
 'datasets/housing/my_train_06.csv',
 'datasets/housing/my_train_07.csv',
 'datasets/housing/my_train_08.csv',
 'datasets/housing/my_train_09.csv',
 'datasets/housing/my_train_10.csv',
 'datasets/housing/my_train_11.csv',
 'datasets/housing/my_train_12.csv',
 'datasets/housing/my_train_13.csv',
 'datasets/housing/my_train_14.csv',
 'datasets/housing/my_train_15.csv',
 'datasets/housing/my_train_16.csv',
 'datasets/housing/my_train_17.csv',
 'datasets/housing/my_train_18.csv',
 'datasets/housing/my_train_19.csv']

In [13]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths,seed = 42)

In [14]:
for filepath in filepath_dataset:
    print(filepath)

tf.Tensor(b'datasets/housing/my_train_15.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_08.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_03.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_01.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_10.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_05.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_19.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_16.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_02.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_09.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_00.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_07.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_12.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_04.csv', shape=(), dtype=string)
tf.Ten

Next, we can call ```interleave()``` method to read from 5 files at a time and interleave their lines (skipping the first line of each file, which is the header row, using the ```skip()``` method):

In [15]:
n_readers = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length = n_readers
)

The ```interleave()``` method will create a dataset that will pull 5 file paths from the filepath_dataset, and for each one it will call the function we gave it ( a lambda in this example) to create a new dataset, in this case a *TextLineDataset*. It will then cycle through these 5 datasets, reading one line at a time from each until all datasets are out of items. Then it will get the next 5 file paths from the *filepath_dataset*, and interleave them the same way, and so on until it runs out of file paths. 

In [16]:
for line in dataset.take(5):
    print(line.numpy())

b'4.6477,38.0,5.03728813559322,0.911864406779661,745.0,2.5254237288135593,32.64,-117.07,1.504'
b'8.72,44.0,6.163179916317992,1.0460251046025104,668.0,2.794979079497908,34.2,-118.18,4.159'
b'3.8456,35.0,5.461346633416459,0.9576059850374065,1154.0,2.8778054862842892,37.96,-122.05,1.598'
b'3.3456,37.0,4.514084507042254,0.9084507042253521,458.0,3.2253521126760565,36.67,-121.7,2.526'
b'3.6875,44.0,4.524475524475524,0.993006993006993,457.0,3.195804195804196,34.04,-118.15,1.625'


These are the first rows (ignoring header row) of 5 csv files, chosen randomly. But notice that these are just byte strings, we need to parse them, and also scale the data. 

In [17]:
n_inputs = 8  #X_train.shape[-1]

@tf.function
def preprocess(line):
    defs = [0.] * n_inputs + [tf.constant([],dtype = tf.float32)]
    fields = tf.io.decode_csv(line,record_defaults = defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    return (x - X_mean) / X_std, y

In [18]:
preprocess(b'4.6477,38.0,5.03728813559322,0.911864406779661,745.0,2.5254237288135593,32.64,-117.07,1.504')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 0.39593136,  0.74167496, -0.16415128, -0.40340805, -0.61991787,
        -0.18355484, -1.4084505 ,  1.2565969 ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.504], dtype=float32)>)

- First, Mean and Standard deviation of each features were pre computed 
- The preprocess function takes one CSV line, and starts by parsing it. For this it uses the ```tf.io.csv_decode()``` function, which takes two argument: first - line to parse, second - array containing the default value for each column in csv file


#### Putting everything together

In [20]:
def csv_reader_dataset(filepaths,repeat = None,n_readers = 5,
                      n_read_threads = None, shuffle_buffer_size = 10000,
                      n_parse_threads = 5, batch_size = 32):
    
    dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length = n_readers, num_parallel_calls = n_read_threads
    )
    dataset = dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(preprocess, num_parallel_calls = n_parse_threads)
    dataset = dataset.batch(batch_size)
    
    return dataset.prefetch(1)

The above function ```csv_reader_dataset()``` is just the single function implementation of what we did above. There is one thing that is different i.e ```dataset.prefetch(1)```, which will do its best to create a batch of data ahead. In other word, when we are training single batch, another batch is ready to be trained. 

Let's implement this ```csv_reader_dataset()``` function to see how it works. 

In [22]:
train_set = csv_reader_dataset(train_filepaths,batch_size = 3)

for X_batch, y_batch in train_set.take(2):
    print("\r\nX = ",X_batch)
    print("\r\ny = ",y_batch)


X =  tf.Tensor(
[[ 1.1832466  -0.2867314   0.256955   -0.0914653   0.6741611   0.05366582
  -0.7432092   0.71184903]
 [-0.44522637  1.8491895  -0.32066625 -0.14044929 -0.10611927 -0.06691425
  -0.691678    0.7318402 ]
 [ 0.3091969   0.5043504   0.20859428 -0.2770272   0.6084533   0.27369827
  -0.84627515  0.7818199 ]], shape=(3, 8), dtype=float32)

y =  tf.Tensor(
[[3.151]
 [2.226]
 [2.141]], shape=(3, 1), dtype=float32)

X =  tf.Tensor(
[[-1.2879554   1.4536486  -0.5052248   0.20396037 -0.49580315  0.43515173
  -0.7666345   0.6568782 ]
 [-0.64608806 -1.0778131  -0.35905546  0.09489206  1.0309911  -0.22977838
  -0.72447133  0.9767287 ]
 [ 1.7620009  -0.6822723   0.7482188  -0.23329605 -0.6326944  -0.32895038
  -1.3241241   1.1716374 ]], shape=(3, 8), dtype=float32)

y =  tf.Tensor(
[[1.141]
 [1.228]
 [3.923]], shape=(3, 1), dtype=float32)


In [23]:
# Using the dataset with keras.

train_set = csv_reader_dataset(train_filepaths,repeat = None)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)

In [36]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(30,activation = "relu",input_shape = X_train.shape[1:]),
    keras.layers.Dense(1)
])
model.compile(loss = "mse", optimizer = keras.optimizers.SGD(lr = 1e-3))

In [37]:
batch_size = 32
model.fit(train_set,steps_per_epoch = len(X_train) // batch_size,epochs = 10,
         validation_data = valid_set,
         validation_steps = len(X_valid) // batch_size)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f6040096ac0>

In [38]:
model.evaluate(test_set,steps = len(X_test)//batch_size)



0.4803062081336975

## The TFRecord Format
The TFRecord format is Tensorflow's preferred format for storing large amount of data, such as images or audio, and reading it efficiently. It is a very simple binary format that just contains a sequence of binary records of varying sizes.

Let's take a look at a simple example. 
- You can create a TFRecord file using ```tf.io.TFRecordWriter``` class
- You can use ```tf.data.TFRecordDataset``` to read one or more TFRecord files

In [39]:
# First Write
with tf.io.TFRecordWriter("my_data.tfrecord") as f:
    f.write(b"This is the first record")
    f.write(b"This is the second record")

# Next read
filepaths = ["my_data.tfrecord"]
dataset = tf.data.TFRecordDataset(filepaths)
for item in dataset:
    print(item)

tf.Tensor(b'This is the first record', shape=(), dtype=string)
tf.Tensor(b'This is the second record', shape=(), dtype=string)
