Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancellation of divergent recursive dataflows #16800

Closed
Tracked by #17012
vmarcos opened this issue Dec 22, 2022 · 26 comments
Closed
Tracked by #17012

Cancellation of divergent recursive dataflows #16800

vmarcos opened this issue Dec 22, 2022 · 26 comments
Assignees
Labels
A-COMPUTE Topics related to the Compute layer A-dataflow Area: dataflow

Comments

@vmarcos
Copy link
Contributor

vmarcos commented Dec 22, 2022

In PR #16787, it has been observed that if we render a divergent dataflow, it keeps on executing even if the corresponding view and indexes associated to it are dropped. We should think of a way to handle cancellation of dataflows that are non-terminating.

Here is an example of how to reproduce the issue:

CREATE VIEW v AS
WITH MUTUALLY RECURSIVE 
  cnt (cnt int) AS (
    SELECT  1 AS cnt
    UNION
    SELECT cnt+1 AS cnt FROM cnt)
SELECT * FROM cnt  WHERE cnt < 10;
CREATE DEFAULT INDEX ON v;
DROP VIEW v;

Now, in the memory visualizer, it is possible to see that the dataflow continues to be alive and producing ever increasing numbers of records.

@petrosagg
Copy link
Contributor

That would be a great incentive to build confidence on Worker::drop_dataflow and manage dataflows using dataflow-wide tokens which stop any dataflow regardless of where it's at in its execution!

I had put up a PR for this recently but we didn't want to potentially introduce further instability to the system because drop_dataflow isn't widely tested at the moment. #15779

@vmarcos vmarcos added the A-COMPUTE Topics related to the Compute layer label Dec 23, 2022
@frankmcsherry
Copy link
Contributor

frankmcsherry commented Dec 23, 2022

An different additional guard rail we could use, and probably want anyhow, is to allow the user to bound the number of iterations. I'm guessing that debugging recursive queries is annoying, and a reasonable pattern is to look at the results after one iteration, two iterations, etc. You could change the query to time itself out (with some counter), but it's plenty easy for DD to handle this for you if you have an upper bound on number of iterations.

More generally .. though I'm sure also more frighteningly .. this could just be a temporal filter on round of iteration, which DD supports also (well, manipulation of the timestamp component, which is what temporal filters end up being).

@aalexandrov
Copy link
Contributor

aalexandrov commented Mar 6, 2023

Summarizing some offline discussion with @antiguru.

It seems that we have two options for the frontend.

  1. Extend the current WMR syntax with an optional LIMIT x clause (WITH MUTUALLY RECURSIVE LIMIT $x).
  2. Introduce a query hint for the entire select query OPTIONS (MAXRECURSION $x). This is similar to what SQL server does.
  3. Introduce a session variable for all queries (SET MAXRECURSION = $x).

I'm leaning towards option (2) - the OPTIONS machinery already exists, and we can just forward the parameter when rendering the plan (that is, we don't need to extend LetRec to carry an additional parameter).

@frankmcsherry I'm interested in your opinion here.

@frankmcsherry
Copy link
Contributor

The first option seems the best to me (although LIMIT x seems not great; it's unclear what it limits; records? iterations? minutes?). Not allowing different settings of the option for different WMR blocks will just bite you when you want to test a nested WMR.

I can't tell if the second option allows an OPTIONS for each SELECT command, or each SELECT clause. If the former it has the above problem, and if the latter there should be only one WMR possible in any case, and it seems easier to localize the argument to the clause it modifies.

@aalexandrov
Copy link
Contributor

aalexandrov commented Mar 13, 2023

To summarize:

  • We're leaning towards option (1).
  • The proposed syntax is WITH MUTUALLY RECURSIVE ITERATE $x TIMES.

Related discussion.

@ggevay ggevay assigned ggevay and unassigned aalexandrov Mar 22, 2023
@ggevay
Copy link
Contributor

ggevay commented Mar 22, 2023

I'll work now on option 1. from above, i.e., adding an optional clause to WITH MUTUALLY RECURSIVE to limit the number of iterations. As Frank noted above, this is also good for query debugging: when one wants to examine the output after a certain number of iterations.

This will reduce the problem of divergent dataflows, but it won't completely go away: Maybe the user doesn't specify this extra clause, and then she still ends up with a problematic dataflow that continues to be alive indefinitely. So we'll also want to work on properly cancelling the dataflows when the associated index and/or view is dropped. Edit: This seems to be #2392

@antiguru
Copy link
Member

Maybe the user doesn't specify this extra clause, and then she still ends up with a problematic dataflow that continues to be alive indefinitely.

This we could work-around by having a reasonable default value, either hardcoded, or a session variable.

@ggevay
Copy link
Contributor

ggevay commented Mar 22, 2023

a reasonable default value

Well, I'm not sure what value we would to set. The problem is that a good default value might be wildly different for different queries. A query that works with only small datasets might do thousands of iterations in a minute, but a query that is working with large datasets could run essentially forever even with tens of iterations. (Also, there are queries where even though the initial dataset is large and the first few iterations do a lot of work, there are still many iterations with small deltas.) So, I would be worried that having a non-infinite default value would bite some users, who don't realize that the default is not infinite, but at the same time wouldn't help some other users, whose queries would still run for a long time.

Btw. I'm not sure I fully understand why this problem is specific to WMR. Any query might take a very long time to complete, e.g., if a cross join surprisingly appears in a plan when a user forgets a join condition. Is the idea that in many of those situations, the query will OOM after a not very large amount of time anyway? Is this somehow less the case for WMR?

Another question: Does the problem come up only for VIEW dropping, or does it also happen with a SELECT? So, for a SELECT, does the dataflow go away if I hit Ctrl+C? Edit: It seems to me SELECT also has the same problem.

@ggevay
Copy link
Contributor

ggevay commented Mar 22, 2023

Btw. it would be great to print a notice to the user when the limit is reached. Then we could set a relatively lower default. But I'm not sure whether we have the infrastructure for this.

@aalexandrov
Copy link
Contributor

aalexandrov commented Mar 22, 2023

It seems to me SELECT also has the same problem.

Yes, but it seems we'll be prioritizing that work (#2392). @antiguru would you consider this as an alternative mechanism to handle divergent dataflows in case we allow to render dataflows without a default limit?

@antiguru
Copy link
Member

I'm not sure that #2392 is the right mechanism to prevent divergent dataflows. Recursive queries have memory requirements proportional to the trace, which means that we might OOM faster than the user has a chance to react. Limiting the number of recursions might help here.

Another question: Does the problem come up only for VIEW dropping, or does it also happen with a SELECT? So, for a SELECT, does the dataflow go away if I hit Ctrl+C? Edit: It seems to me SELECT also has the same problem.

Currently, we let the computation finish, however long this might take. In the future, we might be able to uninstall the dataflows, but I'd prefer if we don't introduce this as a hard dependency.

I agree we have to think a bit more about this because we're mixing a product and an engineering question. I think it makes sense to have the infrastructure in place to set a limit on iterations and to have an optional global limit (or set the default to u64::MAX). We can then think about setting a default value and how to communicate it to users with product.

Btw. it would be great to print a notice to the user when the limit is reached. Then we could set a relatively lower default. But I'm not sure whether we have the infrastructure for this.

Yep, this would be nice, but I don't think we have a way to communicate notices from the dataflow layer, only errors, but that's not what we'd like to have here.

@ggevay
Copy link
Contributor

ggevay commented Mar 22, 2023

Btw. one more complication is that if we have a per-WMR limit, then we will need to change NormalizeLets. For example, in this test, we wouldn't be able to merge the two WMRs if they have a different limit. This would mean that the invariant currently produced by NormalizeLets that "if there is WMR in a plan fragment then it is at the top" would be violated, because the Union would be at the top. I'm not sure how big a problem would this be. (Maybe the rendering relies on it?) I'll look into this tomorrow. Edit: Maybe each binding will need to have its own limit in MIR's LetRec.

@aalexandrov
Copy link
Contributor

aalexandrov commented Mar 23, 2023

@ggevay one nit: it's important to prevent users from saying:

WITH MUTUALLY RECURSIVE LIMIT 0

The current semantics of WMR blocks are of a

do { /* sequential updates */ } while /* no change*/

and I can imagine that the code of some optimizations will get unnecessary complicated if we want to handle the 0 iterations case. I think you can use one of the std::num::NonZero~ to ensure this statically in the new LetRec field.

@ggevay
Copy link
Contributor

ggevay commented Mar 23, 2023

I'm not sure that #2392 is the right mechanism to prevent divergent dataflows. Recursive queries have memory requirements proportional to the trace, which means that we might OOM faster than the user has a chance to react. Limiting the number of recursions might help here.

Well, I think OOMing would often be a better situation than either silently stopping after a default limit or running forever:

  • An unexpected and silent stop at a default limit can be considered almost as bad as incorrect results. Correctness is the top item on our Compute team mission, whereas stability is only second. (One could even argue that the definition of "Stability" given there doesn't even include preventing OOMs for a bad query, because the definition says "available under all situations that do not exceed the available compute and storage resources")
  • In the running forever case, the user would be left wondering what she should do: let it run so that it may still finish or kill it. OOMing gives a clear signal that something is very wrong.

Also note that when a user is developing a query, she should always run her experimental queries on such a cluster that is ok to suddenly crash, because queries can suddenly OOM due to a wide range of user mistakes (also unrelated to WMR).

Nevertheless, I think we need to have both
A. a mechanism to limit the number of iterations, and
B. proper dataflow cancellation (#2392).

Both of these are good for mitigating the effect of divergent WMRs at least in some cases (e.g., A. is good if the user explicitly sets some nice limit, and B. is good when the WMR is running forever, but somehow not OOMing, e.g., Marcos' query in the issue decription), and both of these are also good for certain things other than divergent WMRs. (E.g., A. is good for output debugging after each WMR iteration, and B. is good for terminating bad queries that don't even have WMR.)

So I'll go ahead with implementing the limit.

I'm still not sure whether we should have a default limit, but we can decide this later (with Product in the loop (@heeringa)). Maybe some large number, e.g., 1000? Or we wait until we can show notices to the user?

but I don't think we have a way to communicate notices from the dataflow layer

Do we have an issue for it at least?

@aalexandrov
Copy link
Contributor

It seems that we are proposing various solution with two overlapping goals in mind:

  1. Coming up with a mechanism and system behavior that protects customers from running divergent dataflows, especially on production clusters.
  2. Coming up with mechanisms to inspect intermediate states of the dataflow.

We might decide that we use the same solution for both or use two separate solutions. Let's use this issue to track (1) and #18362 to track (2).

@ggevay
Copy link
Contributor

ggevay commented Mar 23, 2023

Yes, we might want to have two limits: a soft limit for 2. and a hard limit that errors out for 1.

Note that 1. is easier, because it's ok if the limit setting is per-query (and/or a session variable), whereas for 2. it would be useful to have it per-WMR.

I'll implement 1. first, because that seems to be the most pressing matter. Then we can do 2. soon afterwards, possibly in two steps:

  • First, we can have a version of 2. that is also per-query. This way, there is only a slight rendering change between 1. and 2., which should be easy. This already gives us the debugging benefit of 2. for at least non-nested WMR.
  • The hard part for 2. is having it per-WMR: For this we need to somehow solve the problem noted above that NormalizeLets sometimes merges two LetRec nodes into one, which is problematic if they have differing limits. I have ideas for this, but this will probably add at least a day of extra work.

@aalexandrov
Copy link
Contributor

I added a comment in #18362 with a straight-forward way to emulate (2) in SQL using mechanic changes to an existing WMR query. It could be that this solution is more robust w.r.t. the NormalizeLets concerns that you raised above. In any case it should unblock people that want to debug #18362 until we support the more elegant direct syntax.

@ggevay
Copy link
Contributor

ggevay commented Mar 23, 2023

This is great, @aalexandrov, thank you very much! Then I'll go for 1., and put aside my NormalizeLets worries.

@aalexandrov
Copy link
Contributor

@ggevay can you please fix the TODO(#16800) in column_knowledge.rs that is added in #18323 when you open a PR for the hard limit?

@ggevay
Copy link
Contributor

ggevay commented Mar 30, 2023

Yes, will do, thanks for the reminder!

@ggevay ggevay mentioned this issue Mar 31, 2023
5 tasks
@philip-stoev
Copy link
Contributor

@teskje 's #18442 is effective in cancelling divergent WMR dataflows.

@ggevay
Copy link
Contributor

ggevay commented Apr 11, 2023

@teskje 's #18442 is effective in cancelling divergent WMR dataflows.

Indeed, if that lands, that would mostly address this issue. But unfortunately we don't know when that will land. It might turn out to be a very long time, because it might require tricky fixes in Timely and/or Differential, and Frank is very busy nowadays. Maybe if a very thorough testing finds that it works well, then it might go in without too much involvement from Frank. In the meantime, I'm working on iteration limits as an alternative fix, because there are also other use cases for limits besides stopping divergent dataflows anyway, as explained in the design doc.

@teskje
Copy link
Contributor

teskje commented Apr 11, 2023

I plan to put up a design doc in the next days to propose some alternate ways of shutting down dataflows as long as we don't yet have drop_dataflow. One of those will be shutting down WMR dataflows by inserting an operator in each loop that stops producing output when it sees a shutdown token being dropped, and produce a fix point that way.

@petrosagg
Copy link
Contributor

One of those will be shutting down WMR dataflows by inserting an operator in each loop that stops producing output when it sees a shutdown token being dropped

I wanted to propose the same thing, it sounds like the easiest thing to do with our existing token infrastructure. The implementation was only a few lines of code so I pushed it here #18718

@teskje
Copy link
Contributor

teskje commented Apr 18, 2023

#18718 is merged now, so divergent WMR dataflows are cancelled as one would expect. @ggevay @aalexandrov I suggest we close this issue here as complete. If we still want a hard limit (we might not?) we should open a separate ticket to track that specifically.

@ggevay
Copy link
Contributor

ggevay commented Apr 19, 2023

Thanks a lot @teskje and @petrosagg!

We'll discuss the hard limit later when @antiguru comes back. (I think he mentioned it to me once that a default hard limit might be desirable even if we have dataflow cancellation.) I opened an issue: #18832.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-COMPUTE Topics related to the Compute layer A-dataflow Area: dataflow
Projects
None yet
Development

No branches or pull requests

8 participants