diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 5c5eba227320b..4bb226492ba5b 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -81,9 +81,7 @@ cdef class PerWindowInvoker(DoFnInvoker): cdef class DoFnRunner(Receiver): cdef DoFnContext context - cdef LoggingContext logging_context cdef object step_name - cdef ScopedMetricsContainer scoped_metrics_container cdef list side_inputs cdef DoFnInvoker do_fn_invoker @@ -112,15 +110,5 @@ cdef class DoFnContext(object): cpdef set_element(self, WindowedValue windowed_value) -cdef class LoggingContext(object): - # TODO(robertwb): Optimize "with [cdef class]" - cpdef enter(self) - cpdef exit(self) - - -cdef class _LoggingContextAdapter(LoggingContext): - cdef object underlying - - cdef class _ReceiverAdapter(Receiver): cdef object underlying diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index d5f35de988fb1..88745c778e352 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -119,16 +119,6 @@ def logging_name(self): return self.user_name -class LoggingContext(object): - """For internal use only; no backwards-compatibility guarantees.""" - - def enter(self): - pass - - def exit(self): - pass - - class Receiver(object): """For internal use only; no backwards-compatibility guarantees. @@ -551,20 +541,15 @@ def __init__(self, windowing: windowing properties of the output PCollection(s) tagged_receivers: a dict of tag name to Receiver objects step_name: the name of this step - logging_context: a LoggingContext object + logging_context: DEPRECATED [BEAM-4728] state: handle for accessing DoFn state - scoped_metrics_container: Context switcher for metrics container + scoped_metrics_container: DEPRECATED operation_name: The system name assigned by the runner for this operation. """ # Need to support multiple iterations. side_inputs = list(side_inputs) - from apache_beam.metrics.execution import ScopedMetricsContainer - - self.scoped_metrics_container = ( - scoped_metrics_container or ScopedMetricsContainer()) self.step_name = step_name - self.logging_context = logging_context or LoggingContext() self.context = DoFnContext(step_name, state=state) do_fn_signature = DoFnSignature(fn) @@ -595,26 +580,16 @@ def receive(self, windowed_value): def process(self, windowed_value): try: - self.logging_context.enter() - self.scoped_metrics_container.enter() self.do_fn_invoker.invoke_process(windowed_value) except BaseException as exn: self._reraise_augmented(exn) - finally: - self.scoped_metrics_container.exit() - self.logging_context.exit() def _invoke_bundle_method(self, bundle_method): try: - self.logging_context.enter() - self.scoped_metrics_container.enter() self.context.set_element(None) bundle_method() except BaseException as exn: self._reraise_augmented(exn) - finally: - self.scoped_metrics_container.exit() - self.logging_context.exit() def start(self): self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 4193ea2debb5f..958731d0ce4d8 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -63,13 +63,12 @@ class RunnerIOOperation(operations.Operation): """Common baseclass for runner harness IO operations.""" - def __init__(self, operation_name, step_name, consumers, counter_factory, + def __init__(self, name_context, step_name, consumers, counter_factory, state_sampler, windowed_coder, target, data_channel): super(RunnerIOOperation, self).__init__( - operation_name, None, counter_factory, state_sampler) + name_context, None, counter_factory, state_sampler) self.windowed_coder = windowed_coder self.windowed_coder_impl = windowed_coder.get_impl() - self.step_name = step_name # target represents the consumer for the bytes in the data plane for a # DataInputOperation or a producer of these bytes for a DataOutputOperation. self.target = target @@ -106,9 +105,9 @@ def __init__(self, operation_name, step_name, consumers, counter_factory, windowed_coder, target=input_target, data_channel=data_channel) # We must do this manually as we don't have a spec or spec.output_coders. self.receivers = [ - operations.ConsumerSet(self.counter_factory, self.step_name, 0, - next(itervalues(consumers)), - self.windowed_coder)] + operations.ConsumerSet( + self.counter_factory, self.name_context.step_name, 0, + next(itervalues(consumers)), self.windowed_coder)] def process(self, windowed_value): self.output(windowed_value) diff --git a/sdks/python/apache_beam/runners/worker/logger.pxd b/sdks/python/apache_beam/runners/worker/logger.pxd deleted file mode 100644 index 201daf4e29aea..0000000000000 --- a/sdks/python/apache_beam/runners/worker/logger.pxd +++ /dev/null @@ -1,25 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -cimport cython - -from apache_beam.runners.common cimport LoggingContext - - -cdef class PerThreadLoggingContext(LoggingContext): - cdef kwargs - cdef list stack diff --git a/sdks/python/apache_beam/runners/worker/logger.py b/sdks/python/apache_beam/runners/worker/logger.py index 07cd320ff8221..ae9cdd3ac751d 100644 --- a/sdks/python/apache_beam/runners/worker/logger.py +++ b/sdks/python/apache_beam/runners/worker/logger.py @@ -26,7 +26,7 @@ import threading import traceback -from apache_beam.runners.common import LoggingContext +from apache_beam.runners.worker import statesampler # This module is experimental. No backwards-compatibility guarantees. @@ -38,7 +38,6 @@ class _PerThreadWorkerData(threading.local): def __init__(self): super(_PerThreadWorkerData, self).__init__() - # TODO(robertwb): Consider starting with an initial (ignored) ~20 elements # in the list, as going up and down all the way to zero incurs several # reallocations. self.stack = [] @@ -53,7 +52,7 @@ def get_data(self): per_thread_worker_data = _PerThreadWorkerData() -class PerThreadLoggingContext(LoggingContext): +class PerThreadLoggingContext(object): """A context manager to add per thread attributes.""" def __init__(self, **kwargs): @@ -150,10 +149,14 @@ def format(self, record): data = per_thread_worker_data.get_data() if 'work_item_id' in data: output['work'] = data['work_item_id'] - if 'stage_name' in data: - output['stage'] = data['stage_name'] - if 'step_name' in data: - output['step'] = data['step_name'] + + tracker = statesampler.get_current_tracker() + if tracker: + output['stage'] = tracker.stage_name + + if tracker.current_state() and tracker.current_state().name_context: + output['step'] = tracker.current_state().name_context.logging_name() + # All logging happens using the root logger. We will add the basename of the # file and the function name where the logging happened to make it easier # to identify who generated the record. diff --git a/sdks/python/apache_beam/runners/worker/logger_test.py b/sdks/python/apache_beam/runners/worker/logger_test.py index 73ec1aa3ad9cb..c131775b8e098 100644 --- a/sdks/python/apache_beam/runners/worker/logger_test.py +++ b/sdks/python/apache_beam/runners/worker/logger_test.py @@ -18,6 +18,7 @@ """Tests for worker logging utilities.""" from __future__ import absolute_import +from __future__ import unicode_literals import json import logging @@ -27,6 +28,8 @@ from builtins import object from apache_beam.runners.worker import logger +from apache_beam.runners.worker import statesampler +from apache_beam.utils.counters import CounterFactory class PerThreadLoggingContextTest(unittest.TestCase): @@ -129,30 +132,38 @@ def test_record_with_arbitrary_messages(self): self.execute_multiple_cases(test_cases) def test_record_with_per_thread_info(self): - with logger.PerThreadLoggingContext( - work_item_id='workitem', stage_name='stage', step_name='step'): - formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid') - record = self.create_log_record(**self.SAMPLE_RECORD) - log_output = json.loads(formatter.format(record)) + self.maxDiff = None + tracker = statesampler.StateSampler('stage', CounterFactory()) + statesampler.set_current_tracker(tracker) + formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid') + with logger.PerThreadLoggingContext(work_item_id='workitem'): + with tracker.scoped_state('step', 'process'): + record = self.create_log_record(**self.SAMPLE_RECORD) + log_output = json.loads(formatter.format(record)) expected_output = dict(self.SAMPLE_OUTPUT) expected_output.update( {'work': 'workitem', 'stage': 'stage', 'step': 'step'}) self.assertEqual(log_output, expected_output) + statesampler.set_current_tracker(None) def test_nested_with_per_thread_info(self): + self.maxDiff = None + tracker = statesampler.StateSampler('stage', CounterFactory()) + statesampler.set_current_tracker(tracker) formatter = logger.JsonLogFormatter(job_id='jobid', worker_id='workerid') - with logger.PerThreadLoggingContext( - work_item_id='workitem', stage_name='stage', step_name='step1'): - record = self.create_log_record(**self.SAMPLE_RECORD) - log_output1 = json.loads(formatter.format(record)) - - with logger.PerThreadLoggingContext(step_name='step2'): + with logger.PerThreadLoggingContext(work_item_id='workitem'): + with tracker.scoped_state('step1', 'process'): record = self.create_log_record(**self.SAMPLE_RECORD) - log_output2 = json.loads(formatter.format(record)) + log_output1 = json.loads(formatter.format(record)) - record = self.create_log_record(**self.SAMPLE_RECORD) - log_output3 = json.loads(formatter.format(record)) + with tracker.scoped_state('step2', 'process'): + record = self.create_log_record(**self.SAMPLE_RECORD) + log_output2 = json.loads(formatter.format(record)) + + record = self.create_log_record(**self.SAMPLE_RECORD) + log_output3 = json.loads(formatter.format(record)) + statesampler.set_current_tracker(None) record = self.create_log_record(**self.SAMPLE_RECORD) log_output4 = json.loads(formatter.format(record)) diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py b/sdks/python/apache_beam/runners/worker/operation_specs.py index 58ba5716c0ea0..d64920f18fe52 100644 --- a/sdks/python/apache_beam/runners/worker/operation_specs.py +++ b/sdks/python/apache_beam/runners/worker/operation_specs.py @@ -378,9 +378,9 @@ def __init__(self, operations, stage_name, step_names=None, original_names=None, name_contexts=None): + # TODO(BEAM-4028): Remove arguments other than name_contexts. self.operations = operations self.stage_name = stage_name - # TODO(BEAM-4028): Remove arguments other than name_contexts. self.name_contexts = name_contexts or self._make_name_contexts( original_names, step_names, system_names) diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 143974eecf642..78a67bcd45711 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -38,7 +38,6 @@ from apache_beam.runners import common from apache_beam.runners.common import Receiver from apache_beam.runners.dataflow.internal.names import PropertyNames -from apache_beam.runners.worker import logger from apache_beam.runners.worker import opcounters from apache_beam.runners.worker import operation_specs from apache_beam.runners.worker import sideinputs @@ -127,10 +126,6 @@ def __init__(self, name_context, spec, counter_factory, state_sampler): else: self.name_context = common.NameContext(name_context) - # TODO(BEAM-4028): Remove following two lines. Rely on name context. - self.operation_name = self.name_context.step_name - self.step_name = self.name_context.logging_name() - self.spec = spec self.counter_factory = counter_factory self.consumers = collections.defaultdict(list) @@ -143,14 +138,11 @@ def __init__(self, name_context, spec, counter_factory, state_sampler): self.state_sampler = state_sampler self.scoped_start_state = self.state_sampler.scoped_state( - self.name_context.metrics_name(), 'start', - metrics_container=self.metrics_container) + self.name_context, 'start', metrics_container=self.metrics_container) self.scoped_process_state = self.state_sampler.scoped_state( - self.name_context.metrics_name(), 'process', - metrics_container=self.metrics_container) + self.name_context, 'process', metrics_container=self.metrics_container) self.scoped_finish_state = self.state_sampler.scoped_state( - self.name_context.metrics_name(), 'finish', - metrics_container=self.metrics_container) + self.name_context, 'finish', metrics_container=self.metrics_container) # TODO(ccy): the '-abort' state can be added when the abort is supported in # Operations. self.receivers = [] @@ -390,11 +382,9 @@ def start(self): fn, args, kwargs, self.side_input_maps, window_fn, tagged_receivers=self.tagged_receivers, step_name=self.name_context.logging_name(), - logging_context=logger.PerThreadLoggingContext( - step_name=self.name_context.logging_name()), state=state, - scoped_metrics_container=None, operation_name=self.name_context.metrics_name()) + self.dofn_receiver = (self.dofn_runner if isinstance(self.dofn_runner, Receiver) else DoFnRunnerReceiver(self.dofn_runner)) diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py b/sdks/python/apache_beam/runners/worker/statesampler.py index b0c2b67f9ff75..b73029cf29cd8 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler.py +++ b/sdks/python/apache_beam/runners/worker/statesampler.py @@ -22,6 +22,7 @@ import threading from collections import namedtuple +from apache_beam.runners import common from apache_beam.utils.counters import Counter from apache_beam.utils.counters import CounterName @@ -69,8 +70,18 @@ def __init__(self, prefix, counter_factory, self._states_by_name = {} self.sampling_period_ms = sampling_period_ms self.tracked_thread = None + self.finished = False + self.started = False super(StateSampler, self).__init__(sampling_period_ms) + @property + def stage_name(self): + return self._prefix + + def stop(self): + set_current_tracker(None) + super(StateSampler, self).stop() + def stop_if_still_running(self): if self.started and not self.finished: self.stop() @@ -90,13 +101,28 @@ def get_info(self): self.tracked_thread) def scoped_state(self, - step_name, + name_context, state_name, io_target=None, metrics_container=None): + """Returns a ScopedState object associated to a Step and a State. + + Args: + name_context: common.NameContext. It is the step name information. + state_name: str. It is the state name (e.g. process / start / finish). + io_target: + metrics_container: MetricsContainer. The step's metrics container. + + Returns: + A ScopedState that keeps the execution context and is able to switch it + for the execution thread. + """ + if not isinstance(name_context, common.NameContext): + name_context = common.NameContext(name_context) + counter_name = CounterName(state_name + '-msecs', stage_name=self._prefix, - step_name=step_name, + step_name=name_context.metrics_name(), io_target=io_target) if counter_name in self._states_by_name: return self._states_by_name[counter_name] @@ -105,6 +131,7 @@ def scoped_state(self, Counter.SUM) self._states_by_name[counter_name] = super( StateSampler, self)._scoped_state(counter_name, + name_context, output_counter, metrics_container) return self._states_by_name[counter_name] diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd b/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd index 76b379b7a115b..799bd0d4dbf4b 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd +++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pxd @@ -43,7 +43,8 @@ cdef class StateSampler(object): cdef int32_t current_state_index - cpdef _scoped_state(self, counter_name, output_counter, metrics_container) + cpdef _scoped_state( + self, counter_name, name_context, output_counter, metrics_container) cdef class ScopedState(object): """Context manager class managing transitions for a given sampler state.""" @@ -52,6 +53,7 @@ cdef class ScopedState(object): cdef readonly int32_t state_index cdef readonly object counter cdef readonly object name + cdef readonly object name_context cdef readonly int64_t _nsecs cdef int32_t old_state_index cdef readonly MetricsContainer _metrics_container diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx index fdf496979f991..8aa5217d8d12d 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx @@ -90,8 +90,12 @@ cdef class StateSampler(object): self.current_state_index = 0 self.time_since_transition = 0 self.state_transition_count = 0 - unknown_state = ScopedState( - self, CounterName('unknown'), self.current_state_index) + unknown_state = ScopedState(self, + CounterName('unknown'), + None, + self.current_state_index, + None, + None) pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK) self.scoped_states_by_index = [unknown_state] pythread.PyThread_release_lock(self.lock) @@ -153,7 +157,7 @@ cdef class StateSampler(object): def current_state(self): return self.scoped_states_by_index[self.current_state_index] - cpdef _scoped_state(self, counter_name, output_counter, + cpdef _scoped_state(self, counter_name, name_context, output_counter, metrics_container): """Returns a context manager managing transitions for a given state. Args: @@ -168,6 +172,7 @@ cdef class StateSampler(object): new_state_index = len(self.scoped_states_by_index) scoped_state = ScopedState(self, counter_name, + name_context, new_state_index, output_counter, metrics_container) @@ -183,10 +188,16 @@ cdef class StateSampler(object): cdef class ScopedState(object): """Context manager class managing transitions for a given sampler state.""" - def __init__( - self, sampler, name, state_index, counter=None, metrics_container=None): + def __init__(self, + sampler, + name, + step_name_context, + state_index, + counter, + metrics_container): self.sampler = sampler self.name = name + self.name_context = step_name_context self.state_index = state_index self.counter = counter self._metrics_container = metrics_container diff --git a/sdks/python/apache_beam/runners/worker/statesampler_slow.py b/sdks/python/apache_beam/runners/worker/statesampler_slow.py index 2f09d0e8bb256..4b1bf830073a3 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_slow.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_slow.py @@ -21,15 +21,18 @@ from builtins import object +from apache_beam.runners import common +from apache_beam.utils import counters + class StateSampler(object): def __init__(self, sampling_period_ms): - self._state_stack = [ScopedState(None, self, None)] + self._state_stack = [ScopedState(self, + counters.CounterName('unknown'), + None)] self.state_transition_count = 0 self.time_since_transition = 0 - self.started = False - self.finished = False def current_state(self): """Returns the current execution state. @@ -40,9 +43,12 @@ def current_state(self): def _scoped_state(self, counter_name, + name_context, output_counter, metrics_container=None): - return ScopedState(self, counter_name, output_counter, metrics_container) + assert isinstance(name_context, common.NameContext) + return ScopedState( + self, counter_name, name_context, output_counter, metrics_container) def _enter_state(self, state): self.state_transition_count += 1 @@ -57,14 +63,16 @@ def start(self): pass def stop(self): - self.finished = True + pass class ScopedState(object): - def __init__(self, sampler, name, counter=None, metrics_container=None): + def __init__(self, sampler, name, step_name_context, + counter=None, metrics_container=None): self.state_sampler = sampler self.name = name + self.name_context = step_name_context self.counter = counter self.nsecs = 0 self.metrics_container = metrics_container