Skip to content

Commit

Permalink
Merge pull request #976 from davidmarin/emr_action_on_failure
Browse files Browse the repository at this point in the history
--emr-action-on-failure
  • Loading branch information
Zach Musgrave committed Mar 12, 2015
2 parents 98cf82c + 8d4caeb commit dc05321
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 6 deletions.
21 changes: 21 additions & 0 deletions docs/guides/emr-opts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,26 @@ Number and type of instances
Choosing/creating a job flow to join
------------------------------------

.. mrjob-opt::
:config: emr_action_on_failure
:switch: --emr-action-on-failure
:type: :ref:`string <data-type-string>`
:set: emr
:default: (automatic)

What happens if step of your job fails

* ``'CANCEL_AND_WAIT'`` cancels all steps on the job flow
* ``'CONTINUE'`` continues to the next step (useful when submitting several
jobs to the same job flow)
* ``'TERMINATE_CLUSTER'`` shuts down the job flow entirely

The default is ``'CANCEL_AND_WAIT'`` when using pooling (see
:mrjob-opt:`pool_emr_job_flows`) or an existing job flow (see
:mrjob-opt:`emr_job_flow_id`), and ``'TERMINATE_CLUSTER'`` otherwise.

.. versionadded:: 0.4.3

.. mrjob-opt::
:config: emr_job_flow_id
:switch: --emr-job-flow-id
Expand Down Expand Up @@ -513,6 +533,7 @@ Choosing/creating a job flow to join
flow every 30 seconds until this many minutes have passed, then start a new
job flow instead of joining one.


S3 paths and options
--------------------
MRJob uses boto to manipulate/access S3. Older versions of boto prior to 2.25.0
Expand Down
7 changes: 5 additions & 2 deletions mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ class EMRRunnerOptionStore(RunnerOptionStore):
'emr_endpoint',
'emr_job_flow_id',
'emr_job_flow_pool_name',
'emr_action_on_failure',
'enable_emr_debugging',
'hadoop_streaming_jar_on_emr',
'hadoop_version',
Expand Down Expand Up @@ -1296,11 +1297,13 @@ def _job_flow_args(self, persistent=False, steps=None):
@property
def _action_on_failure(self):
# don't terminate other people's job flows
if (self._opts['emr_job_flow_id'] or
if (self._opts['emr_action_on_failure']):
return self._opts['emr_action_on_failure']
elif (self._opts['emr_job_flow_id'] or
self._opts['pool_emr_job_flows']):
return 'CANCEL_AND_WAIT'
else:
return 'TERMINATE_JOB_FLOW'
return 'TERMINATE_CLUSTER'

def _build_steps(self):
"""Return a list of boto Step objects corresponding to the
Expand Down
5 changes: 5 additions & 0 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,11 @@ def add_emr_opts(opt_group):
'--emr-job-flow-id', dest='emr_job_flow_id', default=None,
help='ID of an existing EMR job flow to use'),

opt_group.add_option(
'--emr-action-on-failure', dest='emr_action_on_failure', default=None,
help=('Action to take when a step fails'
' (e.g. TERMINATE_CLUSTER | CANCEL_AND_WAIT | CONTINUE)')),

opt_group.add_option(
'--enable-emr-debugging', dest='enable_emr_debugging',
default=None, action='store_true',
Expand Down
8 changes: 5 additions & 3 deletions tests/mockboto.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def run_jobflow(self,
name, log_uri, ec2_keyname=None, availability_zone=None,
master_instance_type='m1.small',
slave_instance_type='m1.small', num_instances=1,
action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False,
action_on_failure='TERMINATE_CLUSTER', keep_alive=False,
enable_debugging=False,
hadoop_version=None,
steps=None,
Expand Down Expand Up @@ -558,7 +558,7 @@ def make_fake_action(real_action):

if enable_debugging:
debugging_step = JarStep(name='Setup Hadoop Debugging',
action_on_failure='TERMINATE_JOB_FLOW',
action_on_failure='TERMINATE_CLUSTER',
main_class=None,
jar=EmrConnection.DebuggingJar,
step_args=EmrConnection.DebuggingArgs)
Expand Down Expand Up @@ -746,7 +746,9 @@ def simulate_progress(self, jobflow_id, now=None):
reason = self.mock_emr_failures[(jobflow_id, step_num)]
if reason:
job_flow.reason = reason
if step.actiononfailure == 'TERMINATE_JOB_FLOW':
# TERMINATED_JOB_FLOW is the old name for TERMINATE_CLUSTER
if step.actiononfailure in (
'TERMINATE_CLUSTER','TERMINATE_JOB_FLOW'):
job_flow.state = 'SHUTTING_DOWN'
if not reason:
job_flow.reason = 'Shut down as step failed'
Expand Down
31 changes: 31 additions & 0 deletions tests/test_emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3453,3 +3453,34 @@ def test_input_output_interpolation(self):
streaming_input_arg = streaming_args[
streaming_args.index('-input') + 1]
self.assertEqual(jar_output_arg, streaming_input_arg)


class ActionOnFailureTestCase(MockEMRAndS3TestCase):

def test_default(self):
runner = EMRJobRunner()
self.assertEqual(runner._action_on_failure,
'TERMINATE_CLUSTER')

def test_default_with_job_flow_id(self):
runner = EMRJobRunner(emr_job_flow_id='j-JOBFLOW')
self.assertEqual(runner._action_on_failure,
'CANCEL_AND_WAIT')

def test_default_with_pooling(self):
runner = EMRJobRunner(pool_emr_job_flows=True)
self.assertEqual(runner._action_on_failure,
'CANCEL_AND_WAIT')

def test_option(self):
runner = EMRJobRunner(emr_action_on_failure='CONTINUE')
self.assertEqual(runner._action_on_failure,
'CONTINUE')

def test_switch(self):
mr_job = MRWordCount(
['-r', 'emr', '--emr-action-on-failure', 'CONTINUE'])
mr_job.sandbox()

with mr_job.make_runner() as runner:
self.assertEqual(runner._action_on_failure, 'CONTINUE')
2 changes: 1 addition & 1 deletion tests/tools/emr/test_terminate_idle_job_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def step(jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar',
start_hours_ago=None,
end_hours_ago=None,
name='Streaming Step',
action_on_failure='TERMINATE_JOB_FLOW',
action_on_failure='TERMINATE_CLUSTER',
**kwargs):
if create_hours_ago:
kwargs['creationdatetime'] = to_iso8601(
Expand Down

0 comments on commit dc05321

Please sign in to comment.