# TensorFlow Dataset API

Lab from https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/machine_learning/deepdive2/introduction_to_tensorflow/solutions/2_dataset_api.ipynb with personal notes nd comments.

**Learning Objectives**
1. Learn how to use tf.data to read data from memory
1. Learn how to use tf.data in a training loop
1. Learn how to use tf.data to read data from disk
1. Learn how to write production input pipelines with feature engineering (batching, shuffling, etc.)


In this notebook, we will start by refactoring the linear regression we implemented in the previous lab so that it takes data from a`tf.data.Dataset`, and we will learn how to implement **stochastic gradient descent** with it. In this case, the original dataset will be synthetic and read by the `tf.data` API directly  from memory.

In a second part, we will learn how to load a dataset with the `tf.data` API when the dataset resides on disk.

Each learning objective will correspond to a __#TODO__  in this student lab notebook -- try to complete this notebook first and then review the [solution notebook](../solutions/2_dataset_api.ipynb).


In [1]:
import os
from pprint import pprint
from pathlib import Path
import pandas as pd
import warnings

import tensorflow as tf
print(tf.version.VERSION)

if tf.version.VERSION != '2.19.0':
    warnings.warn('Running an untested version of the Tensorflow API.')

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

2.19.0


## Loading data from memory

### Creating the dataset

Let's consider the synthetic dataset of the previous section:

In [2]:
N_POINTS = 10
X = tf.constant(range(N_POINTS), dtype=tf.float32)
Y = 2 * X + 10

We begin with implementing a function that takes as input

- our $X$ and $Y$ vectors of synthetic data generated by the linear function $y= 2x + 10$
- the number of passes over the dataset we want to train on (`epochs`)
- the size of the batches the dataset (`batch_size`)

and returns a `tf.data.Dataset`: 

In [3]:
def create_dataset(X, Y, epochs, batch_size, is_training = False):
    """Builds a dataset with rules on how to iterate through it (during training).
    """

    # Note: # the input can also be passed as: {'X': X, 'Y': Y}
    dataset = tf.data.Dataset.from_tensor_slices( (X, Y)) 

    # Note 1: The last batch may not contain the exact number of elements, hence drop_remainder=True.
    # Note 2: epochs sets a maximum to the number of iterations in the training process. If epochs 
    #   is null, we iterate through the dataset infinitely. 
    
    dataset = dataset.repeat(epochs).batch(batch_size, drop_remainder=True)

    if is_training:
        # If for trainig 
        dataset = dataset.shuffle(buffer_size=10, seed=12345)

    # Most dataset input pipelines should end with a call to `prefetch`. This
    # allows later elements to be prepared while the current element is being
    # processed. This often improves latency and throughput, at the cost of
    # using additional memory to store prefetched elements.

    # Note: Like other `Dataset` methods, prefetch operates on the
    # elements of the input dataset. It has no concept of examples vs. batches.
    # `examples.prefetch(2)` will prefetch two elements (2 examples),
    # while `examples.batch(20).prefetch(2)` will prefetch 2 elements
    # (2 batches, of 20 examples each).
    dataset.prefetch(buffer_size=1)

    return dataset

Let's test our function by iterating twice over our dataset in batches of 3 datapoints:

In [4]:
BATCH_SIZE = 3
EPOCHS = 2
dataset = create_dataset(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)

for i, (x, y) in enumerate(dataset):
    print("x:", x.numpy(), "y:", y.numpy())
    assert len(x) == BATCH_SIZE
    assert len(y) == BATCH_SIZE

x: [0. 1. 2.] y: [10. 12. 14.]
x: [3. 4. 5.] y: [16. 18. 20.]
x: [6. 7. 8.] y: [22. 24. 26.]
x: [9. 0. 1.] y: [28. 10. 12.]
x: [2. 3. 4.] y: [14. 16. 18.]
x: [5. 6. 7.] y: [20. 22. 24.]


### Loss function and gradients

The loss function and the function that computes the gradients are the same as before:

In [5]:
def loss_mse(X, Y, w0, w1):
    # Note: we write this as if we are deailing wiht one instance at the time 
    # (hence, X is a 1D vector, or a scalar in this example)
    Y_hat = w0 * X + w1
    errors = (Y_hat - Y)**2
    return tf.reduce_mean(errors)

def compute_gradients(X, Y, w0, w1):
    with tf.GradientTape() as tape:
        loss = loss_mse(X, Y, w0, w1)
    return tape.gradient(loss, [w0, w1])

### (Manual) Training loop

In the traning loop, we will iterate directly on the `tf.data.Dataset` generated by our `create_dataset` function. 

In [6]:
EPOCHS = 250
BATCH_SIZE = 2
LEARNING_RATE = .02

MSG = "STEP {step} - loss: {loss}, w0: {w0}, w1: {w1}\n"

w0 = tf.Variable(0.0)
w1 = tf.Variable(0.0)

dataset = create_dataset(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)

for step, (X_batch, Y_batch) in enumerate(dataset): 
    # compute delta
    dw0, dw1 = compute_gradients(X_batch, Y_batch, w0, w1)
    w0.assign_sub(dw0 * LEARNING_RATE)
    w1.assign_sub(dw1 * LEARNING_RATE)
    
    # print(MSG.format(step=step, loss=loss, w0=w0.numpy(), w1=w1.numpy()))
    
    if step % 100 == 0:
        loss = loss_mse(X, Y, w0, w1)
        print(MSG.format(step=step, loss=loss, w0=w0.numpy(), w1=w1.numpy()))
        
assert loss < 0.0001
assert abs(w0 - 2) < 0.001
assert abs(w1 - 10) < 0.001

STEP 0 - loss: 331.1055908203125, w0: 0.23999999463558197, w1: 0.4399999976158142

STEP 100 - loss: 3.2297768592834473, w0: 2.55655837059021, w1: 6.674341678619385

STEP 200 - loss: 0.48055925965309143, w0: 2.2146825790405273, w1: 8.717182159423828

STEP 300 - loss: 0.07150308042764664, w0: 2.082810878753662, w1: 9.505172729492188

STEP 400 - loss: 0.010638890787959099, w0: 2.03194260597229, w1: 9.809128761291504

STEP 500 - loss: 0.001582985743880272, w0: 2.012321710586548, w1: 9.926374435424805

STEP 600 - loss: 0.00023550672631245106, w0: 2.0047526359558105, w1: 9.971602439880371

STEP 700 - loss: 3.50688278558664e-05, w0: 2.0018346309661865, w1: 9.989042282104492

STEP 800 - loss: 5.220536877459381e-06, w0: 2.000706911087036, w1: 9.995771408081055

STEP 900 - loss: 7.789132610014349e-07, w0: 2.0002737045288086, w1: 9.998367309570312

STEP 1000 - loss: 1.1540369371232373e-07, w0: 2.000105381011963, w1: 9.999371528625488

STEP 1100 - loss: 1.7078491509892046e-08, w0: 2.00004005432128

## Loading data from disk

### Use tf.data to read the CSV files

The `tf.data` API can easily read csv files using the helper function tf.data.experimental.make_csv_dataset

If you have TFRecords (which is recommended), you may use tf.data.experimental.make_batched_features_dataset

The first step is to define 

- the feature names into a list `CSV_COLUMNS`
- their default values into a list `DEFAULTS`

In [7]:
PATH_TO_DATA_FOLDER = Path('data')

# The data have no headers: define the feature names here
CSV_COLUMNS = [
    'fare_amount',
    'pickup_datetime',
    'pickup_longitude',
    'pickup_latitude',
    'dropoff_longitude',
    'dropoff_latitude',
    'passenger_count',
    'key'
]
# Target variable
LABEL_COLUMN = 'fare_amount'

# Defining the default values into a list `DEFAULTS`
DEFAULTS = [[0.0], ['na'], [0.0], [0.0], [0.0], [0.0], [0.0], ['na']]

df = pd.read_csv( 'data/taxi-train.csv', names = CSV_COLUMNS, nrows=10)
df.head()

Unnamed: 0,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,key
0,11.3,2011-01-28 20:42:59 UTC,-73.999022,40.739146,-73.990369,40.717866,1,0
1,7.7,2011-06-27 04:28:06 UTC,-73.987443,40.729221,-73.979013,40.758641,1,1
2,10.5,2011-04-03 00:54:53 UTC,-73.982539,40.735725,-73.954797,40.778388,1,2
3,16.2,2009-04-10 04:11:56 UTC,-74.001945,40.740505,-73.91385,40.758559,1,3
4,33.5,2014-02-24 18:22:00 UTC,-73.993372,40.753382,-73.8609,40.732897,2,4


In [8]:
# Create a tensorflow dataset for exploration only (no transformations)
def create_dataset_without_transformations(pattern: str, batch_size=10):
    """Load csv data from a path (which can be a pattern, i.e. include the `*` symbol) and return a
    tf.data.Dataset.
    """

    return tf.data.experimental.make_csv_dataset(
        # Note that this is, by default
        pattern,
        batch_size=batch_size,
        column_names=CSV_COLUMNS,
        column_defaults=DEFAULTS)

tempds = create_dataset_without_transformations( 'data/taxi-*.csv', batch_size=3)
print(tempds)

<_PrefetchDataset element_spec=OrderedDict([('fare_amount', TensorSpec(shape=(3,), dtype=tf.float32, name=None)), ('pickup_datetime', TensorSpec(shape=(3,), dtype=tf.string, name=None)), ('pickup_longitude', TensorSpec(shape=(3,), dtype=tf.float32, name=None)), ('pickup_latitude', TensorSpec(shape=(3,), dtype=tf.float32, name=None)), ('dropoff_longitude', TensorSpec(shape=(3,), dtype=tf.float32, name=None)), ('dropoff_latitude', TensorSpec(shape=(3,), dtype=tf.float32, name=None)), ('passenger_count', TensorSpec(shape=(3,), dtype=tf.float32, name=None)), ('key', TensorSpec(shape=(3,), dtype=tf.string, name=None))])>


Note that this is a **prefetched dataset, where each element is an `OrderedDict` whose keys are the feature names and whose values are tensors of shape `(batch_size,)` (i.e. vectors).**

Let's iterate over the two first element of this dataset using `dataset.take(2)` and let's convert them ordinary Python dictionary with numpy array as values for more readability:

In [9]:
for data in tempds.take(2):
    pprint({k: v.numpy() for k, v in data.items()})
    print("\n")

{'dropoff_latitude': array([40.70576 , 40.736816, 40.77489 ], dtype=float32),
 'dropoff_longitude': array([-74.009964, -73.986725, -73.98209 ], dtype=float32),
 'fare_amount': array([13.3, 11. , 14. ], dtype=float32),
 'key': array([b'2857', b'1684', b'2561'], dtype=object),
 'passenger_count': array([2., 1., 1.], dtype=float32),
 'pickup_datetime': array([b'2012-07-05 14:18:00 UTC', b'2014-10-06 15:16:00 UTC',
       b'2015-06-16 18:59:15 UTC'], dtype=object),
 'pickup_latitude': array([40.738842, 40.755974, 40.7553  ], dtype=float32),
 'pickup_longitude': array([-73.98321, -73.9796 , -73.97314], dtype=float32)}


{'dropoff_latitude': array([40.72844 , 40.74992 , 40.784775], dtype=float32),
 'dropoff_longitude': array([-73.98173, -73.99045, -73.96779], dtype=float32),
 'fare_amount': array([11. ,  4.9,  9. ], dtype=float32),
 'key': array([b'1341', b'177', b'718'], dtype=object),
 'passenger_count': array([1., 1., 5.], dtype=float32),
 'pickup_datetime': array([b'2013-12-06 14:55:00 U

### Transforming the features

In this step we want to do two things:
1. Do something such that, when we iterate through the dataset for custom training, we can separate features from label (as in previous example).
2. Apply some transformation to the features (e.g. drop some features).

This can be achieved writing a custom function that does both (1) and (2), and pass it to the `tf.data.Dataset` object through the `map` method, that will apply the transformation to each element of the dataset as it iterates. If used with the `prefetch` method, this can lead to a very efficient implementation, where we transform the data while we train the model.



Let's first implement a function that takes as input a row (represented as an `OrderedDict` in our `tf.data.Dataset` as above) and then returns a tuple with two elements:

* The first element being the same `OrderedDict` with the label dropped
* The second element being the label itself (`fare_amount`)

Note that we will need to also remove the `key` and `pickup_datetime` column, which we won't use.

In [10]:
UNWANTED_COLS = ['pickup_datetime', 'key']
from collections import OrderedDict

def features_and_labels(row_data: OrderedDict):
    """This function applies a transformation to each row of the dataset. As we saw that each row is
    treated as an OrderedDict, we need to write the function to operate on it.

    Returns:
        OrderedDict: containing the features we want to use.
        label_value: the value needed for training the model. Why this is not an OrderedDict?
    """



    # if we wanted to preserve row_data, here we would need to make a copy of this dictionary. However, 
    # in the context of batch training and iteration of the dataset losing info from row_data is 
    # irrelevant. We are, in fact, continuously readying new data (i.e. a new row_data object), dropping
    # some keys and moving on to the next one.
    features = row_data
    
    # remove label...
    label_value = features.pop(LABEL_COLUMN)
    
    # ... and unwanted features
    for key_to_remove in UNWANTED_COLS:
        _ = features.pop(key_to_remove)
    
    return features, label_value

Let's iterate over 2 examples from our `tempds` dataset and apply our `feature_and_labels`
function to each of the examples to make sure it's working:

In [11]:
BATCH_SIZE = 3
for row_data in tempds.take(2):
    features, label = features_and_labels(row_data)
    pprint(features)
    print(label, "\n")
    # assert UNWANTED_COLS[0] not in features.keys()
    # assert UNWANTED_COLS[1] not in features.keys()
    assert label.shape == [BATCH_SIZE]

OrderedDict([('pickup_longitude',
              <tf.Tensor: shape=(3,), dtype=float32, numpy=array([-73.94789 , -73.983765, -73.944084], dtype=float32)>),
             ('pickup_latitude',
              <tf.Tensor: shape=(3,), dtype=float32, numpy=array([40.779606, 40.74341 , 40.823822], dtype=float32)>),
             ('dropoff_longitude',
              <tf.Tensor: shape=(3,), dtype=float32, numpy=array([-73.98037, -74.0132 , -73.94545], dtype=float32)>),
             ('dropoff_latitude',
              <tf.Tensor: shape=(3,), dtype=float32, numpy=array([40.728622, 40.716763, 40.83443 ], dtype=float32)>),
             ('passenger_count',
              <tf.Tensor: shape=(3,), dtype=float32, numpy=array([1., 2., 1.], dtype=float32)>)])
tf.Tensor([16.5 12.9  7. ], shape=(3,), dtype=float32) 

OrderedDict([('pickup_longitude',
              <tf.Tensor: shape=(3,), dtype=float32, numpy=array([-73.9583 , -73.99123, -73.97835], dtype=float32)>),
             ('pickup_latitude',
              <t

### Putting things together: reading from disk and and transform while training

In [12]:
# Create a tensorflow dataset for exploration only (no transformations)
def create_dataset_with_transformations(pattern: str, batch_size=100, mode='eval'):
    """Load csv data from a path (which can be a pattern, i.e. include the `*` symbol) and return a
    tf.data.Dataset.
    """

    dataset = tf.data.experimental.make_csv_dataset(
        pattern,
        batch_size=batch_size,
        column_names=CSV_COLUMNS,
        column_defaults=DEFAULTS)

    dataset = dataset.map(features_and_labels).cache()

    if mode == 'train':
        dataset = dataset.shuffle(12).repeat()

    # take advantage of multi-threading; 1=AUTOTUNE
    # tf.data.AUTOTUNE
    dataset = dataset.prefetch(1)
    
    return dataset

Let's test that our batches are of the right size:

In [13]:
BATCH_SIZE = 3

tempds = create_dataset_with_transformations( 'data/taxi-train*.csv', batch_size=BATCH_SIZE)

for X_batch, Y_batch in tempds.take(2):
    pprint({k: v.numpy() for k, v in X_batch.items()})
    print(f'Y = {list(Y_batch.numpy())}', end="\n\n")
    assert len(Y_batch) == BATCH_SIZE

{'dropoff_latitude': array([40.746513, 40.646084, 40.774357], dtype=float32),
 'dropoff_longitude': array([-74.00809, -73.95827, -73.87265], dtype=float32),
 'passenger_count': array([1., 1., 2.], dtype=float32),
 'pickup_latitude': array([40.74543, 40.6535 , 40.73845], dtype=float32),
 'pickup_longitude': array([-73.99873, -73.95789, -74.0064 ], dtype=float32)}
Y = [np.float32(5.3), np.float32(12.1), np.float32(34.27)]

{'dropoff_latitude': array([40.643883, 40.731724, 40.72906 ], dtype=float32),
 'dropoff_longitude': array([-73.783005, -73.9918  , -73.90475 ], dtype=float32),
 'passenger_count': array([1., 1., 5.], dtype=float32),
 'pickup_latitude': array([40.732903, 40.72373 , 40.729908], dtype=float32),
 'pickup_longitude': array([-73.98619 , -74.00486 , -74.000565], dtype=float32)}
Y = [np.float32(57.33), np.float32(6.5), np.float32(17.7)]



**TO SEE HOW TO USE THIS FOR TRAINING, GO TO: https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/machine_learning/deepdive2/introduction_to_tensorflow/solutions/1_training_at_scale_vertex.ipynb**

### Integrating with keras models

In [14]:
# Example (replace with your actual model and data)
BATCH_SIZE = 1000
ds_train = create_dataset_with_transformations( 'data/taxi-train*.csv', batch_size=BATCH_SIZE)

# a way to extract the first element of the dataset, and get the model features.
X_first, y_first = next(iter(ds_train))
# X_first, y_first = list(ds_train.take(1))

col_features = list(X_first.keys()) 

In [15]:
# Create a Keras Input layer for each feature
feature_inputs = []
for feature_name in col_features:
  feature_inputs.append(tf.keras.Input(shape=(1,), name=feature_name))

# Concatenate all the feature inputs into a single tensor
concatenated_features = tf.keras.layers.concatenate(feature_inputs)

# Define the Dense layer that will receive the concatenated features
# This is a simple model with one hidden layer and one output neuron for regression
hidden_layer = tf.keras.layers.Dense(8, activation='relu')(concatenated_features)
output_layer = tf.keras.layers.Dense(1)(hidden_layer)

# Create the Keras Model
# The inputs are the list of individual feature inputs
# The output is the final Dense layer
model = tf.keras.Model(inputs=feature_inputs, outputs=output_layer)

# Compile the model for regression
model.compile(optimizer='adam',
              loss='mean_squared_error',
              metrics=['mae']) # Mean Absolute Error

# Print a summary of the model architecture
model.summary()

In [23]:
# 2. Define the EarlyStopping callback
# We will monitor the validation loss to prevent overfitting
early_stopping_callback = tf.keras.callbacks.EarlyStopping(
    monitor='mean_squared_error',  # The metric to monitor
    patience=3,          # Number of epochs with no improvement after which training will be stopped
    min_delta=0.1,     # Minimum change in the monitored quantity to qualify as an improvement
    restore_best_weights=True, # Restores model weights from the epoch with the best value
    verbose=1,            # To see the messages when training stops
)

# get valudation data
ds_val = create_dataset_with_transformations( 'data/taxi-valid*.csv', batch_size=1000)

# a way to extract the first element of the dataset, and get the model features.
X_val, y_val = next(iter(ds_train))

model.fit(ds_train, epochs=1, callbacks=[early_stopping_callback], validation_data=(X_val, y_val))

   4596/Unknown [1m19s[0m 4ms/step - loss: 87.8527 - mae: 5.8755

KeyboardInterrupt: 