Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYSTEMML-1451][Phase 2] Decouple Scripts and HDFS support #575

Closed
261 changes: 107 additions & 154 deletions bin/systemml-spark-submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,167 +21,120 @@
# -------------------------------------------------------------

import os
import sys
from os.path import join, exists, abspath
from os import environ
import glob
from os.path import join, exists, abspath
import argparse
import shutil
import platform

if environ.get('SPARK_HOME') is None:
print('SPARK_HOME not set')
sys.exit(1)
else:
spark_home = environ.get('SPARK_HOME')
spark_path = join(spark_home, 'bin', 'spark-submit')
from utils import get_env, find_script_file, log4j_path, config_path


# error help print
def print_usage_and_exit():
print('Usage: ./systemml-spark-submit.py -f <dml-filename> [arguments]')
sys.exit(1)

cparser = argparse.ArgumentParser(description='System-ML Spark Submit Script')

# SPARK-SUBMIT Options
cparser.add_argument('--master', default='local[*]', help='local, yarn-client, yarn-cluster', metavar='')
cparser.add_argument('--driver-memory', default='5G', help='Memory for driver (e.g. 512M)', metavar='')
cparser.add_argument('--num-executors', default='2', help='Number of executors to launch', metavar='')
cparser.add_argument('--executor-memory', default='2G', help='Memory per executor', metavar='')
cparser.add_argument('--executor-cores', default='1', help='Number of cores', metavar='')
cparser.add_argument('--conf', help='Spark configuration file', nargs='+', metavar='')

# SYSTEM-ML Options
cparser.add_argument('-nvargs', help='List of attributeName-attributeValue pairs', nargs='+', metavar='')
cparser.add_argument('-args', help='List of positional argument values', metavar='', nargs='+')
cparser.add_argument('-config', help='System-ML configuration file (e.g SystemML-config.xml)', metavar='')
cparser.add_argument('-exec', default='hybrid_spark', help='System-ML backend (e.g spark, spark-hybrid)', metavar='')
cparser.add_argument('-explain', help='explains plan levels can be hops, runtime, '
'recompile_hops, recompile_runtime', nargs='?', const='runtime', metavar='')
cparser.add_argument('-debug', help='runs in debug mode', action='store_true')
cparser.add_argument('-stats', help='Monitor and report caching/recompilation statistics, '
'heavy hitter <count> is 10 unless overridden', nargs='?', const='10', metavar='')
cparser.add_argument('-gpu', help='uses CUDA instructions when reasonable, '
'set <force> option to skip conservative memory estimates '
'and use GPU wherever possible', nargs='?')
cparser.add_argument('-f', required=True, help='specifies dml/pydml file to execute; '
'path can be local/hdfs/gpfs', metavar='')

args = cparser.parse_args()

# Optional arguments
ml_options = []
if args.nvargs is not None:
ml_options.append('-nvargs')
ml_options.append(' '.join(args.nvargs))
if args.args is not None:
ml_options.append('-args')
ml_options.append(' '.join(args.args))
if args.debug is not False:
ml_options.append('-debug')
if args.explain is not None:
ml_options.append('-explain')
ml_options.append(args.explain)
if args.gpu is not None:
ml_options.append('-gpu')
ml_options.append(args.gpu)
if args.stats is not None:
ml_options.append('-stats')
ml_options.append(args.stats)

# Assign script file to name received from argparse module
script_file = args.f

# find the systemML root path which contains the bin folder, the script folder and the target folder
# tolerate path with spaces
script_dir = os.path.dirname(os.path.realpath(__file__))
project_root_dir = os.path.dirname(script_dir)
user_dir = os.getcwd()

scripts_dir = join(project_root_dir, 'scripts')
build_dir = join(project_root_dir, 'target')
lib_dir = join(build_dir, 'lib')

systemml_jar = build_dir + os.sep + "SystemML.jar"
jcuda_jars = glob.glob(lib_dir + os.sep + "jcu*.jar")
target_jars = ','.join(jcuda_jars) # Include all JCuda Jars

log4j_properties_path = join(project_root_dir, 'conf', 'log4j.properties.template')

build_err_msg = 'You must build the project before running this script.'
build_dir_err_msg = 'Could not find target directory ' + build_dir + '. ' + build_err_msg

# check if the project had been built and the jar files exist
if not (exists(build_dir)):
print(build_dir_err_msg)
sys.exit(1)

print('================================================================================')

# if the present working directory is the project root or bin folder, then use the temp folder as user.dir
if user_dir == project_root_dir or user_dir == join(project_root_dir, 'bin'):
user_dir = join(project_root_dir, 'temp')
print('Output dir: ' + user_dir)

# if the SystemML-config.xml does not exist, create it from the template
systemml_config_path = join(project_root_dir, 'conf', 'SystemML-config.xml')
systemml_template_config_path = join(project_root_dir, 'conf', 'SystemML-config.xml.template')
if not (exists(systemml_config_path)):
shutil.copyfile(systemml_template_config_path, systemml_config_path)
print('... created ' + systemml_config_path)

# if SystemML-config.xml is provided as arguments
if args.config is None:
systemml_config_path_arg = systemml_config_path
else:
systemml_config_path_arg = args.config


# from http://stackoverflow.com/questions/1724693/find-a-file-in-python
def find_file(name, path):
for root, dirs, files in os.walk(path):
if name in files:
return join(root, name)
return None

# if the script file path was omitted, try to complete the script path
if not (exists(script_file)):
script_file_name = abspath(script_file)
script_file_found = find_file(script_file, scripts_dir)
if script_file_found is None:
print('Could not find DML script: ' + script_file)
print_usage_and_exit()
else:
script_file = script_file_found
print('DML Script:' + script_file)

default_conf = 'spark.driver.extraJavaOptions=-Dlog4j.configuration=file:{}'.format(log4j_properties_path)
def default_jars(systemml_home):
"""
return: String
Location of systemml and jcuda jars
"""
build_dir = join(systemml_home, 'target')
lib_dir = join(build_dir, 'lib')
systemml_jar = build_dir + os.sep + "SystemML.jar"
jcuda_jars = glob.glob(lib_dir + os.sep + "jcu*.jar")
target_jars = ','.join(jcuda_jars)
return target_jars, systemml_jar

# Backslash problem in windows.
if platform.system() == 'Windows':
default_conf = default_conf.replace('\\', '//')

if args.conf is not None:
conf = ' --conf '.join(args.conf + [default_conf])
else:
conf = default_conf
def spark_submit_entry(master, driver_memory, num_executors, executor_memory,
executor_cores, conf,
nvargs, args, config, explain, debug, stats, gpu, f):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a little comment about this function.

cmd_spark = [spark_path, '--class', 'org.apache.sysml.api.DMLScript',
'--master', args.master, '--driver-memory', args.driver_memory,
'--num-executors', args.num_executors, '--executor-memory', args.executor_memory,
'--executor-cores', args.executor_cores, '--conf', conf, '--jars', target_jars,
systemml_jar]

cmd_system_ml = ['-config', systemml_config_path_arg,
'-exec', vars(args)['exec'], '-f', script_file, ' '.join(ml_options)]
spark_home, systemml_home = get_env()
spark_path = join(spark_home, 'bin', 'spark-submit')
script_file = find_script_file(systemml_home, f)

cmd = cmd_spark + cmd_system_ml
# Jars
cuda_jars, systemml_jars = default_jars(systemml_home)

return_code = os.system(' '.join(cmd))
# For debugging
# print(' '.join(cmd))
# Log4j
log4j = log4j_path(systemml_home)
log4j_properties_path = 'spark.driver.extraJavaOptions=-Dlog4j.configuration=file:{}'.format(log4j)
if conf is None:
default_conf = log4j_properties_path
else:
default_conf = ' --conf '.join(conf + [default_conf])

if return_code != 0:
print('Failed to run SystemML. Exit code :' + str(return_code))
print(' '.join(cmd))
# Config
if config is None:
default_config = config_path(systemml_home)
else:
default_config = ' --conf '.join(conf + [default_conf])

# optional arguments
ml_options = []
if nvargs is not None:
ml_options.append('-nvargs')
ml_options.append(' '.join(nvargs))
if args is not None:
ml_options.append('-args')
ml_options.append(' '.join(args))
if explain is not None:
ml_options.append('-explain')
ml_options.append(explain)
if debug is not False:
ml_options.append('-debug')
if stats is not None:
ml_options.append('-stats')
ml_options.append(stats)
if gpu is not None:
ml_options.append('-gpu')
ml_options.append(gpu)

if len(ml_options) < 1:
ml_options = ''

# stats, explain, target_jars
cmd_spark = [spark_path, '--class', 'org.apache.sysml.api.DMLScript',
'--master', master, '--driver-memory', driver_memory,
'--num-executors', num_executors, '--executor-memory', executor_memory,
'--executor-cores', executor_cores, '--conf', default_conf,
'--jars', cuda_jars, systemml_jars]

cmd_system_ml = ['-config', default_config,
'-exec', 'hybrid_spark', '-f', script_file, ' '.join(ml_options)]

cmd = cmd_spark + cmd_system_ml
return_code = os.system(' '.join(cmd))
return return_code


if __name__ == '__main__':
spark_home, systemml_home = get_env()

cparser = argparse.ArgumentParser(description='System-ML Spark Submit Script')
# SPARK-SUBMIT Options
cparser.add_argument('--master', default='local[*]', help='local, yarn-client, yarn-cluster', metavar='')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please also print out the defaults for each of the options in the help message?

cparser.add_argument('--driver-memory', default='5G', help='Memory for driver (e.g. 512M)', metavar='')
cparser.add_argument('--num-executors', default='2', help='Number of executors to launch', metavar='')
cparser.add_argument('--executor-memory', default='2G', help='Memory per executor', metavar='')
cparser.add_argument('--executor-cores', default='1', help='Number of cores', metavar='')
cparser.add_argument('--conf', help='Spark configuration file', nargs='+', metavar='')

# SYSTEM-ML Options
cparser.add_argument('-nvargs', help='List of attributeName-attributeValue pairs', nargs='+', metavar='')
cparser.add_argument('-args', help='List of positional argument values', metavar='', nargs='+')
cparser.add_argument('-config', help='System-ML configuration file (e.g SystemML-config.xml)', metavar='')
cparser.add_argument('-explain', help='explains plan levels can be hops, runtime, '
'recompile_hops, recompile_runtime', nargs='?', const='runtime', metavar='')
cparser.add_argument('-debug', help='runs in debug mode', action='store_true')
cparser.add_argument('-stats', help='Monitor and report caching/recompilation statistics, '
'heavy hitter <count> is 10 unless overridden', nargs='?', const='10',
metavar='')
cparser.add_argument('-gpu', help='uses CUDA instructions when reasonable, '
'set <force> option to skip conservative memory estimates '
'and use GPU wherever possible', nargs='?')
cparser.add_argument('-f', required=True, help='specifies dml/pydml file to execute; '
'path can be local/hdfs/gpfs', metavar='')

args = cparser.parse_args()
arg_dict = vars(args)

return_code = spark_submit_entry(**arg_dict)

if return_code != 0:
print('Failed to run SystemML. Exit code :' + str(return_code))