Skip to content

Commit

Permalink
design doc: Faster Dataflow Cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Apr 13, 2023
1 parent aa70471 commit 5664991
Showing 1 changed file with 89 additions and 59 deletions.
148 changes: 89 additions & 59 deletions doc/developer/design/20230411_faster_dataflow_shutdown.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ The following is a list of problems we have observed so far:
The divergent loop part of the dataflow, and all operators downstream of it, continue running forever.
The only way to stop such a dataflow is currently to drop the affected replicas.

- **Persist sources always emit full batches** ([#7577])
- **Persist sources always emit full batches**

Persist sources don't shut down immediately when signaled to do so.
Instead, they always finishes emitting the remaining updates available at the current time.
Expand All @@ -65,7 +65,7 @@ The following is a list of problems we have observed so far:
Even if the replica doesn't run out of memory, the current behavior of persist source means that more data is introduced into the dataflow before it can shut down.
This slows down the dataflow shutdown, as downstream operators require time to process the new updates in addition to the ones already in the dataflow.

- **Join operators can multiply data** ([#7577])
- **Join operators can amplify data** ([#7577])

Depending on the amount of input data and the join condition, joins can vastly multiply their input data.
Consequently, they can take a long time to finish emitting all their outputs even once their inputs have advanced to the empty frontier.
Expand All @@ -82,14 +82,13 @@ Failing to shut down a dataflow, or failing to do so in a timely manner, usually

For these reasons, we are interested in completing dataflow shutdowns as quickly as possible.


[`Worker::drop_dataflow`]: https://docs.rs/timely/latest/timely/worker/struct.Worker.html#method.drop_dataflow
[timely-dataflow/#519]: https://github.com/TimelyDataflow/timely-dataflow/pull/519
[WMR]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20221204_with_mutually_recursive.md
[#7577]: https://github.com/MaterializeInc/materialize/issues/7577
[#16800]: https://github.com/MaterializeInc/materialize/issues/16800
[#16860]: https://github.com/MaterializeInc/materialize/issues/16860


# Explanation
[explanation]: #explanation

Expand All @@ -105,120 +104,151 @@ In this design document we propose three measures to tackle the problems outline
1. [Faster persist source shutdown](#faster-persist-source-shutdown)
1. [Tokenizing join operators](#tokenizing-join-operators)

Each of these measures introduces the possibility that downstream operators receive incomplete update sets for a given time slice.
We discuss how to deal with issues introduced by this as well.

## Fuse Operator

We propose the introduction of a "fuse" operator.
The purpose of this operator is to interrupt the flow of data updates when the dataflow is dropped.
The purpose of this operator is to interrupt the flow of updates when the dataflow is dropped.
This is useful to ensure downstream operators are not fed more updates when they should be shutting down.
It is also useful to construct an artificial fixpoint in dataflow loops.

In the scope of this document, the main use case for the fuse operator is forcing divergent WMR dataflows to shut down.
WMR dataflows circulate updates until they reach a fixpoint, and a WMR dataflow becomes divergent when no fixpoint can ever be reached.
By introducing a fuse operator in the loop, the circulation of updates is stopped when the dataflow is dropped, allowing the operators in the loop to advance to the empty frontier and shut down.

We anticipate that the fuse operator will also come in handy in the future to resolve other shutdown bottlenecks.
We anticipate that the fuse operator will also come in handy to resolve other shutdown bottlenecks.
For example, it can be inserted before expensive operators to stop their incoming update flow as early as possible.
It is also likely to be useful to support [tokenizing join operators](#tokenizing-join-operators).

The fuse operator is instantiated with a token reference that it uses to observe dataflow shutdowns.
As long as there are still other holders of the token, it simply forwards all updates received on its inputs to its outputs.
Once the token has been dropped by everyone else, it discards all input updates.
Once the token has been dropped by everyone else, it advances its output to the empty frontier and discards all input updates.
[#18718] provides the described fuse operator implementation, and applies it to the cancellation of WMR dataflows.

Note that the fuse operator as proposed here does not proactively advance its output frontier to the empty frontier when its token is dropped.
While it could do that, that would complicate its implementation and it is currently not evident that this would be a useful optimization.
The proposed design does not preclude adding the optimization in the future.

[#18718]: https://github.com/MaterializeInc/materialize/pull/18718

## Faster Persist Source Shutdown

TODO
We propose restoring the previous behavior of the persist source of immediately shutting down when it observes that the shutdown token was dropped, rather than ignoring the shutdown until the entire current batch is emitted.
The benefit of this change is that cancellation of queries that read large snapshots from persist becomes much faster, by reducing the time until persist sources shut down, as well as the amount of data introduced into the dataflow.
Reducing the amount of introduced data can also prevent OOMs that might otherwise occur some time after the user has already cancelled a query.

The original reason for having persist sources always emit whole batches was that certain COMPUTE operators were not able to deal with only seeing an incomplete set of updates for a time.
They would see invalid retractions and occasionally react by panicking.
As part of [#17178], COMPUTE operators have been made much more robust to unexpected retractions.
They now consistently report errors gracefully instead of crashing the process.
Because of this the workaround in the persist source is not necessary anymore and we are free to revert it to the previous more efficent behavior.

[#17178]: https://github.com/MaterializeInc/materialize/issues/17178

## Tokenizing Join Operators

TODO
We propose that join operators (both linear and delta joins) receive references to shutdown tokens and use these to stop update processing when the dataflow is getting shut down.
The benefits of this change are that join operators shut down more quickly and that they are prevented from introducing more data into the dataflow.

Exactly how the join implementations should perform the token checking is still an open question.
Initial testing in the scope of [#7577] suggests that the main bottleneck in joins does not come from the work of producing matching updates, but from applying the join closure to the result of the matching.
This implies that the join closure should perform a token check before processing each join result.

Concerns have been raised that the overhead of performing a check for every update might slow down joins prohibitively.
We have not been able to corroborate these concerns with tests so far.
At least for `SELECT` queries computing simple cross joins adding a token check to the join closure does not appear to increase the query time noticably.

There might be other places in the join implementations where adding a token check improves shutdown performance.
For example, adding a fuse operator after the join output would allow downstream operators to observe an empty input frontier and begin shutting down before the join operator has finished iterating over all matches.
In the interest of keeping code complexity in check, we suggest to not speculatively add more token checks to the join implementations, unless we have evidence (e.g., example queries) that they significantly improve dataflow shutdown performance. The same applies to adding token checks to other operators than join.

[#7577]: https://github.com/MaterializeInc/materialize/issues/7577

Explain the design as if it were part of Materialize and you were teaching the team about it.
This can mean:
## Handling Incomplete Time Slices

- Introduce new named concepts.
- Explain the feature using examples that demonstrate product-level changes.
- Explain how it builds on the current architecture.
- Explain how engineers and users should think about this change, and how it influences how everyone uses the product.
- If needed, talk though errors, backwards-compatibility, or migration strategies.
- Discuss how this affects maintainability, or whether it introduces concepts that might be hard to change in the future.
Each of the measures proposed above relies on suppressing updates that would usually be emitted to downstream operators.
As a result, if an operator observes a token while processing a time slice, its downstream operators might only receive an incomplete set of updates for this time slice.
Operators must thus be able to deal with seeing invalid data during a dataflow shutdown. In particular:

# Reference explanation
[reference-explanation]: #reference-explanation
1. Operators must gracefully handle invalid retractions (i.e., updates with negative multiplicity that don't have associated positive updates).
2. Dataflow sinks must not emit invalid data to external systems.

Focus on the implementation of the feature.
This is the technical part of the design.
Since the resolution of [#17178], operators observing invalid retractions don't panic anymore.
But they still log errors that appear in Sentry and alert engineering about potential bugs.
To avoid unnecessary noise, we need to ensure that no errors are logged when invalid retractions are observed due to dataflow shutdown.
For this, we propose giving all operators that can log errors due to invalid retractions handles to shutdown tokens.
These operators can then check if the dataflow is still alive before logging any errors.

Persist sinks already hold onto tokens that inform them about dataflow shutdowns and that ensure that no further data is emitted once the COMPUTE worker drops a sink token.
Indexes and subscribes are technically also dataflow sinks, albeit ones that don't directly write to external systems.
Instead, their output is queried and forwarded by the COMPUTE worker.
The worker is implemented in such a way that it stops reading from a dataflow's output before dropping it, so there is no risk of it reading invalid data from indexes or subscribes during dataflow shutdown.

- Is it reasonably clear how the feature is implemented?
- What dependencies does the feature have and introduce?
- Focus on corner cases.
- How can we test the feature and protect against regressions?

# Rollout
[rollout]: #rollout

Describe what steps are necessary to enable this feature for users.
How do we validate that the feature performs as expected? What monitoring and observability does it require?

## Testing and observability
[testing-and-observability]: #testing-and-observability

Testability and explainability are top-tier design concerns!
Describe how you will test and roll out the implementation.
When the deliverable is a refactoring, the existing tests may be sufficient.
When the deliverable is a new feature, new tests are imperative.

Describe what metrics can be used to monitor and observe the feature.
What information do we need to expose internally, and what information is interesting to the user?
How do we expose the information?
We rely on our existing test suite to ensure that the above measures for faster dataflow shutdown don't impact correctness (e.g., by making operators shut down too early).

Basic guidelines:
For testing that the proposed measures have an effect we will write testdrive tests that drop dataflows and observe their shutdown process with help of the introspection sources.
This will be very simple for testing cancellation of divergent WMR dataflows (which shut down either quickly or never) but harder for testing the cancellation of persist sources and cross joins (which shut down either quickly or slowly).
Since the later tests rely on timing, care must be taken to ensure they don't produce false positives or false negatives.

* Nearly every feature requires either Rust unit tests, sqllogictest tests, or testdrive tests.
* Features that interact with Kubernetes additionally need a cloudtest test.
* Features that interact with external systems additionally should be tested manually in a staging environment.
* Features or changes to performance-critical parts of the system should be load tested.
For [tokenizing join operators](#tokenizing-join-operators) in particular, potential performance degradation is a concern.
We can build some confidence here by performing SQL-based benchmarks to accompany the PR that introduces this feature.

## Lifecycle
[lifecycle]: #lifecycle

If the design is risky or has the potential to be destabilizing, you should plan to roll the implementation out behind a feature flag.
List all feature flags, their behavior and when it is safe to change their value.
Describe the [lifecycle of the feature](https://www.notion.so/Feature-lifecycle-2fb13301803b4b7e9ba0868238bd4cfb).
Will it start as an alpha feature behind a feature flag?
What level of testing will be required to promote to beta?
To stable?
The changes proposed in this design doc do not seem particularly risky.
There is no need for feature flags or extended testing on staging.


# Drawbacks
[drawbacks]: #drawbacks

Why should we *not* do this?
The proposed changes increase the complexity of the rendering code by requiring that dataflow shutdown tokens are passed to various additional operators.

Furthermore, when compared to `drop_dataflow`, the proposed measures are less effective.
They make dataflows shut down faster but cannot guarantee immediate shutdown.


# Conclusion and alternatives
[conclusion-and-alternatives]: #conclusion-and-alternatives

- Why is the design the best to solve the problem?
- What other designs have been considered, and what were the reasons to not pick any other?
- What is the impact of not implementing this design?
The propsed design is effective in solving commonly observed issues that lead to slow dataflow shutdown and corresponding usability issues.
It is also relatively easy to implement, as it extend the token mechanism that already exists in dataflow rendering.

As an alternative to the changes proposed here, we could instead rely on Timely's `drop_dataflow`.
Doing so would simplify the implementation in Materialize, as dataflow rendering would not have to care about shutdown concerns.
`drop_dataflow` would also guarantee that dataflow operators are always shut down immediately, rather than after some undetermined amount of time.
However, the maintainers of Timely Dataflow are not confident in the correctness of `drop_dataflow` at this time.
As such, we consider relying on it for such a fundamental part of Materialize as too risky.

An alternative to the proposed join tokenization could be to implement the token checks not inside the Materialize rendering code, but in the core DD operator logic.
Doing so would potentially provide greater control of the update stream, which would allow us to discard updates earlier.
We reject this alternative because making changes to DD is relatively more complicated and time-intensive.
Furthermore, DD is a public library, so it is not clear whether the maintainers would accept changes that are tailored to the Materialize use-case.

<!--
# Unresolved questions
[unresolved-questions]: #unresolved-questions
- What questions need to be resolved to finalize the design?
- What questions will need to be resolved during the implementation of the design?
- What questions does this design raise that we should address separately?
-->


# Future work
[future-work]: #future-work

Describe what work should follow from this design, which new aspects it enables, and how it might affect individual parts of Materialize.
Think in larger terms.
This section can also serve as a place to dump ideas that are related but not part of the design.
We expect to find more instances of dataflow operators that are slow to shut down under certain conditions, even after this design is implemented.
Insofar as these instances can be resolved by adding more token checks, we should do so.

This design does not consider issues caused by dataflow operators refusing to yield back to the Timely runtime.
These instances can only be solved by adding or improving fueling for these operators, which is something we should do but leave as future work.

If you can't think of any, please note this down.
Finally, we should continue the work of stabilizing `drop_dataflow`.
As pointed out above, it is a simpler and more effective approach than what this design proposes, and therefore the strictly better solution provided we can gain confidence in it.

0 comments on commit 5664991

Please sign in to comment.