-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-562] Add DoFn.setup and DoFn.teardown to Python SDK #7994
Conversation
@charlesccychen PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only reviewed to the point of my last comment, not further.
@@ -619,6 +619,7 @@ def start_bundle(self): | |||
step_name=self._applied_ptransform.full_label, | |||
state=DoFnState(self._counter_factory), | |||
user_state_context=self.user_state_context) | |||
self.runner.setup() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be called once per instance, not in start_bundle
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the DirectRunner, we currently have one instance per bundle. If we want these instances to be long-living, we need a larger refactoring here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, unfortunately the DirectRunner does not cache instances between bundles. Fortunately this code path is only used by DirectRunner and not the other runners.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If self._perform_dofn_pickle_test
is false, it looks like it is not cloned but comes directly from self._appliedptransform.transform.dofn
. Is the whole appliedptransform clone per bundle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some pdb breakpoints and confirmed that the DoFn is cloned during the test and that the lifecycle methods are called on the clones. See this thread for where the cloning occurs.
Hi @kennknowles, would you be willing to be the primary reviewer on this since @charlesccychen is unavailable for a while? |
Sure, I would be happy to. |
Thank you! Could you also let me know which of your previous review comments are still actionable? |
with TestPipeline() as p: | ||
(p | ||
| 'Start' >> beam.Create([1, 2, 3]) | ||
| 'Do' >> beam.ParDo(CallSequenceEnforcingDoFn())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about, adding an assert after the end of the the with: to assert that teardown was actually called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would that actually work? My understanding is that with remote runners, the DoFn instance here and the one on the worker are different physical instances, and there isn't a way to check for state changes on the DoFn on the worker to figure out if teardown was called or not.
Also, when I was doing print debugging, it looks like teardown is not being called on the direct runner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would not work with remote runners but it would work DirectRunner. Is not direct runner default for TestPipeline?
Also, when I was doing print debugging, it looks like teardown is not being called on the direct runner.
That looks like a bug in this PR. (Or maybe not since it is not guaranteed to be called but I think we should exercise these paths in DirectRunner.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. If we only run this test for DirectRunner, then I can add the extra assert.
Yes, I will investigate and see if DirectRunner can be made to call teardown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Print debugging shows that teardown is being correctly called. However, the assert in this location fails because the worker is using a different unpickled copy of the DoFn.
- A pickle round trip happens in ptransform.py during pipeline construction time.
- The default direct runner FnApiRunner unpickles the DoFn in operations.py and bundle_processor.py.
- The alternative direct runner BundleBasedDirectRunner performs a pickle round trip in transform_evaluator.py if _perform_dofn_pickle_test is true.
This means that it is difficult to get a handle to the DoFn instance that the direct runner worker is actually using.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test that checks that a global flag is set. This assumes that the worker is running in the same process as the test.
@@ -619,6 +619,7 @@ def start_bundle(self): | |||
step_name=self._applied_ptransform.full_label, | |||
state=DoFnState(self._counter_factory), | |||
user_state_context=self.user_state_context) | |||
self.runner.setup() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If self._perform_dofn_pickle_test
is false, it looks like it is not cloned but comes directly from self._appliedptransform.transform.dofn
. Is the whole appliedptransform clone per bundle?
|
||
|
||
@attr('ValidatesRunner') | ||
class DoFnLifecycleTest(unittest.TestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Python have TestStream implemented in the direct runner? Or something equivalent for testing directly against the SDK harness? It would be good to have a test that explicitly crosses multiple bundles, since the purpose of setup / teardown is for operations that cross bundles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestStream is implemented in direct runner but it would not use sdk harness.
Also, Yifan's implementation here adds the APIs, but does not really change the lifecycle to re-use dofns.
Nevertheless, it will be good to run a test with sdk harness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the test already checks that setup is called exactly once, so all that is needed is a way to generate a test such that the runner runs multiple bundles with one DoFn. I'm not sure if that capability exists currently.
Unfortunately the test only checks that teardown is called at most once, and not exactly once. I don't have a good way of checking that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this will be possible if we set _perform_dofn_pickle_test to false in a test. However that needs to be somehow exposed as an option or monkey patched.
I cannot reply to this comment in place
applied transform will be a new instance for each bundle (for direct runner). (See: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/direct/executor.py#L366) |
That line does not show a cloning of appliedptransform...
…On Fri, Apr 19, 2019 at 3:44 PM Ahmet Altay ***@***.***> wrote:
I cannot reply to this comment in place
If self._perform_dofn_pickle_test is false, it looks like it is not cloned
but comes directly from self._appliedptransform.transform.dofn. Is the
whole appliedptransform clone per bundle?
applied transform will be a new instance for each bundle (for direct
runner). (See:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/direct/executor.py#L366
)
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#7994 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAEMO6ONQUU57D4336BHZS3PRJDNRANCNFSM4G36BRSA>
.
|
You are right, I confused by evaluator. perform_dofn_pickle_test defaults to True, and there is only one path that is setting to false related to sdf evaluation. I think we should clone by default, probably by getting rid of that flag. I am not sure why we had the flag in the first place, but I am guessing there will be an underlying issue. |
Added a test to ensure that shutdown is called on DirectRunner. PTAL I discovered that metrics counter increments during setup and teardown will not be registered in the pipeline result metrics. I'm not sure if this should be considered a bug, since setup and teardown may could happen outside the pipeline run (it happens within the pipeline run for DirectRunner, but not necessarily for other runners) I looked at the metrics API, but could not figure out how to set up the metrics environment correctly for those stages. @aaltay do you have any suggestions on how to handle this? |
Hmm my initial gut reaction is to think that metrics are computed / committed as part of a bundle, and setup/teardown are not part of bundle execution. on the other hand, I can imagine user wanting access to metrics from setup/teardown... So perhaps I'd say to file a bug so we figure out what's necessary to make metrics available there... |
@pabloem Your correct that setup/teardown are not part of bundle execution but are part of the DoFn lifecycle. Metrics outside of a bundle do make sense since you may want to capture something like memory usage, CPU utilization, caching stats, ... that cross bundle boundaries or are global. A lot of this falls under worker "health" though. |
@yifanmai In addition to filing a bug related to metrics, could this PR move forward? |
Yes, I think so. I will open a bug to track the fact that metrics in shutdown. (I will do it after this PR otherwise the bug will not make sense.) I have addressed the earlier feedback regarding adding tests to make sure that shutdown is called; @aaltay could you and @kennknowles take another look? |
The change LGTM. But there are test errors. Please take a look at those. (If you need help with py3 compatibility, cc: @tvalentyn could help.) One I found in the logs is the following, there might be other errors as well: 6:57:54 FAILED (SKIP=448, errors=24, failures=5) 16:58:01 File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner |
1f0078f
to
830228a
Compare
@kennknowles @aaltay tests are passing now. PTAL? |
Run Python PostCommit |
Fails with a py3 error, not sure how it is related: |
Run Python PostCommit |
The shutdown assert is breaking in Dataflow also (because it only works for in-process). Can I delete that assert, or disable it for Dataflow?
|
Let's delete it. I do not think it will be easy to assert just for Dataflow runner. |
Run Python PostCommit |
Failing at
|
Run Python PostCommit |
Thank you @yifanmai ! |
* [BEAM-562] Add setup and teardown to Python DoFn
Thanks @aaltay, @kennknowles and @charlesccychen for your help! I added https://issues.apache.org/jira/browse/BEAM-7340 to track the issue related to metrics in DoFn.teardown, as discussed earlier. |
@yifanmai is this also expected to work for unbounded sources? I don't get it to work when reading from pubsub.... |
@NikeNano it should work with all dofns regardless of the types of sources. If that is not working, please file an issue. |
OK, thanks @aaltay. Will investigate further and file an issue if i don't get it to work. |
Created issues for DoFn.setup, https://issues.apache.org/jira/browse/BEAM-7885 |
DoFn.setup and DoFn.teardown is currently supported in Java but not Python. These methods are useful for performing expensive per-thread initialization. This change adds those methods to make the Python SDK more consistent with the Java SDK. It also modifies the direct runner to invoke these methods.
Post-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.