## Imports

In [1]:
import argparse
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from hops import hdfs
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from tensorflowonspark import TFCluster
from hops import util

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2451,application_1512575073636_1010,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


## Constants

In [2]:
project_path = "/Projects/" + hdfs.project_name()

TRAIN_FEATURES_PATH = project_path + "/HAR_Dataset/cleaned_data/train/features"
TRAIN_LABELS_PATH = project_path + "/HAR_Dataset/cleaned_data/train/labels"
TEST_FEATURES_PATH = project_path + "/HAR_Dataset/cleaned_data/test/features"
TEST_LABELS_PATH = project_path + "/HAR_Dataset/cleaned_data/test/labels"

sc = spark.sparkContext
sql = SQLContext(sc)

## Read Data

In [3]:
def parse_args(num_executors):
    "setup parser if running from CLI"
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', "--cluster", action='store_true', default=False)
    parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
    parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
    parser.add_argument("-X", "--mode", help="train|inference", default="train")
    parser.add_argument("-f", "--features", help="HDFS path to features in parallelized format", default=TRAIN_FEATURES_PATH)
    parser.add_argument("-l", "--labels", help="HDFS path to labels in parallelized format", default=TRAIN_LABELS_PATH)
    parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default=project_path + "/HAR_Dataset/saved_model/saved_model")
    parser.add_argument("-r", "--rdma", help="use rdma connection", default=False)
    parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default=project_path + "/HAR_Dataset/predictions")
    parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=100000)
    parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100)
    parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=100)
    parser.add_argument("-lr", "--learningrate", help="number of epochs", type=float, default=0.00025)
    args = parser.parse_args()
    return args

def training_args(args):
    "helper function for default train parameters"
    args.features = TRAIN_FEATURES_PATH
    args.labels = TRAIN_LABELS_PATH
    args.mode = "train"
    args.steps = 100000
    args.batch_size = 100
    args.epochs=100
    return args
    
def test_args(args):
    "helper function for default test parameters"
    args.features = TEST_FEATURES_PATH
    args.labels = TEST_LABELS_PATH
    args.mode = "inference"
    args.steps = 100000
    args.batch_size = 100
    args.epochs=100
    return args

## TensorFlow Model

In [7]:
def map_fun(args, ctx):
    """Training/Inference Function executed by parameter-servers and workers in distributed TFOS"""
    NUM_FEATURES = 3
    NUM_CLASSES = 7
    SEQUENCE_SIZE = 10
    NUM_HIDDEN_UNITS = 64

    def print_log(worker_num, arg):
        print("Worker {0}: {1}".format(worker_num, arg))

    from tensorflowonspark import TFNode
    from datetime import datetime
    import getpass
    import math
    import numpy
    import os
    import signal
    import tensorflow as tf
    import time
    # Used to get TensorBoard logdir for TensorBoard that show up in HopsWorks
    from hops import tensorboard

    worker_num = ctx.worker_num
    job_name = ctx.job_name
    task_index = ctx.task_index
    cluster_spec = ctx.cluster_spec
    print_log(worker_num, "task_index: {0}, job_name {1}, cluster_spec: {2}".format(task_index, job_name, cluster_spec))
    num_workers = len(cluster_spec['worker'])

    # Delay PS nodes a bit, since workers seem to reserve GPUs more quickly/reliably (w/o conflict)
    if job_name == "ps":
        time.sleep((worker_num + 1) * 5)

    batch_size = args.batch_size
    print_log(worker_num, "batch_size: {0}".format(batch_size))

    # Get TF cluster and server instances
    cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)
    
    def read_csv_features(feature_dir, batch_size=100, num_epochs=None, task_index=None, num_workers=None):
        """ Reads pre-processed and parallelized CSV files from disk into TF-HDFS queues"""
        print_log(worker_num, "num_epochs: {0}".format(num_epochs))

        # Setup queue of csv feature filenames
        tf_record_pattern = os.path.join(feature_dir, 'part-*')
        features = tf.gfile.Glob(tf_record_pattern)
        print_log(worker_num, "features: {0}".format(features))
        feature_queue = tf.train.string_input_producer(features, shuffle=False, capacity=1000, num_epochs=num_epochs,
                                                       name="feature_queue")
        # Setup reader for feature queue
        feature_reader = tf.TextLineReader(name="feature_reader")
        _, feat_csv = feature_reader.read(feature_queue)
        feature_defaults = [[1.0] for col in range(NUM_FEATURES)]
        feature = tf.stack(tf.decode_csv(feat_csv, feature_defaults), name="input_features")
        print_log(worker_num, "features: {0}, shape: {1}".format(feature, feature.shape))

        # Return a batch of examples
        return tf.train.batch([feature], batch_size, num_threads=10, name="batch_csv")

    def read_csv_labels(label_dir, batch_size=10, num_epochs=None, task_index=None, num_workers=None):
        """ Reads pre-processed and parallelized CSV files from disk into TF-HDFS queues"""
        print_log(worker_num, "num_epochs: {0}".format(num_epochs))

        # Setup queue of csv label filenames
        tf_record_pattern = os.path.join(label_dir, 'part-*')
        labels = tf.gfile.Glob(tf_record_pattern)
        print_log(worker_num, "labels: {0}".format(labels))
        label_queue = tf.train.string_input_producer(labels, shuffle=False, capacity=1000, num_epochs=num_epochs,
                                                     name="label_queue")

        # Setup reader for label queue
        label_reader = tf.TextLineReader(name="label_reader")
        _, label_csv = label_reader.read(label_queue)
        label_defaults = [tf.constant([], dtype=tf.int64)]
        label = tf.stack(tf.decode_csv(label_csv, label_defaults), name = "input_labels")
        print_log(worker_num, tf.shape(label))
        print_log(worker_num, "label: {0}".format(label))

        # Return a batch of examples
        return tf.train.batch([label], batch_size, num_threads=10, name="label_batch_csv")
    
    if job_name == "ps":
        print_log(worker_num, "Parameter Server Joining")
        server.join()

    elif job_name == "worker":
        print_log(worker_num, "worker {0} starting")

        with tf.device(tf.train.replica_device_setter(
                worker_device="/job:worker/task:%d" % task_index,
                cluster=cluster)):

            def build_graph(X):
                """Builds the computational graph of the model"""
                
                print_log(worker_num, "build graph, input shape: {0}".format(X.shape))
                
                W = {
                    'hidden': tf.Variable(tf.random_normal([NUM_FEATURES, NUM_HIDDEN_UNITS])),
                    'output': tf.Variable(tf.random_normal([NUM_HIDDEN_UNITS, NUM_CLASSES]))
                }
                biases = {
                        'hidden': tf.Variable(tf.random_normal([NUM_HIDDEN_UNITS], mean=1.0)),
                        'output': tf.Variable(tf.random_normal([NUM_CLASSES]))
                }
    
                #X = tf.transpose(inputs, [1, 0, 2])
                #X = tf.reshape(X, [-1, N_FEATURES])
                hidden = tf.nn.relu(tf.matmul(X, W['hidden']) + biases['hidden'])
                hidden = tf.split(hidden, SEQUENCE_SIZE, 0)

                # Stack 2 LSTM layers
                lstm_layers = [tf.contrib.rnn.BasicLSTMCell(NUM_HIDDEN_UNITS, forget_bias=1.0) for _ in range(2)]
                lstm_layers = tf.contrib.rnn.MultiRNNCell(lstm_layers)

                outputs, _ = tf.contrib.rnn.static_rnn(lstm_layers, hidden, dtype=tf.float32)

                # Get output for the last time step
                lstm_last_output = outputs[-1]
                logits = tf.matmul(lstm_last_output, W['output']) + biases['output']
                return logits

            def define_optimizer(logits, labels, LEARNING_RATE):
                """Defines the optimizer of the model and calculates step, loss, prediction, accuracy"""
                
                print_log(worker_num, "define optimizer, labels shape: {0}, logits shape: {1}".format(labels.shape, logits.shape))
                
                #Global step to keep track of how long training have proceeded, 
                #incremented by one for each gradient computation
                global_step = tf.Variable(0, name="global_step", trainable=False)
                
                softmax_prediction = tf.nn.softmax(logits, name="prediction")
                prediction = tf.argmax(softmax_prediction, 1)
                
                #L2 Regularization
                L2_LOSS = 0.0015
                l2 = L2_LOSS * sum(tf.nn.l2_loss(tf_var) for tf_var in tf.trainable_variables())
                
                # Define loss and optimizer
                cross_entropy = tf.reduce_mean(
                    tf.nn.sparse_softmax_cross_entropy_with_logits(
                        labels=tf.reshape(labels, [-1]), 
                        logits=logits)) + l2
                tf.summary.scalar("loss", cross_entropy) #for tensorboard
                
                opt = tf.train.GradientDescentOptimizer(0.5)
                opt = tf.train.SyncReplicasOptimizer(opt, replicas_to_aggregate=num_workers,
                                                     total_num_replicas=num_workers)
                
                train_step = opt.minimize(cross_entropy, global_step=global_step)
                
                # Test trained model
                correct_prediction = tf.equal(prediction, labels)
                accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy")
                tf.summary.scalar("acc", accuracy) #for tensorboard
                sync_replicas_hook = opt.make_session_run_hook(task_index == 0)
                chief_queue_runner = opt.get_chief_queue_runner()
                return train_step, accuracy, cross_entropy, global_step, prediction, correct_prediction, sync_replicas_hook, chief_queue_runner

            LEARNING_RATE = args.learningrate
            
            # Placeholders or QueueRunner/Readers for input data
            num_epochs = 1 if args.mode == "inference" else None if args.epochs == 0 else args.epochs
            index = task_index if args.mode == "inference" else None
            workers = num_workers if args.mode == "inference" else None

            features = TFNode.hdfs_path(ctx, args.features) #input csv files
            labels = TFNode.hdfs_path(ctx, args.labels) #input csv files
            
            x = read_csv_features(features, batch_size, num_epochs, index, workers)
            y = read_csv_labels(labels, batch_size/SEQUENCE_SIZE, num_epochs, index, workers)
            print_log(worker_num, "shape: {0}, {1}".format(x.shape, y.shape))
            
            logits = build_graph(x)
            training_step, accuracy, loss, global_step, pred, correct_prediction, sync_replicas_hook, chief_queue_runner = define_optimizer(logits, y, LEARNING_RATE)

            saver = tf.train.Saver()
            summary_op = tf.summary.merge_all()
            init_op = tf.global_variables_initializer()

        logdir = tensorboard.logdir()
        print_log(worker_num, "tensorflow model path: {0}".format(logdir))

        if job_name == "worker" and task_index == 0:
            summary_writer = tf.summary.FileWriter(logdir, graph=tf.get_default_graph())

        if args.mode == "train":
            sv = tf.train.Supervisor(is_chief=(task_index == 0),
                                         logdir=logdir,
                                         init_op=init_op,
                                         summary_op=None,
                                         summary_writer=None,
                                         saver=saver,
                                         global_step=global_step,
                                         stop_grace_secs=300,
                                         save_model_secs=10)
        else:
            sv = tf.train.Supervisor(is_chief=(task_index == 0),
                                         logdir=logdir,
                                         summary_op=None,
                                         saver=saver,
                                         global_step=global_step,
                                         stop_grace_secs=300,
                                         save_model_secs=0)

        output_dir = TFNode.hdfs_path(ctx, args.output)
        output_file = tf.gfile.Open("{0}/part-{1:05d}".format(output_dir, worker_num), mode='w')
        
        model_dir = TFNode.hdfs_path(ctx, args.model)
        
        # The supervisor takes care of session initialization, restoring from
        # a checkpoint, and closing when done or an error occurs.
        with sv.managed_session(server.target) as sess:
            """Asynchronous SGD training with supervisor"""
            if sv.is_chief:
                    sv.start_queue_runners(sess, [chief_queue_runner])
            
            print_log(worker_num, "session ready, starting training")
            # Loop until the supervisor shuts down or maximum steps have completed.
            step = 0
            count = 0
            #sv.start_queue_runners(sess)
            while not sv.should_stop() and step < args.steps:
                # Run a training step asynchronously.
                if args.mode == "train":
                    _, summary, step = sess.run([training_step, summary_op, global_step])
                    # logging.info accuracy and save model checkpoint to HDFS every 100 steps
                    acc = sess.run(accuracy)
                    print_log(worker_num, "step: {0}, acc: {1}".format(step, acc))
                    #if (step % 100 == 0):
                        #preds, acc, losss = sess.run([pred, accuracy, loss])
                        #print_log(worker_num, "step: {0}, acc: {1}, loss: {2}".format(step, acc, losss))
                        #xx,yy,corr_pred = sess.run([x, y, correct_prediction])
                        #print_log(worker_num, "x: {0}, y: {1}".format(xx, yy))
                        #print_log(worker_num, "pred: \n {2} \n corr_pred: \n {0}, \n label: \n {1} \n".format(corr_pred, yy, preds))
                        
                    if sv.is_chief:
                        summary_writer.add_summary(summary, step)
                else:  # args.mode == "inference"
                    print_log(worker_num, "doing inference")
                    label, preds, acc = sess.run([labels, pred, accuracy])
                    for i in range(len(label)):
                        count += 1
                        output_file.write("{0} {1}\n".format(label[i], pred[i]))
                    print_log(worker_num, "count: {0}".format(count))

            if args.mode == "inference":
                output_file.close()
                # Delay chief worker from shutting down supervisor during inference, since it can load model, start session,
                # run inference and request stop before the other workers even start/sync their sessions.
                if task_index == 0:
                    time.sleep(60)
            
            if sv.is_chief:
                save_path = saver.save(sess, model_dir)
                print_log(worker_num, "Model saved in file: {}".format(save_path))
                summ = tf.summary.FileWriter(model_dir, graph=tf.get_default_graph())
                summ.flush()

            # Ask for all the services to stop.
            print("{0} stopping supervisor".format(datetime.now().isoformat()))
            sv.stop()


## Spark Cluster Setup For Training

In [8]:
def spark_setup_cluster_training():
    """Start the cluster training with given parameters"""
    from hops import tensorboard
    num_executors = util.num_executors(spark)
    num_ps = util.num_param_servers(spark)
    args = parse_args(num_executors)
    args = training_args(args)
    #args = test_args(args)
    cluster = TFCluster.run(sc, map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW)
    cluster.shutdown()
    print("Finnished, cluster shutdown")

In [9]:
spark_setup_cluster_training()

KeyboardInterrupt: 