-
Notifications
You must be signed in to change notification settings - Fork 457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
design doc: Faster Dataflow Shutdown #18760
design doc: Faster Dataflow Shutdown #18760
Conversation
5664991
to
02aaf37
Compare
02aaf37
to
46e2fd5
Compare
46e2fd5
to
0294e70
Compare
|
||
Concerns have been raised that the overhead of performing a check for every update might slow down joins prohibitively. | ||
We have not been able to corroborate these concerns with tests so far. | ||
At least for `SELECT` queries computing simple cross joins adding a token check to the join closure does not appear to increase the query time noticeably. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is to be expected. Any time a Rust function accepts a closure via a generic parameter it is considered for inlining by the compiler, since monomorphization needs to be deferred until the crate calling the generic function is compiled. In the case of join_core
in particular I'm pretty sure the closure we provide gets inlined here and then a loop invariant pass lifts the token check out of the loop, so we only run it once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I didn't consider a smart compiler!
For reference, this is the test that lead me to the above statement:
-- common setup
CREATE TABLE t (a int);
INSERT INTO t SELECT * FROM generate_series(1, 10000);
-- join shutdown poc
materialize=> SELECT b.a * 10, b.a * 10, c.a * 10 FROM t a, t b, t c limit 1;
?column? | ?column? | ?column?
----------+----------+----------
66560 | 66560 | 66560
(1 row)
Time: 66956.747 ms (01:06.957)
-- main
materialize=> SELECT b.a * 10, b.a * 10, c.a * 10 FROM t a, t b, t c limit 1;
?column? | ?column? | ?column?
----------+----------+----------
66560 | 66560 | 66560
(1 row)
Time: 66909.803 ms (01:06.910)
0294e70
to
d337003
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the write-up, which looks good to me overall. I am not 100% sure about the bits where the fuse operator is seen as a finished design in performing an early downgrade to the empty frontier. Also, perhaps we should think of increasing the complexity of mutually recursive structures for testing.
|
||
In the scope of this document, the main use case for the fuse operator is forcing divergent WMR dataflows to shut down. | ||
WMR dataflows circulate updates until they reach a fixpoint, and a WMR dataflow becomes divergent when no fixpoint can ever be reached. | ||
By introducing a fuse operator in the loop, the circulation of updates is stopped when the dataflow is dropped, allowing the operators in the loop to advance to the empty frontier and shut down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fragment might be conflating two separate ideas: (a) The fuse operator can aid a WMR dataflow reach a fixpoint and thus not be "stuck" in divergent recursive structures; (b) Whether fuse should or should not downgrade to the empty frontier when it does so.
I think that point (a) is clearly a design objective for the fuse operator. I am not sure about point (b), even though it's been added to the current implementation. An alternative for fuse is to only provide (a), then normal source shutdown mediated by tokens could trigger the eventual downgrading of downstream operators to the empty frontier and their shutdown. Could we articulate these design choices somehow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will flesh out this part a bit more, but the gist is:
- Only stopping data updates would be enough to solve the WMR fixpoint issue.
- Also stopping progress updates (by advancing to the empty frontier) potentially speeds up the dataflow shutdown because downstream operators can start shutting down immediately when the dataflow is dropped, instead of having to wait for upstream operators to finish processing their remaining updates.
- As such, advancing to the empty frontier is not a necessity, but we should still do it because it speeds things up and is low-risk.
It sounds like you disagree about the low-risk part. Is this just because of the uncertainty about whether the fuse operator continues to consume upstream data after it has advanced to the empty frontier, or do you worry about something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pulled immediately advancing to the empty frontier into a separate paragraph and added more discussion pointing out that this is just a performance optimization, not a requirement for solving divergent WMR shutdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that my worry is a bit broader, but perhaps motivated only by ignorance of all the details of the written contract for the Timely scheduler. :-) So trying to articulate it a bit more:
- I agree that stopping data updates indeed is enough to solve the WMR issue. It could be done with a fuse operator or coming to think of it, simply by rendering a
branch_when
that performs a test of the token. So if we do not have the bit of downgrading to the empty frontier, perhaps a specialized fuse operator is just not necessary? - Let's say that we want to force the downgrade to the empty frontier in the fuse operator. Now, to ensure the design is sound, we need a correctness argument that this downgrading is safe under the Timely scheduling constraints for any arbitrarily shaped dataflow graph, with as many loops as you want it to have. Perhaps I am just missing something, but I could not grok this from the design doc. I was hoping to get some pointers to (a) the guarantees that we are using regarding Timely scheduling, backed by the Timely documentation; (b) the reasoning that gets me the implication from these guarantees to the safety of deploying the empty-frontier forcing fuse.
- I did not look at the details, but it seems that after the empty-frontier forcing change to fuse, we have still some pending problems in the fast
sqllogictest
run where we are getting stuck in a WMR query: https://buildkite.com/materialize/tests/builds/53595#01877af5-2a11-43b0-846d-8952679c4386. Not sure if it is a simple bug or if something bigger is lurking behind it. It could be that we need to drop the capability there, not only downgrade it to the empty frontier, but I am not 100% sure. - I tried to think a bit about how the empty-frontier forcing fuse behaves in terms of a path summary, but I am still a bit uncertain about the semantics. Do you have a handy argument for why it works out? I will keep on thinking about a justification along these lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for writing your thoughts down! I don't have answer for all your points yet, but these are my thoughts to some of them:
So if we do not have the bit of downgrading to the empty frontier, perhaps a specialized fuse operator is just not necessary?
For me adding a token check to the WMR rendering is the specialization and having a fuse operator would be the generic implementation. The fuse operator would be useful in other contexts as well, and it wouldn't be less useful in the WMR context, which is why I'd prefer that solution even if it doesn't actively downgrade its frontier on token drop.
Now, to ensure the design is sound, we need a correctness argument that this downgrading is safe under the Timely scheduling constraints for any arbitrarily shaped dataflow graph, with as many loops as you want it to have.
Can we define what "safe" means here? Is it just that all operators in the dataflow will shut down eventually and free all their resources or is it more?
I did not look at the details, but it seems that after the empty-frontier forcing change to fuse, we have still some pending problems in the fast sqllogictest run where we are getting stuck in a WMR query
Good catch, I'll need to look at that.
I tried to think a bit about how the empty-frontier forcing fuse behaves in terms of a path summary, but I am still a bit uncertain about the semantics. Do you have a handy argument for why it works out?
I will admit that I don't fully grok that either. My understanding is that setting the output path summary to the empty antichain makes it look to the reachability checker as if there is no connection between the fuse operator and its downstream node. I guess this only applies to reachability propagation from other nodes though, because the downstream node is still able to see the frontier of the fuse operator in its input frontier. When the fuse operator drops its capability, then that advances its output frontier to the empty frontier, even though upstream nodes still hold capabilities that would hold back the frontier if the path summary wasn't the empty antichain.
I'd be fine with moving forward with a fuse operator that leaves its output frontier alone. It would still be great if we could answer your questions, but I don't think we need to block on them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the safety question: I think that eventual shutdown and no resource leakage are a fine start, but we should also consider other aspects, such as things not getting stuck or data exchanged not violating the frontiers communicated. I think there is a bigger discussion in this paper. This might be a good candidate for a reading group discussion if Frank is available and we'd like to go into the details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided that it's fine to have a fuse operator that doesn't do special things to its output frontier, especially since #18718 now merged without this feature. I adjusted the design doc to mark this as a possible future optimization we can do.
That's correct. The output frontier will be the empty antichain but timely
will continue scheduling it and it will drain its input handle
…On Fri, Apr 14, 2023, 13:48 Jan Teske ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In doc/developer/design/20230411_faster_dataflow_shutdown.md
<#18760 (comment)>
:
> +
+## Tokenizing Join Operators
+
+We propose that join operators (both linear and delta joins) receive references to shutdown tokens and use these to stop update processing when the dataflow is getting shut down.
+The benefits of this change are that join operators shut down more quickly and that they are prevented from introducing more data into the dataflow.
+
+Exactly how the join implementations should perform the token checking is still an open question.
+Initial testing in the scope of [#7577] suggests that the main bottleneck in joins does not come from the work of producing matching updates, but from applying the join closure to the result of the matching.
+This implies that the join closure should perform a token check before processing each join result.
+
+Concerns have been raised that the overhead of performing a check for every update might slow down joins prohibitively.
+We have not been able to corroborate these concerns with tests so far.
+At least for `SELECT` queries computing simple cross joins adding a token check to the join closure does not appear to increase the query time noticeably.
+
+There might be other places in the join implementations where adding a token check improves shutdown performance.
+For example, adding a fuse operator after the join output would allow downstream operators to observe an empty input frontier and begin shutting down before the join operator has finished iterating over all matches.
My assumption until now was that Timely would continue to schedule the
fuse operator, as long as it still receives new inputs, regardless of
whether or not it has advanced to the empty frontier. So the fuse operator
would be consuming the join outputs.
That might be wrong of course! @antiguru <https://github.com/antiguru>
and @petrosagg <https://github.com/petrosagg> should be able to deny or
confirm.
—
Reply to this email directly, view it on GitHub
<#18760 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAHFLHDE5SVSABXKN3SDDADXBE2SNANCNFSM6AAAAAAW5CG2XM>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
25e45ca
to
6e59369
Compare
@vmarcos I believe I addressed all your concerns, can you take another look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the latest changes addressed my core concerns. Thanks for bearing with me!
Thanks for reviewing! |
This PR adds a design doc proposing ways to make cancelled COMPUTE dataflows shut down faster.
Rendered version.
Motivation
Advances #7577 and #16800.
Proposes alternatives to #2392.
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way) and therefore is tagged with aT-proto
label.