## Training in 'Parallel'
Here we train multiple models with varying hyperparameters. We design the client to be able to distribute runs onto various servers/devices. First we setup a cluster running on three different processes -- in a production situation these would ideally be on different computers to speed up the training process. Next, we create three NNs each one pinned on a seperate task. Finally we train those three NNs -- again in a production environment those NNs would be trained in parallel!


### Tips:
* Pin sessions to tasks on the cluster
* Restart a graph -> restart all active sessions

In [2]:
import tensorflow as tf
import os

#### Load the data

In [49]:
mnist = tf.keras.datasets.mnist
(x_train, y_train),(x_test, y_test) = mnist.load_data()

x_train, y_train = x_train, y_train
x_test, y_test = x_test, y_test

x_train, x_test = x_train / 255.0, x_test / 255.0
x_train, x_test = x_train.reshape(-1,28*28), x_test.reshape(-1,28*28)

data = (x_train,y_train,x_test,y_test)

In [4]:
# Create method for getting batches for training

class mini_batches:
    
    def __init__(self, x, y, size):
        self.x = x
        self.y = y
        self.size = size
        self.index = 0
    
    def next_batch(self):
        if self.index + self.size >= len(self.x):            
            batch_x = self.x[self.index:]
            batch_y = self.y[self.index:]
            self.index = 0
            return batch_x, batch_y
        
        batch_x = self.x[self.index:self.index + self.size]
        batch_y = self.y[self.index:self.index + self.size]
        self.index = self.index + self.size
        return batch_x, batch_y

#### Cluster
Only run once!!

In [5]:
tasks = ["localhost:2222", "localhost:2223","localhost:2224"]
jobs = {"local": tasks}
cluster = tf.train.ClusterSpec(jobs)

In [6]:
server1 = tf.train.Server(cluster, job_name="local", task_index=0)
server2 = tf.train.Server(cluster, job_name="local", task_index=1)
server3 = tf.train.Server(cluster, job_name="local", task_index=2)

In [12]:
def run_with_location_trace(sess, op):
    # From https://stackoverflow.com/a/41525764/7832197
    run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
    run_metadata = tf.RunMetadata()
    sess.run(op, options=run_options, run_metadata=run_metadata)
    for device in run_metadata.step_stats.dev_stats:
      print(device.device)
      for node in device.node_stats:
        print("  ", node.node_name)

#### Graphs

In [51]:
from tensorflow.contrib.layers import fully_connected 
from tensorflow.contrib.layers import batch_norm
from tensorflow.contrib.layers import dropout

def make_network(task,name):
    with tf.device("/job:local/task:{0}".format(task)):
        
        is_training = tf.placeholder(tf.bool, shape=(), name='is_training_{0}'.format(name))
    
        # Inputs for training
        X = tf.placeholder(tf.float32, shape=(None,28*28), name='X_{0}'.format(name))
        y = tf.placeholder(tf.int32, shape=(None), name='y_{0}'.format(name))
        X_drop = dropout(X,.5, is_training=is_training)
    
        # Nueral Network layers
        he_init = tf.contrib.layers.variance_scaling_initializer()
        bn_params = {'is_training':is_training, 'decay':0.99, 'updates_collections':None}
    
        with tf.contrib.framework.arg_scope([fully_connected], weights_initializer=he_init, activation_fn=tf.nn.elu, 
                                                    normalizer_fn=batch_norm, normalizer_params=bn_params):
            h1 = dropout(fully_connected(X_drop, 40, scope='h1_{0}'.format(name)))
            h2 = dropout(fully_connected(h1, 30, scope='h2_{0}'.format(name)))
            h3 = dropout(fully_connected(h2, 20, scope='h3_{0}'.format(name)))
            output = fully_connected(h3, 10, scope='output_{0}'.format(name), activation_fn=None)
        
        # Loss from Network
        x_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=output)
        loss = tf.reduce_mean(x_entropy, name='loss_{0}'.format(name))
    
        # SGD
        optimizer = tf.train.AdamOptimizer()
        train = optimizer.minimize(loss)
    
        # Evaluation of performance
        correct = tf.nn.in_top_k(output, y, 1)
        accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
        
    return train, accuracy, X, y, is_training

In [52]:
tf.reset_default_graph()

In [53]:
t0, a0, X0, y0, b0 = make_network("0","0")
t1, a1, X1, y1, b1 = make_network("1","1")
t2, a2, X2, y2, b2 = make_network("2","2")

#### Sessions

In [55]:
sess1 = tf.Session(server1.target)
sess2 = tf.Session(server2.target)
sess3 = tf.Session(server3.target)

In [56]:
sess1.run(tf.global_variables_initializer())

In [60]:
# Training function

def train_model(sess, train, accuracy, X, y, is_training, data):

    # Mini batches
    batches = mini_batches(data[0], data[1], 500)
    max_acc = 0
    epochs = 0

    # Save model
    saver = tf.train.Saver()

    # SGD Updates
    for index, batch in enumerate(range(50000)):
        batch_x, batch_y = batches.next_batch()
        sess.run(train, feed_dict={X: batch_x, y:batch_y, is_training:True})
        
        # Early stopping and Checkpoint logging
        if index % 500 == 0:
            saver.save(sess, os.path.join(os.getcwd(), 'tensorflow/models/12_deep_learning_parallel.ckpt'))
            
            cur_acc = sess.run(accuracy,feed_dict={X:data[2], y:data[3], is_training:False})
            print(cur_acc)
            if cur_acc > max_acc:
                max_acc = cur_acc
                epochs = 0
            else:
                epochs = epochs + 1
                if epochs > 3:
                    saver.save(sess, os.path.join(os.getcwd(), 'tensorflow/models/12_deep_learning_parallel.ckpt'))
                    break

    # Save final model
    saver.save(sess, os.path.join(os.getcwd(), 'tensorflow/models/12_deep_learning_parallel.ckpt'))

In [62]:
train_model(sess1, t0, a0, X0, y0, b0, data)

0.0973
0.6499
0.7187
0.7352
0.7529
0.755
0.7636
0.7722
0.7752
0.7721
0.7656
0.772
0.7675


In [63]:
train_model(sess2, t1, a1, X1, y1, b1, data)

0.1039
0.657
0.7145
0.7438
0.7468
0.7634
0.7575
0.7639
0.7662
0.7771
0.7742
0.771
0.7748
0.7769


In [64]:
train_model(sess3, t2, a2, X2, y2, b2, data)

0.1074
0.6504
0.7105
0.7296
0.7513
0.757
0.7635
0.762
0.771
0.7713
0.7768
0.7736
0.7763
0.7757
0.7813
0.7804
0.7752
0.777
0.7767


In [54]:
sess1.close()
sess2.close()
sess3.close()