Skip to content

Commit

Permalink
took step # and total # of steps out of report_long_jobs (looks silly…
Browse files Browse the repository at this point in the history
… alongside bootstrapping)
  • Loading branch information
David Marin committed Feb 14, 2012
1 parent c7e07ce commit f85feb3
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 56 deletions.
63 changes: 39 additions & 24 deletions mrjob/tools/emr/report_long_jobs.py
Expand Up @@ -51,7 +51,10 @@
log = logging.getLogger('mrjob.tools.emr.report_long_jobs')


def main(args):
def main(args, now=None):
if now is None:
now = datetime.utcnow()

option_parser = make_option_parser()
options, args = option_parser.parse_args(args)

Expand All @@ -62,11 +65,12 @@ def main(args):

log.info('getting information about running jobs')
emr_conn = EMRJobRunner(conf_path=options.conf_path).make_emr_conn()
job_flows = describe_all_job_flows(emr_conn, states=['RUNNING'])
job_flows = describe_all_job_flows(
emr_conn, states=['BOOTSTRAPPING', 'RUNNING'])

min_time = timedelta(hours=options.min_hours)

job_info = find_long_running_jobs(job_flows, min_time)
job_info = find_long_running_jobs(job_flows, min_time, now=now)

print_report(job_info)

Expand All @@ -86,31 +90,40 @@ def find_long_running_jobs(job_flows, min_time, now=None):
* *job_flow_id*: the job flow's unique ID (e.g. ``j-SOMEJOBFLOW``)
* *step_name*: name of the step
* *step_num*: which step is currently running or pending. zero-indexed.
* *step_state*: state of the step, either ``'RUNNING'`` or ``'PENDING'``
* *total_steps*: total number of steps in the job flow
* *time*: amount of time step was running or pending, as a
:py:class:`datetime.timedelta`
"""
if now is None:
now = datetime.utcnow()

for jf in job_flows:

# special case for jobs that are taking a long time to bootstrap
if jf.state == 'BOOTSTRAPPING':
start_timestamp = jf.startdatetime
start = datetime.strptime(start_timestamp, boto.utils.ISO8601)

time_running = now - start

if time_running >= min_time:
# we tell bootstrapping info by step_state being empty,
# and only use job_flow_id and time in the report
yield({'job_flow_id': jf.jobflowid,
'step_name': '',
'step_state': '',
'time': time_running})

# the default case: running job flows
if jf.state != 'RUNNING':
continue

total_steps = len(jf.steps)

num_and_running_steps = [(step_num, step)
for (step_num, step) in enumerate(jf.steps)
if step.state == 'RUNNING']
num_and_pending_steps = [(step_num, step)
for (step_num, step) in enumerate(jf.steps)
if step.state == 'PENDING']
running_steps = [step for step in jf.steps if step.state == 'RUNNING']
pending_steps = [step for step in jf.steps if step.state == 'PENDING']

if num_and_running_steps:
if running_steps:
# should be only one, but if not, we should know
for step_num, step in num_and_running_steps:
for step in running_steps:

start_timestamp = step.startdatetime
start = datetime.strptime(start_timestamp, boto.utils.ISO8601)
Expand All @@ -119,15 +132,13 @@ def find_long_running_jobs(job_flows, min_time, now=None):

if time_running >= min_time:
yield({'job_flow_id': jf.jobflowid,
'step_num': step_num,
'total_steps': total_steps,
'step_name': step.name,
'step_state': step.state,
'time': time_running})

# sometimes EMR says it's "RUNNING" but doesn't actually run steps!
elif num_and_pending_steps:
step_num, step = num_and_pending_steps[0]
elif pending_steps:
step = pending_steps[0]

# PENDING job should have run starting when the job flow
# became ready, or the previous step completed
Expand All @@ -141,8 +152,6 @@ def find_long_running_jobs(job_flows, min_time, now=None):

if time_pending >= min_time:
yield({'job_flow_id': jf.jobflowid,
'step_num': step_num,
'total_steps': total_steps,
'step_name': step.name,
'step_state': step.state,
'time': time_pending})
Expand All @@ -154,9 +163,15 @@ def print_report(job_info):
on a single (long) line.
"""
for ji in job_info:
print '%-15s step %3d of %3d: %7s for %17s (%s)' % (
ji['job_flow_id'], ji['step_num'] + 1, ji['total_steps'],
ji['step_state'], format_timedelta(ji['time']), ji['step_name'])
# BOOTSTRAPPING case
if ji['time'] < timedelta(7) or not ji['step_state']:
print '%-15s BOOTSTRAPPING for %17s' % (
ji['job_flow_id'], format_timedelta(ji['time']))
else:
print '%-15s %7s for %17s (%s)' % (
ji['job_flow_id'],
ji['step_state'], format_timedelta(ji['time']),
ji['step_name'])


def format_timedelta(time):
Expand Down
115 changes: 83 additions & 32 deletions tests/tools/emr/test_report_long_jobs.py
Expand Up @@ -29,30 +29,16 @@
import unittest


class ReportLongJobsTestCase(MockEMRAndS3TestCase):

def setUp(self):
super(ReportLongJobsTestCase, self).setUp()
# redirect print statements to self.stdout
self._real_stdout = sys.stdout
self.stdout = StringIO()
sys.stdout = self.stdout

def tearDown(self):
sys.stdout = self._real_stdout
super(ReportLongJobsTestCase, self).tearDown()

def test_with_no_job_flows(self):
main(['-q', '--no-conf']) # just make sure it doesn't crash

def test_with_one_job_flow(self):
emr_conn = EMRJobRunner(conf_path=False).make_emr_conn()
emr_conn.run_jobflow('no name', log_uri=None)
main(['-q', '--no-conf']) # just make sure it doesn't crash


JOB_FLOWS = [
MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-BOOTSTRAPPING',
startdatetime='2010-06-06T00:05:00Z',
state='BOOTSTRAPPING',
steps=[],
),
MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-RUNNING1STEP',
readydatetime='2010-06-06T00:15:00Z',
state='RUNNING',
Expand All @@ -65,6 +51,7 @@ def test_with_one_job_flow(self):
]
),
MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-RUNNING2STEPS',
readydatetime='2010-06-06T00:15:00Z',
state='RUNNING',
Expand All @@ -83,6 +70,7 @@ def test_with_one_job_flow(self):
]
),
MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-RUNNINGANDPENDING',
readydatetime='2010-06-06T00:15:00Z',
state='RUNNING',
Expand All @@ -105,6 +93,7 @@ def test_with_one_job_flow(self):
]
),
MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-PENDING1STEP',
readydatetime='2010-06-06T00:15:00Z',
state='RUNNING',
Expand All @@ -116,6 +105,7 @@ def test_with_one_job_flow(self):
]
),
MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-PENDING2STEPS',
readydatetime='2010-06-06T00:15:00Z',
state='RUNNING',
Expand All @@ -134,6 +124,7 @@ def test_with_one_job_flow(self):
),

MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-COMPLETED',
readydatetime='2010-06-06T00:15:00Z',
state='COMPLETED',
Expand All @@ -150,10 +141,48 @@ def test_with_one_job_flow(self):

JOB_FLOWS_BY_ID = dict((jf.jobflowid, jf) for jf in JOB_FLOWS)


class ReportLongJobsTestCase(MockEMRAndS3TestCase):

def setUp(self):
super(ReportLongJobsTestCase, self).setUp()
# redirect print statements to self.stdout
self._real_stdout = sys.stdout
self.stdout = StringIO()
sys.stdout = self.stdout

def tearDown(self):
sys.stdout = self._real_stdout
super(ReportLongJobsTestCase, self).tearDown()

def test_with_no_job_flows(self):
main(['-q', '--no-conf']) # just make sure it doesn't crash

def test_with_all_job_flows(self):
self.mock_emr_job_flows.update(JOB_FLOWS_BY_ID)
emr_conn = EMRJobRunner(conf_path=False).make_emr_conn()
emr_conn.run_jobflow('no name', log_uri=None)
main(['-q', '--no-conf'])
lines = [line for line in StringIO(self.stdout.getvalue())]
self.assertEqual(len(lines), len(JOB_FLOWS_BY_ID) - 1)


class FindLongRunningJobsTestCase(unittest.TestCase):

maxDiff = None # show whole diff when tests fail

def test_bootstrapping(self):
self.assertEqual(
list(find_long_running_jobs(
[JOB_FLOWS_BY_ID['j-BOOTSTRAPPING']],
min_time=timedelta(hours=1),
now=datetime(2010, 6, 6, 4)
)),
[{'job_flow_id': 'j-BOOTSTRAPPING',
'step_name': '',
'step_state': '',
'time': timedelta(hours=3, minutes=55)}])

def test_running_one_step(self):
self.assertEqual(
list(find_long_running_jobs(
Expand All @@ -163,9 +192,7 @@ def test_running_one_step(self):
)),
[{'job_flow_id': 'j-RUNNING1STEP',
'step_name': 'mr_denial: Step 1 of 5',
'step_num': 0,
'step_state': 'RUNNING',
'total_steps': 1,
'time': timedelta(hours=3, minutes=40)}])

# job hasn't been running for 1 day
Expand All @@ -186,9 +213,7 @@ def test_running_two_steps(self):
)),
[{'job_flow_id': 'j-RUNNING2STEPS',
'step_name': 'mr_anger: Step 2 of 5',
'step_num': 1,
'step_state': 'RUNNING',
'total_steps': 2,
'time': timedelta(hours=3, minutes=30)}])

# job hasn't been running for 1 day
Expand All @@ -209,9 +234,7 @@ def test_running_and_pending(self):
)),
[{'job_flow_id': 'j-RUNNINGANDPENDING',
'step_name': 'mr_anger: Step 2 of 5',
'step_num': 1,
'step_state': 'RUNNING',
'total_steps': 3,
'time': timedelta(hours=3, minutes=30)}])

# job hasn't been running for 1 day
Expand All @@ -232,9 +255,7 @@ def test_pending_one_step(self):
)),
[{'job_flow_id': 'j-PENDING1STEP',
'step_name': 'mr_bargaining: Step 3 of 5',
'step_num': 0,
'step_state': 'PENDING',
'total_steps': 1,
'time': timedelta(hours=3, minutes=45)}])

# job hasn't been running for 1 day
Expand All @@ -255,9 +276,7 @@ def test_pending_two_steps(self):
)),
[{'job_flow_id': 'j-PENDING2STEPS',
'step_name': 'mr_depression: Step 4 of 5',
'step_num': 1,
'step_state': 'PENDING',
'total_steps': 2,
'time': timedelta(hours=3, minutes=25)}])

# job hasn't been running for 1 day
Expand All @@ -278,3 +297,35 @@ def test_completed(self):
)),
[]
)

def test_all_together(self):
self.assertEqual(
list(find_long_running_jobs(
JOB_FLOWS,
min_time=timedelta(hours=1),
now=datetime(2010, 6, 6, 4)
)),
[{'job_flow_id': 'j-BOOTSTRAPPING',
'step_name': '',
'step_state': '',
'time': timedelta(hours=3, minutes=55)},
{'job_flow_id': 'j-RUNNING1STEP',
'step_name': 'mr_denial: Step 1 of 5',
'step_state': 'RUNNING',
'time': timedelta(hours=3, minutes=40)},
{'job_flow_id': 'j-RUNNING2STEPS',
'step_name': 'mr_anger: Step 2 of 5',
'step_state': 'RUNNING',
'time': timedelta(hours=3, minutes=30)},
{'job_flow_id': 'j-RUNNINGANDPENDING',
'step_name': 'mr_anger: Step 2 of 5',
'step_state': 'RUNNING',
'time': timedelta(hours=3, minutes=30)},
{'job_flow_id': 'j-PENDING1STEP',
'step_name': 'mr_bargaining: Step 3 of 5',
'step_state': 'PENDING',
'time': timedelta(hours=3, minutes=45)},
{'job_flow_id': 'j-PENDING2STEPS',
'step_name': 'mr_depression: Step 4 of 5',
'step_state': 'PENDING',
'time': timedelta(hours=3, minutes=25)}])

0 comments on commit f85feb3

Please sign in to comment.