# Overview

How to use Tensorflow `tf.data` to build data pipeline

In [1]:
import tensorflow as tf
import pandas as pd
import os
import airathon.paths as paths

Dataset = tf.data.Dataset

tf.__version__

'2.7.0'

In [2]:
assert tf.executing_eagerly()

# Basics

In [3]:
id_dataset = Dataset.from_tensor_slices([[1], [2], [3]])

for id in id_dataset:
    print(f"id = {id}, type = {type(id)}")

print(id_dataset.element_spec)

2022-03-19 19:16:53.122815: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-03-19 19:16:53.170763: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-03-19 19:16:53.171650: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero


id = [1], type = <class 'tensorflow.python.framework.ops.EagerTensor'>
id = [2], type = <class 'tensorflow.python.framework.ops.EagerTensor'>
id = [3], type = <class 'tensorflow.python.framework.ops.EagerTensor'>
TensorSpec(shape=(1,), dtype=tf.int32, name=None)


2022-03-19 19:16:53.175916: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-03-19 19:16:53.176351: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-03-19 19:16:53.177539: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-03-19 19:16:53.178615: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so retur

# Transformation

In [4]:
value_dataset = id_dataset.map(lambda row: row ** 2)
id_value_dataset = Dataset.zip((id_dataset, value_dataset))

for (id, value) in id_value_dataset:
    print(f"id = {id}, value = {value}")

print(id_value_dataset)
print(id_value_dataset.element_spec)

id = [1], value = [1]
id = [2], value = [4]
id = [3], value = [9]
<ZipDataset shapes: ((1,), (1,)), types: (tf.int32, tf.int32)>
(TensorSpec(shape=(1,), dtype=tf.int32, name=None), TensorSpec(shape=(1,), dtype=tf.int32, name=None))


## Dict Transformation

In [5]:
dict_dataset = Dataset.from_tensor_slices({ "a": [1, 2, 3], "b": [2, 2, 2] })
flat_dict_dataset = dict_dataset.map(lambda i: tf.stack([i["a"], i["b"]]))

for i in flat_dict_dataset:
    print(i)

tf.Tensor([1 2], shape=(2,), dtype=int32)
tf.Tensor([2 2], shape=(2,), dtype=int32)
tf.Tensor([3 2], shape=(2,), dtype=int32)


# CSV

In [6]:
df = pd.read_csv(os.path.join(paths.dataset_metadata(), "grid_metadata.csv"))
df.head()

Unnamed: 0,grid_id,location,tz,wkt
0,1X116,Taipei,Asia/Taipei,"POLYGON ((121.5257644471362 24.97766123020391,..."
1,1Z2W7,Delhi,Asia/Calcutta,"POLYGON ((77.30453178416276 28.54664454217707,..."
2,3S31A,Los Angeles (SoCAB),Etc/GMT+8,POLYGON ((-117.9338248256995 33.79558357488509...
3,6EIL6,Delhi,Asia/Calcutta,"POLYGON ((77.07995296313287 28.54664454217707,..."
4,7334C,Delhi,Asia/Calcutta,"POLYGON ((77.12486872733885 28.54664454217707,..."


In [7]:
csv_dataset = Dataset.from_tensor_slices(dict(df))
csv_dataset.element_spec

{'grid_id': TensorSpec(shape=(), dtype=tf.string, name=None),
 'location': TensorSpec(shape=(), dtype=tf.string, name=None),
 'tz': TensorSpec(shape=(), dtype=tf.string, name=None),
 'wkt': TensorSpec(shape=(), dtype=tf.string, name=None)}

In [8]:
for element in csv_dataset.take(5):
    # type: dict
    grid_id = element["grid_id"]
    location = element["location"]

    print(f"grid id = {grid_id}, location = {location}")

grid id = b'1X116', location = b'Taipei'
grid id = b'1Z2W7', location = b'Delhi'
grid id = b'3S31A', location = b'Los Angeles (SoCAB)'
grid id = b'6EIL6', location = b'Delhi'
grid id = b'7334C', location = b'Delhi'


# Training

In [9]:
model = tf.keras.models.Sequential([
    tf.keras.Input(1),
    tf.keras.layers.Dense(32, activation="relu"),
    tf.keras.layers.Dense(32, activation="relu"),
    tf.keras.layers.Dense(16, activation="relu"),
    tf.keras.layers.Dense(1),
])

model.compile(loss=tf.keras.losses.MeanSquaredError())
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 32)                64        
                                                                 
 dense_1 (Dense)             (None, 32)                1056      
                                                                 
 dense_2 (Dense)             (None, 16)                528       
                                                                 
 dense_3 (Dense)             (None, 1)                 17        
                                                                 
Total params: 1,665
Trainable params: 1,665
Non-trainable params: 0
_________________________________________________________________


When using dataset to train models

- Dataset need to be organized as a tuple of (x, y)
- x, y must not have shape `()`. In another word, each element in x, y cannot be 
  a single number

In [10]:
model.fit(id_value_dataset, epochs=75)

Epoch 1/75
Epoch 2/75
Epoch 3/75
Epoch 4/75
Epoch 5/75
Epoch 6/75
Epoch 7/75
Epoch 8/75
Epoch 9/75
Epoch 10/75
Epoch 11/75
Epoch 12/75
Epoch 13/75
Epoch 14/75
Epoch 15/75
Epoch 16/75
Epoch 17/75
Epoch 18/75
Epoch 19/75
Epoch 20/75
Epoch 21/75
Epoch 22/75
Epoch 23/75
Epoch 24/75
Epoch 25/75
Epoch 26/75
Epoch 27/75
Epoch 28/75
Epoch 29/75
Epoch 30/75
Epoch 31/75
Epoch 32/75
Epoch 33/75
Epoch 34/75
Epoch 35/75
Epoch 36/75
Epoch 37/75
Epoch 38/75
Epoch 39/75
Epoch 40/75
Epoch 41/75
Epoch 42/75
Epoch 43/75
Epoch 44/75
Epoch 45/75
Epoch 46/75
Epoch 47/75
Epoch 48/75
Epoch 49/75
Epoch 50/75
Epoch 51/75
Epoch 52/75
Epoch 53/75
Epoch 54/75
Epoch 55/75
Epoch 56/75
Epoch 57/75
Epoch 58/75
Epoch 59/75
Epoch 60/75
Epoch 61/75
Epoch 62/75
Epoch 63/75
Epoch 64/75
Epoch 65/75
Epoch 66/75
Epoch 67/75
Epoch 68/75
Epoch 69/75
Epoch 70/75
Epoch 71/75
Epoch 72/75
Epoch 73/75
Epoch 74/75
Epoch 75/75


<keras.callbacks.History at 0x7f726b31fc10>

In [11]:
model.predict(id_dataset)

array([[2.914656 ],
       [5.2017374],
       [7.4888196]], dtype=float32)

In [12]:
for value in value_dataset:
    print(value)

tf.Tensor([1], shape=(1,), dtype=int32)
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([9], shape=(1,), dtype=int32)
