Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advance distinguish_since further #196

Merged
merged 5 commits into from Aug 6, 2019

Conversation

@frankmcsherry
Copy link
Member

commented Aug 3, 2019

This PR improves the ability of operators to advance the distinguish_since() capability for held traces, by paying more careful attention to the timely dataflow frontiers on their inputs. These inputs may reflect the absence of further batches, which if empty will never arrive (as empty batches cannot currently be sent in timely dataflow). By advancing the capability, these traces are now able to start merging empty batches, which .. I mean I guess we have to do that don't we.

There is a similar change required in reduce.rs, and it is possible that other operators (e.g. count.rs and distinct.rs) might want a hand as well. Pretty much all uses of distinguish_since() should be audited.

Also, distinguish_since() should probably be deleted, but we aren't there yet.

frankmcsherry added some commits Aug 3, 2019

@frankmcsherry

This comment has been minimized.

Copy link
Member Author

commented Aug 4, 2019

The most recent commit puts the corresponding improvements into reduce.rs. It involved a bit of program logic change, in an especially finicky file. It would be great to get it exercised a bit on multiple workers by the likes of @comnik and @ryzhyk, if you folks end up with free cycles.

@frankmcsherry

This comment has been minimized.

Copy link
Member Author

commented Aug 4, 2019

Also, all of this makes me very much want to shift timely dataflow over to using capability sets for each message, rather than single capabilities...

@comnik

This comment has been minimized.

Copy link
Member

commented Aug 5, 2019

Also, all of this makes me very much want to shift timely dataflow over to using capability sets for each message, rather than single capabilities...

Can you explain a bit how that would affect issues such as this one?

I'll run the 3DF tests on this branch, can't quite promise when I'll get to it though.

@frankmcsherry

This comment has been minimized.

Copy link
Member Author

commented Aug 5, 2019

The problem here (and likely a few other spots) is that information reaches the operator through two different paths. Both of them are conservative approximations to the truth (batch frontiers don't reflect empty batches, timely frontiers merge frontiers across all workers).

It would be great if we were able to just push the truth at each worker, and remove a bit of wonky logic that works around this sort of issue. Nothing obviously wrong at the moment, but it is a bit tortured (and, based on past experience, some subtle detail overlooked).

@frankmcsherry

This comment has been minimized.

Copy link
Member Author

commented Aug 5, 2019

I think I might go ahead and merge this, perhaps with a bit more testing and the count.rs and threshold.rs variants coded. It blocks a few other improvements that I'm trying to get done as well. If master goes sideways for you, let me know about that too!

@frankmcsherry

This comment has been minimized.

Copy link
Member Author

commented Aug 5, 2019

This now appears to work, at least it passes tests and such, but it has an annoying design defect that I should either rewrite to work around, or at least own up to publicly.

Operators that manage trace handles have two paths information about batches can reach them: through their input streams, and from the trace handle itself. The trace handle is the source of truth, but it is not as directly accessible as the stream of batches itself. Some operators don't maintain the trace, for example. The input streams are what supply the batches to the operator, but they could plausibly linger in their delivery of batches, making them a bit of a stale view of what is held by the trace handle.

In principle operators with access to the trace handle can pull out all batches, using the map_batches operation, which gives them an up-to-date view of what the arrangement is sitting on. We could work with the theory that arrangements create batches that accurately reflect their input frontiers, and so this supply of batches doesn't linger behind what information timely dataflow would present. Operators that do not hold on to a trace handle would need to work differently, but they are fewer (just join?) and do not have some of the same issues about reflecting their progress back at the trace handle.

@comnik

This comment has been minimized.

Copy link
Member

commented Aug 5, 2019

Both of them are conservative approximations to the truth (batch frontiers don't reflect empty batches, timely frontiers merge frontiers across all workers).

This was very helpful, thanks. And so with capability sets you could send an empty set with an empty batch?

I don't have much to contribute to this, but to me it seems desirable that empty batches are accurately reflected in the trace, because at least for 3DF I would expect that only a small subset of traces receives updates for any given round of input.

@frankmcsherry

This comment has been minimized.

Copy link
Member Author

commented Aug 5, 2019

Yes, that is correct. At the moment empty traces should be reflected in the trace, but may not make it to custom differential operators you might write. If you swing through a trace directly, using something like map_batches, they should be visible there.

@frankmcsherry

This comment has been minimized.

Copy link
Member Author

commented Aug 5, 2019

Thinking out loud: operators need to await the arrival of input batches before processing them, as they may need capabilities that arrive only with the batches. The only exception to this are empty batches, which can be "virtually received" without awaiting any actual transmissions (which is good, because we do not transmit them). However, operators probably should not shoot forward and attempt to process all batches available through their trace handle, as in the event of latency the batch capabilities may not be at hand and a big issue will ensue.

So, subject to the constraints, I think at the moment the most sane thing to do is for operators to continue to be largely driven by accepting input batches, but to then forward their "accept frontier" up through any subsequent empty batches, starting from the upper frontier of their most recently received batch. We stop at the upper frontier of the last empty batch (either before a non-empty batch, or at the end of the trace).

@frankmcsherry

This comment has been minimized.

Copy link
Member Author

commented Aug 5, 2019

The most recent commit adds a method that I think resolves much of the ambiguity. TraceReader has a new helper method advance_upper which accepts a mutable reference to an antichain, and will advance that antichain through any empty batches immediately after the batch for which it is the upper bound.

This method then gets used in each of join.rs, reduce.rs, count.rs, and threshold.rs, as a modification to the tracked upper frontier of accepted batches. This should result in properly advancing trace handles so that physical merging of empty batches can take place, as well as other knock-on benefits (advance_by now lets us do compaction in a few corner cases).

@frankmcsherry frankmcsherry merged commit 3dce300 into master Aug 6, 2019

2 checks passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details

@frankmcsherry frankmcsherry deleted the distinguish_less branch Aug 6, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.