Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of setting up our transformers #412

Merged
merged 46 commits into from
Mar 10, 2020
Merged

Conversation

frankpepermans
Copy link
Member

Ok, this turned out to be bigger than expected (as per usual haha).

Some issues made me take a closer look at how we are setting up our Stream transformations,
TL;DR : we were doing it wrong, slightly!

The way we did things before, is by always creating a new StreamController in our transformers, and then immediately subscribing to them. Afterwards, in the onListen handler, we would then actually subscribe to the underlying Stream.

This was causing some side effects, I'll elaborate more on those later on.

Ideally, we want to have the same effects that default transformers have, like for example the built-in map.
At first, I took a look at the stream_transform library and copied how they do it, which was a better way but still not giving the exact same behaviour as the default transformers.
Finally, I went with using Stream.eventTransformed, which as a result also cleans up a lot of things in a better way.

This PR should fix that up.

As an effect of doing things differently, our do transformer has different behaviour in 2 cases, but the new behaviour is actually in line with how it is supposed to work for Dart streams:

  • onListen will now only fire on the first subscriber, if there are more, then all current subscribers need to cancel first. If then another new subscriber registers, onListen will be called a second time.
  • onCancel will now only fire when all active subscribers have cancelled.

This is a big change, and will sure spark a conversation, please do test and ask away :)

@codecov-io
Copy link

codecov-io commented Mar 7, 2020

Codecov Report

Merging #412 into master will decrease coverage by 0.86%.
The diff coverage is 93.09%.

@@            Coverage Diff             @@
##           master     #412      +/-   ##
==========================================
- Coverage   93.55%   92.68%   -0.87%     
==========================================
  Files          60       61       +1     
  Lines        2047     2037      -10     
==========================================
- Hits         1915     1888      -27     
- Misses        132      149      +17

Copy link
Collaborator

@brianegan brianegan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a few hours to review this one, and overall I think the approach makes sense.

Should we release this as a -dev release to gather early feedback and see if the change hits folks in a negative way overall? I still think it will be worth fixing, but hard to get a sense of the impact this change has since it fixes more subtle bugs.

}

@override
FutureOr onCancel(EventSink<S> sink) => _mapperSubscription?.cancel();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we close the output sink at this point as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not at cancel time, onCancel will be called if all subscribers have cancelled for example, but the Stream could still be active and be subscribed to again at a later stage.

// this method is ran before any other events might be added.
// Once the first event(s) is/are successfully added, this method
// will not trigger again.
void _safeAddFirstEvent() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a good idea? I've actually run into exactly the bug you're talking about, but this wold effectively turn any sync Stream into an async Stream. I think the bug is sort of "intended" as there are special considerations to take when using Sync StreamControllers.

// forwarded events.
if (stream is Subject<T>) {
controller = stream.createForwardingController(
onListen: onListen, onCancel: onCancel, sync: true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these sync property always be true here? Very hard to know without a stream.isSync or something :/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh no idea :) I've checked stream_transform from the google devs, they transform using sync true always.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, since we use transform sinks here, if the root Stream is async, then the transformed Streams will be too. It's like awaiting a Future and mapping the result in sync,
You still need to await the initial time.


// Create a new Controller, which will serve as a trampoline for
// forwarded events.
if (stream is Subject<T>) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this special case? It looks like we listen to the source Stream in onListen and add all events from the source Stream to this new sink. Therefore, it seems like the events should be properly replayed in that manner?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at this a bit more, and it might point to a general problem with our Subject implementations that we're fixing here: Our subjects currently replay events after the close method is called. Do you think that's actually the correct behavior? It kinda seems like they should just emitDone like a normal StreamController at that point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we do need this behavior, if for example the source is a ReplaySubject, then using an intermediate broadcast one will not replay events. It's different from our previous impl. where binding would spawn a new Controller always.

If we omit the Subject duplication, our tests will fail accordingly.

I'll add a better explanation later, in short, the very root Stream will not always be resubscribed to when new subscripers register.

@frankpepermans frankpepermans merged commit d737338 into master Mar 10, 2020
@frankpepermans frankpepermans deleted the rework/bind_then_do branch March 10, 2020 20:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants