Skip to content

Commit

Permalink
Merge pull request #1711 from davidmarin/faster-log-parsing
Browse files Browse the repository at this point in the history
mrjob diagnose tool, fix _cat_log() (fixes #1707, fixes #1708)
  • Loading branch information
David Marin committed Nov 27, 2017
2 parents 1841a83 + fbffa06 commit 89f06b5
Show file tree
Hide file tree
Showing 15 changed files with 447 additions and 58 deletions.
6 changes: 6 additions & 0 deletions docs/guides/cmd.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ create-cluster

.. automodule:: mrjob.tools.emr.create_cluster


diagnose
^^^^^^^^

.. automodule:: mrjob.tools.diagnose

.. _report-long-jobs:

report-long-jobs
Expand Down
6 changes: 6 additions & 0 deletions mrjob/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ def _create_cluster(args):
main(args)


@_command('diagnose', 'Diagnose cause of job failure')
def _diagnose(args):
from mrjob.tools.diagnose import main
main(args)


@_command('boss', 'Run a command on every node of a cluster.')
def _mrboss(args):
from mrjob.tools.emr.mrboss import main
Expand Down
2 changes: 1 addition & 1 deletion mrjob/logs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
Pull important information from a log file. This generally follows the same
format as _interpret_<type>_logs(), above.
Log lines are always strings (see mrjob.logs.wrap._cat_log()).
Log lines are always strings (see mrjob.logs.wrap._cat_log_lines()).
_parse_*_log() methods generally return a part of the _interpret_*_logs()
format, but are *not* responsible for including implied job/task IDs.
Expand Down
4 changes: 2 additions & 2 deletions mrjob/logs/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import re

from .task import _parse_task_stderr
from .wrap import _cat_log
from .wrap import _cat_log_lines
from .wrap import _ls_logs

# match cause of failure when there's a problem with bootstrap script. Example:
Expand Down Expand Up @@ -121,7 +121,7 @@ def _interpret_emr_bootstrap_stderr(fs, matches, partial=True):
for match in matches:
stderr_path = match['path']

task_error = _parse_task_stderr(_cat_log(fs, stderr_path))
task_error = _parse_task_stderr(_cat_log_lines(fs, stderr_path))
if task_error:
task_error = dict(task_error) # make a copy
task_error['path'] = stderr_path
Expand Down
6 changes: 3 additions & 3 deletions mrjob/logs/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from .counters import _sum_counters
from .ids import _add_implied_task_id
from .wrap import _ls_logs
from .wrap import _cat_log
from .wrap import _cat_log_lines


log = getLogger(__name__)
Expand Down Expand Up @@ -124,9 +124,9 @@ def _interpret_history_log(fs, matches):

if match['yarn']:
# not yet implemented
result = _parse_yarn_history_log(_cat_log(fs, path))
result = _parse_yarn_history_log(_cat_log_lines(fs, path))
else:
result = _parse_pre_yarn_history_log(_cat_log(fs, path))
result = _parse_pre_yarn_history_log(_cat_log_lines(fs, path))

# patch path, task_id, etc. into errors
for error in result.get('errors') or ():
Expand Down
6 changes: 3 additions & 3 deletions mrjob/logs/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .ids import _add_implied_task_id
from .log4j import _parse_hadoop_log4j_records
from .task import _parse_task_stderr
from .wrap import _cat_log
from .wrap import _cat_log_lines
from .wrap import _ls_logs


Expand Down Expand Up @@ -154,7 +154,7 @@ def _interpret_emr_step_syslog(fs, matches):
for match in matches:
path = match['path']

interpretation = _parse_step_syslog(_cat_log(fs, path))
interpretation = _parse_step_syslog(_cat_log_lines(fs, path))

result.update(interpretation)
for error in result.get('errors') or ():
Expand All @@ -177,7 +177,7 @@ def _interpret_emr_step_stderr(fs, matches):
for match in matches:
path = match['path']

error = _parse_task_stderr(_cat_log(fs, path))
error = _parse_task_stderr(_cat_log_lines(fs, path))

if error:
error['path'] = path
Expand Down
10 changes: 5 additions & 5 deletions mrjob/logs/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .ids import _add_implied_task_id
from .ids import _to_job_id
from .log4j import _parse_hadoop_log4j_records
from .wrap import _cat_log
from .wrap import _cat_log_lines
from .wrap import _ls_logs
from mrjob import parse

Expand Down Expand Up @@ -235,7 +235,7 @@ def _interpret_task_logs(fs, matches, partial=True, log_callback=None):
if stderr_path:
if log_callback:
log_callback(stderr_path)
task_error = _parse_task_stderr(_cat_log(fs, stderr_path))
task_error = _parse_task_stderr(_cat_log_lines(fs, stderr_path))

if task_error:
task_error['path'] = stderr_path
Expand All @@ -249,7 +249,7 @@ def _interpret_task_logs(fs, matches, partial=True, log_callback=None):

if log_callback:
log_callback(syslog_path)
syslog_error = _parse_task_syslog(_cat_log(fs, syslog_path))
syslog_error = _parse_task_syslog(_cat_log_lines(fs, syslog_path))
syslogs_parsed.add(syslog_path)

if not syslog_error.get('hadoop_error'):
Expand Down Expand Up @@ -319,7 +319,7 @@ def _interpret_spark_task_logs(fs, matches, partial=True, log_callback=None):
if log_callback:
log_callback(stderr_path)
# stderr is Spark's syslog
stderr_error = _parse_task_syslog(_cat_log(fs, stderr_path))
stderr_error = _parse_task_syslog(_cat_log_lines(fs, stderr_path))

if stderr_error.get('hadoop_error'):
stderr_error['hadoop_error']['path'] = stderr_path
Expand All @@ -334,7 +334,7 @@ def _interpret_spark_task_logs(fs, matches, partial=True, log_callback=None):
if log_callback:
log_callback(stdout_path)
# the stderr of the application master ends up in "stdout"
task_error = _parse_task_stderr(_cat_log(fs, stdout_path))
task_error = _parse_task_stderr(_cat_log_lines(fs, stdout_path))

if task_error:
task_error['path'] = stdout_path
Expand Down
11 changes: 7 additions & 4 deletions mrjob/logs/wrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@
from logging import getLogger

from mrjob.py2 import to_unicode
from mrjob.util import to_lines

from .ids import _sort_by_recency

log = getLogger(__name__)


def _cat_log(fs, path):
"""fs.cat() the given log, converting lines to strings, and logging
errors."""
def _cat_log_lines(fs, path):
"""Yield lines from the given log.
Log errors rather than raising them.
"""
try:
if not fs.exists(path):
return
for line in fs.cat(path):
for line in to_lines(fs.cat(path)):
yield to_unicode(line)
except (IOError, OSError) as e:
log.warning("couldn't cat() %s: %r" % (path, e))
Expand Down
166 changes: 166 additions & 0 deletions mrjob/tools/diagnose.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Copyright 2017 Yelp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Print probable cause of error for a failed step.
Currently this only works on EMR.
Usage::
mrjob diagnose [opts] j-CLUSTERID
Options::
-c CONF_PATHS, --conf-path CONF_PATHS
Path to alternate mrjob.conf file to read from
--no-conf Don't load mrjob.conf even if it's available
--emr-endpoint EMR_ENDPOINT
Force mrjob to connect to EMR on this endpoint (e.g.
us-west-1.elasticmapreduce.amazonaws.com). Default is
to infer this from region.
-h, --help show this help message and exit
-q, --quiet Don't print anything to stderr
--region REGION GCE/AWS region to run Dataproc/EMR jobs in.
--s3-endpoint S3_ENDPOINT
Force mrjob to connect to S3 on this endpoint (e.g. s3
-us-west-1.amazonaws.com). You usually shouldn't set
this; by default mrjob will choose the correct
endpoint for each S3 bucket based on its location.
--step-id STEP_ID ID of a particular failed step to diagnose
-v, --verbose print more messages to stderr
"""
from argparse import ArgumentParser
from logging import getLogger

from mrjob.aws import _boto3_paginate
from mrjob.emr import _EMR_SPARK_ARGS
from mrjob.emr import EMRJobRunner
from mrjob.job import MRJob
from mrjob.logs.errors import _format_error
from mrjob.options import _add_basic_args
from mrjob.options import _add_runner_args
from mrjob.options import _alphabetize_actions
from mrjob.options import _filter_by_role

log = getLogger(__name__)


def main(cl_args=None):
arg_parser = _make_arg_parser()
options = arg_parser.parse_args(cl_args)

MRJob.set_up_logging(quiet=options.quiet, verbose=options.verbose)

runner_kwargs = {k:v for k, v in options.__dict__.items()
if k not in ('quiet', 'verbose', 'step_id')}

runner = EMRJobRunner(**runner_kwargs)
emr_client = runner.make_emr_client()

# pick step
step = _get_step(emr_client, options.cluster_id, options.step_id)

if not step:
raise SystemExit(1)

if step['Status']['State'] != 'FAILED':
log.warning('step %s has state %s, not FAILED' %
(step['Id'], step['Status']['State']))

# interpret logs
log.info('Diagnosing step %s (%s)' % (step['Id'], step['Name']))

log_interpretation = dict(step_id=step['Id'])

step_type = _infer_step_type(step)

error = runner._pick_error(log_interpretation, step_type)

# print error
if error:
log.error('Probable cause of failure:\n\n%s\n\n' %
_format_error(error))
else:
log.warning('No error detected')


def _get_step(emr_client, cluster_id, step_id=None):

# just iterate backwards through steps, rather than filtering
# by step ID or status. usually it'll be the last step anyhow

for step in _boto3_paginate('Steps', emr_client, 'list_steps',
ClusterId=cluster_id):

if _step_matches(step, step_id=step_id):
return step
else:
if step_id:
log.error('step %s not found on cluster %s' %
(step_id, cluster_id))
else:
log.error('cluster %s has no failed steps' % cluster_id)


def _step_matches(step, step_id=None):
if not step_id:
return step['Status']['State'] == 'FAILED'
else:
return step['Id'] == step_id


def _infer_step_type(step):
args = step['Config']['Args']

# all that matters for log parsing is picking out Spark steps
# (doesn't matter if it's spark or spark_jar or spark_script)
#
# and of course we don't know the logging habits of jar steps,
# so we might as well use streaming's logic
for i in range(len(_EMR_SPARK_ARGS)):
if list(args[i:i + len(_EMR_SPARK_ARGS)]) == _EMR_SPARK_ARGS:
return 'spark'
else:
return 'streaming'

# every spark step on EMR must include these args
return any(args[i:i+len(_EMR_SPARK_ARGS)] == _EMR_SPARK_ARGS
for i in range(len(_EMR_SPARK_ARGS)))


def _make_arg_parser():
usage = '%(prog)s [opts] [--step-id STEP_ID] CLUSTER_ID'
description = (
'Get probable cause of failure for step on CLUSTER_ID.'
' By default we look at the last failed step')
arg_parser = ArgumentParser(usage=usage, description=description)

_add_basic_args(arg_parser)
_add_runner_args(
arg_parser,
_filter_by_role(EMRJobRunner.OPT_NAMES, 'connect'))

arg_parser.add_argument(
dest='cluster_id',
help='ID of cluster with failed step')
arg_parser.add_argument(
'--step-id', dest='step_id',
help='ID of a particular failed step to diagnose')

_alphabetize_actions(arg_parser)

return arg_parser


if __name__ == '__main__':
main()
6 changes: 3 additions & 3 deletions tests/logs/test_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def setUp(self):
patch('mrjob.logs.bootstrap._parse_task_stderr',
return_value=dict(message='BOOM!\n')))

self.mock_cat_log = self.start(patch('mrjob.logs.bootstrap._cat_log'))
self.mock_cat_log_lines = self.start(patch('mrjob.logs.bootstrap._cat_log_lines'))

def interpret_bootstrap_stderr(self, matches, **kwargs):
"""Wrap _interpret_emr_bootstrap_stderr(), since fs doesn't matter"""
Expand Down Expand Up @@ -198,7 +198,7 @@ def test_ignore_multiple_matches(self):
)
)

self.mock_cat_log.called_once_with(
self.mock_cat_log_lines.called_once_with(
self.mock_fs, ('s3://bucket/tmp/logs/j-1EE0CL1O7FDXU/node/'
'i-b659f519/bootstrap-actions/1/stderr.gz'))

Expand Down Expand Up @@ -285,4 +285,4 @@ def test_skip_blank_log(self):
)
)

self.assertEqual(self.mock_cat_log.call_count, 2)
self.assertEqual(self.mock_cat_log_lines.call_count, 2)
8 changes: 4 additions & 4 deletions tests/logs/test_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def setUp(self):
patch('mrjob.logs.history._parse_pre_yarn_history_log',
return_value=mock_return_value))

self.mock_cat_log = self.start(patch('mrjob.logs.history._cat_log'))
self.mock_cat_log_lines = self.start(patch('mrjob.logs.history._cat_log_lines'))

def interpret_history_log(self, matches):
"""Wrap _interpret_history_log(), since fs doesn't matter."""
Expand All @@ -137,7 +137,7 @@ def test_pre_yarn(self):
[dict(path='/path/to/pre-yarn-history.jar', yarn=False)]),
self.mock_parse_pre_yarn_history_log.return_value)

self.mock_cat_log.called_once_with(
self.mock_cat_log_lines.called_once_with(
self.mock_fs, '/path/to/pre-yarn-history.jar')

self.assertEqual(self.mock_parse_pre_yarn_history_log.call_count, 1)
Expand All @@ -148,7 +148,7 @@ def test_yarn(self):
[dict(path='/path/to/yarn-history.jhist', yarn=True)]),
self.mock_parse_yarn_history_log.return_value)

self.mock_cat_log.called_once_with(
self.mock_cat_log_lines.called_once_with(
self.mock_fs, '/path/to/yarn-history.jhist')

self.assertEqual(self.mock_parse_yarn_history_log.call_count, 1)
Expand All @@ -163,7 +163,7 @@ def test_ignore_multiple_matches(self):
dict(path='/path/to/yarn-history-2.jhist', yarn=True)]),
self.mock_parse_yarn_history_log.return_value)

self.mock_cat_log.called_once_with(
self.mock_cat_log_lines.called_once_with(
self.mock_fs, '/path/to/yarn-history-1.jhist')

self.assertEqual(self.mock_parse_yarn_history_log.call_count, 1)
Expand Down

0 comments on commit 89f06b5

Please sign in to comment.