-
Notifications
You must be signed in to change notification settings - Fork 459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SYSTEMML-1451][Phase 2] Decouple Scripts and HDFS support #575
Closed
krishnakalyan3
wants to merge
14
commits into
apache:master
from
krishnakalyan3:SYSTEMML-1451-phase2
Closed
Changes from 11 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
7367005
[SYSTEMML-1451] phase 2 work
krishnakalyan3 1334d37
fix missing refs
krishnakalyan3 bfaa025
add docstring
krishnakalyan3 d3921cb
pylint and docstring
krishnakalyan3 377c0c7
fix error handling
krishnakalyan3 e611851
update comments
krishnakalyan3 dd7fbc7
update doc
krishnakalyan3 31900dc
updates to docs
krishnakalyan3 fd68d73
update doc
krishnakalyan3 87ecbed
gspread
krishnakalyan3 b7022e5
windows fix
krishnakalyan3 ac8178c
update todos and fix comments
krishnakalyan3 16ed00e
minor test changes
krishnakalyan3 f668444
debug off
krishnakalyan3 File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,167 +21,124 @@ | |
# ------------------------------------------------------------- | ||
|
||
import os | ||
import sys | ||
from os.path import join, exists, abspath | ||
from os import environ | ||
import glob | ||
import argparse | ||
import shutil | ||
from os.path import join | ||
import platform | ||
import argparse | ||
from utils import get_env, find_script_file, log4j_path, config_path | ||
|
||
if environ.get('SPARK_HOME') is None: | ||
print('SPARK_HOME not set') | ||
sys.exit(1) | ||
else: | ||
spark_home = environ.get('SPARK_HOME') | ||
spark_path = join(spark_home, 'bin', 'spark-submit') | ||
|
||
|
||
# error help print | ||
def print_usage_and_exit(): | ||
print('Usage: ./systemml-spark-submit.py -f <dml-filename> [arguments]') | ||
sys.exit(1) | ||
|
||
cparser = argparse.ArgumentParser(description='System-ML Spark Submit Script') | ||
|
||
# SPARK-SUBMIT Options | ||
cparser.add_argument('--master', default='local[*]', help='local, yarn-client, yarn-cluster', metavar='') | ||
cparser.add_argument('--driver-memory', default='5G', help='Memory for driver (e.g. 512M)', metavar='') | ||
cparser.add_argument('--num-executors', default='2', help='Number of executors to launch', metavar='') | ||
cparser.add_argument('--executor-memory', default='2G', help='Memory per executor', metavar='') | ||
cparser.add_argument('--executor-cores', default='1', help='Number of cores', metavar='') | ||
cparser.add_argument('--conf', help='Spark configuration file', nargs='+', metavar='') | ||
|
||
# SYSTEM-ML Options | ||
cparser.add_argument('-nvargs', help='List of attributeName-attributeValue pairs', nargs='+', metavar='') | ||
cparser.add_argument('-args', help='List of positional argument values', metavar='', nargs='+') | ||
cparser.add_argument('-config', help='System-ML configuration file (e.g SystemML-config.xml)', metavar='') | ||
cparser.add_argument('-exec', default='hybrid_spark', help='System-ML backend (e.g spark, spark-hybrid)', metavar='') | ||
cparser.add_argument('-explain', help='explains plan levels can be hops, runtime, ' | ||
'recompile_hops, recompile_runtime', nargs='?', const='runtime', metavar='') | ||
cparser.add_argument('-debug', help='runs in debug mode', action='store_true') | ||
cparser.add_argument('-stats', help='Monitor and report caching/recompilation statistics, ' | ||
'heavy hitter <count> is 10 unless overridden', nargs='?', const='10', metavar='') | ||
cparser.add_argument('-gpu', help='uses CUDA instructions when reasonable, ' | ||
'set <force> option to skip conservative memory estimates ' | ||
'and use GPU wherever possible', nargs='?') | ||
cparser.add_argument('-f', required=True, help='specifies dml/pydml file to execute; ' | ||
'path can be local/hdfs/gpfs', metavar='') | ||
|
||
args = cparser.parse_args() | ||
|
||
# Optional arguments | ||
ml_options = [] | ||
if args.nvargs is not None: | ||
ml_options.append('-nvargs') | ||
ml_options.append(' '.join(args.nvargs)) | ||
if args.args is not None: | ||
ml_options.append('-args') | ||
ml_options.append(' '.join(args.args)) | ||
if args.debug is not False: | ||
ml_options.append('-debug') | ||
if args.explain is not None: | ||
ml_options.append('-explain') | ||
ml_options.append(args.explain) | ||
if args.gpu is not None: | ||
ml_options.append('-gpu') | ||
ml_options.append(args.gpu) | ||
if args.stats is not None: | ||
ml_options.append('-stats') | ||
ml_options.append(args.stats) | ||
|
||
# Assign script file to name received from argparse module | ||
script_file = args.f | ||
|
||
# find the systemML root path which contains the bin folder, the script folder and the target folder | ||
# tolerate path with spaces | ||
script_dir = os.path.dirname(os.path.realpath(__file__)) | ||
project_root_dir = os.path.dirname(script_dir) | ||
user_dir = os.getcwd() | ||
|
||
scripts_dir = join(project_root_dir, 'scripts') | ||
build_dir = join(project_root_dir, 'target') | ||
lib_dir = join(build_dir, 'lib') | ||
|
||
systemml_jar = build_dir + os.sep + "SystemML.jar" | ||
jcuda_jars = glob.glob(lib_dir + os.sep + "jcu*.jar") | ||
target_jars = ','.join(jcuda_jars) # Include all JCuda Jars | ||
|
||
log4j_properties_path = join(project_root_dir, 'conf', 'log4j.properties.template') | ||
|
||
build_err_msg = 'You must build the project before running this script.' | ||
build_dir_err_msg = 'Could not find target directory ' + build_dir + '. ' + build_err_msg | ||
|
||
# check if the project had been built and the jar files exist | ||
if not (exists(build_dir)): | ||
print(build_dir_err_msg) | ||
sys.exit(1) | ||
|
||
print('================================================================================') | ||
|
||
# if the present working directory is the project root or bin folder, then use the temp folder as user.dir | ||
if user_dir == project_root_dir or user_dir == join(project_root_dir, 'bin'): | ||
user_dir = join(project_root_dir, 'temp') | ||
print('Output dir: ' + user_dir) | ||
|
||
# if the SystemML-config.xml does not exist, create it from the template | ||
systemml_config_path = join(project_root_dir, 'conf', 'SystemML-config.xml') | ||
systemml_template_config_path = join(project_root_dir, 'conf', 'SystemML-config.xml.template') | ||
if not (exists(systemml_config_path)): | ||
shutil.copyfile(systemml_template_config_path, systemml_config_path) | ||
print('... created ' + systemml_config_path) | ||
|
||
# if SystemML-config.xml is provided as arguments | ||
if args.config is None: | ||
systemml_config_path_arg = systemml_config_path | ||
else: | ||
systemml_config_path_arg = args.config | ||
|
||
|
||
# from http://stackoverflow.com/questions/1724693/find-a-file-in-python | ||
def find_file(name, path): | ||
for root, dirs, files in os.walk(path): | ||
if name in files: | ||
return join(root, name) | ||
return None | ||
|
||
# if the script file path was omitted, try to complete the script path | ||
if not (exists(script_file)): | ||
script_file_name = abspath(script_file) | ||
script_file_found = find_file(script_file, scripts_dir) | ||
if script_file_found is None: | ||
print('Could not find DML script: ' + script_file) | ||
print_usage_and_exit() | ||
else: | ||
script_file = script_file_found | ||
print('DML Script:' + script_file) | ||
|
||
default_conf = 'spark.driver.extraJavaOptions=-Dlog4j.configuration=file:{}'.format(log4j_properties_path) | ||
|
||
# Backslash problem in windows. | ||
if platform.system() == 'Windows': | ||
default_conf = default_conf.replace('\\', '//') | ||
def default_jars(systemml_home): | ||
""" | ||
return: String | ||
Location of systemml and jcuda jars | ||
""" | ||
build_dir = join(systemml_home, 'target') | ||
lib_dir = join(build_dir, 'lib') | ||
systemml_jar = build_dir + os.sep + "SystemML.jar" | ||
jcuda_jars = glob.glob(lib_dir + os.sep + "jcu*.jar") | ||
target_jars = ','.join(jcuda_jars) | ||
return target_jars, systemml_jar | ||
|
||
if args.conf is not None: | ||
conf = ' --conf '.join(args.conf + [default_conf]) | ||
else: | ||
conf = default_conf | ||
|
||
cmd_spark = [spark_path, '--class', 'org.apache.sysml.api.DMLScript', | ||
'--master', args.master, '--driver-memory', args.driver_memory, | ||
'--num-executors', args.num_executors, '--executor-memory', args.executor_memory, | ||
'--executor-cores', args.executor_cores, '--conf', conf, '--jars', target_jars, | ||
systemml_jar] | ||
def spark_submit_entry(master, driver_memory, num_executors, executor_memory, | ||
executor_cores, conf, | ||
nvargs, args, config, explain, debug, stats, gpu, f): | ||
|
||
cmd_system_ml = ['-config', systemml_config_path_arg, | ||
'-exec', vars(args)['exec'], '-f', script_file, ' '.join(ml_options)] | ||
spark_home, systemml_home = get_env() | ||
spark_path = join(spark_home, 'bin', 'spark-submit') | ||
script_file = find_script_file(systemml_home, f) | ||
|
||
cmd = cmd_spark + cmd_system_ml | ||
# Jars | ||
cuda_jars, systemml_jars = default_jars(systemml_home) | ||
|
||
return_code = os.system(' '.join(cmd)) | ||
# For debugging | ||
# print(' '.join(cmd)) | ||
# Log4j | ||
log4j = log4j_path(systemml_home) | ||
log4j_properties_path = 'spark.driver.extraJavaOptions=-Dlog4j.configuration=file:{}'.format(log4j) | ||
if conf is None: | ||
default_conf = log4j_properties_path | ||
else: | ||
default_conf = ' --conf '.join(conf + [log4j_properties_path]) | ||
|
||
if return_code != 0: | ||
print('Failed to run SystemML. Exit code :' + str(return_code)) | ||
print(' '.join(cmd)) | ||
# Config XML | ||
if config is None: | ||
default_config = config_path(systemml_home) | ||
else: | ||
default_config = ' -config '.join(conf + [config_path(systemml_home)]) | ||
|
||
if platform.system() == 'Windows': | ||
default_conf = default_conf.replace('\\', '//') | ||
|
||
# optional arguments | ||
ml_options = [] | ||
if nvargs is not None: | ||
ml_options.append('-nvargs') | ||
ml_options.append(' '.join(nvargs)) | ||
if args is not None: | ||
ml_options.append('-args') | ||
ml_options.append(' '.join(args)) | ||
if explain is not None: | ||
ml_options.append('-explain') | ||
ml_options.append(explain) | ||
if debug is not False: | ||
ml_options.append('-debug') | ||
if stats is not None: | ||
ml_options.append('-stats') | ||
ml_options.append(stats) | ||
if gpu is not None: | ||
ml_options.append('-gpu') | ||
ml_options.append(gpu) | ||
|
||
if len(ml_options) < 1: | ||
ml_options = '' | ||
|
||
# stats, explain, target_jars | ||
cmd_spark = [spark_path, '--class', 'org.apache.sysml.api.DMLScript', | ||
'--master', master, '--driver-memory', driver_memory, | ||
'--num-executors', num_executors, '--executor-memory', executor_memory, | ||
'--executor-cores', executor_cores, '--conf', default_conf, | ||
'--jars', cuda_jars, systemml_jars] | ||
|
||
cmd_system_ml = ['-config', default_config, | ||
'-exec', 'hybrid_spark', '-f', script_file, ' '.join(ml_options)] | ||
|
||
cmd = cmd_spark + cmd_system_ml | ||
return_code = os.system(' '.join(cmd)) | ||
return return_code | ||
|
||
|
||
if __name__ == '__main__': | ||
spark_home, systemml_home = get_env() | ||
|
||
cparser = argparse.ArgumentParser(description='System-ML Spark Submit Script') | ||
# SPARK-SUBMIT Options | ||
cparser.add_argument('--master', default='local[*]', help='local, yarn-client, yarn-cluster', metavar='') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please also print out the defaults for each of the options in the help message? |
||
cparser.add_argument('--driver-memory', default='5G', help='Memory for driver (e.g. 512M)', metavar='') | ||
cparser.add_argument('--num-executors', default='2', help='Number of executors to launch', metavar='') | ||
cparser.add_argument('--executor-memory', default='2G', help='Memory per executor', metavar='') | ||
cparser.add_argument('--executor-cores', default='1', help='Number of cores', metavar='') | ||
cparser.add_argument('--conf', help='Spark configuration file', nargs='+', metavar='') | ||
|
||
# SYSTEM-ML Options | ||
cparser.add_argument('-nvargs', help='List of attributeName-attributeValue pairs', nargs='+', metavar='') | ||
cparser.add_argument('-args', help='List of positional argument values', metavar='', nargs='+') | ||
cparser.add_argument('-config', help='System-ML configuration file (e.g SystemML-config.xml)', metavar='') | ||
cparser.add_argument('-explain', help='explains plan levels can be hops, runtime, ' | ||
'recompile_hops, recompile_runtime', nargs='?', const='runtime', metavar='') | ||
cparser.add_argument('-debug', help='runs in debug mode', action='store_true') | ||
cparser.add_argument('-stats', help='Monitor and report caching/recompilation statistics, ' | ||
'heavy hitter <count> is 10 unless overridden', nargs='?', const='10', | ||
metavar='') | ||
cparser.add_argument('-gpu', help='uses CUDA instructions when reasonable, ' | ||
'set <force> option to skip conservative memory estimates ' | ||
'and use GPU wherever possible', nargs='?') | ||
cparser.add_argument('-f', required=True, help='specifies dml/pydml file to execute; ' | ||
'path can be local/hdfs/gpfs', metavar='') | ||
|
||
args = cparser.parse_args() | ||
arg_dict = vars(args) | ||
|
||
return_code = spark_submit_entry(**arg_dict) | ||
|
||
if return_code != 0: | ||
print('Failed to run SystemML. Exit code :' + str(return_code)) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add a little comment about this function.