Skip to content

Commit

Permalink
Merge pull request #1759 from davidmarin/error-while-reading-from
Browse files Browse the repository at this point in the history
fix "error while reading from" in inline mode
  • Loading branch information
David Marin committed Apr 28, 2018
2 parents d5039d1 + f63bb72 commit e472b69
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 4 deletions.
13 changes: 13 additions & 0 deletions mrjob/examples/mr_boom.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# Copyright 2016 Yelp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A job that always fails."""
from mrjob.job import MRJob

Expand Down
9 changes: 5 additions & 4 deletions mrjob/inline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright 2011 Matthew Tai and Yelp
# Copyright 2012-2016 Yelp and Contributors
# Copyright 2017 Yelp
# Copyright 2018 Yelp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -94,9 +95,9 @@ def invoke_task(stdin, stdout, stderr, wd, env):
# read input path from stdin, add to args
line = stdin.readline().decode('utf_8')
input_uri = line.split('\t')[-1].rstrip()
# input_uri is an absolute path, no need to copy to cwd
input_path = input_uri
args = list(args) + [input_path, input_uri]
# input_uri is an absolute path, can serve
# as path and uri both
args = list(args) + [input_uri, input_uri]

task = self._mrjob_cls(args)
task.sandbox(stdin=stdin, stdout=stdout, stderr=stderr)
Expand All @@ -108,7 +109,7 @@ def invoke_task(stdin, stdout, stderr, wd, env):
# because then we lose the stacktrace (which is the whole
# point of the inline runner)

if input_path: # from manifest
if input_uri: # from manifest
self._error_while_reading_from = input_uri
else:
self._error_while_reading_from = self._task_input_path(
Expand Down
42 changes: 42 additions & 0 deletions tests/test_inline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Copyright 2013 Yelp and Lyft
# Copyright 2014 Marc Abramowitz
# Copyright 2015-2017 Yelp
# Copyright 2018 Yelp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -28,13 +29,15 @@

from mrjob.examples.mr_phone_to_url import MRPhoneToURL
from mrjob.inline import InlineMRJobRunner
from mrjob.job import MRJob

from tests.examples.test_mr_phone_to_url import write_conversion_record
from tests.mr_test_cmdenv import MRTestCmdenv
from tests.mr_two_step_job import MRTwoStepJob
from tests.sandbox import EmptyMrjobConfTestCase
from tests.sandbox import SandboxedTestCase
from tests.job import run_job
from tests.py2 import patch
from tests.test_sim import MRIncrementerJob


Expand Down Expand Up @@ -177,3 +180,42 @@ def test_input_manifest(self):
run_job(MRPhoneToURL(['-r', self.RUNNER, wet2_gz_path, '-']),
raw_input=wet1.getvalue()),
self.EXPECTED_OUTPUT)


class MRNope(MRJob):
def mapper_init(self):
raise NotImplementedError


class MRManifestNope(MRJob):
def mapper_raw(self, input_path, input_uri):
raise NotImplementedError


class WhileReadingFromTestCase(SandboxedTestCase):
# mostly a regression test for #1758

def _test_reading_from(self, job_class, expect_input_path):
# check that we report the actual input file and not the manifest file
input_path = self.makefile('input.txt')

job = job_class([input_path])
job.sandbox()

log = self.start(patch('mrjob.inline.log'))

with job.make_runner() as runner:
self.assertRaises(NotImplementedError, runner.run)

error_log = ''.join(a[0][0] for a in log.error.call_args_list)

if expect_input_path:
self.assertIn(input_path, error_log)
else:
self.assertNotIn(input_path, error_log)

def test_regular_job(self):
self._test_reading_from(MRNope, expect_input_path=False)

def test_input_manifest(self):
self._test_reading_from(MRManifestNope, expect_input_path=True)

0 comments on commit e472b69

Please sign in to comment.