## TensorFlow Model

In [1]:
def map_fun(args, ctx):
    """Training/Inference Function executed by parameter-servers and workers in distributed TFOS"""
    from tensorflowonspark import TFNode
    from datetime import datetime
    import os
    import tensorflow as tf
    import time
    from hops import tensorboard
    from datetime import datetime
    import pydoop.hdfs as pyhdfs
    startTime= datetime.now()

    # Constants
    NUM_FEATURES = 3
    NUM_CLASSES = 7
    SEQUENCE_SIZE = 200
    NUM_HIDDEN_UNITS = 64
    TEST_SIZE = 130622
    TRAIN_SIZE = 522490
    LEARNING_RATE = args.learningrate

    def print_log(worker_num, arg):
        print("Worker {0}: {1}".format(worker_num, arg))

    # Cluster parameters
    worker_num = ctx.worker_num
    job_name = ctx.job_name
    task_index = ctx.task_index
    cluster_spec = ctx.cluster_spec
    print_log(worker_num, "task_index: {0}, job_name {1}, cluster_spec: {2}".format(task_index, job_name, cluster_spec))
    num_workers = len(cluster_spec['worker'])

    # Delay PS nodes a bit, since workers seem to reserve GPUs more quickly/reliably (w/o conflict)
    if job_name == "ps":
        time.sleep((worker_num + 1) * 5)

    batch_size = 1024
    print_log(worker_num, "batch_size: {0}".format(batch_size))

    # Get TF cluster and server instances
    cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)

    def read_csv_features(feature_dir, batch_size=100, num_epochs=None, task_index=None, num_workers=None):
        """ Reads pre-processed and parallelized CSV files from disk into TF-HDFS queues"""
        print_log(worker_num, "num_epochs: {0}".format(num_epochs))

        # Setup queue of csv feature filenames
        tf_record_pattern = os.path.join(feature_dir, 'part-*')
        features = tf.gfile.Glob(tf_record_pattern)
        print_log(worker_num, "features: {0}".format(features))
        feature_queue = tf.train.string_input_producer(features, shuffle=False, capacity=1000, num_epochs=num_epochs,
                                                       name="feature_queue")
        # Setup reader for feature queue
        feature_reader = tf.TextLineReader(name="feature_reader")
        _, feat_csv = feature_reader.read(feature_queue)
        feature_defaults = [[1.0] for col in range(SEQUENCE_SIZE * NUM_FEATURES)]
        feature = tf.stack(tf.decode_csv(feat_csv, feature_defaults), name="input_features")
        print_log(worker_num, "features: {0}, shape: {1}".format(feature, feature.shape))

        # Return a batch of examples
        return tf.train.batch([feature], batch_size, num_threads=1, name="batch_csv")

    def read_csv_labels(label_dir, batch_size=10, num_epochs=None, task_index=None, num_workers=None):
        """ Reads pre-processed and parallelized CSV files from disk into TF-HDFS queues"""
        print_log(worker_num, "num_epochs: {0}".format(num_epochs))

        # Setup queue of csv label filenames
        tf_record_pattern = os.path.join(label_dir, 'part-*')
        labels = tf.gfile.Glob(tf_record_pattern)
        print_log(worker_num, "labels: {0}".format(labels))
        label_queue = tf.train.string_input_producer(labels, shuffle=False, capacity=1000, num_epochs=num_epochs,
                                                     name="label_queue")
        # Setup reader for label queue
        label_reader = tf.TextLineReader(name="label_reader")
        _, label_csv = label_reader.read(label_queue)
        label_defaults = [tf.constant([], dtype=tf.int64)]
        label = tf.stack(tf.decode_csv(label_csv, label_defaults), name="input_labels")
        print_log(worker_num, tf.shape(label))
        print_log(worker_num, "label: {0}".format(label))

        # Return a batch of examples
        return tf.train.batch([label], batch_size, num_threads=1, name="label_batch_csv")

    if job_name == "ps":
        print_log(worker_num, "Parameter Server Joining")
        server.join()

    elif job_name == "worker":
        print_log(worker_num, "worker {0} starting")

        with tf.device(tf.train.replica_device_setter(
                worker_device="/job:worker/task:%d" % task_index,
                cluster=cluster)):

            ################  The computational graph ########################

            # Queue parameters
            num_epochs = 1 if args.mode == "inference" else None if args.epochs == 0 else args.epochs
            index = task_index if args.mode == "inference" else None
            workers = num_workers if args.mode == "inference" else None

            # Placeholders or QueueRunner/Readers for input data
            features = TFNode.hdfs_path(ctx, args.features)  # input csv files
            labels = TFNode.hdfs_path(ctx, args.labels)  # input csv files
            test_features = TFNode.hdfs_path(ctx, args.testfeatures)  # input csv files
            test_labels = TFNode.hdfs_path(ctx, args.testlabels)  # input csv files

            # Read input from queues
            x = read_csv_features(features, batch_size, num_epochs, index, workers)
            x = tf.reshape(x, [x.shape[0].value, SEQUENCE_SIZE, NUM_FEATURES])
            y = read_csv_labels(labels, batch_size, num_epochs, index, workers)
            x_test = read_csv_features(test_features, batch_size, num_epochs, index, workers)
            x_test = tf.reshape(x_test, [x_test.shape[0].value, SEQUENCE_SIZE, NUM_FEATURES])
            y_test = read_csv_labels(test_labels, batch_size, num_epochs, index, workers)

            # First FCC layer
            W = {
                'hidden': tf.Variable(tf.random_normal([NUM_FEATURES, NUM_HIDDEN_UNITS])),
                'output': tf.Variable(tf.random_normal([NUM_HIDDEN_UNITS, NUM_CLASSES]))
            }
            biases = {
                'hidden': tf.Variable(tf.random_normal([NUM_HIDDEN_UNITS], mean=1.0)),
                'output': tf.Variable(tf.random_normal([NUM_CLASSES]))
            }

            # Reshape for convenience
            X = tf.transpose(x, [1, 0, 2])
            X = tf.reshape(X, [-1, NUM_FEATURES])
            X_test = tf.transpose(x_test, [1, 0, 2])
            X_test = tf.reshape(X_test, [-1, NUM_FEATURES])

            # Output from first FCC layer, split into sequences for truncated backprop
            hidden = tf.nn.relu(tf.matmul(X, W['hidden']) + biases['hidden'])
            hidden = tf.split(hidden, SEQUENCE_SIZE, 0)
            hidden_test = tf.nn.relu(tf.matmul(X_test, W['hidden']) + biases['hidden'])
            hidden_test = tf.split(hidden_test, SEQUENCE_SIZE, 0)

            # Stack 2 LSTM layers
            lstm_layers = [tf.contrib.rnn.BasicLSTMCell(NUM_HIDDEN_UNITS, forget_bias=1.0) for _ in range(2)]
            lstm_layers = tf.contrib.rnn.MultiRNNCell(lstm_layers)

            # Get output from LSTM layers
            outputs, _ = tf.contrib.rnn.static_rnn(lstm_layers, hidden, dtype=tf.float32)
            outputs_test, _ = tf.contrib.rnn.static_rnn(lstm_layers, hidden_test, dtype=tf.float32)

            # Get output for the last time step
            lstm_last_output = outputs[-1]
            lstm_last_output_test = outputs_test[-1]

            # Output from second FCC layer, aka logits
            logits = tf.matmul(lstm_last_output, W['output']) + biases['output']
            logits_test = tf.matmul(lstm_last_output_test, W['output']) + biases['output']

            # Global step to keep track of how long training have proceeded
            global_step = tf.Variable(0, name="global_step", trainable=False)

            # Make predictions with softmax + argmax
            softmax_prediction = tf.nn.softmax(logits, name="prediction")
            prediction = tf.argmax(softmax_prediction, 1)
            softmax_prediction_test = tf.nn.softmax(logits_test, name="prediction_test")
            prediction_test = tf.argmax(softmax_prediction_test, 1)

            # L2 Regularization
            L2_LOSS = 0.0015
            l2 = L2_LOSS * sum(tf.nn.l2_loss(tf_var) for tf_var in tf.trainable_variables())

            # Cross entropy loss + L2 regularization
            loss = tf.reduce_mean(
                tf.nn.softmax_cross_entropy_with_logits(
                    labels=tf.one_hot(tf.reshape(y, [-1]), NUM_CLASSES),
                    logits=logits))
            loss_test = tf.reduce_mean(
                tf.nn.softmax_cross_entropy_with_logits(
                    labels=tf.one_hot(tf.reshape(y_test, [-1]), NUM_CLASSES),
                    logits=logits_test))
            loss_reg = loss + l2
            tf.summary.scalar("loss_test", loss_test)  # for tensorboard

            # Define optimizer
            train_step = tf.train.AdamOptimizer(learning_rate=LEARNING_RATE).minimize(
                loss_reg,
                global_step=global_step)

            # Test trained model
            correct_prediction = tf.equal(prediction, tf.argmax(tf.one_hot(tf.reshape(y, [-1]), NUM_CLASSES), 1))
            correct_prediction_test = tf.equal(prediction_test, tf.argmax(tf.one_hot(tf.reshape(y_test, [-1]), NUM_CLASSES),1))
            accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy")
            accuracy_test = tf.reduce_mean(tf.cast(correct_prediction_test, tf.float32), name="accuracy_test")
            tf.summary.scalar("acc_test", accuracy_test)  # for tensorboard

            # Utility stuff tensorboard and logging
            saver = tf.train.Saver()
            summary_op = tf.summary.merge_all()
            init_op = tf.global_variables_initializer()
            logdir = tensorboard.logdir()
            display_step = 200
            history = dict(test_loss=[], test_acc=[], log = [])
            print_log(worker_num, "tensorflow model path: {0}".format(logdir))

            # Setup Supervisor
            if job_name == "worker" and task_index == 0:
                summary_writer = tf.summary.FileWriter(logdir, graph=tf.get_default_graph())

            if args.mode == "train":
                sv = tf.train.Supervisor(is_chief=(task_index == 0),
                                         logdir=logdir,
                                         init_op=init_op,
                                         summary_op=None,
                                         summary_writer=None,
                                         saver=saver,
                                         global_step=global_step,
                                         stop_grace_secs=300,
                                         save_model_secs=10)
            else:
                sv = tf.train.Supervisor(is_chief=(task_index == 0),
                                         logdir=logdir,
                                         summary_op=None,
                                         saver=saver,
                                         global_step=global_step,
                                         stop_grace_secs=300,
                                         save_model_secs=0)

    # Run the graph for NUM_STEPS, compute test accuracy incrementally
    # The supervisor takes care of session initialization, restoring from
    # a checkpoint, and closing when done or an error occurs.
    with sv.managed_session(server.target) as sess:
        """Asynchronous SGD training with supervisor"""
        print_log(worker_num, "session ready, starting training")
        step = 0
        chief_count=0
        while not sv.should_stop() and step < args.steps:
            if args.mode == "train":
                _, summary, step = sess.run([train_step, summary_op, global_step])
                if(step % 50 == 0):
                    print("step: {0}".format(step))
                if sv.is_chief and chief_count % (display_step//num_workers) == 0:
                    test_a, test_l = sess.run([accuracy_test, loss_test])
                    result = "step: {0}, test acc: {1}, test loss: {2}".format(step, test_a, test_l)
                    history['test_loss'].append(test_l)
                    history['test_acc'].append(test_a)
                    history['log'].append(result)
                    print_log(worker_num, result)
                    summary_writer.add_summary(summary, step)
                if sv.is_chief:
                    chief_count = chief_count+1
            else:
                print_log(worker_num, "inference not supported, do it locally with saved model instead")

        if args.mode == "inference":
            print_log(worker_num, "inference not supported, do it locally with saved model instead")

        if sv.is_chief:
            print_log(worker_num, "Saving session stats")
            endTime = datetime.now()
            timeElapsed= endTime-startTime
            accs = "\n".join(str(x) for x in history["test_acc"])
            loss = "\n".join(str(x) for x in history["test_loss"])
            logs = "\n".join(str(x) for x in history["log"])
            pyhdfs.dump(accs, args.output + "/accuracy")
            pyhdfs.dump(loss, args.output + "/loss")
            pyhdfs.dump(logs, args.output + "/log")
            time = "start: " + str(startTime) + "\nend: " + str(endTime) + "\nduration: " + str(timeElapsed)
            pyhdfs.dump(time, args.output + "/time")
            summ = tf.summary.FileWriter(logdir, graph=tf.get_default_graph())
            summ.flush()

        # Ask for all the services to stop.
        print("{0} stopping supervisor".format(datetime.now().isoformat()))
        sv.stop()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3070,application_1513605045578_0305,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


## Spark Cluster Setup For Training

In [None]:
from pyspark.context import SparkContext
from pyspark.conf import SparkConf

import argparse
import os
import numpy
import sys
import tensorflow as tf
import threading
from datetime import datetime
from hops import util
from hops import hdfs

from tensorflowonspark import TFCluster

project_path = "/Projects/" + hdfs.project_name()

TRAIN_FEATURES_PATH = project_path + "/HAR_Dataset/cleaned_data_parallel/train/features"
TRAIN_LABELS_PATH = project_path + "/HAR_Dataset/cleaned_data_parallel/train/labels"
TEST_FEATURES_PATH = project_path + "/HAR_Dataset/cleaned_data_parallel/test/features"
TEST_LABELS_PATH = project_path + "/HAR_Dataset/cleaned_data_parallel/test/labels"
OUTPUT_PATH = project_path + "/HAR_Dataset/output/dist_async"

sc = spark.sparkContext
num_executors = util.num_executors(spark)
num_ps = util.num_param_servers(spark)

parser = argparse.ArgumentParser()
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1000000)
parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv")
parser.add_argument("-fe", "--features", help="HDFS path to MNIST images in parallelized format", default=TRAIN_FEATURES_PATH)
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format", default=TRAIN_LABELS_PATH)
parser.add_argument("-tf", "--testfeatures", help="HDFS path to features in parallelized format", default=TEST_FEATURES_PATH)
parser.add_argument("-tl", "--testlabels", help="HDFS path to labels in parallelized format", default=TEST_LABELS_PATH)
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default=OUTPUT_PATH)
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster (for Spark Standalone)", type=int, default=num_executors)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default=OUTPUT_PATH)
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=10001)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("-X", "--mode", help="train|inference", default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)
parser.add_argument("-lr", "--learningrate", help="number of epochs", type=float, default=0.00025)
args = parser.parse_args()

print("{0} ===== Start".format(datetime.now().isoformat()))

cluster = TFCluster.run(sc, map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW)
cluster.shutdown()

print("{0} ===== Stop".format(datetime.now().isoformat()))