# 2. Data Loaders for ML Pipelines

In part 1 of this blog, we looked at python methods, generators, and iterators for lazy data reading. 

In this blog post, we will combine these tools and build something a little more involved: Effecient data objects for preparation and data ingestion into ML models!

Let's look at the holodec data (as of Oct 12, 2020), which contains large hologram images and numerical data accompaning each image.

In [2]:
from holodecml.data import *

In [3]:
def load_raw_datasets(path_data, num_particles, split, output_cols, subset):
    ds = open_dataset(path_data, num_particles, split)
    if subset:
        ix = int(subset * ds['image'].shape[0])
        inputs = ds['image'][:ix].values
        outputs = ds[output_cols].to_dataframe()
        outputs = outputs[outputs["hid"] < (ix+1)]
    else:
        inputs = ds["image"].values
        outputs = ds[output_cols].to_dataframe()    
    ds.close()
    return inputs, outputs

In [4]:
path_data = "/glade/p/cisl/aiml/ai4ess_hackathon/holodec/"
num_particles = "multi"
split = 'train'
subset = False
output_cols = ["x", "y", "z", "d", "hid"]

In [5]:
t0 = time.time()
inputs, outputs = load_raw_datasets(path_data, num_particles, split, output_cols, subset)
t1 = time.time()
print(t1-t0)

This file is pretty large, not large enough to cause memory overflow but its a big one!

We will first write a lazy reader that returns the same things as the above method, but one row at a time. Then we will utilize special Iterable objects from the torch, tensorflow, and keras libraries that will enable multiprocessing with multiple workers, batching, and other transformations.

### The Dataset Reader

In [7]:
import tensorflow as tf
from tensorflow.keras.utils import Sequence
from torch.utils.data import Dataset, DataLoader

The torch.utils.data.Dataset and tensorflow.keras.utils.Sequence objects are nearly identical to each other in that both require a custom Reader to contain at minumum the __len__ and __getitem__ (thunder) methods:

In [9]:
class HologramReader(Dataset): # Sequence
    
    'Generates data for Keras/Tensorflow/Torch environments'
    
    def __init__(self, 
                 path_data, 
                 num_particles, 
                 split, 
                 output_cols, 
                 subset, 
                 maxnum_particles = 100):
        
        'Initialization'
        self.ds = open_dataset(path_data, num_particles, split)
        self.output_cols = [x for x in output_cols if x != 'hid']        
        self.hologram_numbers = list(range(len(self.ds.hologram_number.values)))
        self.maxnum_particles = maxnum_particles
        
    def __len__(self):
        'Denotes the number of batches per epoch'
        return len(self.hologram_numbers)
    
    def __getitem__(self, idx):
        'Return one row of data'
        
        # Select one "row" from the dataset
        hologram = self.hologram_numbers[idx]

        x_out = self.ds["image"][hologram].values
        y_out = np.zeros((
            self.maxnum_particles if self.maxnum_particles else self.num_particles, 
            len(self.output_cols)
        ))
        particles = np.where(self.ds["hid"] == hologram + 1)[0]
        for l, p in enumerate(particles):
            for m, col in enumerate(self.output_cols):
                y_out[l, m] = self.ds[col].values[p]

        return x_out, y_out

In [10]:
data_reader = HologramReader(path_data, num_particles, split, output_cols, subset)

The method __getitem__ is used to select whichever hologram we want (take the one labeled 0):

In [None]:
x, y = data_reader.__getitem__(0)

In [None]:
print(x.shape, y.shape)

Make a quick plot:

In [11]:
import matplotlib.pyplot as plt

In [None]:
plt.imshow(x)

### The Iterator

##### Torch

For the torch/keras Reader, we wrap a torch DataLoader object around it to enable iterative batching as well as multiprocessing capabilities (among other options). As noted, the DataLoader object requires that the Reader class contains the __len__ and __getitem__ thunder methods:

In [None]:
data_iterator = DataLoader(
    data_reader,
    num_workers = 8,
    batch_size = 32,
    shuffle = True
)

In [None]:
for (x, y) in data_iterator:
    print(x.shape, y.shape)
    break

The DataLoader object will take care of creating independent workers that each will load the Reader and put the row data onto a queue. The first few calls to data_iterator will be slow as the workers have to be initialized before queues start to fill up.

##### Keras

When using Keras, all we have to do is swap HologramReader(Dataset) for HologramReader(Sequence) and make no other changes. We could just skip the inheritance and specify the output type (torch tensor versus numpy tensor) upon return.

In older Keras versions, the Reader would be fed into the Model.fit_generator method, which allowed one to toggle the number of workers. The important bit about the Sequence object is that it is thread-safe, meaning the workers spawned will not duplicate data chunks (this isn't a major problem when shuffling the data).

The torch DataLoader came with shuffle capability (by initializing a random order using the __len__ method), whereas in the Sequence object, shuffle functionality (and whatever else you want) can be enabled by adding a method typically called "on_epoch_end" that will shuffle after some number of data points have been returned. 


##### Tensorflow

But, since the Sequence object was recently deprecated in favor of Tensorflow's data objects, we will proceed by setting up one of those. We only have to make a few adjustments to the above Readers:

In [None]:
class TFHologramReader: 
    
    'Generates data for Keras/Tensorflow/Torch environments'
    
    def __init__(self, 
                 path_data, 
                 num_particles, 
                 split, 
                 output_cols, 
                 subset, 
                 maxnum_particles = 100):
        
        'Initialization'
        self.ds = open_dataset(path_data, num_particles, split)
        self.output_cols = [x for x in output_cols if x != 'hid']        
        self.hologram_numbers = list(range(len(self.ds.hologram_number.values)))
        self.maxnum_particles = maxnum_particles
    
    def __call__(self):
        'Return one row of data'
        for k, hologram in enumerate(self.hologram_numbers):
            # Select one "row" from the dataset
            x_out = self.ds["image"][hologram].values
            y_out = np.zeros((
                self.maxnum_particles if self.maxnum_particles else self.num_particles, 
                len(self.output_cols)
            ))
            particles = np.where(self.ds["hid"] == hologram + 1)[0]
            for l, p in enumerate(particles):
                for m, col in enumerate(self.output_cols):
                    y_out[l, m] = self.ds[col].values[p]

            yield tf.convert_to_tensor(x_out), tf.convert_to_tensor(y_out)

Note that this Reader's __call__ is a generator, whereas before we used a method.

Calling and using the reader is the same as before. First, initialize an instance by setting the input variables:

In [None]:
tf_data_generator = TFHologramReader(
    path_data, num_particles, split, output_cols, subset
)

Then we wrap Tensorflow's Dataset object around the generator, using the from_generator method within the Dataset object, and setting the tensor types explictly:

In [None]:
tf_data_iterator = tf.data.Dataset.from_generator(
    tf_data_generator, 
    (tf.dtypes.float32, tf.dtypes.float32)
)

Setting the batch size can be done by

In [None]:
tf_data_iterator = tf_data_iterator.batch(32)

Shuffling is a little more complex. Since a generator is in play, a memory buffer of size buffer_size will be created and rows put into it. Then, batches are selected from it and returned. Note that there will be a delay in the begining as the buffer has to fill up:

In [None]:
tf_data_iterator = tf_data_iterator.shuffle(buffer_size=100)

To get true random sampling, the buffer size would need to be the size of the data set.

In practice, one should shuffle the hologram numbers list in the TFReader upon initialization since they are available, similar to what needed to be done when using the keras Sequence object. For the sake of illustration we use tensorflows method for shuffling.

In [None]:
for (x, y) in tf_data_iterator:
    print(x.shape, y.shape)
    break

The number of workers can be set when training the model through the .fit() super-class (which contains an machinery for rolling out the Reader and feeding the batches it into the model):

Note that we could use any of these approaches with either ML language, though the torch and tensorflow versions are superior to the now-deparecated Sequence. The later is true in the case of running with one worker at a time. Enabling multiprocessing on your own with Tensorflow stuff is tricky! You need to "shard" the data into chunks first, then each chunk is shipped off to a worker. The Torch iterator object does all of this more, under the hood. 

These special iterator objects contain many other features not covered here, so be sure to check out the documentation!

Tensorflow Dataset: https://www.tensorflow.org/api_docs/python/tf/data/Dataset

Torch Dataset: https://pytorch.org/docs/stable/data.html