In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging
import argparse
import subprocess
import shlex
from imp import reload
from tensorflowonspark import TFCluster
from pyspark import SparkContext
from pyspark.conf import SparkConf

In [None]:
# Remove existing models/artifacts if any
subprocess.check_output('hdfs dfs -rm -R -f -skipTrash mnist', shell=True)
subprocess.check_output('hdfs dfs -rm -R -f -skipTrash mnist_model', shell=True)
subprocess.check_output('hdfs dfs -rm -R -f -skipTrash mnist_export', shell=True)
subprocess.check_output('hdfs dfs -rm -R -f -skipTrash predictions', shell=True)
subprocess.check_output('cd $MESOS_SANDBOX && rm -rf mnist tensorflowonspark', shell=True)
subprocess.check_output('cd $MESOS_SANDBOX && rm -f mnist.zip', shell=True)

In [None]:
# Clone repo with adjusted TF 1.11 APIs in mnist_dist/mnist_spark
subprocess.check_output('cd $MESOS_SANDBOX && git clone https://github.com/yahoo/tensorflowonspark', shell=True)

In [None]:
# Download the mnist example 
subprocess.check_output('cd $MESOS_SANDBOX && curl -fsSL -O https://downloads.mesosphere.com/data-science/assets/mnist.zip && unzip mnist.zip', shell=True)

In [None]:
# Create mnist data in tfr format
subprocess.check_output('eval spark-submit ${SPARK_OPTS} --verbose tensorflowonspark/examples/mnist/mnist_data_setup.py --output mnist/tfr --format tfr', shell=True)

In [None]:
# Set the number of executors to the number of available GPU agents
num_ps = 0
num_executors = 5

parser = argparse.ArgumentParser()
parser.add_argument("--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("--cluster_size", help="number of nodes in the cluster (for Spark Standalone)", type=int, default=num_executors)
parser.add_argument("--driver_ps_nodes", help="Run tensorflow PS node on driver locally. You will need to set cluster_size = num_executors + num_ps", default=False)
parser.add_argument("--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("--format", help="example format: (csv2|tfr)", choices=["csv2", "tfr"], default="tfr")
parser.add_argument("--images_labels", help="HDFS path to MNIST image_label files in parallelized format")
parser.add_argument("--mode", help="train|inference", default="train")
parser.add_argument("--model", help="HDFS path to save/load model during train/test", default="mnist_model")
parser.add_argument("--export", help="HDFS path to export saved_model", default="mnist_export")
parser.add_argument("--num_ps", help="number of ps nodes", default=num_ps)
parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("--rdma", help="use rdma connection", default=False)
parser.add_argument("--readers", help="number of reader/enqueue threads per worker", type=int, default=10)
parser.add_argument("--shuffle_size", help="size of shuffle buffer", type=int, default=1000)
parser.add_argument("--steps", help="maximum number of steps", type=int, default=500)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")

In [None]:
# CPU Config
# conf = SparkConf().setAppName('MNIST-CPU') \
#                   .set('spark.mesos.executor.docker.image', 'mesosphere/mesosphere-data-toolkit:latest')

In [None]:
# GPU Config
conf = SparkConf().setAppName('MNIST-GPU') \
                  .set('spark.mesos.executor.docker.image', 'mesosphere/mesosphere-data-toolkit:latest-gpu') \
                  .set('spark.mesos.gpus.max', num_executors) \
                  .set('spark.mesos.executor.gpus', 1)

In [None]:
# Make sure you cloned the repo with the adjusted TF 1.11 APIs in mnist_dist/mnist_spark
sc = SparkContext(conf=conf).getOrCreate()
sc.addPyFile('tensorflowonspark/examples/mnist/tf/mnist_dist.py')

In [None]:
import mnist_dist

In [None]:
reload(logging)
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.INFO, datefmt='%I:%M:%S')

In [None]:
# Verify training images
# Make sure you unzipped mnist.zip into mnist and ran the mnist_data_setup job via: eval spark-submit ${SPARK_OPTS} --verbose $(pwd)/tensorflowonspark/examples/mnist/mnist_data_setup.py --output mnist/tfr --format tfr
train_images_files = "mnist/tfr/train"
print(subprocess.check_output(shlex.split('hdfs dfs -ls -R {}'.format(train_images_files))))

In [None]:
# Parse arguments for training
args = parser.parse_args(['--mode', 'train', 
                          '--epochs', '3',
                          '--batch_size', '100',
                          '--images_labels', train_images_files,
                          '--format', 'tfr',
                          '--steps', '10000',
                          '--model', 'mnist_model'])
print(args)

In [None]:
# Start the cluster for training
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, args.num_ps, False, TFCluster.InputMode.TENSORFLOW, driver_ps_nodes=args.driver_ps_nodes)

In [None]:
cluster.shutdown()

In [None]:
# See if mnist_model was successfully created
print(subprocess.check_output(shlex.split('hdfs dfs -ls mnist_model')))

In [None]:
# Parse arguments for inference
args = parser.parse_args(['--mode', 'inference',
                          '--images_labels', train_images_files,
                          '--output', 'predictions',
                          '--model', 'mnist_model'])
print(args)

In [None]:
# Start the cluster for inference
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, args.num_ps, False, TFCluster.InputMode.TENSORFLOW, driver_ps_nodes=args.driver_ps_nodes)

In [None]:
cluster.shutdown()

In [None]:
predictions = sc.textFile("predictions")

In [None]:
predictions.take(10)

In [None]:
sc.stop()