Skip to content

Commit

Permalink
Merge pull request #1766 from davidmarin/google-diagnose
Browse files Browse the repository at this point in the history
probable cause of error on Dataproc (fixes #1672)
  • Loading branch information
David Marin committed May 4, 2018
2 parents f141795 + 7f2a4d1 commit de79472
Show file tree
Hide file tree
Showing 8 changed files with 826 additions and 28 deletions.
300 changes: 286 additions & 14 deletions mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import google.auth
import google.cloud.dataproc_v1
import google.cloud.dataproc_v1.types
import google.cloud.logging
import google.api_core.exceptions
import google.api_core.grpc_helpers
except:
Expand All @@ -41,7 +42,10 @@
from mrjob.fs.gcs import parse_gcs_uri
from mrjob.fs.local import LocalFilesystem
from mrjob.logs.counters import _pick_counters
from mrjob.logs.errors import _format_error
from mrjob.logs.mixin import LogInterpretationMixin
from mrjob.logs.task import _parse_task_stderr
from mrjob.logs.task import _parse_task_syslog_records
from mrjob.logs.step import _interpret_new_dataproc_step_stderr
from mrjob.py2 import PY2
from mrjob.py2 import to_unicode
Expand Down Expand Up @@ -121,6 +125,22 @@
)


# used to match log entries that tell us if a container exited
_CONTAINER_EXECUTOR_CLASS_NAME = (
'org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor')

# used to determine which containers exited with nonzero status
_CONTAINER_EXIT_RE = re.compile(
r'Exit code from container (?P<container_id>\w+)'
r' is ?: (?P<returncode>\d+)')

_TRACEBACK_EXCEPTION_RE = re.compile('\w+: .*$')

_STDERR_LOG4J_WARNING = re.compile(
r'.*(No appenders could be found for logger'
r'|Please initialize the log4j system'
r'|See http://logging.apache.org/log4j)')

# convert enum values to strings (e.g. 'RUNNING')

def _cluster_state_name(state_value):
Expand Down Expand Up @@ -358,6 +378,11 @@ def job_client(self):
return google.cloud.dataproc_v1.JobControllerClient(
**self._client_create_kwargs())

@property
def logging_client(self):
return google.cloud.logging.Client(credentials=self._credentials,
project=self._project_id)

def _client_create_kwargs(self):
if self._opts['region']:
endpoint = '%s-%s' % (self._opts['region'], _DEFAULT_ENDPOINT)
Expand Down Expand Up @@ -755,8 +780,7 @@ def _launch_step(self, step_num):

return job_id

def _wait_for_step_to_complete(
self, job_id, step_num=None, num_steps=None):
def _wait_for_step_to_complete(self, job_id, step_num, num_steps):
"""Helper for _wait_for_step_to_complete(). Wait for
step with the given ID to complete, and fetch counters.
If it fails, attempt to diagnose the error, and raise an
Expand All @@ -767,26 +791,25 @@ def _wait_for_step_to_complete(
log_interpretation = dict(job_id=job_id)
self._log_interpretations.append(log_interpretation)

step_interpretation = {}
log_interpretation['step'] = step_interpretation
log_interpretation['step'] = {}
step_type = self._get_step(step_num)['type']

while True:
# https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs#JobStatus # noqa
job = self._get_job(job_id)

job_state = job.status.State.Name(job.status.state)

driver_output_uri = job.driver_output_resource_uri

log.info('%s => %s' % (job_id, job_state))

# interpret driver output so far
if driver_output_uri:
self._update_step_interpretation(step_interpretation,
driver_output_uri)
log_interpretation['step']['driver_output_uri'] = (
job.driver_output_resource_uri)

self._interpret_step_logs(log_interpretation, step_type)

if step_interpretation.get('progress'):
log.info(' ' + step_interpretation['progress']['message'])
progress = log_interpretation['step'].get('progress')
if progress:
log.info(' ' + progress['message'])

# https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs#State # noqa
# these are the states covered by the ACTIVE job state matcher,
Expand All @@ -800,6 +823,12 @@ def _wait_for_step_to_complete(
if job_state != 'CANCELLED':
self._log_counters(log_interpretation, step_num)

if job_state == 'ERROR':
error = self._pick_error(log_interpretation, step_type)
if error:
log.error('Probable cause of failure:\n\n%s\n\n' %
_format_error(error))

# we're done, will return at the end of this
if job_state == 'DONE':
break
Expand All @@ -811,6 +840,23 @@ def _default_step_output_dir(self):
# put intermediate data in HDFS
return 'hdfs:///tmp/mrjob/%s/step-output' % self._job_key

### log intepretation ###

# step

def _interpret_step_logs(self, log_interpretation, step_type):
"""Hook for interpreting step logs.
Unlike with most runners, you may call this multiple times and it
will continue to parse the step log incrementally, which is useful
for getting job progress."""
driver_output_uri = log_interpretation.get(
'step', {}).get('driver_output_uri')

if driver_output_uri:
self._update_step_interpretation(
log_interpretation['step'], driver_output_uri)

def _update_step_interpretation(
self, step_interpretation, driver_output_uri):
new_lines = self._get_new_driver_output_lines(driver_output_uri)
Expand Down Expand Up @@ -868,9 +914,164 @@ def _get_new_driver_output_lines(self, driver_output_uri):

return lines

# history

def _interpret_history_log(self, log_interpretation):
"""Does nothing. We can't get the history logs, and we don't need
them."""
log_interpretation.setdefault('history', {})

# task

def _interpret_task_logs(self, log_interpretation, step_type,
error_attempt_ids=(), partial=True):
"""Scan node manager log to find failed container IDs of failed
tasks, and then scan the corresponding stderr and syslogs."""
if 'task' in log_interpretation and (
partial or not log_interpretation['task'].get('partial')):
return # already interpreted

step_interpretation = log_interpretation.get('step') or {}

application_id = step_interpretation.get('application_id')
if not application_id:
log.warning(
"Can't parse node manager logs; missing application ID")
return

log_interpretation['task'] = self._task_log_interpretation(
application_id, step_type, partial)

def _task_log_interpretation(
self, application_id, step_type, partial=True):
"""Helper for :py:meth:`_interpret_task_logs`"""
result = {}

for container_id in self._failed_task_container_ids(application_id):
error = _parse_task_syslog_records(
self._task_syslog_records(
application_id, container_id, step_type))

if not error.get('hadoop_error'):
# not sure if this ever happens, since we already know
# which containers failed
continue

error['container_id'] = container_id

# fix weird munging of java stacktrace
error['hadoop_error']['message'] = _fix_java_stack_trace(
error['hadoop_error']['message'])

task_error = _parse_task_stderr(
self._task_stderr_lines(
application_id, container_id, step_type))

if task_error:
task_error['message'] = _fix_traceback(task_error['message'])
error['task_error'] = task_error

result.setdefault('errors', []).append(error)

# if partial is true, bail out when we find the first task error
if task_error and partial:
result['partial'] = True
return result

return result

def _failed_task_container_ids(self, application_id):
"""Stream container IDs of failed tasks, in reverse order."""
container_id_prefix = 'container' + application_id[11:]

log_filter = self._make_log_filter(
'yarn-yarn-nodemanager',
{'jsonPayload.class': _CONTAINER_EXECUTOR_CLASS_NAME})

log.info('Scanning node manager logs for IDs of failed tasks...')

# it doesn't seem to work to do self.logging_client.logger();
# there's some RPC dispute about whether the log name should
# be qualified by project name or not
entries = self.logging_client.list_entries(
filter_=log_filter, order_by=google.cloud.logging.DESCENDING)

for entry in entries:
message = entry.payload.get('message')
if not message:
continue

m = _CONTAINER_EXIT_RE.match(message)
if not m:
continue

returncode = int(m.group('returncode'))
if not returncode:
continue

container_id = m.group('container_id')
# matches some other step
if not container_id.startswith(container_id_prefix):
continue

log.debug(' %s' % container_id)
yield container_id

def _task_stderr_lines(self, application_id, container_id, step_type):
"""Yield lines from a specific stderr log."""
log_filter = self._make_log_filter(
'yarn-userlogs', {
'jsonPayload.application': application_id,
'jsonPayload.container': container_id,
# TODO: pick based on step_type
'jsonPayload.container_logname': 'stderr',
})

log.info(' reading stderr log...')
entries = self.logging_client.list_entries(filter_=log_filter)

# use log4j parsing to handle tab -> newline conversion
for record in _log_entries_to_log4j(entries):
for line in record['message'].split('\n'):
yield line

def _task_syslog_records(self, application_id, container_id, step_type):
"""Yield log4j records from a specific syslog.
"""
log_filter = self._make_log_filter(
'yarn-userlogs', {
'jsonPayload.application': application_id,
'jsonPayload.container': container_id,
# TODO: pick based on step_type
'jsonPayload.container_logname': 'syslog',
})

log.info(' reading syslog...')
entries = self.logging_client.list_entries(filter_=log_filter)

return _log_entries_to_log4j(entries)

# misc

def _make_log_filter(self, log_name=None, extra_values=None):
# we only want logs from this project, cluster, and region
d = {}

d['resource.labels.cluster_name'] = self._cluster_id
d['resource.labels.project_id'] = self._project_id
d['resource.labels.region'] = self._region()
d['resource.type'] = 'cloud_dataproc_cluster'

if log_name:
d['logName'] = 'projects/%s/logs/%s' % (
self._project_id, log_name)

if extra_values:
d.update(extra_values)

return _log_filter_str(d)

def counters(self):
# TODO - mtai @ davidmarin - Counters are currently always empty as we
# are not processing task logs
return [_pick_counters(log_interpretation)
for log_interpretation in self._log_interpretations]

Expand Down Expand Up @@ -1113,3 +1314,74 @@ def _ssh_tunnel_args(self, bind_port):
self._job_tracker_host(),
'--',
] + self._ssh_tunnel_opts(bind_port)


def _log_filter_str(name_to_value):
"""return a map from name to value into a log filter query that requires
each name to equal the given value."""
return ' AND '.join(
'%s = %s' % (name, _quote_filter_value(value))
for name, value in sorted(name_to_value.items()))


def _quote_filter_value(s):
"""Put a string in double quotes, escaping double quote characters"""
return '"%s"' % s.replace('"', r'\"')


def _log_entries_to_log4j(entries):
"""Convert log entries from a single log file to log4j format, tracking
line number.
See :py:meth:`mrjob.logs.log4j._parse_hadoop_log4j_records`
for format.
"""
line_num = 0

for entry in entries:
message = entry.payload.get('message') or ''

# NOTE: currently, google.cloud.logging seems strip newlines :(
num_lines = len(message.split('\n'))

yield dict(
caller_location='',
level=(entry.severity or ''),
logger=(entry.payload.get('class') or ''),
message=message,
num_lines=num_lines,
start_line=line_num,
thread='',
timestamp=(entry.timestamp or ''),
)

line_num += num_lines


def _fix_java_stack_trace(s):
# this is what we get from `gcloud logging`
if '\n' in s:
return s
else:
return s.replace('\t', '\n\t')


def _fix_traceback(s):
lines = s.split('\n')

# strip log4j warnings (which do have proper linebreaks)
lines = [
line for line in lines
if line and not _STDERR_LOG4J_WARNING.match(line)
]

s = '\n'.join(lines)

if '\n' in s:
return s # traceback does have newlines

s = s.replace(' File', '\n File')
s = s.replace(' ', '\n ')
s = _TRACEBACK_EXCEPTION_RE.sub(lambda m: '\n' + m.group(0), s)

return s
2 changes: 1 addition & 1 deletion mrjob/logs/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _pick_counters(self, log_interpretation, step_type):
def _pick_error(self, log_interpretation, step_type):
"""Pick probable cause of failure (only call this if job fails)."""
if not all(log_type in log_interpretation for
log_type in ('job', 'step', 'task')):
log_type in ('step', 'history', 'task')):
log.info('Scanning logs for probable cause of failure...')
self._interpret_step_logs(log_interpretation, step_type)
self._interpret_history_log(log_interpretation)
Expand Down

0 comments on commit de79472

Please sign in to comment.