Skip to content

Commit

Permalink
Merge pull request #1717 from davidmarin/pick-task-logs
Browse files Browse the repository at this point in the history
quicker task log picking (fix #1706)
  • Loading branch information
David Marin committed Dec 20, 2017
2 parents a0d1880 + d4e25d9 commit 69f3fb8
Show file tree
Hide file tree
Showing 15 changed files with 473 additions and 109 deletions.
14 changes: 7 additions & 7 deletions mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1995,14 +1995,14 @@ def _stream_history_log_dirs(self, output_dir=None):
# Unlike on 3.x, the history logs *are* available on S3, but they're
# not useful enough to justify the wait when SSH is set up

# if version_gte(self.get_image_version(), '4'):
# # denied access on some 4.x AMIs by the yarn user, see #1244
# dir_name = 'hadoop-mapreduce/history'
# s3_dir_name = 'hadoop-mapreduce/history'
if version_gte(self.get_image_version(), '3'):
if version_gte(self.get_image_version(), '4'):
# on 4.0.0 (and possibly other versions before 4.3.0)
# history logs aren't on the filesystem. See #1253
dir_name = 'hadoop-mapreduce/history'
s3_dir_name = 'hadoop-mapreduce/history'
elif version_gte(self.get_image_version(), '3'):
# on the 3.x AMIs, the history log lives inside HDFS and isn't
# copied to S3. We don't need it anyway; everything relevant
# is in the step log
# copied to S3.
return iter([])
else:
dir_name = 'hadoop/history'
Expand Down
10 changes: 3 additions & 7 deletions mrjob/fs/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,17 +347,13 @@ def _get_s3_key(self, uri):
return self.get_bucket(bucket_name).Object(key_name)

def get_all_bucket_names(self):
"""Get a stream of the names of all buckets owned by this user
"""Get a list of the names of all buckets owned by this user
on S3.
.. versionadded:: 0.6.0
"""
# we don't actually want to return these Bucket objects to
# the user because their client might connect to the wrong region
# endpoint
r = self.make_s3_resource()
for b in r.buckets.all():
yield b.name
c = self.make_s3_client()
return [b['Name'] for b in c.list_buckets()['Buckets']]

def create_bucket(self, bucket_name, region=None):
"""Create a bucket on S3 with a location constraint
Expand Down
53 changes: 46 additions & 7 deletions mrjob/logs/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,77 @@
"""Merging errors, picking the best one, and displaying it."""
import json

from mrjob.util import unique
from .ids import _time_sort_key



def _pick_error(log_interpretation):
"""Pick most recent error from a dictionary possibly containing
step, history, and task interpretations. Returns None if there
are no errors.
"""
errors = _pick_errors(log_interpretation)
if errors:
return errors[0]
else:
return None


def _pick_errors(log_interpretation):
"""Yield all errors from the given log interpretation, sorted
by recency."""
def yield_errors():
for log_type in ('step', 'history', 'task'):
errors = log_interpretation.get(log_type, {}).get('errors')
for error in errors or ():
yield error

errors = _merge_and_sort_errors(yield_errors())
if errors:
return errors[0]
else:
return None
attempt_to_container_id = log_interpretation.get('history', {}).get(
'attempt_to_container_id', {})

return _merge_and_sort_errors(yield_errors(), attempt_to_container_id)

def _merge_and_sort_errors(errors):

def _pick_error_attempt_ids(log_interpretation):
"""Pick error attempt IDs, so we know which task logs to look at."""
errors = _pick_errors(log_interpretation)

errors.sort(key=_is_probably_task_error, reverse=True)

return list(unique(
error['attempt_id'] for error in errors
if error.get('attempt_id')))


def _is_probably_task_error(error):
"""Used to identify task errors."""
return ('subprocess failed' in
error.get('hadoop_error', {}).get('message', ''))


def _merge_and_sort_errors(errors, attempt_to_container_id=None):
"""Merge errors from one or more lists of errors and then return
them, sorted by recency.
We allow None in place of an error list.
"""
attempt_to_container_id = attempt_to_container_id or {}

key_to_error = {}

for error in errors:
key = _time_sort_key(error)
# merge by container_id if we know it
container_id = error.get('container_id') or (
attempt_to_container_id.get(error.get('attempt_id')))

if container_id:
key = ('container_id', container_id)
else:
key = ('time_key', _time_sort_key(error))

key_to_error.setdefault(key, {})
# assume redundant fields will match
key_to_error[key].update(error)

# wrap sort key to prioritize task errors. See #1429
Expand Down
9 changes: 9 additions & 0 deletions mrjob/logs/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ def _parse_yarn_history_log(lines):
This returns a dictionary which may contain the following keys:
attempt_to_container_id: map from attempt_id to container_id (used
to find task logs corresponding to failed attempts)
counters: map from group to counter to amount. If job failed, we sum
counters for succesful tasks
errors: a list of dictionaries with the keys:
Expand Down Expand Up @@ -180,6 +182,13 @@ def _parse_yarn_history_log(lines):
events = [e for e in record['event'].values()
if isinstance(e, dict)]

# update container_id -> attempt_id mapping
for event in events:
if 'attemptId' in event and 'containerId' in event:
result.setdefault('attempt_to_container_id', {})
result['attempt_to_container_id'][
event['attemptId']] = event['containerId']

if record_type.endswith('_ATTEMPT_FAILED'):
for event in events:
err_msg = event.get('error')
Expand Down
15 changes: 5 additions & 10 deletions mrjob/logs/ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def _sort_by_recency(ds):


def _time_sort_key(d):
"""Sort key to sort the dictionaries containing IDs roughly by time
"""Sort key to sort the given dictionaries containing IDs roughly by time
(earliest first).
We consider higher attempt_nums "later" than higher task_nums (of the
Expand Down Expand Up @@ -61,17 +61,10 @@ def _time_sort_key(d):
d.get('application_id') or
_to_job_id(container_id) or '').split('_')

# a container ID like container_1450486922681_0005_01_00000 implies:
# timestamp and step: 1450486922681_0005
# attempt num: 01
# task num: 00000
container_parts = container_id.split('_')

timestamp_and_step = '_'.join(attempt_parts[1:3])
task_type = '_'.join(attempt_parts[3:4])
task_num = '_'.join(attempt_parts[4:5]) or '_'.join(container_parts[-1:])
attempt_num = (
'_'.join(attempt_parts[5:6]) or '_'.join(container_parts[-2:-1]))
task_num = '_'.join(attempt_parts[4:5])
attempt_num = '_'.join(attempt_parts[5:6])

# numbers are 0-padded, so no need to convert anything to int
# also, 'm' (task type in attempt ID) sorts before 'r', which is
Expand All @@ -83,6 +76,8 @@ def _time_sort_key(d):
attempt_num,
task_num)

return sort_key


def _add_implied_task_id(d):
"""If *d* (a dictionary) has *attempt_id* but not *task_id*, add it.
Expand Down
26 changes: 21 additions & 5 deletions mrjob/logs/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from mrjob.compat import uses_yarn
from mrjob.logs.counters import _pick_counters
from mrjob.logs.errors import _pick_error
from mrjob.logs.errors import _pick_error_attempt_ids
from mrjob.logs.history import _interpret_history_log
from mrjob.logs.history import _ls_history_logs
from mrjob.logs.task import _interpret_task_logs
Expand Down Expand Up @@ -118,7 +119,11 @@ def _pick_error(self, log_interpretation, step_type):
log.info('Scanning logs for probable cause of failure...')
self._interpret_step_logs(log_interpretation, step_type)
self._interpret_history_log(log_interpretation)
self._interpret_task_logs(log_interpretation, step_type)

error_attempt_ids = _pick_error_attempt_ids(log_interpretation)

self._interpret_task_logs(
log_interpretation, step_type, error_attempt_ids)

return _pick_error(log_interpretation)

Expand Down Expand Up @@ -163,7 +168,8 @@ def _interpret_step_logs(self, log_interpretation, step_type):
log_interpretation['step'] = step_interpretation

def _interpret_task_logs(
self, log_interpretation, step_type, partial=True):
self, log_interpretation, step_type, error_attempt_ids=(),
partial=True):
"""Fetch task syslogs and stderr, and add 'task' to interpretation."""
if 'task' in log_interpretation and (
partial or not log_interpretation['task'].get('partial')):
Expand All @@ -177,6 +183,9 @@ def _interpret_task_logs(

yarn = uses_yarn(self.get_hadoop_version())

attempt_to_container_id = log_interpretation.get('history', {}).get(
'attempt_to_container_id', {})

if yarn:
if not application_id:
if not log_interpretation.get('no_job'):
Expand All @@ -200,12 +209,16 @@ def _interpret_task_logs(
step_type,
application_id=application_id,
job_id=job_id,
output_dir=output_dir),
output_dir=output_dir,
error_attempt_ids=error_attempt_ids,
attempt_to_container_id=attempt_to_container_id,
),
partial=partial,
log_callback=_log_parsing_task_log)

def _ls_task_logs(self, step_type,
application_id=None, job_id=None, output_dir=None):
application_id=None, job_id=None, output_dir=None,
error_attempt_ids=None, attempt_to_container_id=None):
"""Yield task log matches."""
if _is_spark_step_type(step_type):
ls_func = _ls_spark_task_logs
Expand All @@ -218,5 +231,8 @@ def _ls_task_logs(self, step_type,
self._stream_task_log_dirs(
application_id=application_id, output_dir=output_dir),
application_id=application_id,
job_id=job_id):
job_id=job_id,
error_attempt_ids=error_attempt_ids,
attempt_to_container_id=attempt_to_container_id,
):
yield match

0 comments on commit 69f3fb8

Please sign in to comment.