Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-2732][BEAM-4028] Logging relies on StateSampler for context #5356

Merged
merged 1 commit into from Jul 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 0 additions & 12 deletions sdks/python/apache_beam/runners/common.pxd
Expand Up @@ -81,9 +81,7 @@ cdef class PerWindowInvoker(DoFnInvoker):

cdef class DoFnRunner(Receiver):
cdef DoFnContext context
cdef LoggingContext logging_context
cdef object step_name
cdef ScopedMetricsContainer scoped_metrics_container
cdef list side_inputs
cdef DoFnInvoker do_fn_invoker

Expand Down Expand Up @@ -112,15 +110,5 @@ cdef class DoFnContext(object):
cpdef set_element(self, WindowedValue windowed_value)


cdef class LoggingContext(object):
# TODO(robertwb): Optimize "with [cdef class]"
cpdef enter(self)
cpdef exit(self)


cdef class _LoggingContextAdapter(LoggingContext):
cdef object underlying


cdef class _ReceiverAdapter(Receiver):
cdef object underlying
29 changes: 2 additions & 27 deletions sdks/python/apache_beam/runners/common.py
Expand Up @@ -119,16 +119,6 @@ def logging_name(self):
return self.user_name


class LoggingContext(object):
"""For internal use only; no backwards-compatibility guarantees."""

def enter(self):
pass

def exit(self):
pass


class Receiver(object):
"""For internal use only; no backwards-compatibility guarantees.

Expand Down Expand Up @@ -551,20 +541,15 @@ def __init__(self,
windowing: windowing properties of the output PCollection(s)
tagged_receivers: a dict of tag name to Receiver objects
step_name: the name of this step
logging_context: a LoggingContext object
logging_context: DEPRECATED [BEAM-4728]
state: handle for accessing DoFn state
scoped_metrics_container: Context switcher for metrics container
scoped_metrics_container: DEPRECATED
operation_name: The system name assigned by the runner for this operation.
"""
# Need to support multiple iterations.
side_inputs = list(side_inputs)

from apache_beam.metrics.execution import ScopedMetricsContainer

self.scoped_metrics_container = (
scoped_metrics_container or ScopedMetricsContainer())
self.step_name = step_name
self.logging_context = logging_context or LoggingContext()
self.context = DoFnContext(step_name, state=state)

do_fn_signature = DoFnSignature(fn)
Expand Down Expand Up @@ -595,26 +580,16 @@ def receive(self, windowed_value):

def process(self, windowed_value):
try:
self.logging_context.enter()
self.scoped_metrics_container.enter()
self.do_fn_invoker.invoke_process(windowed_value)
except BaseException as exn:
self._reraise_augmented(exn)
finally:
self.scoped_metrics_container.exit()
self.logging_context.exit()

def _invoke_bundle_method(self, bundle_method):
try:
self.logging_context.enter()
self.scoped_metrics_container.enter()
self.context.set_element(None)
bundle_method()
except BaseException as exn:
self._reraise_augmented(exn)
finally:
self.scoped_metrics_container.exit()
self.logging_context.exit()

def start(self):
self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle)
Expand Down
11 changes: 5 additions & 6 deletions sdks/python/apache_beam/runners/worker/bundle_processor.py
Expand Up @@ -63,13 +63,12 @@
class RunnerIOOperation(operations.Operation):
"""Common baseclass for runner harness IO operations."""

def __init__(self, operation_name, step_name, consumers, counter_factory,
def __init__(self, name_context, step_name, consumers, counter_factory,
state_sampler, windowed_coder, target, data_channel):
super(RunnerIOOperation, self).__init__(
operation_name, None, counter_factory, state_sampler)
name_context, None, counter_factory, state_sampler)
self.windowed_coder = windowed_coder
self.windowed_coder_impl = windowed_coder.get_impl()
self.step_name = step_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this deletion intentional? If so, can you add a comment / JIRA reference to clean up step_name in the arguments?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of my goal with BEAM-4028. The step name is meant to only be retrievable through the name context.

# target represents the consumer for the bytes in the data plane for a
# DataInputOperation or a producer of these bytes for a DataOutputOperation.
self.target = target
Expand Down Expand Up @@ -106,9 +105,9 @@ def __init__(self, operation_name, step_name, consumers, counter_factory,
windowed_coder, target=input_target, data_channel=data_channel)
# We must do this manually as we don't have a spec or spec.output_coders.
self.receivers = [
operations.ConsumerSet(self.counter_factory, self.step_name, 0,
next(itervalues(consumers)),
self.windowed_coder)]
operations.ConsumerSet(
self.counter_factory, self.name_context.step_name, 0,
next(itervalues(consumers)), self.windowed_coder)]

def process(self, windowed_value):
self.output(windowed_value)
Expand Down
25 changes: 0 additions & 25 deletions sdks/python/apache_beam/runners/worker/logger.pxd

This file was deleted.

17 changes: 10 additions & 7 deletions sdks/python/apache_beam/runners/worker/logger.py
Expand Up @@ -26,7 +26,7 @@
import threading
import traceback

from apache_beam.runners.common import LoggingContext
from apache_beam.runners.worker import statesampler

# This module is experimental. No backwards-compatibility guarantees.

Expand All @@ -38,7 +38,6 @@ class _PerThreadWorkerData(threading.local):

def __init__(self):
super(_PerThreadWorkerData, self).__init__()
# TODO(robertwb): Consider starting with an initial (ignored) ~20 elements
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidental deletion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logging context will be removed once it's no longer useful (after it's removed from Google code) so optimizations should not be considered anymore. I'll remove it ASAP as part of BEAM-4728

# in the list, as going up and down all the way to zero incurs several
# reallocations.
self.stack = []
Expand All @@ -53,7 +52,7 @@ def get_data(self):
per_thread_worker_data = _PerThreadWorkerData()


class PerThreadLoggingContext(LoggingContext):
class PerThreadLoggingContext(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we able to get rid of this class entirely? It looks like you removed the only usage in operations.py.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is used internally at google, so we need to remove it from there first.

"""A context manager to add per thread attributes."""

def __init__(self, **kwargs):
Expand Down Expand Up @@ -150,10 +149,14 @@ def format(self, record):
data = per_thread_worker_data.get_data()
if 'work_item_id' in data:
output['work'] = data['work_item_id']
if 'stage_name' in data:
output['stage'] = data['stage_name']
if 'step_name' in data:
output['step'] = data['step_name']

tracker = statesampler.get_current_tracker()
if tracker:
output['stage'] = tracker.stage_name

if tracker.current_state() and tracker.current_state().name_context:
output['step'] = tracker.current_state().name_context.logging_name()

# All logging happens using the root logger. We will add the basename of the
# file and the function name where the logging happened to make it easier
# to identify who generated the record.
Expand Down
39 changes: 25 additions & 14 deletions sdks/python/apache_beam/runners/worker/logger_test.py
Expand Up @@ -18,6 +18,7 @@
"""Tests for worker logging utilities."""

from __future__ import absolute_import
from __future__ import unicode_literals

import json
import logging
Expand All @@ -27,6 +28,8 @@
from builtins import object

from apache_beam.runners.worker import logger
from apache_beam.runners.worker import statesampler
from apache_beam.utils.counters import CounterFactory


class PerThreadLoggingContextTest(unittest.TestCase):
Expand Down Expand Up @@ -129,30 +132,38 @@ def test_record_with_arbitrary_messages(self):
self.execute_multiple_cases(test_cases)

def test_record_with_per_thread_info(self):
with logger.PerThreadLoggingContext(
work_item_id='workitem', stage_name='stage', step_name='step'):
formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid')
record = self.create_log_record(**self.SAMPLE_RECORD)
log_output = json.loads(formatter.format(record))
self.maxDiff = None
tracker = statesampler.StateSampler('stage', CounterFactory())
statesampler.set_current_tracker(tracker)
formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid')
with logger.PerThreadLoggingContext(work_item_id='workitem'):
with tracker.scoped_state('step', 'process'):
record = self.create_log_record(**self.SAMPLE_RECORD)
log_output = json.loads(formatter.format(record))
expected_output = dict(self.SAMPLE_OUTPUT)
expected_output.update(
{'work': 'workitem', 'stage': 'stage', 'step': 'step'})
self.assertEqual(log_output, expected_output)
statesampler.set_current_tracker(None)

def test_nested_with_per_thread_info(self):
self.maxDiff = None
tracker = statesampler.StateSampler('stage', CounterFactory())
statesampler.set_current_tracker(tracker)
formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid')
with logger.PerThreadLoggingContext(
work_item_id='workitem', stage_name='stage', step_name='step1'):
record = self.create_log_record(**self.SAMPLE_RECORD)
log_output1 = json.loads(formatter.format(record))

with logger.PerThreadLoggingContext(step_name='step2'):
with logger.PerThreadLoggingContext(work_item_id='workitem'):
with tracker.scoped_state('step1', 'process'):
record = self.create_log_record(**self.SAMPLE_RECORD)
log_output2 = json.loads(formatter.format(record))
log_output1 = json.loads(formatter.format(record))

record = self.create_log_record(**self.SAMPLE_RECORD)
log_output3 = json.loads(formatter.format(record))
with tracker.scoped_state('step2', 'process'):
record = self.create_log_record(**self.SAMPLE_RECORD)
log_output2 = json.loads(formatter.format(record))

record = self.create_log_record(**self.SAMPLE_RECORD)
log_output3 = json.loads(formatter.format(record))

statesampler.set_current_tracker(None)
record = self.create_log_record(**self.SAMPLE_RECORD)
log_output4 = json.loads(formatter.format(record))

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/worker/operation_specs.py
Expand Up @@ -378,9 +378,9 @@ def __init__(self, operations, stage_name,
step_names=None,
original_names=None,
name_contexts=None):
# TODO(BEAM-4028): Remove arguments other than name_contexts.
self.operations = operations
self.stage_name = stage_name
# TODO(BEAM-4028): Remove arguments other than name_contexts.
self.name_contexts = name_contexts or self._make_name_contexts(
original_names, step_names, system_names)

Expand Down
18 changes: 4 additions & 14 deletions sdks/python/apache_beam/runners/worker/operations.py
Expand Up @@ -38,7 +38,6 @@
from apache_beam.runners import common
from apache_beam.runners.common import Receiver
from apache_beam.runners.dataflow.internal.names import PropertyNames
from apache_beam.runners.worker import logger
from apache_beam.runners.worker import opcounters
from apache_beam.runners.worker import operation_specs
from apache_beam.runners.worker import sideinputs
Expand Down Expand Up @@ -127,10 +126,6 @@ def __init__(self, name_context, spec, counter_factory, state_sampler):
else:
self.name_context = common.NameContext(name_context)

# TODO(BEAM-4028): Remove following two lines. Rely on name context.
self.operation_name = self.name_context.step_name
self.step_name = self.name_context.logging_name()

self.spec = spec
self.counter_factory = counter_factory
self.consumers = collections.defaultdict(list)
Expand All @@ -143,14 +138,11 @@ def __init__(self, name_context, spec, counter_factory, state_sampler):

self.state_sampler = state_sampler
self.scoped_start_state = self.state_sampler.scoped_state(
self.name_context.metrics_name(), 'start',
metrics_container=self.metrics_container)
self.name_context, 'start', metrics_container=self.metrics_container)
self.scoped_process_state = self.state_sampler.scoped_state(
self.name_context.metrics_name(), 'process',
metrics_container=self.metrics_container)
self.name_context, 'process', metrics_container=self.metrics_container)
self.scoped_finish_state = self.state_sampler.scoped_state(
self.name_context.metrics_name(), 'finish',
metrics_container=self.metrics_container)
self.name_context, 'finish', metrics_container=self.metrics_container)
# TODO(ccy): the '-abort' state can be added when the abort is supported in
# Operations.
self.receivers = []
Expand Down Expand Up @@ -390,11 +382,9 @@ def start(self):
fn, args, kwargs, self.side_input_maps, window_fn,
tagged_receivers=self.tagged_receivers,
step_name=self.name_context.logging_name(),
logging_context=logger.PerThreadLoggingContext(
step_name=self.name_context.logging_name()),
state=state,
scoped_metrics_container=None,
operation_name=self.name_context.metrics_name())

self.dofn_receiver = (self.dofn_runner
if isinstance(self.dofn_runner, Receiver)
else DoFnRunnerReceiver(self.dofn_runner))
Expand Down
31 changes: 29 additions & 2 deletions sdks/python/apache_beam/runners/worker/statesampler.py
Expand Up @@ -22,6 +22,7 @@
import threading
from collections import namedtuple

from apache_beam.runners import common
from apache_beam.utils.counters import Counter
from apache_beam.utils.counters import CounterName

Expand Down Expand Up @@ -69,8 +70,18 @@ def __init__(self, prefix, counter_factory,
self._states_by_name = {}
self.sampling_period_ms = sampling_period_ms
self.tracked_thread = None
self.finished = False
self.started = False
super(StateSampler, self).__init__(sampling_period_ms)

@property
def stage_name(self):
return self._prefix

def stop(self):
set_current_tracker(None)
super(StateSampler, self).stop()

def stop_if_still_running(self):
if self.started and not self.finished:
self.stop()
Expand All @@ -90,13 +101,28 @@ def get_info(self):
self.tracked_thread)

def scoped_state(self,
step_name,
name_context,
state_name,
io_target=None,
metrics_container=None):
"""Returns a ScopedState object associated to a Step and a State.

Args:
name_context: common.NameContext. It is the step name information.
state_name: str. It is the state name (e.g. process / start / finish).
io_target:
metrics_container: MetricsContainer. The step's metrics container.

Returns:
A ScopedState that keeps the execution context and is able to switch it
for the execution thread.
"""
if not isinstance(name_context, common.NameContext):
name_context = common.NameContext(name_context)

counter_name = CounterName(state_name + '-msecs',
stage_name=self._prefix,
step_name=step_name,
step_name=name_context.metrics_name(),
io_target=io_target)
if counter_name in self._states_by_name:
return self._states_by_name[counter_name]
Expand All @@ -105,6 +131,7 @@ def scoped_state(self,
Counter.SUM)
self._states_by_name[counter_name] = super(
StateSampler, self)._scoped_state(counter_name,
name_context,
output_counter,
metrics_container)
return self._states_by_name[counter_name]
Expand Down