# Building a TF Data Loader
This notebook shows how we go from serialized TFRecords, which we saved in the [previous notebook](./0_save-training-data.ipynb) to training batches. TF Data Loaders are created by defining a set of operations that convert a stream of data using [Tensorflow's Data module](https://www.tensorflow.org/guide/data).

In [1]:
import tensorflow as tf
import numpy as np
import os

In [2]:
data_file = 'datasets/train_data.proto'

## Part 1: Loading Records from Disk
Our records are stored as serialized protobuf records. Origin of the stream of data will be to read from this file

In [3]:
loader = tf.data.TFRecordDataset(data_file)
loader

2022-02-27 20:38:43.007730: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2022-02-27 20:38:43.009091: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-02-27 20:38:43.012065: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.


<TFRecordDatasetV2 shapes: (), types: tf.string>

Note that this loader produces a dataset of strings, which are the serialized data objects

In [4]:
next(iter(loader))

2022-02-27 20:38:43.051610: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2022-02-27 20:38:43.052954: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 3693060000 Hz


<tf.Tensor: shape=(), dtype=string, numpy=b'\n\x89\x02\n\x0f\n\x06n_atom\x12\x05\x1a\x03\n\x01\x13\n4\n\x04bond\x12,\x1a*\n(\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\nd\n\x0cconnectivity\x12T\x1aR\nP\x00\x01\x01\x00\x01\x02\x01\t\x02\x01\x02\x03\x02\x04\x02\n\x03\x02\x03\x04\x03\x0b\x03\x0c\x04\x02\x04\x03\x04\x05\x04\x08\x05\x04\x05\x06\x05\r\x05\x0e\x06\x05\x06\x07\x06\x0f\x06\x10\x07\x06\x07\x08\x07\x11\x07\x12\x08\x04\x08\x07\t\x01\n\x02\x0b\x03\x0c\x03\r\x05\x0e\x05\x0f\x06\x10\x06\x11\x07\x12\x07\n\x13\n\x07bandgap\x12\x08\x12\x06\n\x04RI]>\n\x1f\n\x04atom\x12\x17\x1a\x15\n\x13\x03\x01\x01\x01\x01\x01\x01\x01\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\n\x0f\n\x06n_bond\x12\x05\x1a\x03\n\x01(\n\x13\n\x07u0_atom\x12\x08\x12\x06\n\x04\xd9%>\xc0'>

For performance reasons, we are going to form these records into batches first before parsing.

- `shuffle` creates a buffer of entries (128 in our case) and randomly picks an entry from that buffer
- `batch` pulls multiple entries and returns them as a list

In [5]:
loader = tf.data.TFRecordDataset(data_file).shuffle(128).batch(2)
loader

<BatchDataset shapes: (None,), types: tf.string>

In [6]:
next(iter(loader))

<tf.Tensor: shape=(2,), dtype=string, numpy=
array([b'\n\xf3\x01\n\x0f\n\x06n_bond\x12\x05\x1a\x03\n\x01"\nX\n\x0cconnectivity\x12H\x1aF\nD\x00\x01\x00\t\x01\x00\x01\x02\x01\x08\x02\x01\x02\x03\x03\x02\x03\x04\x03\x07\x03\n\x04\x03\x04\x05\x04\x06\x04\x0b\x05\x04\x05\x06\x05\x0c\x06\x04\x06\x05\x06\x07\x06\r\x07\x03\x07\x06\x07\x08\x08\x01\x08\x07\x08\x0e\t\x00\n\x03\x0b\x04\x0c\x05\r\x06\x0e\x08\n\x1b\n\x04atom\x12\x13\x1a\x11\n\x0f\x02\x01\x03\x01\x01\x02\x01\x01\x01\x00\x00\x00\x00\x00\x00\n\x13\n\x07bandgap\x12\x08\x12\x06\n\x04q\xacK>\n.\n\x04bond\x12&\x1a$\n"\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\n\x13\n\x07u0_atom\x12\x08\x12\x06\n\x04e4\x1b\xc0\n\x0f\n\x06n_atom\x12\x05\x1a\x03\n\x01\x0f',
       b'\n\xa5\x02\n\x0f\n\x06n_atom\x12\x05\x1a\x03\n\x01\x17\nt\n\x0cconnectivity\x12d\x1ab\n`\x00\x01\x00\t\x00\n\x00\x0b\x01\x00\x01\x02\x01\x0c\x01\r\x02\x01\x02\x03\x02\x08\x03\x02\x03\x04

We now have a two member batch of arrays

Our next step is to convert the batches of records into a set of tensors.

To do so, you must define which elements of the protobuf message you would like to read from the object and define their types.

In [7]:
def parse_records(example_proto):
    """Parse data from the TFRecord"""
    features = {
        'u0_atom': tf.io.FixedLenFeature([], tf.float32, default_value=np.nan),
        'n_atom': tf.io.FixedLenFeature([], tf.int64),
        'n_bond': tf.io.FixedLenFeature([], tf.int64),
        'connectivity': tf.io.VarLenFeature(tf.int64),
        'atom': tf.io.VarLenFeature(tf.int64),
        'bond': tf.io.VarLenFeature(tf.int64),
    }
    return tf.io.parse_example(example_proto, features)

We apply this function to the data chain

In [8]:
loader = tf.data.TFRecordDataset(data_file).shuffle(128).batch(2).map(parse_records)
loader

<MapDataset shapes: {atom: (None, None), bond: (None, None), connectivity: (None, None), n_atom: (None,), n_bond: (None,), u0_atom: (None,)}, types: {atom: tf.int64, bond: tf.int64, connectivity: tf.int64, n_atom: tf.int64, n_bond: tf.int64, u0_atom: tf.float32}>

We now have Tensor objects in a dictionary!

## Preprocessing to Make MPNN-compatible batches
Tensorflow now can understand the data types, but these data are not yet in a form we can use in our MPNN.

In [9]:
next(iter(loader))

{'atom': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x7f31783c9460>,
 'bond': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x7f31783c94c0>,
 'connectivity': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x7f31783c9ac0>,
 'n_atom': <tf.Tensor: shape=(2,), dtype=int64, numpy=array([16, 17])>,
 'n_bond': <tf.Tensor: shape=(2,), dtype=int64, numpy=array([32, 32])>,
 'u0_atom': <tf.Tensor: shape=(2,), dtype=float32, numpy=array([-2.579495, -2.631878], dtype=float32)>}

Our big issue is that the `SparseTensor` objects cannot be used in many Tensorflow operations. We need to convert them to Dense layers.

In [10]:
def prepare_for_batching(dataset):
    """Make the variable length arrays into RaggedArrays.
    
    Allows them to be merged together in batches"""
    for c in ['atom', 'bond', 'connectivity']:
        expanded = tf.expand_dims(dataset[c].values, axis=0, name=f'expand_{c}')
        dataset[c] = tf.RaggedTensor.from_tensor(expanded)
    return dataset

In [11]:
loader = tf.data.TFRecordDataset(data_file).shuffle(128).batch(2).map(parse_records).map(prepare_for_batching)
loader

Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Index'
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Index'


<MapDataset shapes: {atom: (1, None), bond: (1, None), connectivity: (1, None), n_atom: (None,), n_bond: (None,), u0_atom: (None,)}, types: {atom: tf.int64, bond: tf.int64, connectivity: tf.int64, n_atom: tf.int64, n_bond: tf.int64, u0_atom: tf.float32}>

In [12]:
next(iter(loader))

{'atom': <tf.RaggedTensor [[1, 1, 2, 1, 1, 3, 1, 1, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 1, 1, 1, 1, 1, 1, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]>,
 'bond': <tf.RaggedTensor [[0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]>,
 'connectivity': <tf.RaggedTensor [[0, 1, 0, 9, 0, 10, 0, 11, 1, 0, 1, 2, 1, 6, 1, 12, 2, 1, 2, 3, 2, 4, 3, 2, 3, 13, 3, 14, 3, 15, 4, 2, 4, 5, 4, 6, 5, 4, 6, 1, 6, 4, 6, 7, 6, 8, 7, 6, 7, 16, 7, 17, 7, 18, 8, 6, 8, 19, 9, 0, 10, 0, 11, 0, 12, 1, 13, 3, 14, 3, 15, 3, 16, 7, 17, 7, 18, 7, 19, 8, 0, 1, 0, 9, 0, 10, 0, 11, 1, 0, 1, 2, 1, 5, 2, 1, 2, 3, 2, 4, 2, 7, 3, 2, 3, 12, 3, 13, 3, 14, 4, 2, 4, 5, 4, 15, 4, 16, 5, 1, 5, 4, 5, 6, 5, 7, 6, 5, 6, 17, 6, 18, 6, 19, 7, 2, 7, 5, 7, 8, 7, 20, 8, 7, 8, 21, 9, 0, 10, 0, 11, 0, 12, 3, 13, 3, 14, 3, 15, 4, 

We now have the data close to the form we need it, minus a few things:

- We can't easily know which "node" corresponds to which training entry because we have stuck multiple graphs into the same batch
- The `connectivity` array is the wrong shape. It is a 1D instead of Nx2 array
- The node ids in the connectivity arrays are incorrect. Since we have merged multiple graphs, the node 0 of the second graph is no longer at position 0 in the `atom` array

In [13]:
def combine_graphs(batch):
    """Combine multiple graphs into a single network"""
    
    # Convert RaggedTensors into vectors
    for c in ['atom', 'bond', 'connectivity']:
        expanded = tf.expand_dims(batch[c].values, axis=0, name=f'expand_{c}')
        batch[c] = tf.RaggedTensor.from_tensor(expanded).flat_values

    # Compute the mappings from bond index to graph index
    batch_size = tf.size(batch['n_atom'], name='batch_size')
    mol_id = tf.range(batch_size, name='mol_inds')
    batch['node_graph_indices'] = tf.repeat(mol_id, batch['n_atom'], axis=0)
    batch['bond_graph_indices'] = tf.repeat(mol_id, batch['n_bond'], axis=0)

    # Reshape the connectivity matrix to (None, 2)
    batch['connectivity'] = tf.reshape(batch['connectivity'], (-1, 2))

    # Compute offsets for the connectivity matrix
    offset_values = tf.cumsum(batch['n_atom'], exclusive=True)
    offsets = tf.repeat(offset_values, batch['n_bond'], name='offsets', axis=0)
    batch['connectivity'] += tf.expand_dims(offsets, 1)

    return batch

In [14]:
loader = tf.data.TFRecordDataset(data_file).shuffle(128).batch(2).map(parse_records).map(prepare_for_batching).map(combine_graphs)
loader

Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Index'
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Index'


<MapDataset shapes: {atom: (None,), bond: (None,), connectivity: (None, 2), n_atom: (None,), n_bond: (None,), u0_atom: (None,), node_graph_indices: (None,), bond_graph_indices: (None,)}, types: {atom: tf.int64, bond: tf.int64, connectivity: tf.int64, n_atom: tf.int64, n_bond: tf.int64, u0_atom: tf.float32, node_graph_indices: tf.int32, bond_graph_indices: tf.int32}>

If you look closely at the output, you will notice two new values:
- `node_graph_indices`, which stores which molecule ("graph") each atom ("node") belongs to.
- `bond_graph_indices`, which stores which molecule each bond belongs to.

This operation converts our vectors into the right shape now. And, because we did all of these operations in Tensorflow, we can delegate these operations to the GPU and use Tensorflow's automated parallelism

In [15]:
next(iter(loader))

{'atom': <tf.Tensor: shape=(32,), dtype=int64, numpy=
 array([3, 1, 1, 1, 2, 1, 3, 1, 1, 0, 0, 0, 0, 0, 0, 0, 2, 1, 1, 1, 2, 2,
        1, 1, 1, 0, 0, 0, 0, 0, 0, 0])>,
 'bond': <tf.Tensor: shape=(64,), dtype=int64, numpy=
 array([1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 0, 0, 0, 0, 2, 2, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])>,
 'connectivity': <tf.Tensor: shape=(64, 2), dtype=int64, numpy=
 array([[ 0,  1],
        [ 1,  0],
        [ 1,  2],
        [ 1,  9],
        [ 2,  1],
        [ 2,  3],
        [ 2, 10],
        [ 2, 11],
        [ 3,  2],
        [ 3,  4],
        [ 3,  8],
        [ 3, 12],
        [ 4,  3],
        [ 4,  5],
        [ 4, 13],
        [ 5,  4],
        [ 5,  6],
        [ 5,  7],
        [ 6,  5],
        [ 7,  5],
        [ 7,  8],
        [ 7, 14],
        [ 8,  3],
        [ 8,  7],
        [ 8, 15],
        [ 9,  1],
        [10,  2],
        