# Introduction and Purpose

Our framework is primarily intended for testing OpenCL implementations of common machine learning operations and (possibly even) hand-written networks. For now, we're going to consider them as _pure functions_, with the return value depending solely on the arguments. To test them, we gather a set of input-output pairs, feed the inputs, and compare results with the corresponding outputs.

Gathering data presents the biggest challenge: it needs to be both truthful and varied, dimensions-wise and content-wise. Accumulating it by hand is error-prone and time-consuming; so is writing generators (essentially, host-based implementations of target operations).

Instead, we choose to extract inputs and outputs from a TensorFlow _computational graph_. Its primitive operations are well-tested, and as such can be used to verify new implementations. Moreover, data can be dumped for a subgraph instead of a single node, allowing fused operations (e.g. batch normalization and ReLU activation combined) to be tested as well.

# Importing a Graph

We envision that users would want to work with existing models, such as the [officially supported ones](https://github.com/tensorflow/models). This is achieved by loading a computational graph from a saved [checkpoint](https://www.tensorflow.org/guide/checkpoints):

In [1]:
import tensorflow as tf

def load_graph_from_checkpoint(sess, chkpt_dir):
  latest_chkpt = tf.train.latest_checkpoint(chkpt_dir)
  
  saver = tf.train.import_meta_graph(f'{latest_chkpt}.meta')
  saver.restore(sess, latest_chkpt)

# Dumping Inputs and Outputs

In [2]:
def run_model_and_extract(sess, inputs, output_node_names):
  outputs = [tf.get_default_graph().get_tensor_by_name(n) for n in output_node_names]
  return sess.run(outputs, inputs)

Note that `inputs` is a dictionary of tensor names to tensors, which provides input values for the _whole model_. For the purposes of testing, the tensors can be randomly generated as long as we know their names, and those can be extracted using `tf.report_uninitialized_variables()`.

In [3]:
def random_inputs(sess):
  input_tensor_names = [f'{str(v, "utf8")}:0' for v in sess.run(tf.report_uninitialized_variables())]
  return {n: sess.run(tf.random_uniform(tf.get_default_graph().get_tensor_by_name(n).shape))
          for n in input_tensor_names}

# Caching Values

If you relaunch the notebook, you may notice that it takes a few seconds for TensorFlow to restore the graph, run it, and dump the data we're interested in. This is an unacceptable delay for frequently run tests; to counter it, we may cache tensors on disk (serialized in a binary NumPy format) and only call TensorFlow routines if we need them.

A unique storage name is needed for each cached value. We've chosen a SHA-256 hash of the checkpoint path and full node name.


In [4]:
from hashlib import sha256
import os

def cache_filename(checkpoint_dir, node_name):
  return sha256(f'{checkpoint_dir}//{node_name}'.encode('utf8')).hexdigest() + '.npy'

def restore_cached(checkpoint_dir, node_names):
  os.makedirs('__testdumps__', exist_ok=True)
  cached_filenames = os.listdir('__testdumps__')
  
  nodes = [(node_name, cache_filename(checkpoint_dir, node_name)) for node_name in node_names]
  return {node_name: np.load('__testdumps__/' + cache_name)
          if cache_name in cached_filenames else None
          for (node_name, cache_name) in nodes}

def cache_outputs(checkpoint_dir, node_names_to_tensors):
  for (node_name, tensor) in node_names_to_tensors.items():
    np.save('__testdumps__/' + cache_filename(checkpoint_dir, node_name), tensor)

def compute_outputs_with_cache(checkpoint_dir, node_names):
  restored = restore_cached(checkpoint_dir, node_names)
  cache_misses = [node_name for (node_name, tensor) in restored.items() if tensor is None]
  
  if len(cache_misses) == 0:
    return restored
  
  tf.reset_default_graph()
  sess = tf.Session()
  
  load_graph_from_checkpoint(sess, checkpoint_dir)
  inputs = random_inputs(sess)
  output_values = run_model_and_extract(sess, inputs, cache_misses)
  
  computed = dict(zip(cache_misses, output_values))
  cache_outputs(checkpoint_dir, computed)
  
  return {**restored, **computed}

# Declaring Desired Outputs

To reduce the amount of boilerplate code required to run the tests, we use _function decorators_ to specify target outputs in a declarative fashion.

In [5]:
import functools
import numpy as np

class FeedTensors(object):
  def __init__(self, checkpoint_dir, test_values):
    output_node_names = list(test_values.values())    
    outputs = compute_outputs_with_cache(checkpoint_dir, output_node_names)
    
    self.test_values = {output_name: outputs[output_node_name]
                        for (output_name, output_node_name) in test_values.items()}
  
  def __call__(self, fn):
    @functools.wraps(fn)
    def with_inputs(*args, **kwargs):
        kwargs['test_values'] = self.test_values
        fn(*args, **kwargs)
    return with_inputs

# Example

The checkpoint used as an example was created using [TensorFlow benchmarking scripts](https://github.com/tensorflow/benchmarks/tree/master/scripts/tf_cnn_benchmarks#tf_cnn_benchmarks-high-performance-benchmarks):

```
CHKPT_DIR='../../Documents/resnet50v1_traindir'

python3 tf_cnn_benchmarks.py --model=resnet50 --data_format=NHWC --batch_size=8 --num_batches=1 \
  --train_dir=${CHKPT_DIR} --trace_file=${CHKPT_DIR}/trace --tfprof_file=${CHKPT_DIR}/profile \
  --summary_verbosity=3 --save_summaries_steps=30 \
  --device=cpu --local_parameter_device=cpu --all_reduce_spec=pscpu
```

In [6]:
@FeedTensors(checkpoint_dir='../../Documents/resnet50v1_traindir',
             test_values={'add': 'v/tower_0/cg/resnet_v10/add:0', 'relu': 'v/tower_0/cg/resnet_v10/Relu:0'})
def test(test_values):
  print({k: v.shape for (k, v) in test_values.items()})

test()

{'add': (8, 56, 56, 256), 'relu': (8, 56, 56, 256)}
