New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2732] Alternative implementation of metrics relying on statesampler for context #4504
Conversation
9c109f1
to
eaacf08
Compare
Please add reviewers if it's ready for review. |
r: @robertwb In PR 4387, MetricsContainers are attached to the current ScopedState. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I've reviewed this, but it seems putting a dict on the StateSampler is not an improvement over putting an attribute on ScopedState. (I was actually imagining the dict living somewhere else entirely, but it's not clear where.) Let's go with pr/4387 instead (which I left some similar comments on).
self.set_container_stack() | ||
index = len(self.PER_THREAD.container) - 1 | ||
if index < 0: | ||
METRICS_SUPPORTED = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an aside, why is this needed? It's possible to run pipelines simultaneously with different runners, which would seem to break this.
self._current_tracker = tracker | ||
|
||
|
||
EXECUTION_STATE_TRACKERS = ExecutionStateTrackers() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than have this global be part of the public API, provide a get/set_current_tracker module-level method.
@@ -29,7 +31,23 @@ | |||
FAST_SAMPLER = False | |||
|
|||
|
|||
StateSamplerInfo = namedtuple( | |||
class ExecutionStateTrackers(threading.local): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this class is even needed, make it private with an underscore.
# This keeps track of the MetricsContainer instances of the work item | ||
# tracked by this sampler. | ||
self.metrics_container_registry = {} | ||
self.register() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not call this from the constructor.
@@ -46,13 +64,24 @@ def __init__(self, prefix, counter_factory, | |||
self._prefix = prefix | |||
self._counter_factory = counter_factory | |||
self._states_by_name = {} | |||
self._registered = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be used anywhere.
@@ -79,3 +109,18 @@ def commit_counters(self): | |||
for state in self._states_by_name.values(): | |||
state_msecs = int(1e-6 * state.nsecs) | |||
state.counter.update(state_msecs - state.counter.value()) | |||
|
|||
|
|||
def simple_tracker(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is just used for tests, put it in a test file. I would rather force the callers to provide a prefix and counter factory manually.
|
||
def simple_tracker(): | ||
"""Create, register and return a simple StateSampler.""" | ||
sampler = StateSampler('', CounterFactory()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remind me the difference between counters and metric cells?
@classmethod | ||
def register_container(cls, step_name, container): | ||
"""Register container to a step name.""" | ||
current_tracker = statesampler.EXECUTION_STATE_TRACKERS.current_tracker() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where would be be registering containers at a point where we don't have a handle to the statesampler itself?
|
||
|
||
MetricsEnvironment = _MetricsEnvironment() | ||
current_step_name = current_tracker.current_state().name.step_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is called for every increment, right?
|
||
|
||
MetricsEnvironment = _MetricsEnvironment() | ||
current_step_name = current_tracker.current_state().name.step_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why name.step_name?
No description provided.