# **TensorFlow Dataset API**

**Learning objectives**

1. Learn how to use `tf.data` to read data from memory
2. Learn how to use `tf.data` in a training loop
3. Learn how to use `tf.data` to read data from disk
4. Learn how to write production input pipelines with feature engineering (batching, shuffling, etc.)

In this notebook, we will start by refactoring the linear regression implemented previously so that it takes the form `tf.data.Dataset`, and we will learn how to implement **stochastic gradient descent** with it. In this case, the original data set will be synthetic and read by the `tf.data` API directly from memory.

In a second part, we will learn how to load a data set with the `tf.data` API when the data set resides on disk.

In [1]:
import json
import math
import os
from pprint import pprint

import numpy as np
import tensorflow as tf

print(tf.__version__)

2.4.1


## **Loading data from memory**

### **Creating the data set**

Let's create our synthetic data set:

In [14]:
N_POINTS = 10

X = tf.constant(range(N_POINTS), dtype=tf.float32)
Y = 2 * X + 10

print("X:", X.numpy(), "\nY:", Y.numpy())

X: [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.] 
Y: [10. 12. 14. 16. 18. 20. 22. 24. 26. 28.]


We begin with implementing a function that takes as input
- our $X$ and $Y$ vectors of synthetic data generated from the linear function $y = 2x + 10$
- the number of passes over the data set we want to train on (`epochs`)
- the size of the batches of the data set (`batch_size`)

and returns a `tf.data.Dataset`.

**Remark:** Note that the last batch may not contain the exact number of elements you specified because the data set was exhausted. If you want batches with the exact same number of elements per batch, we will have to discard the last batch by setting:

`dataset = dataset.batch(batch_size, drop_remainder=True)`

We will do that here.

In [7]:
# Let's define create_dataset() procedure

def create_dataset(X, Y, epochs, batch_size):
    # Using the tf.data.Dataset.from_tensor_slices() method we are able to get slices of a `list` or `array`
    dataset = tf.data.Dataset.from_tensor_slices((X, Y))
    dataset = dataset.repeat(epochs).batch(batch_size, drop_remainder=True)
    return dataset

Let's test our function by iterating twice over our data set in batches of 3 data points:

In [62]:
BATCH_SIZE = 3
EPOCH = 2

dataset = create_dataset(X, Y, epochs=EPOCH, batch_size=BATCH_SIZE)

for i, (x, y) in enumerate(dataset):
    # You can convert a native `tf.Tensor` to a NumPy `array` unsing `.numpy()` method
    # Let's output the value of x and y
    print("x:", x.numpy(), "y:", y.numpy())
    assert len(x) == BATCH_SIZE
    assert len(y) == BATCH_SIZE
assert EPOCH

x: [0. 1. 2.] y: [10. 12. 14.]
x: [3. 4. 5.] y: [16. 18. 20.]
x: [6. 7. 8.] y: [22. 24. 26.]
x: [9. 0. 1.] y: [28. 10. 12.]
x: [2. 3. 4.] y: [14. 16. 18.]
x: [5. 6. 7.] y: [20. 22. 24.]


### **Loss function and gradients**

In [44]:
# Let's define loss_mse() procedure which will return computed mean of elements across dimensions of a tensor
def loss_mse(X, Y, w0, w1):
    return tf.reduce_mean(((w0 * X + w1) - Y)**2)

# Let's define compute_gradients() procedure which will return value of recorded operations 
# for automatic differentiation
def compute_gradients(X, Y, w0, w1):
    with tf.GradientTape() as tape:
        loss = loss_mse(X, Y, w0, w1)
    return tape.gradient(loss, [w0, w1])

### **Training loop**

The main difference is that now, in the training loop, we will iterate directly on the `tf.data.Dataset` generated by our `create_dataset` function.

In [58]:
# Here we'll configure the data set so that it iterates 250 times over our synthetic data set in batches of 2

# Set training environment
EPOCHS = 250
BATCH_SIZE = 2
LEARNING_RATE = 0.02

# Initialise weights
w0 = tf.Variable(0.0)
w1 = tf.Variable(0.0)

# Create data set in training environment
dataset = create_dataset(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)

# Training loop
for step, (X_batch, Y_batch) in enumerate(dataset):
    # Compute gradients with respect to model parameters w0 and w1
    dw0, dw1 = compute_gradients(X_batch, Y_batch, w0, w1)
    # Update w0 and w1
    w0.assign_sub(dw0*LEARNING_RATE)
    w1.assign_sub(dw1*LEARNING_RATE)
    # Print metrics
    if step % 100 == 0:
        loss = loss_mse(X_batch, Y_batch, w0, w1)
        print("STEP: {} MSE: {} w0: {} w1: {}".format(step, loss, w0.numpy(), w1.numpy()))

assert loss < 0.0001
assert abs(w0 - 2) < 0.001
assert abs(w1 - 10) < 0.001

STEP: 0 MSE: 109.76800537109375 w0: 0.23999999463558197 w1: 0.4399999976158142
STEP: 100 MSE: 9.363959312438965 w0: 2.55655837059021 w1: 6.674341678619385
STEP: 200 MSE: 1.393267273902893 w0: 2.2146825790405273 w1: 8.717182159423828
STEP: 300 MSE: 0.20730558037757874 w0: 2.082810878753662 w1: 9.505172729492188
STEP: 400 MSE: 0.03084510937333107 w0: 2.03194260597229 w1: 9.809128761291504
STEP: 500 MSE: 0.004589457996189594 w0: 2.012321710586548 w1: 9.926374435424805
STEP: 600 MSE: 0.0006827632314525545 w0: 2.0047526359558105 w1: 9.971602439880371
STEP: 700 MSE: 0.00010164897685172036 w0: 2.0018346309661865 w1: 9.989042282104492
STEP: 800 MSE: 1.5142451957217418e-05 w0: 2.000706911087036 w1: 9.995771408081055
STEP: 900 MSE: 2.256260358990403e-06 w0: 2.0002737045288086 w1: 9.998367309570312
STEP: 1000 MSE: 3.3405058275093324e-07 w0: 2.000105381011963 w1: 9.999371528625488
STEP: 1100 MSE: 4.977664502803236e-08 w0: 2.000040054321289 w1: 9.999757766723633
STEP: 1200 MSE: 6.475602276623249e-0

## **Loading data from disk**

### **Locating the csv files**

We will start with the **taxifare dataset**.

In [67]:
!ls -l data/taxi*.csv

-rw-rw-r-- 1 antounes antounes 25073872 mars  12 09:56 data/taxi-fare-test.csv
-rw-rw-r-- 1 antounes antounes 25143609 mars  12 09:54 data/taxi-fare-train.csv


### **Use `tf.data` to read the CSV files**

The `tf.data` API can easily read csv files using the helper function `tf.data.experimental.make_csv_dataset`. If you have `TFRecords` (which is recommended), you may use `tf.data.experimental.make_batched_features_dataset`

The first step is to define:
- the feature names into a list `CSV_COLUMNS`
- their default values into a list `DEFAULT`

In [95]:
# Defining the feature names into a list `CSV_COLUMNS`

CSV_COLUMNS = [
    "id",
    "pickup_datetime",
    "passenger_count",
    "pickup_longitude",
    "pickup_latitude",
    "dropoff_longitude",
    "dropoff_latitude",
    "trip_duration"
]
LABEL_COLUMN = "trip_duration"
# Defining the default values into a list `DEFAULTS`
DEFAULTS = [["na"], ["na"], [0.0], [0.0], [0.0], [0.0], [0.0], ["na"]]

Let's now wrap the call to `make_csv_dataset` into its own function that will take only the file pattern (i.e. glob) where the data set files are to be located:

In [96]:
def create_dataset(pattern):
    # The `tf.data.experimental.make_csv_dataset() method reads CSV files into a dataset`
    return tf.data.experimental.make_csv_dataset(pattern, 1, CSV_COLUMNS, DEFAULTS)

tempds = create_dataset("data/taxi-train.csv")
# Let's output the value of `tempds`
print(tempds)

<PrefetchDataset shapes: OrderedDict([(id, (1,)), (pickup_datetime, (1,)), (passenger_count, (1,)), (pickup_longitude, (1,)), (pickup_latitude, (1,)), (dropoff_longitude, (1,)), (dropoff_latitude, (1,)), (trip_duration, (1,))]), types: OrderedDict([(id, tf.string), (pickup_datetime, tf.string), (passenger_count, tf.float32), (pickup_longitude, tf.float32), (pickup_latitude, tf.float32), (dropoff_longitude, tf.float32), (dropoff_latitude, tf.float32), (trip_duration, tf.string)])>


Note that this is a prefetched data set, where each element is an `OrderedDict` whose keys are the feature names and whose values are tensors of shape `(1,)` (i.e. vectors).

In [97]:
# Let's iterate over the first two elements of this dataset using `dataset.take(2)`
# Then convert them to ordinary Python dictionary with numpy array as values for more readability
for data in tempds.take(2):
    pprint({k: v.numpy() for k, v in data.items()})
    print("\n")

{'dropoff_latitude': array([40.745857], dtype=float32),
 'dropoff_longitude': array([-73.97897], dtype=float32),
 'id': array([b'id1709332'], dtype=object),
 'passenger_count': array([5.], dtype=float32),
 'pickup_datetime': array([b'2016-03-21 09:15:46'], dtype=object),
 'pickup_latitude': array([40.721836], dtype=float32),
 'pickup_longitude': array([-74.00833], dtype=float32),
 'trip_duration': array([b'1355'], dtype=object)}


{'dropoff_latitude': array([40.76781], dtype=float32),
 'dropoff_longitude': array([-73.90583], dtype=float32),
 'id': array([b'id0046394'], dtype=object),
 'passenger_count': array([1.], dtype=float32),
 'pickup_datetime': array([b'2016-03-26 17:25:30'], dtype=object),
 'pickup_latitude': array([40.75548], dtype=float32),
 'pickup_longitude': array([-73.98386], dtype=float32),
 'trip_duration': array([b'1545'], dtype=object)}




### **Transforming the features**

What we really need is a dictionary of features + a label. So, we have to do two things to the above dictionary:
1. Remove the unwanted column `id`
2. Keep the label separate from features

Let's first implement a function that takes as input a row (represented as an `OrderedDict` in our `tf.data.Dataset` as above) and then returns a tuple with two elements:
- The first element is the same `OrderedDict` with the label dropped
- The second element is the label itself (`trip_duration`)

Note that we will need to also remove the `id` and `pickup_datetime` columns, which we won't use.

In [100]:
UNWANTED_COLS = ["pickup_datetime", "id"]

# Let's define the features_and_labels() method
def features_and_labels(row_data):
    # The `.pop()` method will return item and drop from frame
    label = row_data.pop(LABEL_COLUMN)
    features = row_data
    
    for unwanted_col in UNWANTED_COLS:
        features.pop(unwanted_col)
    
    return features, label

Let's iterate over 2 examples from our `tempds` dataset and apply our `features_and_labels` function to each of the examples to make sure it's working

In [101]:
for row_data in tempds.take(2):
    features, label = features_and_labels(row_data)
    pprint(features)
    print(label, "\n")
    assert UNWANTED_COLS[0] not in features.keys()
    assert UNWANTED_COLS[1] not in features.keys()
    assert label.shape == 1

OrderedDict([('passenger_count',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>),
             ('pickup_longitude',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([-73.99823], dtype=float32)>),
             ('pickup_latitude',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([40.745716], dtype=float32)>),
             ('dropoff_longitude',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([-73.995], dtype=float32)>),
             ('dropoff_latitude',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([40.74646], dtype=float32)>)])
tf.Tensor([b'358'], shape=(1,), dtype=string) 

OrderedDict([('passenger_count',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>),
             ('pickup_longitude',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([-73.98789], dtype=float32)>),
             ('pickup_latitude',
              <tf.Tensor: s

### **Batching**

Let's now refactor our `create_dataset` function so that it takes an additional argument `batch_size` and batch the data correspondingly. We will also use the `features_and_labels` function we implemented for our `tf.data.Dataset` to produce tuples of features and labels.

In [106]:
# Let's define the create_dataset() method
def create_dataset(pattern, batch_size):
    # The tf.data.experimental.make_csv_dataset() method reads CSV files into a `tf.data.Dataset`
    dataset = tf.data.experimental.make_csv_dataset(pattern, batch_size, CSV_COLUMNS, DEFAULTS)
    return dataset.map(features_and_labels)

Let's check that our batches are of the right size.

In [107]:
BATCH_SIZE = 2

tempds = create_dataset("data/taxi-train*", batch_size=2)

for X_batch, Y_batch in tempds.take(2):
    pprint({k: v.numpy() for k, v in X_batch.items()})
    print(Y_batch.numpy(), "\n")
    assert len(Y_batch) == BATCH_SIZE

{'dropoff_latitude': array([40.76498, 40.74407], dtype=float32),
 'dropoff_longitude': array([-73.97662, -73.9879 ], dtype=float32),
 'passenger_count': array([1., 4.], dtype=float32),
 'pickup_latitude': array([40.75322, 40.75848], dtype=float32),
 'pickup_longitude': array([-73.97971, -73.98475], dtype=float32)}
[b'341' b'463'] 

{'dropoff_latitude': array([40.739273, 40.7655  ], dtype=float32),
 'dropoff_longitude': array([-73.991905, -73.97568 ], dtype=float32),
 'passenger_count': array([1., 5.], dtype=float32),
 'pickup_latitude': array([40.72218, 40.75678], dtype=float32),
 'pickup_longitude': array([-74.0106 , -73.99027], dtype=float32)}
[b'1146' b'818'] 



### **Shuffling**

When training a deep learning model in batches over multiple workers, it is helpful if we shuffle the data. That way, different workers will be working on different parts of the input file at the same time, and so **averaging gradients across workers will help**. Also, during training, we will need to read the data indefinitely.

Let's refactor our `create_dataset` function so that it shuffles the data, when the data set is used for training.

We will introduce an additional argument `mode` to our function to allow the function body to distinguish the case when it needs to shuffle the data (`mode == "train"`) from when it shouldn't (`mode == "eval"`).

Also, before returning we will want to prefetch 1 data point ahead of time (`dataset.prefetch(1)`) to speed-up training.

In [108]:
def create_dataset(pattern, batch_size=1, mode="eval"):
    dataset = tf.data.experimental.make_csv_dataset(pattern, batch_size, CSV_COLUMNS, DEFAULTS)
    
    # The `map()` function executes a specified function on each item of an iterable
    # The item is sent to the function as a parameter
    dataset = dataset.map(features_and_labels).cache()
    
    if mode == "train":
        dataset = dataset.shuffle(1000).repeat()
    
    # Take advantage of multi-threading; 1=AUTOTUNE
    dataset = dataset.prefetch(1)
    return dataset

Let's check that our function works well in both modes.

In [110]:
tempds = create_dataset("data/taxi-train*", batch_size=2, mode="train")
print(list(tempds.take(1)))

[(OrderedDict([('passenger_count', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([1., 2.], dtype=float32)>), ('pickup_longitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([-73.98663, -73.98484], dtype=float32)>), ('pickup_latitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([40.74597 , 40.711388], dtype=float32)>), ('dropoff_longitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([-74.002754, -73.98135 ], dtype=float32)>), ('dropoff_latitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([40.739914, 40.722054], dtype=float32)>)]), <tf.Tensor: shape=(2,), dtype=string, numpy=array([b'720', b'288'], dtype=object)>)]


In [111]:
tempds = create_dataset("data/taxi-train*", batch_size=2, mode="eval")
print(list(tempds.take(1)))

[(OrderedDict([('passenger_count', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([1., 1.], dtype=float32)>), ('pickup_longitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([-73.980125, -73.95641 ], dtype=float32)>), ('pickup_latitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([40.765305, 40.81344 ], dtype=float32)>), ('dropoff_longitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([-73.800186, -73.953   ], dtype=float32)>), ('dropoff_latitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([40.708366, 40.80246 ], dtype=float32)>)]), <tf.Tensor: shape=(2,), dtype=string, numpy=array([b'1468', b'297'], dtype=object)>)]
