In this notebook we will look TensorFlow's Data API, TF Record format and creating custom preprocessing layers. 

Some datasets will not fit in memory, fortunetly TensorFlow's Data API takes care of this. You just need to tell it where the data is and how to transform it. It will take care of the multithreading and process it as efficiently as possible. 

When the data is read it uses TFRecord format, an efficient and flexible binary format. SQL databases and Google's Big Query service can also be read and extensions for other open source databases are available. 

**TF Transform** can be used write a single preprocessing function (to transform your data) and then can be exported to be incorporated into your trained model during deployment. It can then transform new instances on the fly during deployment in production. 

**TF Datasets** can be used to download many common datasets, such as ImageNet. 

In [1]:
#regular imports
from tensorflow import keras
import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib as plt

In [2]:
data = tf.data.Dataset.range(10) #create a dataset
data

<RangeDataset shapes: (), types: tf.int64>

In [3]:
for i in data:
  print(i)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(7, shape=(), dtype=int64)
tf.Tensor(8, shape=(), dtype=int64)
tf.Tensor(9, shape=(), dtype=int64)


In [4]:
dataset = data.repeat(3).batch(7) #split the data set in batches of 7 in 3 groups
for i in dataset:
  print(i)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int64)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int64)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int64)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int64)
tf.Tensor([8 9], shape=(2,), dtype=int64)


In [5]:
#you can drop the last batch so that you even number of groups
dataset =  data.repeat(3).batch(7, drop_remainder= True) #set to True
for i in dataset:
  print(i)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int64)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int64)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int64)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int64)


In [6]:
#dataset methods create new datasets not modify the new so always set a variable 
#filter dataset

datasets = data.filter(lambda x: x>3).take(4) #filter and take methods
#filter by greater than 3 and take 4 values only
for i in dataset:
  print(i)

#you can even apply a function by using map, see the documentation for all functions
#you can unbatch your dataset (i.e. split it into several datasets)


tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int64)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int64)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int64)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int64)


# Shuffling your data
**In gradient descent**, we have seen that the more independent and identically distributed a dataset is, **the better the performance outcome**. 

To shuffle a dataset, you can specify the buffer size, this creates a buffer between the dataset and the call. When you call the data, you are calling the buffer which pulls data from the original dataset. Imagine a buffer size of 3, and shuffling just three cards at a time, and give your friend one card. This would not be as shuffled compared to a buffer size of 52. 

For large datasets, you will mostly definitely exceed the RAM you have with this approach. Instead, you can split your data across multiple files and read them simultaneously and randomly followed by a final shuffle. The Data API makes this possible!


In [7]:
#use the California Dataset, split it into multiple files and then interleave
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

Downloading Cal. housing from https://ndownloader.figshare.com/files/5976036 to /root/scikit_learn_data


In [8]:
#Function to split dataset
import os
def save_to_multiple_csv_files(data, name_prefix, header= None, n_parts=10):
  housing_dir = os.path.join('datasets', 'housing')
  os.makedirs(housing_dir, exist_ok= True)
  path_format = os.path.join(housing_dir, 'california_{}_{:02d}.csv')
  #make sure the length of the files are the same -interleaving works best
  
  filepaths = []
  m= len(data)
  for file_index, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
    part_csv = path_format.format(name_prefix, file_index)
    filepaths.append(part_csv)
    with open(part_csv, 'wt', encoding= 'utf-8') as f:
      if header is not None:
        f.write(header)
        f.write('\n')
      for row_index in row_indices:
        f.write(','.join([repr(col) for col in data[row_index]])) #see below
        f.write('\n')
  return filepaths

In [9]:
','.join([repr(i) for i in range(10)])

'0,1,2,3,4,5,6,7,8,9'

In [10]:
#combine the X and y values for train, valid and test datasets
train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]

#create header column
head_cols = housing.feature_names + ['MedianHouseValue']
header = ','.join(head_cols)
header
train_filepaths = save_to_multiple_csv_files(train_data, name_prefix='train',
                                             header= header, n_parts=20)

valid_filepaths = save_to_multiple_csv_files(valid_data, name_prefix='valid',
                                             header= header, n_parts=20)

test_filepaths = save_to_multiple_csv_files(test_data, name_prefix='test',
                                             header= header, n_parts=20)


In [11]:
for file in train_filepaths:
  print(file)

datasets/housing/california_train_00.csv
datasets/housing/california_train_01.csv
datasets/housing/california_train_02.csv
datasets/housing/california_train_03.csv
datasets/housing/california_train_04.csv
datasets/housing/california_train_05.csv
datasets/housing/california_train_06.csv
datasets/housing/california_train_07.csv
datasets/housing/california_train_08.csv
datasets/housing/california_train_09.csv
datasets/housing/california_train_10.csv
datasets/housing/california_train_11.csv
datasets/housing/california_train_12.csv
datasets/housing/california_train_13.csv
datasets/housing/california_train_14.csv
datasets/housing/california_train_15.csv
datasets/housing/california_train_16.csv
datasets/housing/california_train_17.csv
datasets/housing/california_train_18.csv
datasets/housing/california_train_19.csv


In [12]:
pd.read_csv(train_filepaths[0]).head(10)

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
0,3.5214,15.0,3.049945,1.106548,1447.0,1.605993,37.63,-122.43,1.442
1,5.3275,5.0,6.49006,0.991054,3464.0,3.44334,33.69,-117.39,1.687
2,3.1,29.0,7.542373,1.591525,1328.0,2.250847,38.44,-122.98,1.621
3,7.1736,12.0,6.289003,0.997442,1054.0,2.695652,33.55,-117.7,2.621
4,2.0549,13.0,5.312457,1.085092,3297.0,2.244384,33.93,-116.93,0.956
5,2.9583,50.0,5.380282,1.117371,579.0,2.71831,33.98,-118.06,1.726
6,3.52,23.0,4.698217,1.034294,2202.0,3.020576,34.14,-118.01,1.873
7,2.7188,32.0,5.511628,1.067829,1337.0,2.591085,34.94,-120.42,1.337
8,2.6563,26.0,4.294893,1.123558,1401.0,2.308072,37.68,-122.08,1.841
9,1.6944,11.0,21.372093,4.627907,69.0,1.604651,40.19,-121.08,1.375


Now that we have the seperate files, let's create a dataset with all these files within it. The tf.datat.Dataset.list_files() shuffles the files for you.

In [13]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed= 42)
for file in filepath_dataset:
  print(file)

tf.Tensor(b'datasets/housing/california_train_05.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/california_train_16.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/california_train_01.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/california_train_17.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/california_train_00.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/california_train_14.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/california_train_10.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/california_train_02.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/california_train_12.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/california_train_19.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/california_train_07.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/california_train_09.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/california_train_13.csv

You can now call interleave which reads from n files at a time. Skip the header and then print the first few lines of the new dataset. Notice the shuffle of the rows.

In [14]:
n_files = 5
dataset = filepath_dataset.interleave(lambda filepath:
                                      tf.data.TextLineDataset(filepath).skip(1),
                                      cycle_length= n_files)
for line in dataset.take(5):
  print(line.numpy()) #returns a byte string so we need to convert it

b'4.5909,16.0,5.475877192982456,1.0964912280701755,1357.0,2.9758771929824563,33.63,-117.71,2.418'
b'2.4792,24.0,3.4547038327526134,1.1341463414634145,2251.0,3.921602787456446,34.18,-118.38,2.0'
b'4.2708,45.0,5.121387283236994,0.953757225433526,492.0,2.8439306358381504,37.48,-122.19,2.67'
b'2.1856,41.0,3.7189873417721517,1.0658227848101265,803.0,2.0329113924050635,32.76,-117.12,1.205'
b'4.1812,52.0,5.701388888888889,0.9965277777777778,692.0,2.4027777777777777,33.73,-118.31,3.215'


#### Preprocessing - Parse Byte String and Scaling the data

In [15]:
X_train.shape

(11610, 8)

In [16]:
from sklearn.preprocessing import StandardScaler

#standard scale inputs, part of preprocessing of the data
std_scaler = StandardScaler()
std_scaler.fit(X_train)
X_mean = std_scaler.mean_
X_std = std_scaler.scale_ #np.sqrt(var_)

n_inputs = 8 # X_train.shape[:-1]

@tf.function
def preprocess(line):
  defaults = [0.] * n_inputs + [tf.constant([], dtype= tf.float32)] #number of columns and their types
  #default value will be 0.
  #the last column contains floats but raises an exception if their is a missing value
  fields = tf.io.decode_csv(line, record_defaults = defaults)
  
  #we need to stack the list of scalar tensors
  X = tf.stack(fields[:-1])
  y = tf.stack(fields[-1:])
  return (X- X_mean) / X_std, y


In [17]:
# using the data from below 

preprocess(b'2.1856,41.0,3.7189873417721517,1.0658227848101265,803.0,2.0329113924050635,32.76,-117.12,1.205')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([-0.8936168 ,  0.9789995 , -0.6810549 , -0.07264194, -0.5669866 ,
        -0.39212453, -1.3522334 ,  1.2316071 ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.205], dtype=float32)>)

In [18]:
#apply this to enitre dataset across the multiple files

def csv_reader_dataset(filepath, repeat=1, n_readers=5, n_read_threads= None,
                        shuffle_buffer_size=10000, n_parse_threads=5,
                        batch_size=32):
  dataset = tf.data.Dataset.list_files(filepath).repeat(repeat)
  dataset = dataset.interleave(lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
                               cycle_length=n_readers, num_parallel_calls= n_read_threads)
  
  #map each line 
  dataset = dataset.map(preprocess, num_parallel_calls= n_parse_threads)
  dataset = dataset.shuffle(shuffle_buffer_size).repeat(repeat)
  return dataset.batch(batch_size).prefetch(1) #for performance

In [19]:
tf.random.set_seed(42)

train_set = csv_reader_dataset(train_filepaths, batch_size=3)
for X_batch, y_batch in train_set.take(2):
  print(X_batch)
  print()
  print(y_batch)

tf.Tensor(
[[ 0.5804519  -0.20762321  0.05616303 -0.15191229  0.01343246  0.00604472
   1.2525111  -1.3671792 ]
 [ 5.818099    1.8491895   1.1784915   0.28173092 -1.2496178  -0.3571987
   0.7231292  -1.0023477 ]
 [-0.9253566   0.5834586  -0.7807257  -0.28213993 -0.36530012  0.27389365
  -0.76194876  0.72684526]], shape=(3, 8), dtype=float32)

tf.Tensor(
[[1.752]
 [1.313]
 [1.535]], shape=(3, 1), dtype=float32)
tf.Tensor(
[[-0.8324941   0.6625668  -0.20741376 -0.18699841 -0.14536144  0.09635526
   0.9807942  -0.67250353]
 [-0.62183803  0.5834586  -0.19862501 -0.3500319  -1.1437552  -0.3363751
   1.107282   -0.8674123 ]
 [ 0.8683102   0.02970133  0.3427381  -0.29872298  0.7124906   0.28026953
  -0.72915536  0.86178064]], shape=(3, 8), dtype=float32)

tf.Tensor(
[[0.919]
 [1.028]
 [2.182]], shape=(3, 1), dtype=float32)


# Prefetching
By calling the prefetch function we allow TensorFlow to work one batch ahead. By calling num_parallel_calls we ensuring that we are loading and preprocessing in a multithreaded manor. Taking full advantage of CPU and GPU available. 

There are many more details and functions available that can be utilised for datasets, even a few experiemental ones - check tf.data.experimental. 

# Datasets with tf.keras
In most cases, the CSV function will be sufficient and you should not try convert this to TFRecord format. This is preferred format for large datasets. 

In [20]:
train_set = csv_reader_dataset(train_filepaths, repeat=None)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)

In [21]:
train_set #1x8 feature matrix and 1D vector

<PrefetchDataset shapes: ((None, 8), (None, 1)), types: (tf.float32, tf.float32)>

In [22]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
keras.layers.Dense(30, activation= 'relu', input_shape= X_train.shape[1:]),
keras.layers.Dense(1)
])

model.compile(loss= 'mse', optimizer= 'nadam')

model.fit(train_set, steps_per_epoch= len(X_train) // 32, epochs= 10,
          validation_data= valid_set)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fdac66a64a8>

# TFRecord Format

TensorFlow's preferred format for large datasets. It consists of a list of binary records. 

The functions available allow you to read files in parallel and compress files so they can be loaded via a network connection (create a compressed TFRecord file - options = tf.io.TFRecordOptions(compression_type= 'GZIP). To read this you need to specify the compression type (compression_type= 'GZIP').

In [23]:
#create a TFRecord file
with tf.io.TFRecordWriter('tfrecord') as f:
  f.write(b'First line')
  f.write(b'Second line') #input data that is in binary


In [24]:
#read from the TFRecord file

file = ['tfrecord']
dataset = tf.data.TFRecordDataset(file)
for line in dataset:
  print(line)

#you can read multiple simultaneously, reading it parallel and interleave their records

tf.Tensor(b'First line', shape=(), dtype=string)
tf.Tensor(b'Second line', shape=(), dtype=string)


#### Protocol Buffers

Every TFRecord file contains **serialized protocol buffers (protobufs)- a portable, extension and efficient binary format** developed by Google in 2001.

TensorFlow provides a protobuf functions for parsing operations. You can load images and decode many formats - parse data using tf.io.parse_tensor() and store any tensor using tf.io.serialize_tensor(). 

## Preprocessing using a Data API and preprocessing layer
Instead of processing your data in Pandas, Sci-kit learn, NumPy, you can use Data API to process the data on the fly as it loads it (i.e. using tf.datasets map function) or by creating a custom preprocessing layer. You can use keras.layers.Lambda or create a custom class - by defining the .adapt() method and passing a data sample to be then used as a normal layer.

##### Categorical Encoding


In [25]:
age_mean, age_std = X_mean[1], X_std[1]
housing_median_age = tf.feature_column.numeric_column('housing_median_age',
normalizer_fn=lambda x: (x-age_mean)/ age_std)

In [26]:
housing_median_age

NumericColumn(key='housing_median_age', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=<function <lambda> at 0x7fdacf4e7e18>)