Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrong behavior of yield when associated stream is cancelled #49451

Closed
sgrekhov opened this issue Jul 14, 2022 · 9 comments
Closed

Wrong behavior of yield when associated stream is cancelled #49451

sgrekhov opened this issue Jul 14, 2022 · 9 comments
Assignees
Labels
area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. type-bug Incorrect behavior (everything from a crash to more subtle misbehavior)

Comments

@sgrekhov
Copy link
Contributor

sgrekhov commented Jul 14, 2022

According to the yield specification

If paused, the function is blocked at the yield statement until the subscription is resumed or canceled. In this case the yield is an asynchronous operation (it does not complete synchronously, but waits for an external event, the resume, before it continues). If canceled, including if the cancel happens during a pause, the yield statement acts like a return; statement.

In fact, if stream is paused then function is blocked at yield statement (as specified above) but if the stream is cancelled, then it yield doesn't acts as a return; but performs one more iteration and only the next yield acts as a return; statement.

Example

import 'dart:async';

List<int> expensiveComputations = [];
List<int> sent = [];

Stream<int> generator() async* {
  for (int i = 1; i <= 5; i++) {
    expensiveComputations.add(i); // Say there is an expensive computation here
    yield i;
    sent.add(i);
  }
}

main() async {
  Stream<int> s = generator();
  late StreamSubscription<int> ss;
  ss = s.listen((int i) async {
    if (i == 2) {
      ss.pause();
      // Give the stream some time to be actually paused
      await Future.delayed(Duration(milliseconds: 100));
      print(sent); // [1]
      print(expensiveComputations); // [1, 2]
      await ss.cancel();
    }
  });
  // Wait for some time while the stream is paused and then cancelled and then see the results
  await Future.delayed(Duration(seconds: 1));
  print(sent);  // [1, 2]
  print(expensiveComputations); // [1, 2, 3]
}

As you can see one more excessive expensive computation was performed after cancelling of the stream

Tested on Dart SDK version: 2.18.0-271.0.dev (dev) (Sat Jul 9 12:06:18 2022 -0700) on "windows_x64"

@eernstg
Copy link
Member

eernstg commented Jul 14, 2022

@lrhn, WDYT?

@lrhn
Copy link
Member

lrhn commented Jul 14, 2022

Sounds plausible. This was never correctly implemented originally, and we had some improvements recently, but apparently it didn't fix everything.
So, more fixing to be done!

(We have a test for this already: https://github.com/dart-lang/sdk/blob/main/tests/language/async_star/cancel_while_paused_at_yield_test.dart.)

The VM fails two tests in that directory:

  • language/async_star/async_star_cancel_test: Pass
  • language/async_star/cancel_while_paused_at_yield_test: RuntimeError
  • language/async_star/cancel_while_paused_test: RuntimeError

(The "Pass" result tests seems like it's still marked as expected failing?)

@lrhn lrhn added area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. type-bug Incorrect behavior (everything from a crash to more subtle misbehavior) labels Jul 14, 2022
@mkustermann
Copy link
Member

Reassigning to @alexmarkov who is working on async/async* atm.

@sgrekhov
Copy link
Contributor Author

Please note that in the example above the following line seems important

      // Give the stream some time to be actually paused
      await Future.delayed(Duration(milliseconds: 100));

If I remove this line, then tests passes on my machine. I tried it on DartPad the results there are opposite. Test passes as it is and fails without this line. I have no explanation why it is so. Test doesn't look racy for me

@lrhn
Copy link
Member

lrhn commented Jul 14, 2022

The await is important because it's the difference between the subscription being paused and cancelled during the event delivery, which means it can ignore the pause and be recognized as cancelled on return, and the subscription merely being paused when the event delivery returns, and then later being cancelled before it resumes.

That is: The async* function yields a value. This delivers the value to the listener. If the listener responds during delivery by pausing or cancelling the subscription, the code recognizes that before continuing.
If the listener is cancelled, the async* function returns at the yield.
If the listener pauses, but does not cancel, the code pauses at the yield.

When the subscription is then cancelled, after control has returned to the paused async* function, it seems the code resumes instead of returning, only to return on the next yield. Which is too late.
So, it needs to do the is-canceled check after the is-paused check-and-suspend.

@alexmarkov
Copy link
Contributor

https://dart-review.googlesource.com/c/sdk/+/251562 fixes this bug in the VM.

However, it breaks co19/Language/Statements/Yield_and_Yield_Each/Yield/execution_async_t02 test which expects async* generator to continue execution if stream subscription is paused and then canceled during yield:

https://github.com/dart-lang/co19/blob/aaa6934c9b4ddaf916b747c0402c820d9dc4c84a/Language/Statements/Yield_and_Yield_Each/Yield/execution_async_t02.dart#L34-L66

@lrhn Could you confirm that co19/Language/Statements/Yield_and_Yield_Each/Yield/execution_async_t02 test is incorrect?

@sgrekhov
Copy link
Contributor Author

@alexmarkov I'm rewriting these tests now. Please see dart-lang/co19#1370

@sgrekhov
Copy link
Contributor Author

@lrhn is there any difference if function, listening a stream, associated with async* function, is sync or async? For example.

Stream<int> generator() async* { ... }

main()  {
  Stream<int> s = generator();
  late StreamSubscription<int> ss;
  ss = s.listen((int i) {
...
  });

In the case above generator function waits on yield till listening function completes and only after than resumes, pauses or returns. But what if listening function is async? Should generator wait on yield till this async function complete process of the deliverd value?

@lrhn
Copy link
Member

lrhn commented Jul 15, 2022

The generator function waits at the first yield until control returns to the event loop. That's what's important, because that's what we promise: That events are delivered as new events (either truly as an event directly from the event loop, or, more commonly, in a way that is sufficiently indistinguishable from that, which is what we do when forwarding events received as other events, or delivering events in a scheduled microtask.)

It means that whichever code called listen has time to do things synchronously afterwards, setting things up to prepare for the events, before the first event is delivered. We don't know how much time is needed, or how far up the call-stack that preparation needs to happen, which is why saying events are delivered "in a later microtask/event" was chosen. That's the one thing we know is definitely later.

So the async* function's yield is not waiting for the calling function to return, it has no way to know that.
But, as a synchronous function, the caller will return before control returns to the event loop.
An asynchronous caller function may not return immediately, but control will return to the event loop at the first await too.
So, you have until the next await to prepare for events being delivered.

(BTW, The late is not necessary.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. type-bug Incorrect behavior (everything from a crash to more subtle misbehavior)
Projects
None yet
Development

No branches or pull requests

5 participants