Skip to content

Commit

Permalink
Merge pull request Yelp#372 from davidmarin/long_job_report_tweaks
Browse files Browse the repository at this point in the history
Long job report tweaks
  • Loading branch information
irskep committed Feb 21, 2012
2 parents 4aabb65 + b209bbe commit dc29a1b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 29 deletions.
17 changes: 9 additions & 8 deletions mrjob/tools/emr/report_long_jobs.py
Expand Up @@ -89,7 +89,7 @@ def find_long_running_jobs(job_flows, min_time, now=None):
a dictionary with the following keys:
* *job_flow_id*: the job flow's unique ID (e.g. ``j-SOMEJOBFLOW``)
* *step_name*: name of the step
* *name*: name of the step, or the job flow when bootstrapping
* *step_state*: state of the step, either ``'RUNNING'`` or ``'PENDING'``
* *time*: amount of time step was running or pending, as a
:py:class:`datetime.timedelta`
Expand All @@ -110,7 +110,7 @@ def find_long_running_jobs(job_flows, min_time, now=None):
# we tell bootstrapping info by step_state being empty,
# and only use job_flow_id and time in the report
yield({'job_flow_id': jf.jobflowid,
'step_name': '',
'name': jf.name,
'step_state': '',
'time': time_running})

Expand All @@ -132,7 +132,7 @@ def find_long_running_jobs(job_flows, min_time, now=None):

if time_running >= min_time:
yield({'job_flow_id': jf.jobflowid,
'step_name': step.name,
'name': step.name,
'step_state': step.state,
'time': time_running})

Expand All @@ -152,7 +152,7 @@ def find_long_running_jobs(job_flows, min_time, now=None):

if time_pending >= min_time:
yield({'job_flow_id': jf.jobflowid,
'step_name': step.name,
'name': step.name,
'step_state': step.state,
'time': time_pending})

Expand All @@ -164,14 +164,15 @@ def print_report(job_info):
"""
for ji in job_info:
# BOOTSTRAPPING case
if ji['time'] < timedelta(7) or not ji['step_state']:
print '%-15s BOOTSTRAPPING for %17s' % (
ji['job_flow_id'], format_timedelta(ji['time']))
if not ji['step_state']:
print '%-15s BOOTSTRAPPING for %17s (%s)' % (
ji['job_flow_id'], format_timedelta(ji['time']),
ji['name'])
else:
print '%-15s %7s for %17s (%s)' % (
ji['job_flow_id'],
ji['step_state'], format_timedelta(ji['time']),
ji['step_name'])
ji['name'])


def format_timedelta(time):
Expand Down
17 changes: 9 additions & 8 deletions mrjob/tools/emr/terminate_idle_job_flows.py
Expand Up @@ -186,8 +186,8 @@ def inspect_and_maybe_terminate_job_flows(
if (pool_name is not None and pool != pool_name):
continue

to_terminate.append(
(jf.jobflowid, jf.name, time_idle, time_to_end_of_hour))
to_terminate.append((jf.jobflowid, jf.name, pending,
time_idle, time_to_end_of_hour))

log.info(
'Job flow statuses: %d bootstrapping, %d running, %d pending, %d idle,'
Expand Down Expand Up @@ -295,13 +295,14 @@ def terminate_and_notify(emr_conn, to_terminate, dry_run=False):
if not to_terminate:
return

for job_flow_id, name, time_idle, time_to_end_of_hour in to_terminate:
for id, name, pending, time_idle, time_to_end_of_hour in to_terminate:
if not dry_run:
emr_conn.terminate_jobflow(job_flow_id)
print ('Terminated job flow %s (%s); was idle for %s,'
' %s to end of hour' %
(job_flow_id, name, strip_microseconds(time_idle),
strip_microseconds(time_to_end_of_hour)))
emr_conn.terminate_jobflow(id)
print ('Terminated job flow %s (%s); was %s for %s, %s to end of hour'
% (id, name,
'pending' if pending else 'idle',
strip_microseconds(time_idle),
strip_microseconds(time_to_end_of_hour)))


def make_option_parser():
Expand Down
32 changes: 19 additions & 13 deletions tests/tools/emr/test_report_long_jobs.py
Expand Up @@ -34,13 +34,15 @@
MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-BOOTSTRAPPING',
name='mr_grieving',
startdatetime='2010-06-06T00:05:00Z',
state='BOOTSTRAPPING',
steps=[],
),
MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-RUNNING1STEP',
name='mr_grieving',
readydatetime='2010-06-06T00:15:00Z',
state='RUNNING',
steps=[
Expand All @@ -54,6 +56,7 @@
MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-RUNNING2STEPS',
name='mr_grieving',
readydatetime='2010-06-06T00:15:00Z',
state='RUNNING',
steps=[
Expand All @@ -73,6 +76,7 @@
MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-RUNNINGANDPENDING',
name='mr_grieving',
readydatetime='2010-06-06T00:15:00Z',
state='RUNNING',
steps=[
Expand All @@ -96,6 +100,7 @@
MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-PENDING1STEP',
name='mr_grieving',
readydatetime='2010-06-06T00:15:00Z',
state='RUNNING',
steps=[
Expand All @@ -108,6 +113,7 @@
MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-PENDING2STEPS',
name='mr_grieving',
readydatetime='2010-06-06T00:15:00Z',
state='RUNNING',
steps=[
Expand All @@ -123,10 +129,10 @@
),
]
),

MockEmrObject(
creationdatetime='2010-06-06T00:00:00Z',
jobflowid='j-COMPLETED',
name='mr_grieving',
readydatetime='2010-06-06T00:15:00Z',
state='COMPLETED',
steps=[
Expand Down Expand Up @@ -180,7 +186,7 @@ def test_bootstrapping(self):
now=datetime(2010, 6, 6, 4)
)),
[{'job_flow_id': 'j-BOOTSTRAPPING',
'step_name': '',
'name': 'mr_grieving',
'step_state': '',
'time': timedelta(hours=3, minutes=55)}])

Expand All @@ -192,7 +198,7 @@ def test_running_one_step(self):
now=datetime(2010, 6, 6, 4)
)),
[{'job_flow_id': 'j-RUNNING1STEP',
'step_name': 'mr_denial: Step 1 of 5',
'name': 'mr_denial: Step 1 of 5',
'step_state': 'RUNNING',
'time': timedelta(hours=3, minutes=40)}])

Expand All @@ -213,7 +219,7 @@ def test_running_two_steps(self):
now=datetime(2010, 6, 6, 4)
)),
[{'job_flow_id': 'j-RUNNING2STEPS',
'step_name': 'mr_anger: Step 2 of 5',
'name': 'mr_anger: Step 2 of 5',
'step_state': 'RUNNING',
'time': timedelta(hours=3, minutes=30)}])

Expand All @@ -234,7 +240,7 @@ def test_running_and_pending(self):
now=datetime(2010, 6, 6, 4)
)),
[{'job_flow_id': 'j-RUNNINGANDPENDING',
'step_name': 'mr_anger: Step 2 of 5',
'name': 'mr_anger: Step 2 of 5',
'step_state': 'RUNNING',
'time': timedelta(hours=3, minutes=30)}])

Expand All @@ -255,7 +261,7 @@ def test_pending_one_step(self):
now=datetime(2010, 6, 6, 4)
)),
[{'job_flow_id': 'j-PENDING1STEP',
'step_name': 'mr_bargaining: Step 3 of 5',
'name': 'mr_bargaining: Step 3 of 5',
'step_state': 'PENDING',
'time': timedelta(hours=3, minutes=45)}])

Expand All @@ -276,7 +282,7 @@ def test_pending_two_steps(self):
now=datetime(2010, 6, 6, 4)
)),
[{'job_flow_id': 'j-PENDING2STEPS',
'step_name': 'mr_depression: Step 4 of 5',
'name': 'mr_depression: Step 4 of 5',
'step_state': 'PENDING',
'time': timedelta(hours=3, minutes=25)}])

Expand Down Expand Up @@ -307,26 +313,26 @@ def test_all_together(self):
now=datetime(2010, 6, 6, 4)
)),
[{'job_flow_id': 'j-BOOTSTRAPPING',
'step_name': '',
'name': 'mr_grieving',
'step_state': '',
'time': timedelta(hours=3, minutes=55)},
{'job_flow_id': 'j-RUNNING1STEP',
'step_name': 'mr_denial: Step 1 of 5',
'name': 'mr_denial: Step 1 of 5',
'step_state': 'RUNNING',
'time': timedelta(hours=3, minutes=40)},
{'job_flow_id': 'j-RUNNING2STEPS',
'step_name': 'mr_anger: Step 2 of 5',
'name': 'mr_anger: Step 2 of 5',
'step_state': 'RUNNING',
'time': timedelta(hours=3, minutes=30)},
{'job_flow_id': 'j-RUNNINGANDPENDING',
'step_name': 'mr_anger: Step 2 of 5',
'name': 'mr_anger: Step 2 of 5',
'step_state': 'RUNNING',
'time': timedelta(hours=3, minutes=30)},
{'job_flow_id': 'j-PENDING1STEP',
'step_name': 'mr_bargaining: Step 3 of 5',
'name': 'mr_bargaining: Step 3 of 5',
'step_state': 'PENDING',
'time': timedelta(hours=3, minutes=45)},
{'job_flow_id': 'j-PENDING2STEPS',
'step_name': 'mr_depression: Step 4 of 5',
'name': 'mr_depression: Step 4 of 5',
'step_state': 'PENDING',
'time': timedelta(hours=3, minutes=25)}])

0 comments on commit dc29a1b

Please sign in to comment.