Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream transform not respecting isBroadcast parameter #36965

Open
frankpepermans opened this issue May 14, 2019 · 3 comments

Comments

Projects
None yet
3 participants
@frankpepermans
Copy link

commented May 14, 2019

(dart 2.2.0)

Below is a test which transforms a broadcast Stream using 2 different methods, one using StreamTransformer.fromHandlers, the other using an extended StreamTransformer class:

      // dummy transformer, using custom class
      final transformer = TestStreamTransformer<int>();
      // dummy transformers, using fromHandlers
      final fromHandlers = StreamTransformer<int, int>.fromHandlers();
      // base broadcast Stream
      final broadcastStream =
          Stream.fromIterable(const [1]).asBroadcastStream();

      final transformedStreamA = broadcastStream.transform(fromHandlers);
      final transformedStreamB = broadcastStream.transform(transformer);

      expect(broadcastStream.isBroadcast, isTrue); // passes
      expect(transformedStreamA.isBroadcast, isTrue); // passes
      expect(transformedStreamB.isBroadcast, isTrue); // fails



class TestStreamTransformer<T> extends StreamTransformerBase<T, T> {
  final StreamTransformer<T, T> transformer;

  TestStreamTransformer() : transformer = _buildTransformer();

  @override
  Stream<T> bind(Stream<T> stream) => transformer.bind(stream);

  static StreamTransformer<T, T> _buildTransformer<T>() {
    return StreamTransformer<T, T>((Stream<T> input, bool cancelOnError) {
      StreamController<T> controller;

      if (input.isBroadcast) {
        controller = StreamController<T>.broadcast(sync: true, onListen: () {});
      } else {
        controller = StreamController<T>(sync: true, onListen: () {});
      }

      return controller.stream.listen(null);
    });
  }
}

I would expect transform to always return a new Stream which is the same as the original Stream regarding broadcast, or not.

@frankpepermans

This comment has been minimized.

Copy link
Author

commented May 14, 2019

The problem is with the bind Function call, when using fromHandlers, bind returns a _BoundSinkStream
https://github.com/dart-lang/sdk/blob/master/sdk/lib/async/stream_transformers.dart#L177
where isBroadcast is properly handled,
however when extending StreamTransformer, bind will return a _BoundSubscriptionStream where this is not the case:
https://github.com/dart-lang/sdk/blob/master/sdk/lib/async/stream_transformers.dart#L323

@lrhn lrhn self-assigned this May 14, 2019

@lrhn

This comment has been minimized.

Copy link
Member

commented May 15, 2019

Correct, the _BoundSubscriptionStream should probably emit the same isBroadcast result as the wrapped stream. I'll fix that.

This doesn't chang whether you can listen more than once or not. In practice, the onListen callback is called for each listen attempt. If that function starts out by calling stream.listen on the original stream, then you can listen multiple times when the original stream allows it.
The boolean isBroadcast doesn't actually do anything, it's just there as a hint that the stream has a particular behavior.

@frankpepermans

This comment has been minimized.

Copy link
Author

commented May 15, 2019

Correct at it being just a hint, except at our rxdart lib, we sometimes treat incoming Streams differently based off this parameter, causing unexpected behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.