This example shows how to train a model in a distributed manner, using the well-known MNIST problem as the classification task. In particular, the between-graph replication mode is adopted. To this end, one needs to declare the computation in a loop, and in each iteration declare the same computation for different workers using function abyss_replica_device_setter(). Besides, one needs to hold the duplicate handles of the inputs and outputs (i.e. feeds and fetches) of the computation. When session.run() is called, each share of inputs should be fed, and all the outputs are used as the fetches.

In [1]:
from abyss import *

In [2]:
import tensorflow as tf
import math

In [3]:
from tensorflow.examples.tutorials.mnist import mnist, input_data

In [4]:
mnist_data = input_data.read_data_sets('/tmp/tensorflow/mnist/input_data', one_hot=True)

Successfully downloaded train-images-idx3-ubyte.gz 9912422 bytes.
Extracting /tmp/tensorflow/mnist/input_data/train-images-idx3-ubyte.gz
Successfully downloaded train-labels-idx1-ubyte.gz 28881 bytes.
Extracting /tmp/tensorflow/mnist/input_data/train-labels-idx1-ubyte.gz
Successfully downloaded t10k-images-idx3-ubyte.gz 1648877 bytes.
Extracting /tmp/tensorflow/mnist/input_data/t10k-images-idx3-ubyte.gz
Successfully downloaded t10k-labels-idx1-ubyte.gz 4542 bytes.
Extracting /tmp/tensorflow/mnist/input_data/t10k-labels-idx1-ubyte.gz


In [5]:
num_ps = 2
num_worker = 2

In [6]:
sess = AbyssDistributedSession(['ps', 'worker'], [num_ps, num_worker])

In [7]:
IMAGE_PIXELS = 28
hidden_units = 1000

In [8]:
# Duplicate handles of inputs and outputs
global_x = []
global_y_ = []
global_train_step = []

for worker_index in range(num_worker):
    worker_device = '/job:worker/task:%d' % (worker_index)
    with tf.device(abyss_replica_device_setter(ps_tasks=num_ps, worker_device=worker_device, ps_device='/job:ps/cpu:0')):
        global_step = tf.Variable(0, name='global_step', trainable=False)
        # Variables of the hidden layer
        with tf.name_scope('hidden'):
            hid_w = tf.Variable(
            tf.truncated_normal(
            [IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], stddev=1.0/IMAGE_PIXELS), name='hid_w')
            hid_b = tf.Variable(tf.zeros([hidden_units]), name='hid_b')
            
        # Variables of the softmax layer
        sm_w = tf.Variable(
        tf.truncated_normal([hidden_units, 10], stddev=1.0/math.sqrt(hidden_units)), name='sm_w')
        sm_b = tf.Variable(tf.zeros([10]), name='sm_b')
        
        # Input
        x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
        y_ = tf.placeholder(tf.float32, [None, 10])
        global_x.append(x)
        global_y_.append(y_)
        
        hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
        hid = tf.nn.relu(hid_lin)
        
        y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
        cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
        
        opt = tf.train.AdamOptimizer(0.01)
        train_step = opt.minimize(cross_entropy, global_step=global_step)
        global_train_step.append(train_step)
        
        init_op = tf.global_variables_initializer()

In [9]:
sess.run(init_op)

In [10]:
for _ in range(100):
    train_feed = {}
    for i in range(num_worker):
        train_feed[global_x[i]], train_feed[global_y_[i]] = mnist_data.train.next_batch(100)
    _, step = sess.run([global_train_step, global_step], feed_dict=train_feed)

In [11]:
# Test trained model
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
print(sess.run(accuracy, feed_dict={x: mnist_data.test.images, y_: mnist_data.test.labels}))

0.9334


In [12]:
sess.close()