Skip to content

Commit

Permalink
Merge pull request #1754 from davidmarin/google-job-progress
Browse files Browse the repository at this point in the history
progress logging and counters for Dataproc (fixes #1671, fixes #1703)
  • Loading branch information
David Marin committed Apr 25, 2018
2 parents 826392d + 3851a4b commit d153ca8
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 24 deletions.
89 changes: 85 additions & 4 deletions mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import logging
import time
import re
from io import BytesIO
from os import environ
from os.path import dirname
from os.path import join
Expand All @@ -40,7 +41,10 @@
from mrjob.fs.gcs import parse_gcs_uri
from mrjob.fs.local import LocalFilesystem
from mrjob.logs.counters import _pick_counters
from mrjob.logs.mixin import LogInterpretationMixin
from mrjob.logs.step import _interpret_new_dataproc_step_stderr
from mrjob.py2 import PY2
from mrjob.py2 import to_unicode
from mrjob.setup import UploadDirManager
from mrjob.step import StepFailedException
from mrjob.util import random_identifier
Expand Down Expand Up @@ -174,7 +178,7 @@ class DataprocException(Exception):
pass


class DataprocJobRunner(HadoopInTheCloudJobRunner):
class DataprocJobRunner(HadoopInTheCloudJobRunner, LogInterpretationMixin):
"""Runs an :py:class:`~mrjob.job.MRJob` on Google Cloud Dataproc.
Invoked when you run your job with ``-r dataproc``.
Expand Down Expand Up @@ -271,6 +275,12 @@ def __init__(self, **kwargs):
self._image_version = None
self._hadoop_version = None

# map driver_output_uri to a dict with the keys:
# log_uri: uri of file we're reading from
# pos: position in file
# buffer: bytes read from file already
self._driver_output_state = {}

# This will be filled by _run_steps()
# NOTE - log_interpretations will be empty except job_id until we
# parse task logs
Expand Down Expand Up @@ -514,6 +524,7 @@ def _create_fs_tmp_bucket(self, bucket_name, location=None):
self._wait_for_fs_sync()

### Running the job ###

def cleanup(self, mode=None):
super(DataprocJobRunner, self).cleanup(mode=mode)

Expand Down Expand Up @@ -733,14 +744,27 @@ def _wait_for_step_to_complete(
log_interpretation = dict(job_id=job_id)
self._log_interpretations.append(log_interpretation)

step_interpretation = {}
log_interpretation['step'] = step_interpretation

while True:
# https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs#JobStatus # noqa
job = self._get_job(job_id)

job_state = job.status.State.Name(job.status.state)

driver_output_uri = job.driver_output_resource_uri

log.info('%s => %s' % (job_id, job_state))

# interpret driver output so far
if driver_output_uri:
self._update_step_interpretation(step_interpretation,
driver_output_uri)

if step_interpretation.get('progress'):
log.info(' ' + step_interpretation['progress']['message'])

# https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs#State # noqa
# these are the states covered by the ACTIVE job state matcher,
# plus SETUP_DONE
Expand All @@ -749,16 +773,73 @@ def _wait_for_step_to_complete(
self._wait_for_api('job completion')
continue

# print counters if job wasn't CANCELLED
if job_state != 'CANCELLED':
self._log_counters(log_interpretation, step_num)

# we're done, will return at the end of this
elif job_state == 'DONE':
if job_state == 'DONE':
break

raise StepFailedException(step_num=step_num, num_steps=num_steps)
else:
raise StepFailedException(
step_num=step_num, num_steps=num_steps)

def _default_step_output_dir(self):
# put intermediate data in HDFS
return 'hdfs:///tmp/mrjob/%s/step-output' % self._job_key

def _update_step_interpretation(
self, step_interpretation, driver_output_uri):
new_lines = self._get_new_driver_output_lines(driver_output_uri)
_interpret_new_dataproc_step_stderr(step_interpretation, new_lines)

def _get_new_driver_output_lines(self, driver_output_uri):
"""Get a list of complete job driver output lines that are
new since the last time we checked.
"""
state = self._driver_output_state.setdefault(
driver_output_uri,
dict(log_uri=None, pos=0, buffer=b''))

# driver output is in logs with names like driveroutput.000000000
log_uris = sorted(self.fs.ls(driver_output_uri + '*'))

for log_uri in log_uris:
# initialize log_uri with first URI we see
if state['log_uri'] is None:
state['log_uri'] = log_uri

# skip log files already parsed
if log_uri < state['log_uri']:
continue

# when parsing the next file, reset *pos*
elif log_uri > state['log_uri']:
state['pos'] = 0
state['log_uri'] = log_uri

log_blob = self.fs._get_blob(log_uri)
# TODO: use start= kwarg once google-cloud-storage 1.9 is out
new_data = log_blob.download_as_string()[state['pos']:]

state['buffer'] += new_data
state['pos'] += len(new_data)

# convert buffer into lines, saving leftovers for next time
stream = BytesIO(state['buffer'])
state['buffer'] = b''

lines = []

for line_bytes in stream:
if line_bytes.endswith(b'\n'):
lines.append(to_unicode(line_bytes))
else:
# leave final partial line (if any) in buffer
state['buffer'] = line_bytes

return lines

def counters(self):
# TODO - mtai @ davidmarin - Counters are currently always empty as we
# are not processing task logs
Expand Down
11 changes: 2 additions & 9 deletions mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
from mrjob.logs.bootstrap import _check_for_nonzero_return_code
from mrjob.logs.bootstrap import _interpret_emr_bootstrap_stderr
from mrjob.logs.bootstrap import _ls_emr_bootstrap_stderr_logs
from mrjob.logs.counters import _format_counters
from mrjob.logs.counters import _pick_counters
from mrjob.logs.errors import _format_error
from mrjob.logs.mixin import LogInterpretationMixin
Expand Down Expand Up @@ -1770,14 +1769,8 @@ def _wait_for_step_to_complete(
# step is done (either COMPLETED, FAILED, INTERRUPTED). so
# try to fetch counters. (Except for master node setup
# and Spark, which has no counters.)
if step['Status']['State'] != 'CANCELLED':
if step_num >= 0 and not _is_spark_step_type(step_type):
counters = self._pick_counters(
log_interpretation, step_type)
if counters:
log.info(_format_counters(counters))
else:
log.warning('No counters found')
if step['Status']['State'] != 'CANCELLED' and step_num >= 0:
self._log_counters(log_interpretation, step_num)

if step['Status']['State'] == 'COMPLETED':
return
Expand Down
10 changes: 2 additions & 8 deletions mrjob/hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from mrjob.fs.composite import CompositeFilesystem
from mrjob.fs.hadoop import HadoopFilesystem
from mrjob.fs.local import LocalFilesystem
from mrjob.logs.counters import _format_counters
from mrjob.logs.counters import _pick_counters
from mrjob.logs.errors import _format_error
from mrjob.logs.mixin import LogInterpretationMixin
Expand Down Expand Up @@ -461,14 +460,9 @@ def _run_job_in_hadoop(self):

log_interpretation['step'] = step_interpretation

step_type = step['type']
self._log_counters(log_interpretation, step_num)

if not _is_spark_step_type(step_type):
counters = self._pick_counters(log_interpretation, step_type)
if counters:
log.info(_format_counters(counters))
else:
log.warning('No counters found')
step_type = step['type']

if returncode:
error = self._pick_error(log_interpretation, step_type)
Expand Down
13 changes: 13 additions & 0 deletions mrjob/logs/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from logging import getLogger

from mrjob.compat import uses_yarn
from mrjob.logs.counters import _format_counters
from mrjob.logs.counters import _pick_counters
from mrjob.logs.errors import _pick_error
from mrjob.logs.errors import _pick_error_attempt_ids
Expand Down Expand Up @@ -240,3 +241,15 @@ def _ls_task_logs(self, step_type,

for match in matches:
yield match

def _log_counters(self, log_interpretation, step_num):
"""Utility for logging counters (if any) for a step."""
step_type = self._get_step(step_num)['type']

if not _is_spark_step_type(step_type):
counters = self._pick_counters(
log_interpretation, step_type)
if counters:
log.info(_format_counters(counters))
else:
log.warning('No counters found')
30 changes: 28 additions & 2 deletions mrjob/logs/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@
_RUNNING_JOB_RE = re.compile(
r'^Running job: (?P<job_id>job_\d+_\d{4})\s*$')

# job progress (YARN)
# no need to make this work for pre-YARN, only Dataproc runner uses it
_JOB_PROGRESS_RE = re.compile(
r'^\s*map\s+(?P<map>\d+)%\s+reduce\s+(?P<reduce>\d+)%\s*$')


# YARN prints this (sometimes followed by a Java exception) when tasks fail
_TASK_ATTEMPT_FAILED_RE = re.compile(
r'^Task Id *:'
Expand Down Expand Up @@ -170,6 +176,14 @@ def _interpret_emr_step_syslog(fs, matches):
return result


def _interpret_new_dataproc_step_stderr(step_interpretation, new_lines):
"""Incrementally update *step_interpretation* (a dict) with information
from new lines read from Hadoop job driver output on Dataproc."""
return _parse_step_syslog_from_log4j_records(
_parse_hadoop_log4j_records(new_lines),
step_interpretation)


def _interpret_emr_step_stderr(fs, matches):
"""Extract information from step stderr (see
:py:func:`~mrjob.logs.task._parse_task_stderr()`),
Expand Down Expand Up @@ -254,14 +268,17 @@ def _parse_step_syslog(lines):
_parse_hadoop_log4j_records(lines))


def _parse_step_syslog_from_log4j_records(records):
def _parse_step_syslog_from_log4j_records(records, step_interpretation=None):
"""Pulls errors, counters, IDs, etc. from log4j records
emitted by Hadoop.
This powers :py:func:`_parse_step_syslog` and
:py:func:`_interpret_hadoop_jar_command_stderr`.
"""
result = {}
if step_interpretation is None:
result = {}
else:
result = step_interpretation

for record in records:
message = record['message']
Expand Down Expand Up @@ -290,6 +307,15 @@ def _parse_step_syslog_from_log4j_records(records):
result['job_id'] = m.group('job_id')
continue

# progress
m = _JOB_PROGRESS_RE.match(message)
if m:
result['progress'] = dict(
map=int(m.group('map')),
reduce=int(m.group('reduce')),
message=message,
)

# task failure
m = _TASK_ATTEMPT_FAILED_RE.match(message)
if m:
Expand Down
5 changes: 4 additions & 1 deletion tests/logs/test_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@
},
job_id='job_1449857544442_0002',
output_dir=('hdfs:///user/root/tmp/mrjob'
'/mr_wc.root.20151211.181326.984074/output'))
'/mr_wc.root.20151211.181326.984074/output'),
progress=dict(map=100, message=' map 100% reduce 100%', reduce=100),
)


# abbreviated version of real output from Hadoop 1.0.3 on EMR AMI 2.4.9
Expand All @@ -93,6 +95,7 @@
job_id='job_201512112247_0003',
output_dir=('hdfs:///user/hadoop/tmp/mrjob'
'/mr_wc.hadoop.20151211.230352.433691/output'),
progress=dict(map=100, message=' map 100% reduce 100%', reduce=100),
)


Expand Down
7 changes: 7 additions & 0 deletions tests/mock_google/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from mrjob.dataproc import _STATE_MATCHER_ACTIVE
from mrjob.dataproc import _cluster_state_name
from mrjob.dataproc import _job_state_name
from mrjob.util import random_identifier


# convert strings (e.g. 'RUNNING') to enum values
Expand Down Expand Up @@ -250,6 +251,12 @@ def _simulate_progress(self, mock_job):
mock_job.status.state = _job_state_value('PENDING')
elif state == 'PENDING':
mock_job.status.state = _job_state_value('RUNNING')
# for now now, we just need this to be set
mock_job.driver_output_resource_uri = (
'gs://mock-bucket-%s/google-cloud-dataproc-metainfo/'
'mock-cluster-id-%s/jobs/mock-job-%s/driveroutput' % (
random_identifier(), random_identifier(),
random_identifier()))
elif state == 'RUNNING':
if self.mock_jobs_succeed:
mock_job.status.state = _job_state_value('DONE')
Expand Down

0 comments on commit d153ca8

Please sign in to comment.