Skip to content

Commit

Permalink
Merge pull request #1916 from davidmarin/runner-step-types
Browse files Browse the repository at this point in the history
catch bad steps on runner initialization (fixes #1915, fixes #1878)
  • Loading branch information
David Marin committed Dec 22, 2018
2 parents 8072586 + 9545e62 commit 1fbd285
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 42 deletions.
3 changes: 3 additions & 0 deletions mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ class DataprocJobRunner(HadoopInTheCloudJobRunner, LogInterpretationMixin):
'task_instance_config',
}

# no Spark support yet (see #1765)
_STEP_TYPES = {'jar', 'streaming'}

def __init__(self, **kwargs):
""":py:class:`~mrjob.dataproc.DataprocJobRunner` takes the same
arguments as
Expand Down
4 changes: 4 additions & 0 deletions mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ class EMRJobRunner(HadoopInTheCloudJobRunner, LogInterpretationMixin):
'visible_to_all_users',
}

# supports everything (so far)
_STEP_TYPES = {
'jar', 'spark', 'spark_jar', 'spark_script', 'streaming'}

# everything that controls instances number, type, or price
_INSTANCE_OPT_NAMES = {
name for name in OPT_NAMES
Expand Down
4 changes: 4 additions & 0 deletions mrjob/hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ class HadoopJobRunner(MRJobBinRunner, LogInterpretationMixin):
'spark_master',
}

# supports everything (so far)
_STEP_TYPES = {
'jar', 'spark', 'spark_jar', 'spark_script', 'streaming'}

def __init__(self, **kwargs):
""":py:class:`~mrjob.hadoop.HadoopJobRunner` takes the same arguments
as :py:class:`~mrjob.runner.MRJobRunner`, plus some additional options
Expand Down
14 changes: 14 additions & 0 deletions mrjob/inline.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ def __init__(self, mrjob_cls=None, **kwargs):
if self._opts['setup']:
log.warning("inline runner can't run setup commands")

def _check_steps(self, steps):
"""Don't try to run steps that include commands."""
super(InlineMRJobRunner, self)._check_steps(steps)

for step_num, step in enumerate(steps):
if step['type'] == 'streaming':
for mrc in ('mapper', 'combiner', 'reducer'):
if step.get(mrc):
if 'command' in step[mrc] or 'pre_filter' in step[mrc]:
raise NotImplementedError(
"step %d's %s runs a command, but inline"
" runner does not support subprocesses (try"
" -r local)" % (step_num, mrc))

def _invoke_task_func(self, task_type, step_num, task_num):
"""Just run tasks in the same process."""
manifest = (step_num == 0 and task_type == 'mapper' and
Expand Down
28 changes: 19 additions & 9 deletions mrjob/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ class MRJobRunner(object):
# command lines to run substeps (including Spark) are handled by
# mrjob.bin.MRJobBinRunner

#: alias for this runner; used for picking section of
#: :py:mod:`mrjob.conf` to load one of ``'local'``, ``'emr'``,
#: or ``'hadoop'``
#: alias for this runner, used on the command line with ``-r``
alias = None

# libjars is only here because the job can set it; might want to
Expand All @@ -107,6 +105,9 @@ class MRJobRunner(object):
'upload_files'
}

# re-define this as a set of step types supported by your runner
_STEP_TYPES = None

# if this is true, when bootstrap_mrjob is true, create a mrjob.zip
# and patch it into the *py_files* option
_BOOTSTRAP_MRJOB_IN_PY_FILES = True
Expand Down Expand Up @@ -818,13 +819,22 @@ def _load_steps(self):
raise NotImplementedError

def _check_steps(self, steps):
"""Raise an exception if there's something wrong with the step
definition."""
"""Look at the step definition (*steps*). If it is not supported by
the runner, raise :py:class:`NotImplementedError`. If it is not
supported by mrjob, raise :py:class:`ValueError`.
"""
if not self._STEP_TYPES:
# use __class__.__name__ because only MRJobRunner would
# trigger this
raise NotImplementedError(
'%s cannot run steps!' % self.__class__.__name__)

for step_num, step in enumerate(steps):
if step['type'] not in STEP_TYPES:
raise ValueError(
'step %d has unexpected step type %r' % (
step_num, step['type']))
if step.get('type') not in self._STEP_TYPES:
raise NotImplementedError(
'step %d has type %r, but %s runner only supports:'
' %s' % (step_num, step.get('type'), self.alias,
', '.join(sorted(self._STEP_TYPES))))

if step.get('input_manifest') and step_num != 0:
raise ValueError(
Expand Down
2 changes: 2 additions & 0 deletions mrjob/sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class SimMRJobRunner(MRJobRunner):
'num_cores'
}

_STEP_TYPES = {'streaming'}

def __init__(self, **kwargs):
super(SimMRJobRunner, self).__init__(**kwargs)

Expand Down
28 changes: 28 additions & 0 deletions tests/mockhadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,34 @@
from mrjob.parse import urlparse
from mrjob.util import shlex_split

from tests.sandbox import SandboxedTestCase


class MockHadoopTestCase(SandboxedTestCase):

def setUp(self):
super(MockHadoopTestCase, self).setUp()
# setup fake hadoop home
hadoop_home = self.makedirs('mock_hadoop_home')
os.environ['HADOOP_HOME'] = hadoop_home
os.environ['MOCK_HADOOP_VERSION'] = "1.2.0"
os.environ['MOCK_HADOOP_TMP'] = self.makedirs('mock_hadoop_tmp')

# make fake hadoop binary
os.mkdir(os.path.join(hadoop_home, 'bin'))
self.hadoop_bin = os.path.join(hadoop_home, 'bin', 'hadoop')
create_mock_hadoop_script(self.hadoop_bin)

# make fake streaming jar
os.makedirs(os.path.join(hadoop_home, 'contrib', 'streaming'))
streaming_jar_path = os.path.join(
hadoop_home, 'contrib', 'streaming', 'hadoop-0.X.Y-streaming.jar')
open(streaming_jar_path, 'w').close()

# make sure the fake hadoop binaries can find mrjob
self.add_mrjob_to_pythonpath()


# layout of $MOCK_HADOOP_TMP:
# cmd.log: single file containing one line per invocation of mock hadoop
# binary, with lines containing space-separated args passed
Expand Down
24 changes: 18 additions & 6 deletions tests/test_bin.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,19 @@
PYTHON_BIN = 'python3'


class ArgsForSparkStepTestCase(SandboxedTestCase):
class AllowSparkOnLocalRunnerTestCase(SandboxedTestCase):

# allow testing spark methods on local runner, even though
# it doesn't (currently) support them (see #1361)

def setUp(self):
super(AllowSparkOnLocalRunnerTestCase, self).setUp()

self.start(patch('mrjob.runner.MRJobRunner._check_steps'))



class ArgsForSparkStepTestCase(AllowSparkOnLocalRunnerTestCase):
# just test the structure of _args_for_spark_step()

def setUp(self):
Expand Down Expand Up @@ -127,7 +139,7 @@ def test_bootstrap_mrjob_overrides_interpreter(self):
self.assertEqual(runner._bootstrap_mrjob(), True)


class GetSparkSubmitBinTestCase(SandboxedTestCase):
class GetSparkSubmitBinTestCase(AllowSparkOnLocalRunnerTestCase):

def test_default(self):
job = MRNullSpark(['-r', 'local'])
Expand Down Expand Up @@ -819,7 +831,7 @@ def test_respects_sh_pre_commands_methd(self):
self.assertEqual(out[:2], ['set -e', 'set -v'])


class PyFilesTestCase(SandboxedTestCase):
class PyFilesTestCase(AllowSparkOnLocalRunnerTestCase):

def test_default(self):
job = MRNullSpark(['-r', 'local'])
Expand Down Expand Up @@ -1003,7 +1015,7 @@ def test_job_can_override_partitioner(self):
'org.apache.hadoop.mapred.lib.HashPartitioner')


class SparkScriptPathTestCase(SandboxedTestCase):
class SparkScriptPathTestCase(AllowSparkOnLocalRunnerTestCase):

def setUp(self):
super(SparkScriptPathTestCase, self).setUp()
Expand Down Expand Up @@ -1059,7 +1071,7 @@ def test_streaming_step_not_okay(self):
runner._spark_script_path, 0)


class SparkScriptArgsTestCase(SandboxedTestCase):
class SparkScriptArgsTestCase(AllowSparkOnLocalRunnerTestCase):

def setUp(self):
super(SparkScriptArgsTestCase, self).setUp()
Expand Down Expand Up @@ -1187,7 +1199,7 @@ def test_streaming_step_not_okay(self):
runner._spark_script_args, 0)


class SparkSubmitArgsTestCase(SandboxedTestCase):
class SparkSubmitArgsTestCase(AllowSparkOnLocalRunnerTestCase):

def setUp(self):
super(SparkSubmitArgsTestCase, self).setUp()
Expand Down
20 changes: 20 additions & 0 deletions tests/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
from tests.mr_jar_and_streaming import MRJarAndStreaming
from tests.mr_just_a_jar import MRJustAJar
from tests.mr_no_mapper import MRNoMapper
from tests.mr_null_spark import MRNullSpark
from tests.mr_spark_script import MRSparkScript
from tests.mr_two_step_job import MRTwoStepJob
from tests.mr_word_count import MRWordCount
from tests.py2 import call
Expand Down Expand Up @@ -2354,3 +2356,21 @@ def test_subnet_on_cmd_line_overrides_network_in_config(self):
'https://www.googleapis.com/compute/v1/projects/%s'
'/us-west1/subnetworks/test' % project_id)
self.assertFalse(gce_config.network_uri)


class UnsupportedStepsTestCase(MockGoogleTestCase):

def test_no_spark_steps(self):
# just a sanity check; _STEP_TYPES is tested in a lot of ways
job = MRNullSpark(['-r', 'dataproc'])
job.sandbox()

self.assertRaises(NotImplementedError, job.make_runner)

def test_no_jar_steps(self):
spark_script_path = self.makefile('chispa.py')

job = MRSparkScript(['-r', 'dataproc', '--script', spark_script_path])
job.sandbox()

self.assertRaises(NotImplementedError, job.make_runner)
27 changes: 1 addition & 26 deletions tests/test_hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@

from tests.mockhadoop import add_mock_hadoop_counters
from tests.mockhadoop import add_mock_hadoop_output
from tests.mockhadoop import create_mock_hadoop_script
from tests.mockhadoop import get_mock_hadoop_cmd_args
from tests.mockhadoop import get_mock_hdfs_root
from tests.mockhadoop import MockHadoopTestCase
from tests.mr_jar_and_streaming import MRJarAndStreaming
from tests.mr_jar_with_generic_args import MRJarWithGenericArgs
from tests.mr_just_a_jar import MRJustAJar
Expand All @@ -62,31 +62,6 @@
pty = Mock()


class MockHadoopTestCase(SandboxedTestCase):

def setUp(self):
super(MockHadoopTestCase, self).setUp()
# setup fake hadoop home
hadoop_home = self.makedirs('mock_hadoop_home')
os.environ['HADOOP_HOME'] = hadoop_home
os.environ['MOCK_HADOOP_VERSION'] = "1.2.0"
os.environ['MOCK_HADOOP_TMP'] = self.makedirs('mock_hadoop_tmp')

# make fake hadoop binary
os.mkdir(os.path.join(hadoop_home, 'bin'))
self.hadoop_bin = os.path.join(hadoop_home, 'bin', 'hadoop')
create_mock_hadoop_script(self.hadoop_bin)

# make fake streaming jar
os.makedirs(os.path.join(hadoop_home, 'contrib', 'streaming'))
streaming_jar_path = os.path.join(
hadoop_home, 'contrib', 'streaming', 'hadoop-0.X.Y-streaming.jar')
open(streaming_jar_path, 'w').close()

# make sure the fake hadoop binaries can find mrjob
self.add_mrjob_to_pythonpath()


class TestFullyQualifyHDFSPath(BasicTestCase):

def test_empty(self):
Expand Down
25 changes: 25 additions & 0 deletions tests/test_inline.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
from mrjob.job import MRJob

from tests.examples.test_mr_phone_to_url import write_conversion_record
from tests.mr_cmd_job import MRCmdJob
from tests.mr_filter_job import MRFilterJob
from tests.mr_null_spark import MRNullSpark
from tests.mr_test_cmdenv import MRTestCmdenv
from tests.mr_two_step_job import MRTwoStepJob
from tests.sandbox import EmptyMrjobConfTestCase
Expand Down Expand Up @@ -222,3 +225,25 @@ def test_regular_job(self):

def test_input_manifest(self):
self._test_reading_from(MRManifestNope, expect_input_path=True)


class UnsupportedStepsTestCase(SandboxedTestCase):

def test_no_command_steps(self):
job = MRCmdJob(['-r', 'inline', '--mapper-cmd', 'cat'])
job.sandbox()

self.assertRaises(NotImplementedError, job.make_runner)

def test_no_pre_filters(self):
job = MRFilterJob(['-r', 'inline', '--mapper-filter', 'grep foo'])
job.sandbox()

self.assertRaises(NotImplementedError, job.make_runner)

def test_no_spark_steps(self):
# just a sanity check; _STEP_TYPES is tested in a lot of ways
job = MRNullSpark(['-r', 'inline'])
job.sandbox()

self.assertRaises(NotImplementedError, job.make_runner)
20 changes: 20 additions & 0 deletions tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
from tests.mr_filter_job import MRFilterJob
from tests.mr_group import MRGroup
from tests.mr_job_where_are_you import MRJobWhereAreYou
from tests.mr_just_a_jar import MRJustAJar
from tests.mr_null_spark import MRNullSpark
from tests.mr_sort_and_group import MRSortAndGroup
from tests.mr_two_step_job import MRTwoStepJob
from tests.mr_word_count import MRWordCount
Expand Down Expand Up @@ -1024,3 +1026,21 @@ def test_setup_cmd(self):
self.EXPECTED_OUTPUT)

self.assertTrue(exists(touched_path))


class UnsupportedStepsTestCase(SandboxedTestCase):

def test_no_spark_steps(self):
# just a sanity check; _STEP_TYPES is tested in a lot of ways
job = MRNullSpark(['-r', 'local'])
job.sandbox()

self.assertRaises(NotImplementedError, job.make_runner)

def test_no_jar_steps(self):
jar_path = self.makefile('dora.jar')

job = MRJustAJar(['-r', 'local', '--jar', jar_path])
job.sandbox()

self.assertRaises(NotImplementedError, job.make_runner)

0 comments on commit 1fbd285

Please sign in to comment.