Skip to content

[BEAM-2732] Starting refactor of state tracking in Python#4375

Merged
robertwb merged 5 commits intoapache:masterfrom
pabloem:global-sampler-var
Jan 18, 2018
Merged

[BEAM-2732] Starting refactor of state tracking in Python#4375
robertwb merged 5 commits intoapache:masterfrom
pabloem:global-sampler-var

Conversation

@pabloem
Copy link
Member

@pabloem pabloem commented Jan 9, 2018

The goal for BEAM-2732 is to refactor the context trackers in the Python SDK so that they will all use the same mechanism.
Currently, Metrics, Logging and StateSampler keep their own contexts. BEAM-2732 aims to have all of them rely on the logic in StateSampler to keep their context (this is already the case in Java).

This PR does the following:

  • Clean up legacy code in the StateSampler.
  • Adds a global per-thread variable for state sampler, so each execution thread will be able to access the current state sampler from this global variable (this will then be used for LoggingContext, and MetricsEnvironment.current_container).
  • Gives the Python-only state sampler functionality to track context, so that non-Cythonized runners can rely on the StateSampler to track context.

…e Python-only state sampler full functionality.
@pabloem
Copy link
Member Author

pabloem commented Jan 9, 2018

r: @robertwb

@pabloem
Copy link
Member Author

pabloem commented Jan 10, 2018

Python tests passing.

@pabloem
Copy link
Member Author

pabloem commented Jan 10, 2018

Run Python PostCommit

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay, here's some initial comments.

from apache_beam.utils.counters import CounterName


class StateSamplerSlowTest(unittest.TestCase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to run the same test on both implementations rather than have a copy. (Possibly conditionally test those parts that aren't implemented of course.)

DEFAULT_SAMPLING_PERIOD_MS = 200


class StateSampler(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to provide a full implementation here, let's make a common base class and put anything that can be shared there. (Possibly the default sampling period as well.)

# the worker.
# We stop using prefixes with included dash.
self.prefix = prefix[:-1] if prefix[-1] == '-' else prefix
EXECUTION_STATE_SAMPLERS.set_sampler(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That the act of creating a StateSampler should not modify global state.

self._current_sampler = sampler


EXECUTION_STATE_SAMPLERS = ExecutionStateSamplers()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd really like to avoid more proliferation of global variables. This is somewhat needed for counters (as the user invokes counter operations without access to the underlying state) but we should be able to pass this state around explicitly.

@pabloem
Copy link
Member Author

pabloem commented Jan 12, 2018

Run Python PostCommit

Copy link
Member Author

@pabloem pabloem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the change. Let me know what you think.

from apache_beam.utils.counters import CounterName


class StateSamplerSlowTest(unittest.TestCase):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
It would be better to run the same test on both implementations rather than have a copy. (Possibly conditionally test those parts that aren't implemented of course.)

Acknowledged.

DEFAULT_SAMPLING_PERIOD_MS = 200


class StateSampler(object):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
If we're going to provide a full implementation here, let's make a common base class and put anything that can be shared there. (Possibly the default sampling period as well.)

I've restructured it into a class hierarchy. Let me know what you think.

# the worker.
# We stop using prefixes with included dash.
self.prefix = prefix[:-1] if prefix[-1] == '-' else prefix
EXECUTION_STATE_SAMPLERS.set_sampler(self)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
That the act of creating a StateSampler should not modify global state.

Added a register function.

self._current_sampler = sampler


EXECUTION_STATE_SAMPLERS = ExecutionStateSamplers()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
I'd really like to avoid more proliferation of global variables. This is somewhat needed for counters (as the user invokes counter operations without access to the underlying state) but we should be able to pass this state around explicitly.

I'll document that we want to pass a tracker around as much as possible - but the aim of this global variable is to be able remove global state within MetricsEnvironment, and PerThreadLoggingContext, and have this be the canonical global state.

How does that sound?

@pabloem
Copy link
Member Author

pabloem commented Jan 12, 2018

Run Python PostCommit

@pabloem
Copy link
Member Author

pabloem commented Jan 12, 2018

@robertwb - I've created a class hierarchy. Passes Python PostCommit, and PreCommit.

from apache_beam.runners.worker.statesampler import DEFAULT_SAMPLING_PERIOD_MS
except ImportError:
DEFAULT_SAMPLING_PERIOD_MS = 0
DEFAULT_SAMPLING_PERIOD_MS = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: single assignment is easier to reason about. Put this into an else clause.

self.counter_factory = counter_factory
self.sampling_period_ms = sampling_period_ms
def __init__(self, *args):
#TODO(pabloem): Figure out how to pass arguments without errors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What errors were you getting?


cdef public int64_t state_transition_count
cdef int64_t time_since_transition
cdef int64_t _time_since_transition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? Or should we be changing all the others as well for consistency?

@@ -190,60 +158,28 @@ cdef class StateSampler(object):
if self.started and not self.finished:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method also common to both?


def __str__(self):
return '%s' % self._str_internal()
return '<CounterName<%s> at %s>' % (self._str_internal(), hex(id(self)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__str__ defaults to __repr__, if you want them to be the same remove this one.

# pylint: disable=global-variable-not-assigned
global statesampler
from apache_beam.runners.worker import statesampler
# pylint: disable=unused-variable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also put this method in a

@classmethod
def setUpClass(cls):
    ...

self._current_sampler = sampler


EXECUTION_STATE_SAMPLERS = ExecutionStateSamplers()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please get rid of this unused global state.

@robertwb
Copy link
Contributor

Let's not add the global variable until it's actually used, in which case we can weigh the pros and cons.

Copy link
Member Author

@pabloem pabloem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Let's not add the global variable until it's actually used, in which case we can weigh the pros and cons.

Removed the local variable. * I mean global.

from apache_beam.runners.worker.statesampler import DEFAULT_SAMPLING_PERIOD_MS
except ImportError:
DEFAULT_SAMPLING_PERIOD_MS = 0
DEFAULT_SAMPLING_PERIOD_MS = 0
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Nit: single assignment is easier to reason about. Put this into an else clause.

Done.

@@ -190,60 +158,28 @@ cdef class StateSampler(object):
if self.started and not self.finished:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Is this method also common to both?

You're right. Done.


def __str__(self):
return '%s' % self._str_internal()
return '<CounterName<%s> at %s>' % (self._str_internal(), hex(id(self)))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
__str__ defaults to __repr__, if you want them to be the same remove this one.

Done.


cdef public int64_t state_transition_count
cdef int64_t time_since_transition
cdef int64_t _time_since_transition
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Why this change? Or should we be changing all the others as well for consistency?

I had thought this was only c-accessible. I've kept it consistent.

self._current_sampler = sampler


EXECUTION_STATE_SAMPLERS = ExecutionStateSamplers()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Please get rid of this unused global state.

I plan to use the global state to track metrics and logging (and eventually remove the old style metrics context). I can add this now, or on the next PR. What do you think?
Next PR: https://github.com/apache/beam/pull/4387/files#diff-89072ff532dd15a0af899957de4c26f3R152

self.counter_factory = counter_factory
self.sampling_period_ms = sampling_period_ms
def __init__(self, *args):
#TODO(pabloem): Figure out how to pass arguments without errors.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
What errors were you getting?

hmmm this is fixed now.

@pabloem pabloem force-pushed the global-sampler-var branch from 7323851 to 5c30712 Compare January 18, 2018 02:03
@pabloem
Copy link
Member Author

pabloem commented Jan 18, 2018

Run Python PostCommit

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM.

from apache_beam.runners.worker import statesampler
# pylint: disable=unused-variable
from apache_beam.runners.worker import statesampler_fast
cls.slow_sampler = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, instead of duplicating the logic, perhaps just query state_sampler.FAST_SAMPLER directly below.

@pabloem pabloem force-pushed the global-sampler-var branch from 17b329e to d3d0cf6 Compare January 18, 2018 21:19
@pabloem
Copy link
Member Author

pabloem commented Jan 18, 2018

Fixed, and done.

@robertwb
Copy link
Contributor

Thanks. Non-Java runs are green. Merging.

@robertwb robertwb merged commit 24e6bf8 into apache:master Jan 18, 2018
@pabloem pabloem deleted the global-sampler-var branch January 18, 2018 21:40
boyuanzz pushed a commit to boyuanzz/beam that referenced this pull request Feb 6, 2018
Also giving the Python-only state sampler full functionality.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants