In [1]:
def launch(spark_session, map_fun, args_dict):
    """ Run the wrapper function with each hyperparameter combination as specified by the dictionary

    Args:
      :spark_session: SparkSession object
      :map_fun: The TensorFlow function to run
      :args_dict: A dictionary containing hyperparameter values to insert as arguments for each TensorFlow job
    """

    sc = spark_session.sparkContext

    # Length of the list of the first list of arguments represents the number of Spark tasks
    num_tasks = len(args_dict.values()[0])

    # Create a number of partitions (tasks)
    nodeRDD = sc.parallelize(range(num_tasks), num_tasks)

    # Execute each of the hyperparameter arguments as a task
    nodeRDD.foreachPartition(_do_search(map_fun, args_dict))


def _do_search(map_fun, args_dict):
    
    def _wrapper_fun(iter):

        for i in iter:
            executor_num = i

        argcount = map_fun.func_code.co_argcount
        names = map_fun.func_code.co_varnames

        args = []
        argIndex = 0
        while argcount > 0:
            # Get arguments for hyperparameter combination
            param_name = names[argIndex]
            param_val = args_dict[param_name][executor_num]
            args.append(param_val)
            argcount -= 1
            argIndex += 1
        map_fun(*args)
    return _wrapper_fun

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1513605290137_0001,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [6]:
def mnist(num_steps):
    from tensorflow.examples.tutorials.mnist import input_data
    import tensorflow as tf
    
    mnist = input_data.read_data_sets('/tmp/tensorflow/mnist/input_data', one_hot=True)
    # Create the model
    x = tf.placeholder(tf.float32, [None, 784])
    W = tf.Variable(tf.zeros([784, 10]))
    b = tf.Variable(tf.zeros([10]))
    y = tf.matmul(x, W) + b
    
    # Define loss and optimizer
    y_ = tf.placeholder(tf.float32, [None, 10])
    
    cross_entropy = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=y))
    train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy)
    
    sess = tf.InteractiveSession()
    tf.global_variables_initializer().run()
    for _ in range(num_steps):
        batch_xs, batch_ys = mnist.train.next_batch(100)
        sess.run(train_step, feed_dict={x: batch_xs, y_: batch_ys})
        
    # Test trained model
    correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    print(sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))

In [7]:
args_dict = {'num_steps': [1000, 10000]}
launch(spark, mnist, args_dict)