In [None]:
def mnist_fun(args, ctx):
    
    def print_log(worker_num, fd, arg):
        fd.write("{} - {}\n".format(str(worker_num), str(arg)))

    from tensorflowonspark import TFNode
    from datetime import datetime
    import getpass
    import math
    import numpy
    import os
    import signal
    import tensorflow as tf
    import time
  
    # Used to get TensorBoard logdir for TensorBoard that show up in HopsWorks
    from hops import tensorboard
    from hops import hdfs
    
    # Set up our own log file
    logfile_path = hdfs.project_path() + "Logs/mylogfile.txt"
    fd_log = hdfs.get_fs().open_file(logfile_path, flags='w')
    fd_log.write('Logfile for genre classification')

    IMAGE_PIXELS=128
    worker_num = ctx.worker_num
    job_name = ctx.job_name
    task_index = ctx.task_index
    cluster_spec = ctx.cluster_spec
    num_workers = len(cluster_spec['worker'])

    # Delay PS nodes a bit, since workers seem to reserve GPUs more quickly/reliably (w/o conflict)
    if job_name == "ps":
        time.sleep((worker_num + 1) * 5)

    # Parameters
    hidden_units = 128
    batch_size   = 100

    # Get TF cluster and server instances
    cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)

    def read_tfr_examples(path, batch_size=100, num_epochs=None, task_index=None, num_workers=None):
        print_log(worker_num, fd_log, "num_epochs: {0}".format(num_epochs))

        # Setup queue of TFRecord filenames
        tf_record_pattern = os.path.join(path, 'part-*')
        
        files = tf.gfile.Glob(tf_record_pattern)
        queue_name = "file_queue"

        # split input files across workers, if specified
        if task_index is not None and num_workers is not None:
            num_files = len(files)
            files = files[task_index:num_files:num_workers]
            queue_name = "file_queue_{0}".format(task_index)

        print_log(worker_num, fd_log, "files: {0}".format(files))
        file_queue = tf.train.string_input_producer(files, shuffle=False, capacity=1000, num_epochs=num_epochs, name=queue_name)

        # Setup reader for examples
        reader = tf.TFRecordReader(name="reader")
        _, serialized = reader.read(file_queue)
        feature_def = {'label': tf.FixedLenFeature([6], tf.int64), 'image': tf.FixedLenFeature([16384], tf.int64) }
        features = tf.parse_single_example(serialized, feature_def)
        norm = tf.constant(255, dtype=tf.float32, shape=(16384,))
        image = tf.div(tf.to_float(features['image']), norm)
        print_log(worker_num, fd_log, "image: {0}".format(image))
        label = tf.to_float(features['label'])
        print_log(worker_num, fd_log, "label: {0}".format(label))

        # Return a batch of examples
        return tf.train.batch([image,label], batch_size, num_threads=args.readers, name="batch")

    if job_name == "ps":
        server.join()
    elif job_name == "worker":
        # Assigns ops to the local worker by default.
        with tf.device(tf.train.replica_device_setter(
            worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):
            
            # Placeholders or QueueRunner/Readers for input data
            num_epochs = 1 if args.mode == "inference" else None if args.epochs == 0 else args.epochs
            index = task_index if args.mode == "inference" else None
            workers = num_workers if args.mode == "inference" else None

            # Read input data from TFRecords
            if args.format == "tfr":
                images = TFNode.hdfs_path(ctx, args.images)
                x, y_ = read_tfr_examples(images, 100, num_epochs, index, workers)
            else:
                raise("{0} format not supported for tf input mode".format(args.format))
            
            # Convnet setup
            
            # ORIGINAL FROM DEMO
            # Variables of the hidden layer
            #hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], stddev=1.0 / IMAGE_PIXELS), name="hid_w")
            #hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b")
            #tf.summary.histogram("hidden_weights", hid_w)

            # Variables of the softmax layer
            #sm_w = tf.Variable(tf.truncated_normal([hidden_units, 6], stddev=1.0 / math.sqrt(hidden_units)), name="sm_w")
            #sm_b = tf.Variable(tf.zeros([6]), name="sm_b")
            #tf.summary.histogram("softmax_weights", sm_w)
    
            #x_img = tf.reshape(x, [-1, IMAGE_PIXELS, IMAGE_PIXELS, 1])
            #tf.summary.image("x_img", x_img)

            #hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
            #hid = tf.nn.relu(hid_lin)

            #y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
            
            #global_step = tf.Variable(0)
            
            # The image data input is a 1-d vector of size 128*128 = 16384
            # Reshape to a tensor of the form [batch size, height, width, channel]
            x_img = tf.reshape(x, [-1, IMAGE_PIXELS, IMAGE_PIXELS, 1])
            tf.summary.image("x_img", x_img) # Save to tensorboard summary
            
            pkeep = 0.75
            
            # Weight and bias variables
            # Weights initialized with small random values between -0.2 and +0.2
            # The weight variables from your convolutional layers should follow 4-dimensional tensors of (patch_height, patch_width, input_channels, output_channels)
            # Convolutional layers
            CL1W = tf.Variable(tf.truncated_normal([5, 5, 1, 4], stddev=0.1))
            CL1B = tf.Variable(tf.zeros([4]))
            CL2W = tf.Variable(tf.truncated_normal([5, 5, 4, 8], stddev=0.1))
            CL2B = tf.Variable(tf.zeros([8]))
            CL3W = tf.Variable(tf.truncated_normal([4, 4, 8, 12], stddev=0.1))
            CL3B = tf.Variable(tf.zeros([12]))
            # Fully connected layer
            FCW = tf.Variable(tf.truncated_normal([32 * 32 * 12, 200], stddev=0.1))
            FCB = tf.Variable(tf.zeros([200]))
            # Read out layer
            ROW = tf.Variable(tf.truncated_normal([200, 6], stddev=0.1))
            ROB = tf.Variable(tf.zeros([6]))

            # Convolutional and ReLU layers
            CL1 = tf.nn.conv2d(x_img, CL1W, strides=[1, 1, 1, 1], padding='SAME')
            Y1 = tf.nn.relu(CL1 + CL1B)
            Y1d = tf.nn.dropout(Y1, pkeep)
            CL2 = tf.nn.conv2d(Y1d, CL2W, strides=[1, 2, 2, 1], padding='SAME')
            Y2 = tf.nn.relu(CL2 + CL2B)
            Y2d = tf.nn.dropout(Y2, pkeep)
            CL3 = tf.nn.conv2d(Y2d, CL3W, strides=[1, 2, 2, 1], padding='SAME')
            Y3 = tf.nn.relu(CL3 + CL3B)
            Y3d = tf.nn.dropout(Y3, pkeep)

            # Fully connected layer
            # Reshape to vector
            # Y3.shape = (?, 7, 7, 12)
            #Y3RS = tf.reshape(Y3, [-1, 28 * 28 * 1])
            # 128/2/2 = 32
            Y3RS = tf.reshape(Y3d, [-1, 32 * 32 * 12])
            Y4 = tf.nn.relu(tf.matmul(Y3RS, FCW) + FCB)

            # Read out layer
            y_logits = tf.matmul(Y4, ROW) + ROB
            y = tf.nn.softmax(y_logits)
            
            

            # Loss op and training op
            
            # ORIGINAL FROM DEMO
            #loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
            #tf.summary.scalar("loss", loss)
            #train_op = tf.train.AdagradOptimizer(0.01).minimize(loss, global_step=global_step)

            # WARNING: This op expects unscaled logits, since it performs a softmax on logits internally for efficiency. Do not call this op with the output of softmax, as it will produce incorrect results.
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=y_logits, labels=y_) # calculate cross-entropy with logits
            loss = tf.reduce_mean(cross_entropy)
            tf.summary.scalar("loss", loss)
            global_step = tf.Variable(0, trainable=False)
            train_op = tf.train.AdamOptimizer(0.005).minimize(loss, global_step=global_step)
            
            # Test trained model (accuracy op)
            label = tf.argmax(y_, 1, name="label") # Actual label
            prediction = tf.argmax(y, 1,name="prediction")
            correct_prediction = tf.equal(prediction, label)
            accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy")
            tf.summary.scalar("acc", accuracy)

            # Save, merge summaries and initialize variables
            saver = tf.train.Saver()
            summary_op = tf.summary.merge_all()
            init_op = tf.global_variables_initializer()

            # Create a "supervisor", which oversees the training process and stores model state into HDFS
            logdir = tensorboard.logdir()
            print("tensorflow model path: {0}".format(logdir))

            if job_name == "worker" and task_index == 0:
                summary_writer = tf.summary.FileWriter(logdir, graph=tf.get_default_graph())

            if args.mode == "train":
                sv = tf.train.Supervisor(is_chief=(task_index == 0),
                                       logdir=logdir,
                                       init_op=init_op,
                                       summary_op=None,
                                       summary_writer=None,
                                       saver=saver,
                                       global_step=global_step,
                                       stop_grace_secs=300,
                                       save_model_secs=10)
            else:
                sv = tf.train.Supervisor(is_chief=(task_index == 0),
                                       logdir=logdir,
                                       summary_op=None,
                                       saver=saver,
                                       global_step=global_step,
                                       stop_grace_secs=300,
                                       save_model_secs=0)
            output_dir = TFNode.hdfs_path(ctx, args.output)
            output_file = tf.gfile.Open("{0}/part-{1:05d}".format(output_dir, worker_num), mode='w')

          # The supervisor takes care of session initialization, restoring from
          # a checkpoint, and closing when done or an error occurs.
    with sv.managed_session(server.target) as sess:
        print("{0} session ready".format(datetime.now().isoformat()))

        # Loop until the supervisor shuts down or 1000000 steps have completed.
        step = 0
        count = 0
        while not sv.should_stop() and step < args.steps:
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.

            # using QueueRunners/Readers
            if args.mode == "train":
                if (step % 100 == 0):
                    print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy)))
                _, summary, step = sess.run([train_op, summary_op, global_step])
                if sv.is_chief:
                    summary_writer.add_summary(summary, step)
            else: # args.mode == "inference"
                labels, pred, acc = sess.run([label, prediction, accuracy])
                #print("label: {0}, pred: {1}".format(labels, pred))
                print("acc: {0}".format(acc))
                for i in range(len(labels)):
                    count += 1
                    output_file.write("{0} {1}\n".format(labels[i], pred[i]))
                print("count: {0}".format(count))
        

        if args.mode == "inference":
            output_file.close()
            # Delay chief worker from shutting down supervisor during inference, since it can load model, start session,
            # run inference and request stop before the other workers even start/sync their sessions.
        if task_index == 0:
            time.sleep(60)

        # Ask for all the services to stop.
        print("{0} stopping supervisor".format(datetime.now().isoformat()))
        sv.stop()
        
    fd_log.close()

In [None]:
from pyspark.context import SparkContext
from pyspark.conf import SparkConf

import argparse
import os
import numpy
import sys
import tensorflow as tf
import threading
from datetime import datetime
from hops import util
from hops import hdfs

from tensorflowonspark import TFCluster

sc = spark.sparkContext
num_executors = util.num_executors(spark)
num_ps = util.num_param_servers(spark)

# hdfs_project_path = "hdfs:///Projects/genre_classifier_2/"
hdfs_tfrecords_path = "/Projects/genre_classifier_2/Spectrograms/tfrecords/training"

parser = argparse.ArgumentParser()
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, 
                    default=0)
parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], 
                    default="tfr")
parser.add_argument("-i", "--images", help="HDFS path to images in parallelized format", 
                    default= hdfs_tfrecords_path)
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format", 
                    default = hdfs_tfrecords_path)
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/test", 
                    default="genre_classification_model")
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster (for Spark Standalone)", type=int, 
                    default=num_executors)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", 
                    default="predictions")
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, 
                    default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, 
                    default=1000)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("-X", "--mode", help="train|inference", 
                    default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", 
                    default=False)
args = parser.parse_args()
print("args:",args)


print("{0} ===== Start".format(datetime.now().isoformat()))

cluster = TFCluster.run(sc, mnist_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW)
cluster.shutdown()

print("{0} ===== Stop".format(datetime.now().isoformat()))