## TensorFlow Data API

In [None]:
import tensorflow as tf
import numpy as np
import os

## tf.data API loads by default:
* ### text data
* ### binary data of fixed size
* ### binary data of varying size in TFRecord format

### TFRecord: usually contains protobuffers - open source binary format but also allows reading from SQL databases

### Crucially TFRecord offers extensions for reading from other sources like BQ

### Keras has preprocessing layers that allow to avoid training serving skew by ingesting raw data and embedding preprocessing in the model

In [None]:
X = tf.range(10)

### Creatig dataset from raw data

### tf.data.Dataset.from_tensor_slices(<tf.Tensor>)

Data read by ingesting slices along the first dimension of the input tensor, so in this case the dataset will contin 10 elements

In [6]:
dataset = tf.data.Dataset.from_tensor_slices(X)

In [7]:
dataset

<_TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.int32, name=None)>

## Iteration over the dataset is easy but the API is streaming, so no slicing or indexing support


In [8]:
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


In [9]:
X_nested = {"a": ([1, 2, 3], [4, 5, 6]), "b": [7, 8, 9]}

In [10]:
X_nested

{'a': ([1, 2, 3], [4, 5, 6]), 'b': [7, 8, 9]}

In [11]:
# Also available:
# from_generator()
# from_tensors()

dataset = tf.data.Dataset.from_tensor_slices(X_nested)

In [12]:
for item in dataset:
    print(item)

{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=1>, <tf.Tensor: shape=(), dtype=int32, numpy=4>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=7>}
{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=2>, <tf.Tensor: shape=(), dtype=int32, numpy=5>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=8>}
{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=3>, <tf.Tensor: shape=(), dtype=int32, numpy=6>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=9>}


In [13]:
X = tf.data.Dataset.range(10)

In [14]:
dataset = X.repeat(3).batch(7, drop_remainder=True)

In [15]:
for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int64)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int64)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int64)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int64)


### map() is used for data transformations, like  preprocessing 
### Can be run in parallel on many threads or scale automatically by setting tf.data.AUTOTUNE
### Function in map must be convertible to tf.function


In [16]:
dataset = dataset.map(lambda x: x * 2, num_parallel_calls=2)

In [17]:
for item in dataset:
    print(item)

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int64)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int64)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int64)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int64)


### Conditional filtering with a function computed on samples

In [18]:
dataset = dataset.filter(lambda x: tf.reduce_sum(x) > 50)

In [19]:
def dsprint(ds):
    for item in ds:
        print(item)

In [20]:
dsprint(dataset)

tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int64)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int64)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int64)


### To inspect a limited number of batches from the dataset take returns a new dataset with the specified number of batches

In [21]:
dsprint(dataset.take(2))

tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int64)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int64)


## Dataset API:
 * ### repeat()
 * ### batch()
 * ### shuffle()
 * ### map(<tf.function>, num_parallel_calls=tf.data.AUTOTUNE)
 * ### filter(<tf.function>)
 * ### interleave(<tf.function, cycle_length=3, num_parallel_calls=tf.data.AUTOTUNE) 
 

### shuffle() randomizes large datasets with auxiliary buffer
### It creates a new dataset and returns random samples from its buffer which is refilled with fresh samples from the original big dataset

In [22]:
dataset = tf.data.Dataset.range(10).repeat(2)
dataset = dataset.shuffle(buffer_size=4, seed=42).batch(7)

In [23]:
dsprint(dataset)

tf.Tensor([1 4 2 3 5 0 6], shape=(7,), dtype=int64)
tf.Tensor([9 8 2 0 3 1 4], shape=(7,), dtype=int64)
tf.Tensor([5 7 9 6 7 8], shape=(6,), dtype=int64)


## Loading dataset from CSV files and interleaving them with random shuffle

## Dataset type: tf.data.TextLineDataset(<files_paths>)

In [25]:
filepaths = './datasets/housing/'

In [26]:
train_filepaths = os.listdir(filepaths)

In [27]:
train_filepaths = [filepaths + train_filepaths[i] for i in range(len(train_filepaths))]

## list_files() reads shuffled files list so this dataset will contain files paths

In [None]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths,
                                              seed=48)

In [29]:
# This dataset contain bytestrings
dsprint(filepath_dataset)

tf.Tensor(b'./datasets/housing/my_train_01.csv', shape=(), dtype=string)
tf.Tensor(b'./datasets/housing/my_train_02.csv', shape=(), dtype=string)
tf.Tensor(b'./datasets/housing/my_train_03.csv', shape=(), dtype=string)


In [30]:
# Interleaving files contents and skipping the
# first line (the header)

n_reads=3
    # Interleave will read cycle_length files (i.e. cycle_length elements from the
    # filepath_dataset dataset), so from this many files lines will be
    # put together and upon shuffling will constitute a new dataset
    # with each line being an element of the final returned dataset
dataset = filepath_dataset.interleave(
    # Every line of each loaded file will be a separate dataset element
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=n_reads,
    num_parallel_calls=tf.data.AUTOTUNE)

## Each instance of the text dataset samples is returned as a tensor 
## Strings are atomic so their sizes are not represented in shape

In [31]:
dsprint(dataset)

tf.Tensor(b'10,2,30,40,50', shape=(), dtype=string)
tf.Tensor(b'1,2,3,4,5', shape=(), dtype=string)
tf.Tensor(b'100,200,300,400,500', shape=(), dtype=string)
tf.Tensor(b'60,70,80,90,100', shape=(), dtype=string)
tf.Tensor(b'6,7,8,9,10', shape=(), dtype=string)
tf.Tensor(b'600,700,800,900,1000', shape=(), dtype=string)


### Data preprocessing

In [32]:
# Imagine we have preprocessed mean, std of 
# each feature column in the data
# sciki standard scaler can provide this
X_mean, X_std = [1.,1.,1.,1.], [1.5,1.5,1.5,1.5]

# Data field width in the csv:
n_inputs = 4 #+1 for the 'target' played by the last value

In [33]:
[0.] * n_inputs + [tf.constant([], dtype=tf.float32)]

[0.0,
 0.0,
 0.0,
 0.0,
 <tf.Tensor: shape=(0,), dtype=float32, numpy=array([], dtype=float32)>]

In [34]:
def parse_csv_line(line):
    # Default value for each column in the csv line
    # for feature columns, which can be missing
    # it defaults to zero float
    # The target however must be present so default value specification is absent
    # and only type is provided. Exception will raise
    # on missing target in the csv line [-1] position
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    return tf.stack(fields[:-1]), tf.stack(fields[-1:])

In [35]:
def preprocess(line):
    x, y = parse_csv_line(line)
    return (x- X_mean) / X_std, y

In [36]:
preprocess(b'100,200,300,400,500')

(<tf.Tensor: shape=(4,), dtype=float32, numpy=array([ 66.     , 132.66667, 199.33333, 266.     ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([500.], dtype=float32)>)

In [37]:
# This tensor has rank 1
# rank is the len of shape
len((4,))

1

In [38]:
# Scalar is rank 0
len(())

0

In [39]:
def csv_reader_dataset(filepaths, n_reads=5, n_read_threads=None,
                      n_parse_threads=5, shuffle_buffer_size=10,
                      seed=42, batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths, seed=seed)

    dataset = dataset.interleave(lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
                                cycle_length=n_reads,
                                num_parallel_calls=n_read_threads)
    # For small datasets - cache() to store all data in VRAM
    # cache() after loading and preprocessing but before shuffling, batching and prefetching
    # ensures the preprocessed data will be stored in RAM
    # but for each epoch the randomization will be done anew
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)#.cache()
    dataset = dataset.shuffle(shuffle_buffer_size, seed=seed)
    return dataset.batch(batch_size).prefetch(1)


In [40]:
filepaths

'./datasets/housing/'

In [41]:
ds = csv_reader_dataset(train_filepaths, batch_size=2)

In [42]:
dsprint(ds)

(<tf.Tensor: shape=(2, 4), dtype=float32, numpy=
array([[ 3.3333333,  4.       ,  4.6666665,  5.3333335],
       [ 6.       ,  0.6666667, 19.333334 , 26.       ]], dtype=float32)>, <tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[10.],
       [50.]], dtype=float32)>)
(<tf.Tensor: shape=(2, 4), dtype=float32, numpy=
array([[ 66.     , 132.66667, 199.33333, 266.     ],
       [399.33334, 466.     , 532.6667 , 599.3333 ]], dtype=float32)>, <tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[ 500.],
       [1000.]], dtype=float32)>)
(<tf.Tensor: shape=(2, 4), dtype=float32, numpy=
array([[39.333332 , 46.       , 52.666668 , 59.333332 ],
       [ 0.       ,  0.6666667,  1.3333334,  2.       ]], dtype=float32)>, <tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[100.],
       [  5.]], dtype=float32)>)


In [43]:
dataset = tf.data.Dataset.range(10)
A = dataset.shard(num_shards=3,index=0)
B = dataset.shard(num_shards=3,index=1)
C = dataset.shard(num_shards=3,index=2)

In [44]:
dsprint(A)

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)
tf.Tensor(9, shape=(), dtype=int64)


In [45]:
list(A.as_numpy_iterator())

[0, 3, 6, 9]

In [46]:
list(B.as_numpy_iterator())

[1, 4, 7]

In [47]:
list(C.as_numpy_iterator())

[2, 5, 8]

### Windowing sequencial data

In [49]:
# Window here chops the whole sequence into smaller datasets containing 
# individual original sequence's elements as samples
ds = tf.data.Dataset.range(30).window(5, shift=1, drop_remainder=True)

In [50]:
for sample in ds.take(3):
    print('\n')
    for x in sample:
        print(x)



tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)


tf.Tensor(1, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)


tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(3, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
tf.Tensor(5, shape=(), dtype=int64)
tf.Tensor(6, shape=(), dtype=int64)


2024-02-28 14:03:03.808811: W tensorflow/core/framework/dataset.cc:959] Input of Window will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.


### Here window size must equal to the windowing size to form such window-sized samples

In [51]:
ds = ds.flat_map(lambda wind_ds: wind_ds.batch(5))

In [52]:
for sample in ds.take(3):
    print(sample)

tf.Tensor([0 1 2 3 4], shape=(5,), dtype=int64)
tf.Tensor([1 2 3 4 5], shape=(5,), dtype=int64)
tf.Tensor([2 3 4 5 6], shape=(5,), dtype=int64)


## Custom training loop with tf.function and dataset

In [53]:
train_set = csv_reader_dataset(train_filepaths)

In [54]:
for X, y in train_set.take(1):
    print(X, y)

tf.Tensor(
[[  3.3333333   4.          4.6666665   5.3333335]
 [  6.          0.6666667  19.333334   26.       ]
 [ 66.        132.66667   199.33333   266.       ]
 [399.33334   466.        532.6667    599.3333   ]
 [ 39.333332   46.         52.666668   59.333332 ]
 [  0.          0.6666667   1.3333334   2.       ]], shape=(6, 4), dtype=float32) tf.Tensor(
[[  10.]
 [  50.]
 [ 500.]
 [1000.]
 [ 100.]
 [   5.]], shape=(6, 1), dtype=float32)


In [55]:
model = tf.keras.Sequential([
    tf.keras.layers.Dense(32, input_shape=[4], activation='relu'),
    tf.keras.layers.Dense(1),
])

In [56]:
@tf.function
def train_one_epoch(model, optimizer, loss_fn, train_set):
    for X_batch, y_batch in train_set:
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            # Main model loss
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            # Efficiently adding regularizers and other losses
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
loss_fn = tf.keras.losses.mean_squared_error

n_epochs = 5

for epoch in range(n_epochs):
    print('\rEpoch {}/{}'.format(epoch + 1, n_epochs))
    train_one_epoch(model, optimizer, loss_fn, train_set)
        

Epoch 1/5


2024-02-28 14:03:05.694878: I external/local_xla/xla/service/service.cc:168] XLA service 0x75ee7c7fc870 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2024-02-28 14:03:05.694899: I external/local_xla/xla/service/service.cc:176]   StreamExecutor device (0): NVIDIA GeForce RTX 3060 Laptop GPU, Compute Capability 8.6
2024-02-28 14:03:05.730013: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:454] Loaded cuDNN version 8906
I0000 00:00:1709125385.793854  116436 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
