Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlocks in tests after upgrading rxdart #587

Closed
Mike278 opened this issue May 24, 2021 · 33 comments · Fixed by #605
Closed

Deadlocks in tests after upgrading rxdart #587

Mike278 opened this issue May 24, 2021 · 33 comments · Fixed by #605

Comments

@Mike278
Copy link

Mike278 commented May 24, 2021

Took another stab at upgrading off rxdart 0.23.1, but unfortunately still getting some failing tests. Here's a repro/comparison (could probably be minimized further but not sure which parts are relevant).

import 'package:meta/meta.dart';
import 'package:moor/ffi.dart';
import 'package:moor/moor.dart';
import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';

class TestDb extends GeneratedDatabase {
  TestDb() : super(SqlTypeSystem.withDefaults(), VmDatabase.memory());
  @override final List<TableInfo> allTables = const [];
  @override final int schemaVersion = 1;
}

Future<void> testCase({
  @required Stream<int> Function() createOuter,
  @required Stream<int> Function() createInner,
}) async {
  final log = <int>[];
  final timeout = Duration(milliseconds: 100);

  final a = createOuter();
  final b = a.switchMap((_) => createInner());

  b.listen(log.add);
  await b.first.then(log.add)
    .timeout(timeout, onTimeout: () => fail('1st should complete'));
  expect(log, [2, 2]);

  b.listen(log.add);
  await b.first.then(log.add)
    .timeout(timeout, onTimeout: () => fail('2nd should complete'));
  expect(log, [2, 2, 2, 2]);
}

void main() {
  group('rxdart upgrade', () {
  
    test("moor", () async {
      final db = TestDb();
      Stream<int> selectInt(int i) => db
        .customSelect('select $i a')
        .map((row) => row.read<int>('a'))
        .watchSingle();
      await testCase(
        createOuter: () => selectInt(1),
        createInner: () => selectInt(2),
      );
    });

    test("rxdart", () async {
      final outer = BehaviorSubject<int>();
      final tc = testCase(
        createOuter: () => outer,
        createInner: () {
          final inner = BehaviorSubject<int>();
          Future.delayed(Duration(milliseconds: 10)).then((_) => inner.add(2));
          return inner;
        }
      );
      await Future.delayed(Duration(milliseconds: 10));
      outer.add(1);
      await tc;
    });
  });
}
  • rxdart 0.23.1: Both tests pass
  • rxdart 0.24.0 and 0.24.1: Both tests fail with "2nd should complete"
  • rxdart 0.25.0, 0.26.0, and 0.27.0: Only the moor test fails
  • All the tests were done with moor 4.3.1, but I also spot checked moor 3.4.0 + rxdart 0.27.0 and I get the same result.

Since this began after upgrading rxdart I've started by creating an issue here, but @simolus3 might be able to help determine if this is a moor bug.

#477 and #500 might have some context from the last time we tried to upgrade off rxdart 0.23.1.

I looked around for similar issues since last time and came #511 looks similar. The 1st repro in that issue still times out with rxdart 0.27.0, even though the 2nd repro looks fixed.

@simolus3
Copy link

I'm taking a look at this now. Interestingly both tests pass if you cancel the first subscription before the second run:

  final sub = b.listen(log.add);
  await b.first
      .then(log.add)
      .timeout(timeout, onTimeout: () => fail('1st should complete'));
  expect(log, [2, 2]);
  await sub.cancel();

@simolus3
Copy link

simolus3 commented May 24, 2021

My take is that RxDart is doing some understandable but questionable things here. First, it looks like the root of the failure is from forwardingStream:

} else if (stream.isBroadcast) {
controller = StreamController<R>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: true,
);
} else {

This function is used by most transformers from RxDart, including switchMap. It maps broadcast streams to broadcast streams (which is reasonable), but the broadcast stream controller will only invoke the onListen callback for the first listener. This means that the "inner" stream will only get listened to once! Since moor streams emit their cached value in the onListen callback (where else would they do it), you don't get a value on the second subscription.

The reason RxDart subjects don't suffer from the same behavior is because forwardingStream treats them specially. For example, we can use the following class to obtain a Stream that should behave exactly like a Subject wrt. to the Stream api in dart:async: Extracted to gist due to length.

When one only uses the stream api from dart:async, a subject should be indistinguishable from _SyncProxyStream(subject). However, this change causes the rxdart test to fail as well:

      final outer = BehaviorSubject<int>();
      final tc = testCase(
          createOuter: () => _SyncProxyStream(outer), // Test fails because of this
          createInner: () {
            final inner = BehaviorSubject<int>();
            Future.delayed(Duration(milliseconds: 10))
                .then((_) => inner.add(2));
            return inner;
          });
      await Future.delayed(Duration(milliseconds: 10));
      outer.add(1);
      await tc;

This means that RxDart's transformers don't reliably work on every Stream, which is really unfortunate. At the same time, I understand the need to keep "broadcastability" through stream transformers which probably wouldn't work otherwise.

My suggestion would be to make forwardingStream return a single-subscription stream when the input is not a Subject, but I'm not sure if there are negative consequences of that as well. It's certainly a breaking change.

@frankpepermans
Copy link
Member

To understand the issue correctly,
do you expect that onListen fires on every new Subscription?

Because, that is not the case, for example:

import 'dart:async';

void main() {
  var count = 0;
  final controller = StreamController<int>.broadcast(onListen: () => print('listen to me! ${++count}'));
  
  controller.stream.listen((_) {});
  controller.stream.listen((_) {});
  controller.stream.listen((_) {});
  controller.stream.listen((_) {});
}

Here, the print invokes only once, "listen to me! 1".

As I recall, this was actually wrong in earlier rxdart versions, where onListen would actually do trigger on every subscription.

If you cancel them all, and then do a new subscription, then onListen invokes again.

@simolus3
Copy link

do you expect that onListen fires on every new Subscription?

In general, yes! I know that the onListen callback from StreamController.broadcast is not necessarily invoked for every subscription, but I don't think that RxDart should make that assumption for every broadcast stream.

For instance, consider this example:

class DoOnSubscribeStream<T> extends Stream<T> {
  final Stream<T> inner;
  final void Function() onSubscribe;

  DoOnSubscribeStream(this.inner, this.onSubscribe);

  @override
  bool get isBroadcast => inner.isBroadcast;

  @override
  StreamSubscription<T> listen(void Function(T event)? onData,
      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
    onSubscribe();
    return inner.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }
}

void main() {
  final controller = StreamController.broadcast();
  final stream = DoOnSubscribeStream(controller.stream, () {
    print('new subscription');
  });

  stream.listen(null); // prints!
  stream.listen(null); // prints!
  stream.listen(null); // prints!

  final switched = stream.switchMap((_) => Stream.empty());
  switched.listen(null); // prints!
  switched.listen(null); // does not print -- why?
}

I think it's perfectly reasonable to expect the last line to print subscribed as well.

In fact, there are less-contrived examples of this as well. For instance, let's say someone writes their own broadcast stream that emits a cached value for each new listener if one is available (moor is doing pretty much that). Those streams depend on new listens not being dropped by some transformer down the line.

In my opinion, the special case for subjects in forwardingStream shows that new subscriptions should propagate even for broadcast streams. If those weren't special cased, switchMap wouldn't properly work for behavior subjects for instance. I strongly feel that RxDart shouldn't give its own stream implementations any special treatment, it's fragile and breaks for transformations that may be implemented by third-party libraries.

@frankpepermans
Copy link
Member

We only use forwardingStream for transformations that need to do something on special events, like onListen, onPause, onResume or onCancel.
Not all our transformers need it, some use a normal Stream sink.

But these events fire just like they would on "normal" Dart streams, and we follow the normal Stream behavior with rxdart.

@frankpepermans
Copy link
Member

Ok I now get what you mean,

You invoke on the actual listen method, not on the onListen events,
in that case, yes we have a difference in rxdart indeed...

Will see how breaking that would be

@frankpepermans
Copy link
Member

See here for a PR which would fix your issue: #588

@Mike278
Copy link
Author

Mike278 commented May 25, 2021

Thank you both for the quick responses! Glad to see it's a simple fix. I'll try to give that branch a try tomorrow.

@frankpepermans
Copy link
Member

@Mike278 Sorry I was a bit too fast there, the fix would be a little more complex unfortunately, I'll keep you posted :/

@frankpepermans
Copy link
Member

So that PR branch probably works for your listen issue, but we need a better solution of course, it's a bit hacky atm.

Do feel free to try it out in the meantime of course.

@hoc081098
Copy link
Collaborator

do you expect that onListen fires on every new Subscription?

In general, yes! I know that the onListen callback from StreamController.broadcast is not necessarily invoked for every subscription, but I don't think that RxDart should make that assumption for every broadcast stream.

RxDart's forwarding stream behavior is the same as fromHandlers (stream_transform) https://github.com/dart-lang/stream_transform/blob/533e1af154629959545e3a3728312af6cb4f5619/lib/src/from_handlers.dart#L10.
I think that is expected behavior.

@frankpepermans
Copy link
Member

Yes, but the issue here is that they expect the listen handler to be invoked on each listen:

import 'dart:async';

import 'package:rxdart/rxdart.dart';

class DoOnSubscribeStream<T> extends Stream<T> {
  final Stream<T> inner;
  final void Function() onSubscribe;

  DoOnSubscribeStream(this.inner, this.onSubscribe);

  @override
  bool get isBroadcast => inner.isBroadcast;

  @override
  StreamSubscription<T> listen(void Function(T event)? onData,
      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
    onSubscribe();
    return inner.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }
}

void main() {
  final controller =
      StreamController<void>.broadcast(onListen: () => print('I start!'));
  final stream = DoOnSubscribeStream(controller.stream, () {
    print('new subscription');
  });

  var switched = stream.switchMap((event) => Stream.value(event));
  switched.listen(null); // prints!
  switched.listen(null); // does not print!

  var mapped = stream.map((event) => event);
  mapped.listen(null); // prints!
  mapped.listen(null); // prints!
}

it prints "I start!" once, as expected, but if you override the listen handler and expect it to run on each actual listen, then it is inconsistent.

@hoc081098
Copy link
Collaborator

IMO DoOnSubscribeStream means calling callback when controller.stream is listen to, because the Stream is broadcast, it should be listen once :))

@hoc081098
Copy link
Collaborator

Only built-in operators cause the inconsistent, but stream_transform package (from dart-lang org) does not follow that behavior 😕

@simolus3
Copy link

`IMO DoOnSubscribeStream means calling callback when controller.stream is listen to

I agree.

because the Stream is broadcast, it should be listen once

Isn't the whole point of broadcast streams that they can be listened to multiple times? :D

I wonder if package:stream_transform should change their behavior as well. I'll open an issue there, we may get more insights on on whether the behavior is intentional or not then.

@Mike278
Copy link
Author

Mike278 commented May 28, 2021

Hmm the repro still fails for me on that branch:

dependencies:
  # ...
  rxdart:

# ...
dependency_overrides:
  rxdart:
    git:
      url: https://github.com/ReactiveX/rxdart.git
      ref: d6a7761cb74761f4f8b6e3663c445f28a145a75b
  rxdart:
    dependency: "direct main"
    description:
      path: "."
      ref: d6a7761cb74761f4f8b6e3663c445f28a145a75b
      resolved-ref: d6a7761cb74761f4f8b6e3663c445f28a145a75b
      url: "https://github.com/ReactiveX/rxdart.git"
    source: git
    version: "0.27.0"

@Mike278
Copy link
Author

Mike278 commented Jun 1, 2021

It looks like #588 fixes at least some of the problem. I tried to combine all the different examples from this issue into one runnable test: https://gist.github.com/Mike278/f21c92e562428af26af58128d0209b00

@frankpepermans
Copy link
Member

Is it an option to move the code in the listen override into an onListen handler?

@Mike278
Copy link
Author

Mike278 commented Jun 1, 2021

My understanding is that would mean the code is only invoked each time the listener count goes from 0 to 1, but the goal is to invoke the code each time the listener count is incremented.

I think I can reduce the remaining failures to this case:

import 'dart:async';

import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';

class WrappedStream<T> extends Stream<T> {
  final Stream<T> inner;

  WrappedStream(this.inner);

  @override
  bool get isBroadcast => inner.isBroadcast;

  @override
  StreamSubscription<T> listen(void Function(T event) onData,
      {Function onError, void Function() onDone, bool cancelOnError}) {
    return inner.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }
}

void main() {
  test('rxdart upgrade', () async {
    final controller = BehaviorSubject.seeded('controller');
    final stream = WrappedStream(controller.stream);

    final switched = stream.switchMap((_) {
      return BehaviorSubject.seeded('switched');
    });

    final timeout = Duration(milliseconds: 100);

    switched.listen(null); // note: commenting this out makes the test pass on 0.27

    final value1 = await switched.first
        .timeout(timeout, onTimeout: () => fail('1st should complete'));
    expect(value1, 'switched');

    final value2 = await switched.first
        .timeout(timeout, onTimeout: () => fail('2nd should complete')); // timeout here with rxdart 0.27 and PR#588
    expect(value2, 'switched');
  });
}

@Mike278
Copy link
Author

Mike278 commented Jun 5, 2021

@frankpepermans any idea what's up with that test failure above? Other than that it looks like the PR is really close!

@frankpepermans
Copy link
Member

It's not really a good PR though, and will never be merged in the current state, but it was an effort to see if it could resolve your problem (which it apparently almost does then).

Can you maybe explain in more detail why you'd need every new subscription to invoke listen on all upstream targets?

...maybe we can also think of a different solution then?

@simolus3
Copy link

simolus3 commented Jun 5, 2021

Can you maybe explain in more detail why you'd need every new subscription to invoke listen on all upstream targets?

In case of moor, it comes from three requirements basically:

  • We expose broadcast streams (because our streams can be listened to multiple times)
  • Those streams may emit cached information when a new listener attaches
  • The streams are not backed by an RxDart Subject (they behave similar to a BehaviorSubject but AFAIK the cached behavior is slightly different - I can work out the exact differences if its necessary to fix this issue)

Regarding the third point, there's also a philosophical argument to be made that RxDart should extend Dart's streams so I think it's unfortunate if moor has to use RxDart's subjects to be compatible with it.

@frankpepermans
Copy link
Member

Ok I think I understand the issue now, so we use ForwardStream to add some hooks that we need for some transformers, and indeed, we have special cases for our own Subjects in there.

We do that indeed to maintain the behavior, i.e. to not suddenly switch from, say a BehaviorSubject, to a plain StreamController, because that would lose the "emit last event on subscribe" behavior.

Correct?

@simolus3
Copy link

simolus3 commented Jun 5, 2021

We do that indeed to maintain the behavior, i.e. to not suddenly switch from, say a BehaviorSubject, to a plain StreamController, because that would lose the "emit last event on subscribe" behavior.

Yes exactly. I don't think it's bad to keep that behavior (it's essentially an optimization when the source stream is known to avoid duplicate computations of transformers). I would prefer new subscriptions to go through for non-subject broadcast streams though.

Lasse from the Dart team suggested using Stream.multi as a suitable implementation for the forwarding stream here. It doesn't have the same problem because it treats each downstream subscription individually. But I think that's essentially the ForwardingStream you've introduced in your PR. He also added some notes on how switchMap may be a special transformer (but the general problem applies to other forwarding transformations too).

@Mike278
Copy link
Author

Mike278 commented Jun 8, 2021

Thank you both again for your collaboration here! @frankpepermans Do you think an approach based on Stream.multi would be a good way to move forward?

@frankpepermans
Copy link
Member

@Mike278 not sure, but I'll try to make some time to investigate

@frankpepermans
Copy link
Member

I did a few attempts to get Stream.multi working, but unfortunately it always breaks on the many use cases we have, most notably the doOnX transformers.

Also, a bit of an annoyance is that subscribing to a Stream.multi always yields a StreamSubscription, even if the underlying Stream is not a broadcast Stream for example, since the description is deferred internally and the StateError that we expect, throws at a different point in time.

@frankpepermans
Copy link
Member

...actually, bit more tinkering, might be able to get it up without breaking too much,
@Mike278 could you try: https://github.com/ReactiveX/rxdart/tree/stream_multi ?

@Mike278
Copy link
Author

Mike278 commented Jun 14, 2021

I haven't tried it out yet, but having a quick look at the code I think there might be a few issues. For example controller.onListen = maybeListen; doesn't do anything according to the Stream.multi docs: Setting its StreamController.onListen has no effect since the onListen callback is called instead, and the StreamController.onListen won't be called later.

Also, a bit of an annoyance is that subscribing to a Stream.multi always yields a StreamSubscription, even if the underlying Stream is not a broadcast Stream

I think the idea for Stream.multi/MultiStreamController is to almost introduce a 3rd type of stream that can act like either/both/neither (in fact the PR that added it mentioned that it's "more like a StreamController2"

A multi-subscription stream can behave like any other stream. If the onListen callback throws on every call after the first, the stream behaves like a single-subscription stream. If the stream emits the same events to all current listeners, it behaves like a broadcast stream.

@frankpepermans
Copy link
Member

maybeListen is being called, if onListen does nothing, then we can remove it.

Ignore that comment I made before, all 700+ tests now pass, with using Stream.multi

@Mike278
Copy link
Author

Mike278 commented Jun 15, 2021

I had a chance to try this out. The moor test from the repro in the OP no longer times out (woo! 🥳), but now both tests fail because an extra onData event is delivered. Here's a smaller repro:

test('duplicate events', () async {
  final source = BehaviorSubject.seeded('source');
  final switched = source.switchMap((value) => BehaviorSubject.seeded('switched'));
  int i = 0;
  switched.listen((_) => i++);
  expect(await switched.first, 'switched');
  expect(i, 1);
  expect(await switched.first, 'switched');
  expect(i, 1); // fails because i==2
});
rxdart:
  dependency: "direct main"
  description:
    path: "."
    ref: stream_multi
    resolved-ref: "2f465a4c5f86dc8a95359150efdda6aa2bc41c09"
    url: "https://github.com/ReactiveX/rxdart.git"
  source: git
  version: "0.27.1"

@hoc081098
Copy link
Collaborator

@Mike278 Could you try #605

@Mike278
Copy link
Author

Mike278 commented Jul 17, 2021

Looks like that works, all our tests pass with #605 - no more deadlocks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants