## Loading and Preprocessing Data with TensorFlow

The Data API can read from CSV, binary files, binaries that use TFRecord SQL databases or even Google Big Query, and the API takes care of everything to make it efficient. Also, we can use for preprocessing.

### The Data API

A _dataset_ represents a sequence of data items.

In [2]:
import tensorflow as tf

In [5]:
# Dataset reading from RAM
X = tf.range(10)
# Creates a dataset that contains 10 items (slices of X)
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset
# the same: tf.data.Dataset.range(10)

<TensorSliceDataset shapes: (), types: tf.int32>

In [3]:
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


### Chaining Transformations
It creates new datasets, we should have a reference to them!
<center><img src="img/trans.png"></img></center>

In [6]:
# calling transformation methods:
# repeat() repeats the original dataset 3 times
# batch() groups the items of the previous dataset in batches of seven items
dataset2 = dataset.repeat(3).batch(7)
for item in dataset2:
    print(item)
# with drop_remain=True, the last one wouldn't appear

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


In [15]:
# lambda
dataset3 = dataset2.map(lambda x: x * 2)
for item in dataset3:
    print(item)

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int32)
tf.Tensor([16 18], shape=(2,), dtype=int32)


For very intensive computations, to allow multithreading use tthe following argument: _num_parallel_calls_.

_map()_ applies a transformation to each item, _apply()_ applies to whole dataset.

In [18]:
# with this, each item in the new dataset will be a single-integer
# tensor instead of a batch of seven integers:
dataset3_1 = dataset3.apply(tf.data.experimental.unbatch())
for i in dataset3_1:
    print(i)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=int32)
tf.Tensor(12, shape=(), dtype=int32)
tf.Tensor(14, shape=(), dtype=int32)
tf.Tensor(16, shape=(), dtype=int32)
tf.Tensor(18, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=int32)
tf.Tensor(12, shape=(), dtype=int32)
tf.Tensor(14, shape=(), dtype=int32)
tf.Tensor(16, shape=(), dtype=int32)
tf.Tensor(18, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=int32)
tf.Tensor(12, shape=(), dtype=int32)
tf.Tensor(14, sh

In [19]:
# Filtering
dataset3_2 = dataset3_1.filter(lambda x: x < 10)
for i in dataset3_2:
    print(i) 

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)


### Shuffling the Data

_shouffle()_ create a new dataset that will start filling up a buffer with the first items of the soure dataset, when it is asked for an item, one is pulled randomly from the buffer and replaced with one from the source dataset, until it has iterated entirely through the source dataset. We must specify the buffer size and don't exceed the amount of RAM we have, a seed could be used too.

In [22]:
dataset4 = tf.data.Dataset.range(10).repeat(3)
dataset4_1 = dataset4.shuffle(buffer_size=5, seed=42).batch(7)
for item in dataset4_1:
    print(item)

tf.Tensor([0 2 3 6 7 9 4], shape=(7,), dtype=int64)
tf.Tensor([5 0 1 1 8 6 5], shape=(7,), dtype=int64)
tf.Tensor([4 8 7 1 2 3 0], shape=(7,), dtype=int64)
tf.Tensor([5 4 2 7 8 9 9], shape=(7,), dtype=int64)
tf.Tensor([3 6], shape=(2,), dtype=int64)


If we call repeat on a shuffled dataset, a new order is generated every time, but we could set _reshuffle_each_iteration=False_.

For models that does not fit in memory, shuffling the data itself (_shuf_ in linux), splitting in multiple files and read them in random order during training, or many at the same time; there are many techniques and the Data API makes it possible very easily.

__Interleaving lines from multiple files__

### California dataset 


In [7]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(housing.data, 
                    housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_

In [5]:
# splitting the dataset in 20 CSV
import os
import numpy as np
def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = os.path.join(".", "CH13_csv_files")
    os.makedirs(housing_dir, exist_ok=True)
    path_format = os.path.join(housing_dir, "my_{}_{:02d}.csv")
    filepaths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filepaths.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([repr(col) for col in data[row_idx]]))
                f.write("\n")
    return filepaths

In [8]:
train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)

train_filepaths = save_to_multiple_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_multiple_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_multiple_csv_files(test_data, "test", header, n_parts=10)

In [10]:
import pandas as pd
pd.read_csv(train_filepaths[0]).head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
0,3.5214,15.0,3.049945,1.106548,1447.0,1.605993,37.63,-122.43,1.442
1,5.3275,5.0,6.49006,0.991054,3464.0,3.44334,33.69,-117.39,1.687
2,3.1,29.0,7.542373,1.591525,1328.0,2.250847,38.44,-122.98,1.621
3,7.1736,12.0,6.289003,0.997442,1054.0,2.695652,33.55,-117.7,2.621
4,2.0549,13.0,5.312457,1.085092,3297.0,2.244384,33.93,-116.93,0.956


In [12]:
# Input Pipeline
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)

2022-01-18 20:37:08.544685: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [14]:
for filepath in filepath_dataset:
    print(filepath)

tf.Tensor(b'./CH13_csv_files/my_train_05.csv', shape=(), dtype=string)
tf.Tensor(b'./CH13_csv_files/my_train_16.csv', shape=(), dtype=string)
tf.Tensor(b'./CH13_csv_files/my_train_01.csv', shape=(), dtype=string)
tf.Tensor(b'./CH13_csv_files/my_train_17.csv', shape=(), dtype=string)
tf.Tensor(b'./CH13_csv_files/my_train_00.csv', shape=(), dtype=string)
tf.Tensor(b'./CH13_csv_files/my_train_14.csv', shape=(), dtype=string)
tf.Tensor(b'./CH13_csv_files/my_train_10.csv', shape=(), dtype=string)
tf.Tensor(b'./CH13_csv_files/my_train_02.csv', shape=(), dtype=string)
tf.Tensor(b'./CH13_csv_files/my_train_12.csv', shape=(), dtype=string)
tf.Tensor(b'./CH13_csv_files/my_train_19.csv', shape=(), dtype=string)
tf.Tensor(b'./CH13_csv_files/my_train_07.csv', shape=(), dtype=string)
tf.Tensor(b'./CH13_csv_files/my_train_09.csv', shape=(), dtype=string)
tf.Tensor(b'./CH13_csv_files/my_train_13.csv', shape=(), dtype=string)
tf.Tensor(b'./CH13_csv_files/my_train_15.csv', shape=(), dtype=string)
tf.Ten

In [15]:
n_readers = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=n_readers
)

In [16]:
# field 4 as a string
for line in dataset.take(5):
    print(line.numpy())

2022-01-18 20:39:57.905612: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2022-01-18 20:39:58.128643: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2901210000 Hz


b'4.5909,16.0,5.475877192982456,1.0964912280701755,1357.0,2.9758771929824563,33.63,-117.71,2.418'
b'2.4792,24.0,3.4547038327526134,1.1341463414634145,2251.0,3.921602787456446,34.18,-118.38,2.0'
b'4.2708,45.0,5.121387283236994,0.953757225433526,492.0,2.8439306358381504,37.48,-122.19,2.67'
b'2.1856,41.0,3.7189873417721517,1.0658227848101265,803.0,2.0329113924050635,32.76,-117.12,1.205'
b'4.1812,52.0,5.701388888888889,0.9965277777777778,692.0,2.4027777777777777,33.73,-118.31,3.215'


In [19]:
# all missing fields are replaced with their default value
record_defaults=[0, np.nan, tf.constant(np.nan, dtype=tf.float64), "Hello", tf.constant([])]
parsed_fields = tf.io.decode_csv('1,2,3,4,5', record_defaults)
parsed_fields

[<tf.Tensor: shape=(), dtype=int32, numpy=1>,
 <tf.Tensor: shape=(), dtype=float32, numpy=2.0>,
 <tf.Tensor: shape=(), dtype=float64, numpy=3.0>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'4'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]

In [20]:
parsed_fields = tf.io.decode_csv(",,,,5", record_defaults)
parsed_fields

[<tf.Tensor: shape=(), dtype=int32, numpy=0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=nan>,
 <tf.Tensor: shape=(), dtype=float64, numpy=nan>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'Hello'>,
 <tf.Tensor: shape=(), dtype=float32, numpy=5.0>]

The 5th field is compulsory (since we provided tf.constant([]) as the "default value"), so we get an exception if we do not provide it:

In [21]:
try: 
    parsed_fields = tf.io.decode_csv(",,,,", record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Field 4 is required but missing in record 0! [Op:DecodeCSV]


In [23]:
# The number of fields should match exactly the number of fields in the record_defaults
try: 
    parsed_fields = tf.io.decode_csv("1,2,3,4,5,6,7", record_defaults)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

Expect 5 fields but have 7 in record 0 [Op:DecodeCSV]


In [24]:
n_inputs = 8
@tf.function
def preprocess(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    return (x - X_mean) / X_std, y

In [25]:
preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 0.16579157,  1.216324  , -0.05204565, -0.39215982, -0.5277444 ,
        -0.2633488 ,  0.8543046 , -1.3072058 ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)

In [26]:
def csv_reader_dataset(filepaths, repeat=1, n_readers=5,
                       n_read_threads=None, shuffle_buffer_size=10000,
                       n_parse_threads=5, batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=n_readers, num_parallel_calls=n_read_threads)
    dataset = dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset.prefetch(1)

<center><img src="img/pipe.png"></img></center>
prefetch() will start working on the next data batch while training the revious batch
<center><img src="img/prefetch.png"></img></center>
cache() method is used in small datasets, to cache its content to RAM, after loading and preprocessing, before shuffling

In [27]:
tf.random.set_seed(42)

train_set = csv_reader_dataset(train_filepaths, batch_size=3)
for X_batch, y_batch in train_set.take(2):
    print("X =", X_batch)
    print("y =", y_batch)
    print()

X = tf.Tensor(
[[ 0.5804519  -0.20762321  0.05616303 -0.15191229  0.01343246  0.00604472
   1.2525111  -1.3671792 ]
 [ 5.818099    1.8491895   1.1784915   0.28173092 -1.2496178  -0.3571987
   0.7231292  -1.0023477 ]
 [-0.9253566   0.5834586  -0.7807257  -0.28213993 -0.36530012  0.27389365
  -0.76194876  0.72684526]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[1.752]
 [1.313]
 [1.535]], shape=(3, 1), dtype=float32)

X = tf.Tensor(
[[-0.8324941   0.6625668  -0.20741376 -0.18699841 -0.14536144  0.09635526
   0.9807942  -0.67250353]
 [-0.62183803  0.5834586  -0.19862501 -0.3500319  -1.1437552  -0.3363751
   1.107282   -0.8674123 ]
 [ 0.8683102   0.02970133  0.3427381  -0.29872298  0.7124906   0.28026953
  -0.72915536  0.86178064]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[0.919]
 [1.028]
 [2.182]], shape=(3, 1), dtype=float32)



### Using Dataset with tf.keras

In [28]:
train_set = csv_reader_dataset(train_filepaths, repeat=None)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)

In [None]:
from tensorflow import keras
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="relu", input_shape=X_train.shape[1:]),
    keras.layers.Dense(1),
])

In [30]:
model.compile(loss="mse", optimizer=keras.optimizers.SGD(learning_rate=1e-3))


In [31]:
batch_size = 32
model.fit(train_set, steps_per_epoch=len(X_train) // batch_size, epochs=10,
          validation_data=valid_set)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f04385ff910>

In [32]:
# we can pass a dataset to the evaluate() and predict() methods
model.evaluate(test_set, steps=len(X_test) // batch_size)



0.4787752032279968

In [33]:
# we could instead just pass test_set, Keras would ignore the labels
new_set = test_set.map(lambda X, y: X) 
X_new = X_test
model.predict(new_set, steps=len(X_new) // batch_size)

array([[2.3576405],
       [2.255291 ],
       [1.4437605],
       ...,
       [0.5654392],
       [3.9442453],
       [1.0232248]], dtype=float32)

Custom training loop iterating over the training set

In [34]:
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error

n_epochs = 5
batch_size = 32
n_steps_per_epoch = len(X_train) // batch_size
total_steps = n_epochs * n_steps_per_epoch
global_step = 0
for X_batch, y_batch in train_set.take(total_steps):
    global_step += 1
    print("\rGlobal step {}/{}".format(global_step, total_steps), end="")
    with tf.GradientTape() as tape:
        y_pred = model(X_batch)
        main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
        loss = tf.add_n([main_loss] + model.losses)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

Global step 1810/1810

TF function that performs the whole training loop

In [35]:
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error

@tf.function
def train(model, n_epochs, batch_size=32,
          n_readers=5, n_read_threads=5, shuffle_buffer_size=10000, n_parse_threads=5):
    train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, n_readers=n_readers,
                       n_read_threads=n_read_threads, shuffle_buffer_size=shuffle_buffer_size,
                       n_parse_threads=n_parse_threads, batch_size=batch_size)
    for X_batch, y_batch in train_set:
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

train(model, 5)

In [36]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [37]:
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error

@tf.function
def train(model, n_epochs, batch_size=32,
          n_readers=5, n_read_threads=5, shuffle_buffer_size=10000, n_parse_threads=5):
    train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, n_readers=n_readers,
                       n_read_threads=n_read_threads, shuffle_buffer_size=shuffle_buffer_size,
                       n_parse_threads=n_parse_threads, batch_size=batch_size)
    n_steps_per_epoch = len(X_train) // batch_size
    total_steps = n_epochs * n_steps_per_epoch
    global_step = 0
    for X_batch, y_batch in train_set.take(total_steps):
        global_step += 1
        if tf.equal(global_step % 100, 0):
            tf.print("\rGlobal step", global_step, "/", total_steps)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

train(model, 5)

Global step 100 / 1810
Global step 200 / 1810
Global step 300 / 1810
Global step 400 / 1810
Global step 500 / 1810
Global step 600 / 1810
Global step 700 / 1810
Global step 800 / 1810
Global step 900 / 1810
Global step 1000 / 1810
Global step 1100 / 1810
Global step 1200 / 1810
Global step 1300 / 1810
Global step 1400 / 1810
Global step 1500 / 1810
Global step 1600 / 1810
Global step 1700 / 1810
Global step 1800 / 1810


In [38]:
for m in dir(tf.data.Dataset):
    if not (m.startswith("_") or m.endswith("_")):
        func = getattr(tf.data.Dataset, m)
        if hasattr(func, "__doc__"):
            print("● {:21s}{}".format(m + "()", func.__doc__.split("\n")[0]))

● apply()              Applies a transformation function to this dataset.
● as_numpy_iterator()  Returns an iterator which converts all elements of the dataset to numpy.
● batch()              Combines consecutive elements of this dataset into batches.
● cache()              Caches the elements in this dataset.
● cardinality()        Returns the cardinality of the dataset, if known.
● concatenate()        Creates a `Dataset` by concatenating the given dataset with this dataset.
● element_spec()       The type specification of an element of this dataset.
● enumerate()          Enumerates the elements of this dataset.
● filter()             Filters this dataset according to `predicate`.
● flat_map()           Maps `map_func` across this dataset and flattens the result.
● from_generator()     Creates a `Dataset` whose elements are generated by `generator`. (deprecated arguments)
● from_tensor_slices() Creates a `Dataset` whose elements are slices of the given tensors.
● from_tensors()    

### The TFRecord Format
Tensorflow's preferred format for storing large amounts of data and reading it efficiently. It is a binary format that contains a sequence of binary records of varying sizes.

In [3]:
# Creating a TFRecord file
with tf.io.TFRecordWriter("other_files/my_data.tfrecord") as f:
    f.write(b"First record")
    f.write(b"Second record")

In [3]:
# Reading the file
filepaths = ["other_files/my_data.tfrecord"]
dataset = tf.data.TFRecordDataset(filepaths)
for item in dataset:
    print(item)
# Do things in parallel by setting num_parallel_reads, or
# by using list_files() and interleave()

2022-01-19 15:54:23.990438: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-01-19 15:54:26.207246: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2022-01-19 15:54:26.251444: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2901210000 Hz


tf.Tensor(b'First record', shape=(), dtype=string)
tf.Tensor(b'Second record', shape=(), dtype=string)


In [5]:
# Compressed TFRrecord Files
options = tf.io.TFRecordOptions(compression_type="GZIP")
with tf.io.TFRecordWriter("my_compressed.tfrecord", options) as f:
    f.write("1")
    f.write("2")

In [6]:
# specifying the compression type
dataset = tf.data.TFRecordDataset(["my_compressed.tfrecord"],
                                    compression_type="GZIP")
for item in dataset:
    print(item)                                

tf.Tensor(b'1', shape=(), dtype=string)
tf.Tensor(b'2', shape=(), dtype=string)


Come back and review protocol buffers if ever need a portable, extensible and efficient binary format.

### Preprocessing the Input Features

We can do it ahead of time, or we can preprocess it on the fly with the Data API, or include a preprocessing layer directly on the model.

In [None]:
# Standardization layer using Lambda layer
means = np.mean(X_train, axis=0, keepdims=True)
stds = np.std(X_train, axis=0, keepdims=True)
eps = keras.backend.epsilon()
model = keras.models.Sequential([
    keras.layers.Lambda(lambda inputs: (inputs - means) / (stds + eps)),
    .....
])

In [None]:
# Nice self-contained custom layer
class Standardization(keras.layers.Layer):
    def adapt(self, data_sample):
        self.means_ = np.mean(data_sample, axis=0, keepdims=True)
        self.stds_ = np.std(data_sample, axis=0, keepdims=True)
    def call(self, inputs):
        return (inputs - self.means_) / (self.stds_ + keras.backend.epsilon())

In [None]:
# Before using the class above, we need to adapt it to our dataset by
# calling adapat(data_sample), this will allow it to use the appropriate
# mean and std deviation for each feature
std_layer = Standardization()
# big enough, representative, but not all the dataset
std_layer.adapt(data_sample)

In [None]:
model = keras.Sequential()
model.add(std_layer)
...
# keras now (?) has a layer like the above, keras.layers.Normalization

### Encoding Categorical Features Using One-Hot Vectors

In [7]:
# possible categories
vocab = ["<1H OCEAN", "INLAND", "NEAR OCEAN", "NEAR BAY", "ISLAND"]
# tensor with the corresponding indices
indices = tf.range(len(vocab), dtype=tf.int64)
# lookup table, passing the vocab with its indices
# If the categories were listed in a text file, we would use TextFileInitializer
table_init = tf.lookup.KeyValueTensorInitializer(vocab, indices)
# out of vocabulary buckets
num_oov_buckets = 2
# if there is a category that is not in vocab, it is hashed in indices 5 and 6
table = tf.lookup.StaticVocabularyTable(table_init, num_oov_buckets)

In [8]:
categories = tf.constant(["NEAR BAY", "DESERT", "INLAND", "INLAND"])
cat_indices = table.lookup(categories)
cat_indices
# Dessert is mapped in index 5(unknown)

<tf.Tensor: shape=(4,), dtype=int64, numpy=array([3, 5, 1, 1])>

In [9]:
cat_one_hot = tf.one_hot(cat_indices, depth=len(vocab) + num_oov_buckets)
cat_one_hot

<tf.Tensor: shape=(4, 7), dtype=float32, numpy=
array([[0., 0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 0., 1., 0.],
       [0., 1., 0., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0., 0., 0.]], dtype=float32)>

It would be nice to contain all of this in a class, but, Keras may have now a one hot encoding layer, _keras.layers.TextVectorization_ that would do all of the above.

### Embeddings

An embrdding is a trainable dense vector that represents a category, they are initialized randomly in vectors, and in the training they will start to separate (similarity) or to get away (different). The better the representation, the easier the NN wil make predictions, this is called representation learning.

Word embeddings are reused commonly for NLP tasks, they encode concepts, but also biases.

In [10]:
# Implementing embeddings manually
embedding_dim = 2
embed_init = tf.random.uniform([len(vocab) + num_oov_buckets, embedding_dim])
# one row per category one per oov bucket and one column per embedding dimension
embedding_matrix = tf.Variable(embed_init)

In this example we are using 2D embeddings, but they typically have from 10 to 300 dimensions, depending on the task and vocabulary size.

In [11]:
# random 6x2 matrix, stored in a variable so Gradient Descent
# can do its thing
embedding_matrix

<tf.Variable 'Variable:0' shape=(7, 2) dtype=float32, numpy=
array([[0.4004737 , 0.20415425],
       [0.7912891 , 0.10039246],
       [0.5946287 , 0.47540402],
       [0.3005432 , 0.68635833],
       [0.48557544, 0.16074741],
       [0.3630824 , 0.8356621 ],
       [0.3994155 , 0.36908448]], dtype=float32)>

In [12]:
categories = tf.constant(["NEAR BAY", "DESERT", "INLAND", "INLAND"])
cat_indices = table.lookup(categories)
cat_indices

<tf.Tensor: shape=(4,), dtype=int64, numpy=array([3, 5, 1, 1])>

In [13]:
# encoding using embeddings
# it look up the indices in the embedding matrix and returns its values
tf.nn.embedding_lookup(embedding_matrix, cat_indices)

<tf.Tensor: shape=(4, 2), dtype=float32, numpy=
array([[0.3005432 , 0.68635833],
       [0.3630824 , 0.8356621 ],
       [0.7912891 , 0.10039246],
       [0.7912891 , 0.10039246]], dtype=float32)>

keras already provides a layer that does all the above, when the trainable layer is crated, it initializes the embedding matrix randomly and when it is called, it return the rowsat those indices in the embedding matrix.

_keras.layers.Embedding_ layer

In [17]:
from tensorflow import  keras
embedding = keras.layers.Embedding(input_dim=len(vocab) + num_oov_buckets, 
                                    output_dim=embedding_dim)
embedding(cat_indices)                                                            

<tf.Tensor: shape=(4, 2), dtype=float32, numpy=
array([[-0.03354662, -0.01979035],
       [-0.04101471,  0.02290389],
       [ 0.03426294,  0.00110865],
       [ 0.03426294,  0.00110865]], dtype=float32)>

We can create a Keras model that can process categorical features (along with numerical ones) and learn an embedding for each category.


In [None]:
# 8 numerical features
regular_inputs = keras.layers.Input(shape=[8])
# categorical feature per instance
categories = keras.layers.Input(shape=[], dtype=tf.string)
# Using Lambda layer to look up each category's index
cat_indices = keras.layers.Lambda(lambda cats: table.lookup(cats))(categories)
cat_embed = keras.layers.Embedding(input_dim=6, output_dim=2)(cat_indices)
# Concatenate layer to be fed to the neaural net
encoded_inputs = keras.layers.concatenate([regular_inputs, cat_embed])
# Simple dense output, could have been bigger
outputs = keras.layers.Dense(1)(encoded_inputs)
model = keras.models.Model(inputs=[regular_inputs, categories], 
                            outputs=[outputs])

When _keras.layers.TextVectorization_ layer is available, with its _adapt()_ method we can extract the vocabulary from a data sample (it will take care of creating the lookup table). It will perform the index lookup (it replaces the Lambda layer)

One-hot encoding followed by a Dense layer (with no activation function and no biases) is equivalent to an Embedding layer. But the Embedding layer uses fewer computations, the performance difference becomes clear when the size of the embedding matrix grows. The Dense layer weight matrix plays the role of the embedding matrix. It would be wasteful to use more embedding dimensions than the number of units in the layer that follows the Embedding layer.

### Keras Preprocessing Layers

- _keras.layers.Discretization_ layer will chop continous data into different bins and encode each bin as a one-hot vector. For example: low, medium, high would be encoded as [1,0,0], [0,1,0] and [0,0,1]. It loses a lot of information but some patterns can be detected easily.
- The models preprocessing layers will be frozen during training, they aren't differentiable. So, an Embedding layer should not be used in a custom preprocessing layer.

Chain multiple preprocessing layers using the _PreprocessingStage_ class.

In [None]:
normalization = keras.layers.Normalization()
discretization = keras.layers.Discretization([...])
pipeline = keras.layers.PreprocessingStage([normalization, discretization])
pipeline.adapt(data_sample)

The TextVectorization layer will also have an option to output word-count vectors instead of word indices, a bag of words, since it loses the order of the words. It should be normalized to reduce the importance of common words (and, to, etc), one way is to divide each word count by the log of the total number of training instances in which the word appears (TF-IDF).

If the preprocessing layers are insufficient, we could create our own, by creating a subclass of the _keras.layers.PreprocessingLayer_ with an _adapt()_ method which should take a data sample, and an optionally _reset_state_ statement.

### TF Transform

It lets to use the same code for preprocessing the data, in multiple platforms. It is really useful for putting models into production.

### TensorFlow Datasets Project

We need to install it, and any transformation presented previously can be applied to a dataset.  