# tests of speed of some `tf.data` pipelines on MNIST

In [1]:
import tensorflow as tf
import numpy as np

  return f(*args, **kwds)


In [2]:
tf.enable_eager_execution()

## Load some data
We start with Numpy arrays and ask how steps later in the pipeline affect speed, neglecting question of whether we need `tf.records`.

In [7]:
def load_mnist_train():
    images = np.load('/home/art/Documents/data/mnist/raw/train_images.npy')
    labels = np.load('/home/art/Documents/data/mnist/raw/train_labels.npy')
    sample_inds = np.load('/home/art/Documents/data/mnist/raw/train_indices.npy')
    return images, labels, sample_inds

# # small dataset so we don't have to sit through all 50k images of MNIST
# images = images[:1000]
# labels = labels[:1000]
# sample_inds = sample_inds[:1000]

## Make a model to test with
We create a very simple Keras model and logic for training.

In [8]:
target_shape = np.prod(images.shape[1:])

model = tf.keras.Sequential([
    tf.keras.layers.Reshape(
        target_shape=(target_shape,)
    ),
    tf.keras.layers.Dense(units=256, activation=tf.nn.relu, input_shape=(4,)),
    tf.keras.layers.Dense(128, activation=tf.nn.relu),
    tf.keras.layers.Dense(10)
])

def loss(model, x, y):
  y_ = model(x)
  return tf.losses.softmax_cross_entropy(onehot_labels=y, logits=y_)

def grad(model, inputs, targets):
  with tf.GradientTape() as tape:
    loss_value = loss(model, inputs, targets)
  return loss_value, tape.gradient(loss_value, model.trainable_variables)

optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.01)

## pipeline 1
What I've been using for RAM model, adapted from tf.contrib

In [16]:
def normalize(images):
    """
    Normalize images to [0.0,1.0]
    """
    images = tf.cast(images, tf.float32)
    images /= 255.
    return images

def pipeline_one(images, labels, sample_inds):
    images = normalize(images)

    def gen():
        for image, label, batch_sample_inds in zip(images, labels, sample_inds):
            yield image, label, batch_sample_inds

    data = tf.data.Dataset.from_generator(gen, (tf.float32, tf.int32, tf.int32),
                                          ((28, 28, 1), [], []))
    data = data.shuffle(buffer_size=sample_inds.shape[-1], reshuffle_each_iteration=True)
    
    return data

images, labels, sample_inds = load_mnist_train()
data = pipeline_one(images, labels, sample_inds)

In [14]:
%%timeit
global_step = tf.Variable(0)

# in eager mode, no need to create iterator from dataset or call get_next()
for img, lbl, batch_train_inds in data.batch(batch_size=32):
    # Optimize the model
    lbl = tf.one_hot(indices=lbl, depth=10)
    loss_value, grads = grad(model, img, lbl)
    optimizer.apply_gradients(zip(grads, model.trainable_variables),
                              global_step)

28.6 s ± 444 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [19]:
def pipeline_two(images, labels, sample_inds):
    data = tf.data.Dataset.from_tensor_slices((images, labels, sample_inds))
    data = data.map(map_func=lambda img, lbl, inds: tuple([normalize(img), lbl, inds]), 
                    num_parallel_calls = 4)
    data.apply(tf.data.experimental.shuffle_and_repeat(buffer_size=sample_inds.shape[-1]))
    return data

images, labels, sample_inds = load_mnist_train()
data = pipeline_two(images, labels, sample_inds)

In [21]:
%%timeit
global_step = tf.Variable(0)

for img, lbl, batch_train_inds in data.batch(batch_size=32):
    # Optimize the model
    lbl = tf.one_hot(indices=lbl, depth=10)
    loss_value, grads = grad(model, img, lbl)
    optimizer.apply_gradients(zip(grads, model.trainable_variables),
                              global_step)

12.1 s ± 110 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
