Skip to content

Commit

Permalink
Merge pull request #1712 from davidmarin/get-step-ids
Browse files Browse the repository at this point in the history
expose get_job_steps() (fixes #1625)
  • Loading branch information
David Marin committed Nov 27, 2017
2 parents 89f06b5 + a44025a commit 7af4eeb
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 72 deletions.
1 change: 1 addition & 0 deletions docs/runners-emr.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ EMR Utilities

.. automethod:: EMRJobRunner.get_cluster_id
.. automethod:: EMRJobRunner.get_image_version
.. automethod:: EMRJobRunner.get_job_steps
.. automethod:: EMRJobRunner.make_emr_client

S3 Utilities
Expand Down
1 change: 1 addition & 0 deletions docs/runners-runner.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Run Information

.. automethod:: MRJobRunner.counters
.. automethod:: MRJobRunner.get_hadoop_version
.. automethod:: MRJobRunner.get_job_key

Configuration
-------------
Expand Down
99 changes: 52 additions & 47 deletions mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from collections import defaultdict
from datetime import datetime
from datetime import timedelta
from itertools import islice
from subprocess import Popen
from subprocess import PIPE

Expand Down Expand Up @@ -275,12 +274,6 @@ def _attempt_to_acquire_lock(s3_fs, lock_uri, sync_wait_time, job_key,
return (key_value == job_key.encode('utf_8'))


def _get_reason(cluster_or_step):
"""Get state change reason message."""
# StateChangeReason is {} before the first state change
return cluster_or_step['Status']['StateChangeReason'].get('Message', '')


class EMRJobRunner(HadoopInTheCloudJobRunner, LogInterpretationMixin):
"""Runs an :py:class:`~mrjob.job.MRJob` on Amazon Elastic MapReduce.
Invoked when you run your job with ``-r emr``.
Expand Down Expand Up @@ -1616,41 +1609,26 @@ def _launch_emr_job(self):
if self._ssh_fs and version_gte(self.get_image_version(), '4.3.0'):
self._ssh_fs.use_sudo_over_ssh()

def _job_step_ids(self, max_steps=None):
"""Get the IDs of the steps we submitted for this job
in chronological order, ignoring steps from other jobs.
Generally, you want to set *max_steps*, so we can make as few API
calls as possible.
"""
# yield all steps whose name matches our job key
def yield_step_ids():
emr_client = self.make_emr_client()
for step in _boto3_paginate('Steps', emr_client, 'list_steps',
ClusterId=self._cluster_id):
if step['Name'].startswith(self._job_key):
yield step['Id']

# list_steps() returns steps in reverse chronological order.
# put them in forward chronological order, only keeping the
# last *max_steps* steps.
return list(reversed(list(islice(yield_step_ids(), max_steps))))
def get_job_steps(self):
"""Efficiently fetch the steps for this mrjob run from the EMR API."""
return _get_job_steps(
self.make_emr_client(), self.get_cluster_id(), self.get_job_key())

def _wait_for_steps_to_complete(self):
"""Wait for every step of the job to complete, one by one."""
# get info about expected number of steps
num_steps = len(self._get_steps())

# if there's a master node setup script, we'll treat that as
# step -1
expected_num_steps = num_steps
if self._master_node_setup_script_path:
max_steps = num_steps + 1
else:
max_steps = num_steps
expected_num_steps += 1

step_ids = self._job_step_ids(max_steps=max_steps)
# get info about steps submitted to cluster
steps = self.get_job_steps()

if len(step_ids) < max_steps:
raise AssertionError("Can't find our steps in the cluster!")
if len(steps) < expected_num_steps:
log.warning('Expected to find %d steps on cluster, found %d' %
(expected_num_steps, len(steps)))

# clear out log interpretations if they were filled somehow
self._log_interpretations = []
Expand All @@ -1663,20 +1641,17 @@ def _wait_for_steps_to_complete(self):
self._set_up_ssh_tunnel()

# treat master node setup as step -1
start = 0
if self._master_node_setup_script_path:
start = -1
else:
start = 0

for step_num, step_id in enumerate(step_ids, start=start):
# this will raise an exception if a step fails
if step_num == -1:
log.info(
'Waiting for master node setup step (%s) to complete...' %
step_id)
else:
log.info('Waiting for step %d of %d (%s) to complete...' % (
step_num + 1, num_steps, step_id))
start -= 1

for step_num, step in enumerate(steps, start=start):
step_id = step['Id']
# don't include job_key in logging messages
step_name = step['Name'].split(': ')[-1]

log.info('Waiting for %s (%s) to complete...' %
(step_name, step_id))

self._wait_for_step_to_complete(step_id, step_num, num_steps)

Expand Down Expand Up @@ -3013,6 +2988,36 @@ def _cluster_spark_support_warning(self):
return None


def _get_job_steps(emr_client, cluster_id, job_key):
"""Efficiently fetch steps for a particular mrjob run from the EMR API.
:param emr_client: a boto3 EMR client. See
:py:meth:`~mrjob.emr.EMRJobRunner.make_emr_client`
:param cluster_id: ID of EMR cluster to fetch steps from. See
:py:meth:`~mrjob.emr.EMRJobRunner.get_cluster_id`
:param job_key: Unique key for a mrjob run. See
:py:meth:`~mrjob.runner.MRJobRunner.get_job_key`
"""
steps = []

for step in _boto3_paginate('Steps', emr_client, 'list_steps',
ClusterId=cluster_id):
if step['Name'].startswith(job_key):
steps.append(step)
elif steps:
# all steps for job will be together, so stop
# when we find a non-job step
break

return list(reversed(list(steps)))


def _get_reason(cluster_or_step):
"""Get state change reason message."""
# StateChangeReason is {} before the first state change
return cluster_or_step['Status']['StateChangeReason'].get('Message', '')


def _fix_configuration_opt(c):
"""Return copy of *c* with *Properties* is always set
(defaults to {}) and with *Configurations* is not set if empty.
Expand Down
37 changes: 12 additions & 25 deletions tests/test_emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import tests.mock_boto3.emr
import tests.mock_boto3.s3
from tests.mock_boto3 import MockBoto3TestCase
from tests.mock_boto3.emr import MockEMRClient
from tests.mockssh import mock_ssh_dir
from tests.mockssh import mock_ssh_file
from tests.mr_hadoop_format_job import MRHadoopFormatJob
Expand Down Expand Up @@ -4029,36 +4028,27 @@ def test_combine_command_line_with_conf(self):
HADOOP_ENV_EMR_CONFIGURATION])


class JobStepIdsTestCase(MockBoto3TestCase):

def setUp(self):
super(JobStepIdsTestCase, self).setUp()
self.start(patch.object(MockEMRClient, 'list_steps',
side_effect=MockEMRClient.list_steps,
autospec=True))
class GetJobStepsTestCase(MockBoto3TestCase):

def test_empty(self):
runner = EMRJobRunner()
runner.make_persistent_cluster()

self.assertEqual(runner._job_step_ids(max_steps=0), [])
self.assertEqual(MockEMRClient.list_steps.call_count, 0)
self.assertEqual(runner.get_job_steps(), [])

def test_own_cluster(self):
job = MRTwoStepJob(['-r', 'emr']).sandbox()

with job.make_runner() as runner:
runner._launch()

steps = _list_all_steps(runner)
all_steps = _list_all_steps(runner)

# ensure that steps appear in correct order (see #1316)
self.assertIn('Step 1', steps[0]['Name'])
self.assertIn('Step 2', steps[1]['Name'])
self.assertIn('Step 1', all_steps[0]['Name'])
self.assertIn('Step 2', all_steps[1]['Name'])

job_step_ids = runner._job_step_ids(max_steps=2)
self.assertEqual(job_step_ids,
[steps[0]['Id'], steps[1]['Id']])
self.assertEqual(runner.get_job_steps(), all_steps[:2])

def test_shared_cluster(self):
cluster_id = EMRJobRunner().make_persistent_cluster()
Expand All @@ -4078,19 +4068,16 @@ def add_other_steps(runner, n):
runner._launch()
add_other_steps(runner, 3)

steps = _list_all_steps(runner)
all_steps = _list_all_steps(runner)

# make sure these are our steps, and they are in the right order
# (see #1316)
self.assertIn(runner._job_key, steps[50]['Name'])
self.assertIn('Step 1', steps[50]['Name'])
self.assertIn(runner._job_key, steps[51]['Name'])
self.assertIn('Step 2', steps[51]['Name'])

job_step_ids = runner._job_step_ids(max_steps=2)
self.assertIn(runner._job_key, all_steps[50]['Name'])
self.assertIn('Step 1', all_steps[50]['Name'])
self.assertIn(runner._job_key, all_steps[51]['Name'])
self.assertIn('Step 2', all_steps[51]['Name'])

self.assertEqual(job_step_ids,
[steps[50]['Id'], steps[51]['Id']])
self.assertEqual(runner.get_job_steps(), all_steps[50:52])


class WaitForStepsToCompleteTestCase(MockBoto3TestCase):
Expand Down

0 comments on commit 7af4eeb

Please sign in to comment.