<a href="https://colab.research.google.com/github/dpressel/mead-tutorials/blob/master/mead_tf_api_tpu.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Running classification on a TPU cluster**

This example code uses the API from Baseline to read a dataset, load word-embedding features, and create a basic convolutional neural network (CNN) to train on a dataset.

The TPU has 8 cores available, and the batch size is set to 16 time that amount (128).

To make this work, we are using the `tf.distribute` API which makes this very simple!

To start running we need tensorflow installed, and `mead-baseline`.

In [1]:
!pip install wheel
!pip install tensorflow
!pip install mead-baseline[tf2]




To access the embeddings and models for classification we need to import them.  That adds them to the internal MEAD registry of available classes. This allows us to refer to them by name.  The `default` name corresponds to pre-trained word embeddings for the embeddings portion, and a CNN for the classifier.


In [0]:
import tensorflow as tf
import os
import baseline
from baseline.tf.embeddings import *
from baseline.tf.classify import *

We need to set up our TPUs before any work is done, and this also cleans up any caches

In [3]:

resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='grpc://' + os.environ['COLAB_TPU_ADDR'])
tf.config.experimental_connect_to_cluster(resolver)
tf.tpu.experimental.initialize_tpu_system(resolver)
strategy = tf.distribute.experimental.TPUStrategy(resolver)

INFO:tensorflow:Initializing the TPU system: grpc://10.27.113.194:8470


INFO:tensorflow:Initializing the TPU system: grpc://10.27.113.194:8470


INFO:tensorflow:Clearing out eager caches


INFO:tensorflow:Clearing out eager caches


INFO:tensorflow:Finished initializing TPU system.


INFO:tensorflow:Finished initializing TPU system.


INFO:tensorflow:Found TPU system:


INFO:tensorflow:Found TPU system:


INFO:tensorflow:*** Num TPU Cores: 8


INFO:tensorflow:*** Num TPU Cores: 8


INFO:tensorflow:*** Num TPU Workers: 1


INFO:tensorflow:*** Num TPU Workers: 1


INFO:tensorflow:*** Num TPU Cores Per Worker: 8


INFO:tensorflow:*** Num TPU Cores Per Worker: 8


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)


Next, will grab down a dataset and some word-embeddings to get us going

In [4]:

!wget https://www.dropbox.com/s/7jyi4pi894bh2qh/sst2.tar.gz?dl=1
!tar -xzf 'sst2.tar.gz?dl=1'
!wget https://www.dropbox.com/s/699kgut7hdb5tg9/GoogleNews-vectors-negative300.bin.gz?dl=1
!mv 'GoogleNews-vectors-negative300.bin.gz?dl=1' GoogleNews-vectors-negative300.bin.gz
!gunzip GoogleNews-vectors-negative300.bin.gz

--2020-05-13 15:22:31--  https://www.dropbox.com/s/7jyi4pi894bh2qh/sst2.tar.gz?dl=1
Resolving www.dropbox.com (www.dropbox.com)... 162.125.3.1, 2620:100:6018:1::a27d:301
Connecting to www.dropbox.com (www.dropbox.com)|162.125.3.1|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: /s/dl/7jyi4pi894bh2qh/sst2.tar.gz [following]
--2020-05-13 15:22:31--  https://www.dropbox.com/s/dl/7jyi4pi894bh2qh/sst2.tar.gz
Reusing existing connection to www.dropbox.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://ucd9fc286a487bb28d7eb018d4ef.dl.dropboxusercontent.com/cd/0/get/A3pLLjAhWHh8F_ktEGMsKNFYk0oFoB5nXDEyu5m8QMVn6yC88F_L7XTwSsaydv_BK4NCrLk_ByxPggMcA7PHBICZ_U0_2Vfx4GR6_sLdl_io8g/file?dl=1# [following]
--2020-05-13 15:22:32--  https://ucd9fc286a487bb28d7eb018d4ef.dl.dropboxusercontent.com/cd/0/get/A3pLLjAhWHh8F_ktEGMsKNFYk0oFoB5nXDEyu5m8QMVn6yC88F_L7XTwSsaydv_BK4NCrLk_ByxPggMcA7PHBICZ_U0_2Vfx4GR6_sLdl_io8g/file?dl=1
Resolving ucd9f

Here we are defining some constants associated with the training set, the maximum feature temporal length, and some fields that will help us set up our `tf.dataset`.  The `word_lengths` key is required by the Baseline classifier in order for it to recognize which dataset field contains the length of the tensor.  This isnt really important for our CNN, but it would be very important if we were using a BiLSTM, for example.  In general, the vectorizers will generate a length for each tensor that they read in based on the unpadded tensor length, and these are placed in the reader's batch dictionary for each batch.


In [5]:
BASE = 'sst2'
TRAIN = os.path.join(BASE, 'stsa.binary.phrases.train')
VALID = os.path.join(BASE, 'stsa.binary.dev')
TEST = os.path.join(BASE, 'stsa.binary.test')
PRETRAINED_EMBEDDINGS = 'GoogleNews-vectors-negative300.bin'
MAX_FEAT_LEN = 100
# Number of batches to prefetch if using tf.datasets
NUM_PREFETCH = 2
# The shuffle buffer
SHUF_BUF_SZ = 5000
LENGTHS_KEY = 'word_lengths'
BATCH_SIZE = 16 * strategy.num_replicas_in_sync
print(f'Using batch size {BATCH_SIZE}')

Using batch size 128


Set up a dictionary of the features, with corresponding embeddings and a vectorizer that converts words to indices.

Then make a reader that can uses these vectorizers.

In [0]:

feature_desc = {
    'word': {
        'vectorizer': baseline.Token1DVectorizer(mxlen=MAX_FEAT_LEN, transform_fn=baseline.lowercase),
        'embed': {'file': PRETRAINED_EMBEDDINGS, 'type': 'default', 'unif': 0.25}
    }
}
vectorizers = {k: v['vectorizer'] for k, v in feature_desc.items()}
reader = baseline.TSVSeqLabelReader(vectorizers,
                              clean_fn=baseline.TSVSeqLabelReader.do_clean)


Set up our vocab, load our embeddings and create a word-to-index vocabulary for each embedding.  In this code example, there is only one feature, `word`, but MEAD supports many.

In [0]:

# This builds a set of counters
vocabs, labels = reader.build_vocab([TRAIN, VALID, TEST])

# This builds a set of embeddings objects, these are typically not DL-specific
# but if they happen to be addons, they can be
embeddings = dict()
for k, v in feature_desc.items():
    embed_config = v['embed']
    embeddings_for_k = baseline.load_embeddings('word',
                                                embed_file=embed_config['file'],
                                                known_vocab=vocabs[k],
                                                embed_type=embed_config.get('type', 'default'),
                                                unif=embed_config.get('unif', 0.))

    embeddings[k] = embeddings_for_k['embeddings']
    # Reset the vocab to the embeddings one
    vocabs[k] = embeddings_for_k['vocab']

Set up our neural network using the `create_model` call.  This finds the appropriate classifier in our registry by the name given in `model_type`, and passes along the other parameters to the factory method that is registered to create this model, finally producing our CNN:

In [23]:
model_params = {
    'cmotsz': 300,
    'filtsz': [3, 4, 5],
    'model_type': 'default'
}
model = baseline.model.create_model(embeddings, labels, **model_params)


Calling model <function register_model.<locals>.create at 0x7fe9b66dc0d0>


We couldve written a reader to load the dataset ourselves, but the `baseline.reader` module provides one for us.  However, its not exactly what we want -- we want the power of `tf.data.Datasets` and we need to then convert our reader's output to that.  To do this, we will use the utility function `to_tensors` which will convert the per-batch dictionaries that are produced by `reader.load` into tensor slices for use in the `tf.data.Dataset`

The `tf.distribute` API provides a function to distribute our dataset over each replica.  The batches end up getting carved up and sent off to the appropriate replica under the hood.

Our `create_dataset` code below encapsulates all of the logic we need to read from a file into a distributed dataset

In [0]:
from baseline.tf.classify.training.utils import to_tensors

def create_dataset(strategy, reader, filename, vocabs, batchsz, lengths_key, shuffle=False):
    ts = reader.load(filename, vocabs=vocabs, batchsz=1)
    dataset = tf.data.Dataset.from_tensor_slices(to_tensors(ts, lengths_key))
    if shuffle:
      dataset = dataset.shuffle(buffer_size=SHUF_BUF_SZ)
    dataset = dataset.batch(batchsz, drop_remainder=True)
    dataset = dataset.prefetch(NUM_PREFETCH)
    dataset = strategy.experimental_distribute_dataset(dataset)
    return dataset

train_ds = create_dataset(strategy, reader, TRAIN, vocabs, BATCH_SIZE, LENGTHS_KEY, True)
valid_ds = create_dataset(strategy, reader, VALID, vocabs, BATCH_SIZE, LENGTHS_KEY)
test_ds = create_dataset(strategy, reader, TEST, vocabs, BATCH_SIZE, LENGTHS_KEY)

We are finally ready to train our model.  We are going to use `tf.eager` to train, and to do this we can use the `EagerOptimizer`.  `8 mile` has the same class defined in PyTorch and TensorFlow, and for most cases, the API can be used identically.  Here we are going to tell `EagerOptimizer` to use `adam` with a learning rate of `1e-3`.  We need a loss function (defined by `loss` below) for the optimizer as well.

The next thing we do is define a function to train a whole epoch of data on the TPU using `tf.distribute`.

TPUs require that any eager code is compiled with `tf.function`.  The `_replicated_train_step` internal function is basically what each TPU core will run.  The results from each core need to be aggregated, and this is done with `_distributed_train_step`, which is compiled with via the `@tf.function` annotation, including its underlying replica train function.  The results from each replica include a per-replica loss, a per-replica number of datapoints that were processed (which is the batch size).  We use the `strategy.reduce` call to aggregate these, and we use `tf.Variable`s for each to aggregate each step in the epoch.



In [0]:
from eight_mile.tf.optz import EagerOptimizer
from eight_mile.tf.layers import get_shape_as_list
from baseline.tf.tfy import SET_TRAIN_FLAG

def loss(model, x, y):
    y_ = model(x)
    return tf.compat.v1.losses.sparse_softmax_cross_entropy(labels=y, logits=y_)

optimizer = EagerOptimizer(loss, optim='adam', lr=0.001)

def train_epoch(optimizer, model, train_ds, strategy):
    def _replicated_train_step(inputs):
        """Replicated training step."""
        features, y = inputs
        per_replica_loss = optimizer.update(model, features, y, strategy.num_replicas_in_sync)
        per_replica_batchsz = tf.cast(get_shape_as_list(y)[0], tf.float32)
        per_replica_report_loss = per_replica_loss * per_replica_batchsz
        return per_replica_report_loss, per_replica_batchsz

    @tf.function
    def _distributed_train_step(inputs):
        per_replica_loss, per_replica_batchsz = strategy.experimental_run_v2(_replicated_train_step, args=(inputs,))
        return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_loss, axis=None), strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_batchsz, axis=None)

    epoch_loss = tf.Variable(0.0)
    epoch_div = tf.Variable(0.0)
    with strategy.scope():
        SET_TRAIN_FLAG(True)
        train_iter = iter(train_ds)
        for next_x in train_iter:
            step_loss, step_batchsz = _distributed_train_step(next_x)
            epoch_loss.assign_add(step_loss)
            epoch_div.assign_add(step_batchsz)
        epoch_loss = epoch_loss.numpy()
        epoch_div = epoch_div.numpy()
        return epoch_loss / epoch_div


We finally have all the pieces we need to train on TPUs!  Now lets train an epoch and see our average loss and elapsed wall clock time in seconds!

In [29]:
import time
start_time = time.time()
avg_loss = train_epoch(optimizer, model, train_ds, strategy=strategy)
elapsed = time.time() - start_time
print(avg_loss, elapsed)

0.028293055 16.16217279434204


We would like to know how we are doing, so we need to define a routine to validate our model on some data. Like in training, we are going to replicate the validation across multiple TPUs, and aggregate the results.  In this case we would also like to know the accuracy of our classifier, so we will count up in each replica how many datapoints were correct and divide by the total number of datapoints.

Otherwise the code is quite similar to our `train_epoch` function.

You might be wondering about the `SET_TRAIN_FLAG()` function that we set in both the `train_epoch` and `test_epoch`.  This sets a global variable inside MEAD that determines if things like dropout should be applied.  It is equivalent to `model.train()` and `model.eval()` in PyTorch.

In [0]:
def test_epoch(model, test_ds, strategy):

    def _replica_test_step(inputs):
        features, y = inputs
        y = tf.cast(y, tf.int64)
        logits = model(features)
        y_ = tf.argmax(logits, axis=1, output_type=tf.int64)
        per_replica_loss = tf.compat.v1.losses.sparse_softmax_cross_entropy(labels=y, logits=logits)
        per_replica_batchsz = tf.cast(get_shape_as_list(y)[0], tf.float32)
        per_replica_report_loss = per_replica_loss * per_replica_batchsz
        return per_replica_report_loss, per_replica_batchsz, tf.cast(tf.reduce_sum(tf.cast(y == y_, tf.int64)), tf.float32)

    @tf.function
    def _distributed_test_step(inputs):
        per_replica_loss, per_replica_batchsz, per_replica_correct = strategy.experimental_run_v2(_replica_test_step, args=(inputs,))
        step_loss = strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_loss, axis=None)
        step_batchsz = strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_batchsz, axis=None)
        step_corr = strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_correct, axis=None)
        return step_loss, step_batchsz, step_corr
    
    with strategy.scope():
        total_loss = tf.Variable(0.0)
        total_norm = tf.Variable(0.0)
        total_corr = tf.Variable(0.0)
        SET_TRAIN_FLAG(False)
        test_iter = iter(test_ds)

        for next_x in test_iter: 
            step_loss, step_batchsz, step_corr = _distributed_test_step(next_x)
            total_loss.assign_add(step_loss)
            total_norm.assign_add(step_batchsz)
            total_corr.assign_add(step_corr)

        total_loss = total_loss.numpy()
        total_corr = total_corr.numpy()
        total_norm = total_norm.numpy()
        acc = total_corr / float(total_norm)
        avg_loss = total_loss / float(total_norm)
        return acc, avg_loss


Okay, lets run an test iteration:

In [31]:
start_time = time.time()
acc, avg_loss = test_epoch(model, valid_ds, strategy=strategy)
elapsed = time.time() - start_time
print(acc, avg_loss, elapsed)

0.84375 0.3882710138956706 2.0577547550201416


Lets run one more epoch and then we will evaluate our model on the test data

In [32]:
start_time = time.time()
avg_loss = train_epoch(optimizer, model, train_ds, strategy=strategy)
elapsed = time.time() - start_time
print('Train', avg_loss, elapsed)
start_time = time.time()
acc, avg_loss = test_epoch(model, valid_ds, strategy=strategy)
elapsed = time.time() - start_time
print('Valid', acc, avg_loss, elapsed)

Train 0.022798203 12.71997618675232
Valid 0.84765625 0.3976578712463379 0.9341402053833008


Ok, we have trained the model for 2 epochs (complete passes over the dataset), with the word divided over 8 TPUs. Finally, we will run the same model, but on the test data this time, and the final accuracy is the one that we would want to report.

In [33]:
start_time = time.time()
acc, avg_loss = test_epoch(model, test_ds, strategy=strategy)
elapsed = time.time() - start_time
print('Test', acc, avg_loss, elapsed)


Test 0.8638392857142857 0.35195088386535645 1.262049674987793


**Conclusion**

In this example, we dug into the internals of MEAD to make our own training loop and evaluation to make the Baseline CNN classifier run on Google TPUs!