Skip to content

Commit

Permalink
Merge pull request #1745 from davidmarin/output-dir
Browse files Browse the repository at this point in the history
--output-dir implies --no-cat-output (fixes #1739)
  • Loading branch information
David Marin committed Mar 30, 2018
2 parents ba40166 + 0550f79 commit 1a25e2d
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 54 deletions.
13 changes: 8 additions & 5 deletions docs/guides/configs-all-runners.rst
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,17 @@ Temp files and cleanup
because it's understood that it has to be on HDFS.

.. mrjob-opt::
:config: no_output
:switch: --no-output
:config: cat_output
:switch: --cat-output, --no-cat-output
:type: boolean
:set: no_mrjob_conf
:default: ``False``
:default: output if :mrjob-opt:`output_dir` is not set

Don't stream output to STDOUT after job completion. This is often used in
conjunction with ``--output-dir`` to store output only in HDFS or S3.
Should we stream job output to STDOUT after completion?

.. versionchanged:: 0.6.3

used to be ``--no-output``.

.. mrjob-opt::
:config: step_output_dir
Expand Down
15 changes: 2 additions & 13 deletions docs/guides/dataproc-quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,12 @@ This the output of this command should be identical to the output shown in
Sending Output to a Specific Place
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

If you'd rather have your output go to somewhere deterministic on GCS, which you
probably do, use :option:`--output-dir`::
If you'd rather have your output go to somewhere deterministic on GCS,
use :option:`--output-dir`::

> python word_count.py -r dataproc README.rst \
> --output-dir=gs://my-bucket/wc_out/

It's also likely that since you know where your output is on GCS, you don't want
output streamed back to your local machine. For that, use
:option:`-no-output`::

> python word_count.py -r dataproc README.rst \
> --output-dir=gs://my-bucket/wc_out/ \
> --no-output

There are many other ins and outs of effectively using mrjob with Dataproc.
This is a strictly no-outs body of documentation!

.. _picking-dataproc-cluster-config:

Choosing Type and Number of GCE Instances
Expand Down
12 changes: 2 additions & 10 deletions docs/guides/emr-quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,12 @@ This the output of this command should be identical to the output shown in
Sending Output to a Specific Place
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

If you'd rather have your output go to somewhere deterministic on S3, which you
probably do, use :option:`--output-dir`::
If you'd rather have your output go to somewhere deterministic on S3, use
:option:`--output-dir`::

> python word_count.py -r emr README.rst \
> --output-dir=s3://my-bucket/wc_out/

It's also likely that since you know where your output is on S3, you don't want
output streamed back to your local machine. For that, use
:option:`-no-output`::

> python word_count.py -r emr README.rst \
> --output-dir=s3://my-bucket/wc_out/ \
> --no-output

There are many other ins and outs of effectively using mrjob with EMR. See
:doc:`emr-advanced` for some of the ins, but the outs are left as an exercise
for the reader. This is a strictly no-outs body of documentation!
Expand Down
16 changes: 7 additions & 9 deletions mrjob/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,17 @@ def run_job(self):
log.error(str(e))
sys.exit(1)

if not self.options.no_output:
if self._should_cat_output():
for chunk in runner.cat_output():
self.stdout.write(chunk)
self.stdout.flush()

def _should_cat_output(self):
if self.options.cat_output is None:
return not self.options.output_dir
else:
return self.options.cat_output

### Command-line arguments ###

def configure_args(self):
Expand All @@ -258,14 +264,6 @@ def configure_args(self):
self.pass_arg_through(...)
...
"""
self.arg_parser.add_argument(
'-h', '--help', dest='help', action='store_true',
help='show this message and exit')

self.arg_parser.add_argument(
'--deprecated', dest='deprecated', action='store_true',
help='include help for deprecated options')

# if script path isn't set, expect it on the command line
if self._FIRST_ARG_IS_SCRIPT_PATH:
self.arg_parser.add_argument(
Expand Down
56 changes: 45 additions & 11 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,7 @@ def __call__(self, parser, namespace, value, option_string=None):
)

# don't show these unless someone types --help --deprecated
_DEPRECATED_NON_RUNNER_OPTS = set([
'deprecated',
])

_DEPRECATED_NON_RUNNER_OPTS = {'deprecated'}

### runner opts ###

Expand Down Expand Up @@ -1284,11 +1281,23 @@ def _add_basic_args(parser):
action='store_true', help='print more messages to stderr')


def _add_job_args(parser):
def _add_job_args(parser, include_deprecated=True):

parser.add_argument(
'--no-output', dest='no_output',
'--cat-output', dest='cat_output',
default=None, action='store_true',
help="Don't stream output after job completion")
help="Stream job output to stdout")

parser.add_argument(
'--no-cat-output', dest='cat_output',
default=None, action='store_false',
help="Don't stream job output to stdout")

if include_deprecated:
parser.add_argument(
'--no-output', dest='cat_output',
default=None, action='store_false',
help='Deprecated alias for --no-cat-output')

parser.add_argument(
'-o', '--output-dir', dest='output_dir', default=None,
Expand All @@ -1308,6 +1317,15 @@ def _add_job_args(parser):
' the last one. Useful for debugging. Currently'
' ignored by local runners.'))

if include_deprecated:
parser.add_argument(
'--deprecated', dest='deprecated', action='store_true',
help='include help for deprecated options')

parser.add_argument(
'-h', '--help', dest='help', action='store_true',
help='show this message and exit')


def _add_step_args(parser):
"""Add switches that determine what part of the job a MRJob runs."""
Expand Down Expand Up @@ -1342,20 +1360,36 @@ def _print_basic_help(option_parser, usage, include_deprecated=False):
"""
help_parser = ArgumentParser(usage=usage, add_help=False)

_add_basic_args(help_parser)
_add_job_args(help_parser, include_deprecated=include_deprecated)

basic_dests = {action.dest for action in help_parser._actions}

# add other custom args added by the user
for action in option_parser._actions:
if action.dest in _RUNNER_OPTS:
# option_parser already includes deprecated option dests

# this excludes deprecated switch aliases (e.g. --no-output)
if action.dest in basic_dests:
continue

if action.dest in _STEP_OPTS:
# this excludes the --deprecated switch (which is explained below)
if action.dest in _DEPRECATED_NON_RUNNER_OPTS:
continue

if (action.dest in _DEPRECATED_NON_RUNNER_OPTS and
not include_deprecated):
# this excludes options that are shown with --help -r <runner>
if action.dest in _RUNNER_OPTS:
continue

# this excludes options that are show with --help --steps
if action.dest in _STEP_OPTS:
continue

# this excludes the ARGS option, which is already covered by usage
if not action.option_strings:
continue

# found a custom option. thanks, library user!
help_parser._add_action(action)

help_parser.print_help()
Expand Down
24 changes: 21 additions & 3 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,11 +941,11 @@ def test_verbose(self):
self.assertEqual(returncode, 0)
self.assertGreater(len(stderr), len(normal_stderr))

def test_no_output(self):
def test_output_dir(self):
self.assertEqual(os.listdir(self.tmp_dir), []) # sanity check

args = ['--no-output', '--output-dir', self.tmp_dir]
stdout, stderr, returncode = self.run_job(args)
stdout, stderr, returncode = self.run_job(
['--output-dir', self.tmp_dir])
self.assertEqual(stdout, b'')
self.assertNotEqual(stderr, b'')
self.assertEqual(returncode, 0)
Expand All @@ -961,6 +961,24 @@ def test_no_output(self):
self.assertEqual(sorted(output_lines),
[b'1\t"foo"\n', b'2\t"bar"\n', b'3\tnull\n'])

def test_no_cat_output(self):
stdout, stderr, returncode = self.run_job(['--no-cat-output'])
self.assertEqual(stdout, b'')
self.assertNotEqual(stderr, b'')
self.assertEqual(returncode, 0)

def test_output_dir_and_cat_output(self):
self.assertEqual(os.listdir(self.tmp_dir), []) # sanity check

stdout, stderr, returncode = self.run_job(
['--output-dir', self.tmp_dir, '--cat-output'])
self.assertNotEqual(stdout, b'')
self.assertNotEqual(stderr, b'')
self.assertEqual(returncode, 0)

# make sure the correct output is in the temp dir
self.assertNotEqual(os.listdir(self.tmp_dir), [])


class BadMainTestCase(TestCase):
"""Ensure that the user cannot do anything but just call MRYourJob.run()
Expand Down
23 changes: 20 additions & 3 deletions tests/test_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def load_options(self, args):
### Test cases ###


class RunJobTestCase(TestCase):
class RunJobTestCase(SandboxedTestCase):

def _make_launcher(self, *args):
"""Make a launcher, add a mock runner (``launcher.mock_runner``), and
Expand All @@ -128,14 +128,31 @@ def test_output(self):
self.assertEqual(launcher.stdout.getvalue(), b'a line\n')
self.assertEqual(launcher.stderr.getvalue(), b'')

def test_no_output(self):
launcher = self._make_launcher('--no-output')
def test_no_cat_output(self):
launcher = self._make_launcher('--no-cat-output')

launcher.run_job()

self.assertEqual(launcher.stdout.getvalue(), b'')
self.assertEqual(launcher.stderr.getvalue(), b'')

def test_output_dir_implies_no_cat_output(self):
launcher = self._make_launcher('--output-dir', self.tmp_dir)

launcher.run_job()

self.assertEqual(launcher.stdout.getvalue(), b'')
self.assertEqual(launcher.stderr.getvalue(), b'')

def test_output_dir_with_explicit_cat_output(self):
launcher = self._make_launcher(
'--output-dir', self.tmp_dir, '--cat-output')

launcher.run_job()

self.assertEqual(launcher.stdout.getvalue(), b'a line\n')
self.assertEqual(launcher.stderr.getvalue(), b'')

def test_exit_on_step_failure(self):
launcher = self._make_launcher()
launcher.mock_runner.run.side_effect = StepFailedException
Expand Down

0 comments on commit 1a25e2d

Please sign in to comment.