Skip to content

Commit

Permalink
Merge pull request #1901 from davidmarin/mrjob-spark-submit
Browse files Browse the repository at this point in the history
add mrjob spark-submit subcommand (fixes #1382)
  • Loading branch information
David Marin committed Dec 8, 2018
2 parents 6c52376 + 6325a51 commit c98d1d6
Show file tree
Hide file tree
Showing 12 changed files with 1,124 additions and 41 deletions.
7 changes: 7 additions & 0 deletions docs/guides/cmd.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ s3-tmpwatch

.. automodule:: mrjob.tools.emr.s3_tmpwatch

.. _spark-submit:

spark-submit
^^^^^^^^^^^^

.. automodule:: mrjob.tools.spark_submit

terminate-cluster
^^^^^^^^^^^^^^^^^

Expand Down
6 changes: 6 additions & 0 deletions mrjob/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ def _s3_tmpwatch(args):
main(args)


@_command('spark-submit', 'Submit Spark jobs')
def _spark_submit(args):
from mrjob.tools.spark_submit import main
main(args)


@_command('terminate-idle-clusters', 'Terminate idle EMR clusters')
def _terminate_idle_clusters(args):
from mrjob.tools.emr.terminate_idle_clusters import main
Expand Down
1 change: 0 additions & 1 deletion mrjob/hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ class HadoopJobRunner(MRJobBinRunner, LogInterpretationMixin):
alias = 'hadoop'

OPT_NAMES = MRJobBinRunner.OPT_NAMES | {
'bootstrap_spark',
'hadoop_bin',
'hadoop_extra_args',
'hadoop_log_dirs',
Expand Down
24 changes: 7 additions & 17 deletions mrjob/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from mrjob.options import _parse_raw_args
from mrjob.options import _print_help_for_runner
from mrjob.options import _print_basic_help
from mrjob.runner import _runner_class
from mrjob.setup import parse_legacy_hash_path
from mrjob.step import StepFailedException
from mrjob.util import log_to_null
Expand Down Expand Up @@ -464,26 +465,15 @@ def _runner_class(self):
Defaults to ``'local'`` and disallows use of inline runner.
"""
if self.options.runner == 'dataproc':
from mrjob.dataproc import DataprocJobRunner
return DataprocJobRunner

elif self.options.runner == 'emr':
from mrjob.emr import EMRJobRunner
return EMRJobRunner

elif self.options.runner == 'hadoop':
from mrjob.hadoop import HadoopJobRunner
return HadoopJobRunner
if not self.options.runner:
return LocalMRJobRunner

elif self.options.runner == 'inline':
raise ValueError("inline is not supported in the multi-lingual"
" launcher.")
raise ValueError(
"inline is not supported in the multi-lingual"
" launcher.")

else:
# run locally by default
from mrjob.local import LocalMRJobRunner
return LocalMRJobRunner
return _runner_class(self.options.runner)

def _runner_kwargs(self):
# just use combine_dicts() and not combine_confs(); leave the
Expand Down
50 changes: 36 additions & 14 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from argparse import ArgumentParser
from argparse import SUPPRESS
from logging import getLogger
from os import devnull

from mrjob.conf import combine_cmds
from mrjob.conf import combine_dicts
Expand Down Expand Up @@ -538,8 +539,8 @@ def __call__(self, parser, namespace, value, option_string=None):
(['--cmdenv'], dict(
action=_KeyValueAction,
help=('Set an environment variable for your job inside Hadoop '
'streaming. Must take the form KEY=VALUE. You can use'
' --cmdenv multiple times.'),
'streaming/Spark. Must take the form KEY=VALUE.'
' You can use --cmdenv multiple times.'),
)),
],
),
Expand Down Expand Up @@ -840,9 +841,8 @@ def __call__(self, parser, namespace, value, option_string=None):
switches=[
(['-D', '--jobconf'], dict(
action=_KeyValueAction,
help=('-D arg to pass through to hadoop streaming; should'
' take the form KEY=VALUE. You can use -D'
' multiple times.'),
help=('passed through to hadoop streaming as -D and to Spark'
' as --conf. Should take the form KEY=VALUE'),
)),
],
),
Expand Down Expand Up @@ -1049,9 +1049,8 @@ def __call__(self, parser, namespace, value, option_string=None):
combiner=combine_cmds,
switches=[
(['--python-bin'], dict(
help=('Alternate python command for Python mappers/reducers.'
' You can include arguments, e.g. --python-bin "python'
' -v"'),
help=('Alternate python command. You can include arguments,'
' e.g. --python-bin "python -v"'),
)),
],
),
Expand Down Expand Up @@ -1178,7 +1177,7 @@ def __call__(self, parser, namespace, value, option_string=None):
switches=[
(['--spark-master'], dict(
help=('--master argument to spark-submit (e.g. '
'spark://host:port, local. Default is yarn'),
'spark://host:port, local). Default is "yarn"'),
)),
],
),
Expand Down Expand Up @@ -1438,7 +1437,8 @@ def _filter_by_role(opt_names, *cloud_roles):
}


def _add_runner_args(parser, opt_names=None, include_deprecated=True):
def _add_runner_args(parser, opt_names=None, include_deprecated=True,
customize_switches=None, suppress_switches=None):
"""add switches for the given runner opts to the given
ArgumentParser, alphabetically by destination. If *opt_names* is
None, include all runner opts."""
Expand All @@ -1447,26 +1447,47 @@ def _add_runner_args(parser, opt_names=None, include_deprecated=True):

for opt_name in sorted(opt_names):
_add_runner_args_for_opt(
parser, opt_name, include_deprecated=include_deprecated)
parser, opt_name,
include_deprecated=include_deprecated,
customize_switches=customize_switches,
suppress_switches=suppress_switches
)


def _add_runner_args_for_opt(parser, opt_name, include_deprecated=True):
def _add_runner_args_for_opt(parser, opt_name, include_deprecated=True,
customize_switches=None, suppress_switches=None):
"""Add switches for a single option (*opt_name*) to the given parser."""
if customize_switches is None:
customize_switches = {}

if suppress_switches is None:
suppress_switches = set()

conf = _RUNNER_OPTS[opt_name]

if conf.get('deprecated') and not include_deprecated:
return

switches = conf.get('switches') or []

def suppressed(switches):
return any(sw in suppress_switches for sw in switches)

for args, kwargs in switches:
kwargs = dict(kwargs)

# allow customization
for switch in args:
if switch in customize_switches:
kwargs.update(customize_switches[switch])

deprecated_aliases = kwargs.pop('deprecated_aliases', None)
deprecated = kwargs.pop('deprecated', False)

suppress = any(sw in suppress_switches for sw in args)

# add this switch
if include_deprecated or not deprecated:
if (include_deprecated or not deprecated) and not suppressed(args):
kwargs['dest'] = opt_name

if kwargs.get('action') == 'append':
Expand All @@ -1477,7 +1498,8 @@ def _add_runner_args_for_opt(parser, opt_name, include_deprecated=True):
parser.add_argument(*args, **kwargs)

# add a switch for deprecated aliases
if deprecated_aliases and include_deprecated:
if (deprecated_aliases and include_deprecated and
not suppressed(deprecated_aliases)):
help = 'Deprecated alias%s for %s' % (
('es' if len(deprecated_aliases) > 1 else ''),
args[-1])
Expand Down
36 changes: 34 additions & 2 deletions mrjob/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from mrjob.setup import WorkingDirManager
from mrjob.setup import name_uniquely
from mrjob.setup import parse_legacy_hash_path
from mrjob.step import OUTPUT
from mrjob.step import STEP_TYPES
from mrjob.step import _is_spark_step_type
from mrjob.util import to_lines
Expand Down Expand Up @@ -492,7 +493,7 @@ def run(self):
:py:class:`~mrjob.inline.InlineMRJobRunner`, where we raise the
actual exception that caused the step to fail).
"""
if not self._script_path:
if not (self._script_path or self._steps):
raise AssertionError('No script to run!')

if self._ran_job:
Expand All @@ -510,7 +511,11 @@ def run(self):
self._run()
self._ran_job = True

log.info('job output is in %s' % self._output_dir)
last_step = self._get_steps()[-1]

# only print this message if the last step uses our output dir
if 'args' not in last_step or OUTPUT in last_step['args']:
log.info('job output is in %s' % self._output_dir)

def cat_output(self):
"""Stream the jobs output, as a stream of ``bytes``. If there are
Expand Down Expand Up @@ -1220,3 +1225,30 @@ def _blank_out_conflicting_opts(opt_list, opt_names, conflicting_opts=None):
blank_out = True

return opt_list


def _runner_class(alias):
"""Get the runner subclass corresponding to the given alias
(importing code only as needed)."""
if alias == 'dataproc':
from mrjob.dataproc import DataprocJobRunner
return DataprocJobRunner

elif alias == 'emr':
from mrjob.emr import EMRJobRunner
return EMRJobRunner

elif alias == 'hadoop':
from mrjob.hadoop import HadoopJobRunner
return HadoopJobRunner

elif alias == 'inline':
from mrjob.inline import InlineMRJobRunner
return InlineMRJobRunner

elif alias == 'local':
from mrjob.local import LocalMRJobRunner
return LocalMRJobRunner

else:
raise ValueError('bad runner alias: %s' % alias)
4 changes: 2 additions & 2 deletions mrjob/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def render_combiner(self):
def render_reducer(self):
return self._render_substep('reducer_cmd', 'reducer_pre_filter')

def description(self, step_num):
def description(self, step_num=0):
"""Returns a dictionary representation of this step:
.. code-block:: js
Expand Down Expand Up @@ -378,7 +378,7 @@ def _default(self, k):
else:
return None

def description(self, other):
def description(self, step_num=0):
"""Return a dictionary representation of this step. See
:ref:`steps-format` for examples."""
result = dict(
Expand Down
72 changes: 72 additions & 0 deletions mrjob/tools/spark-submit-notes.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
Usage: mrjob spark-submit [options] <app jar | python file> app arguments

(no R file support)

opts that go into the job definition:

--class (main_class)

opts that are just aliases for mrjob opts:

--master (spark_master)
--deploy-mode (spark_deploy_mode)
--jars (libjars)
--conf (jobconf)

opts than can be used as-is

--py-files (py_files)
--files (upload_files)
--archives (upload_archives)

opts that can be passed through directly to Spark

--name
--packages
--exclude-packages
--repositories
--driver-memory
--driver-java-options
--driver-library-path
--driver-class-path
--executor-memory
--proxy-user
--driver-cores
--supervise
--total-executor-cores
--executor-cores
--queue
--num-executors
--principal
--keytab


opts involving files, should see if we can pass them through

--properties-file


pass through to spark?

--verbose (possibly depending on runner)


not relevant to submitting
--version
--kill
--status


all mrjob options basically make sense except check_input_paths

for now, setting input_paths to os.devnull and setting check_input_paths to
False would be enough





Other notes:

why doesn't cluster_id have cloud_role='launch'?
same for cluster_properties?

0 comments on commit c98d1d6

Please sign in to comment.