New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FlatMap #45
FlatMap #45
Conversation
I'm now reasoning through how to do demand management for the child subscriptions. (demand for upstream was trivial since it's basically set to Experimenting with the Apple code, it seems to request 1 from all children, and then make some decisions about what to do next when it receives a value from any child. I think it will be necessarily complex to deal with buffering of one extra value per child subscription. Imagine the following flow:
At this point, flatMap has to buffer that value so that it can be passed to downstream if a later demand request is made. continuing to experiment with Apple code, but I think it's unfortunately non-trivial |
@epatey hmm, that seems… unexpected. Yeah, FlatMap is probably the most nontrivial part of Combine. |
So, here's a unit test (that passes in apple combine mode), that helps explain more precisely the behavior. It's essentially that FlatMap puts func testChildDemand() {
let upstreamPublisher = PassthroughSubject<Void, Never>()
var childDemand: Subscribers.Demand?
let childSubscription = CustomSubscription(onRequest: { childDemand = $0 })
let childPublisher = CustomPublisherBase<Int, Never>(
subscription: childSubscription)
let flatMap = upstreamPublisher.flatMap { _ in childPublisher }
var downstreamSubscription: Subscription?
let downstreamSubscriber = TrackingSubscriberBase<Int, Never>(
receiveSubscription: {
downstreamSubscription = $0
$0.request(.max(2))
})
flatMap.subscribe(downstreamSubscriber)
upstreamPublisher.send(())
// It seems that Apple creates enough demand on each child such that it can
// have one extra/buffered value after the downstream demand is satisfied.
// A demand of 1 on each child is created
XCTAssertEqual(childDemand, .max(1))
// Downstream demand is 2, so:
// - this value gets sent
// - downstream demand goes down to 1
// - child is asked for 1 more
XCTAssertEqual(childPublisher.send(666), .max(1))
XCTAssertEqual(downstreamSubscriber.history, [.subscription("FlatMap"),
.value(666)])
// Downstream demand is 1, so:
// - this value gets sent
// - downstream demand goes down to 0, but still need a buffered value
// - child is asked for 1 more
XCTAssertEqual(childPublisher.send(777), .max(1))
XCTAssertEqual(downstreamSubscriber.history, [.subscription("FlatMap"),
.value(666),
.value(777)])
// Downstream demand is 0, so:
// - this value is buffered and NOT sent
// - downstream demand is 0 and there's a buffered value
// - child is asked for 0 more
XCTAssertEqual(childPublisher.send(888), .max(0))
XCTAssertEqual(downstreamSubscriber.history, [.subscription("FlatMap"),
.value(666),
.value(777)])
childDemand = nil
downstreamSubscription!.request(.max(2))
XCTAssertEqual(childDemand, .max(1))
XCTAssertEqual(downstreamSubscriber.history, [.subscription("FlatMap"),
.value(666),
.value(777),
.value(888)])
} |
Codecov Report
@@ Coverage Diff @@
## master #45 +/- ##
==========================================
+ Coverage 97.74% 97.91% +0.17%
==========================================
Files 43 44 +1
Lines 1727 1918 +191
==========================================
+ Hits 1688 1878 +190
- Misses 39 40 +1
Continue to review full report at Codecov.
|
Back pressure/demand makes things WAY WAY more complex. I really wonder if it was worth it vs Rx's decision to not have the feature. I've honestly never felt the need for it, but maybe I'm missing something. Getting close to complete support, though. |
0c5c515
to
18e8d29
Compare
@broadwaylamb , I think I'm finally ready for a review. Here are a couple of thoughts:
|
I'm thinking about iterating on The challenge is that, in the case of |
Ok, so my thinking has evolved a little. I think what I built with If this proves to be correct, I'll undo the factoring I did since that base class adds complexity but no value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your hard work, Eric!
You've done a really amazing job. I like that you've put a lot of comments, since the implementation is nontrivial indeed. I also like the tests. I have some suggestions and questions though.
I'd like to ask you to not force-push for now as it makes reviewing a little bit harder. After it's complete, you can squash the commits to keep the history clear.
LGTM Generated by 🚫 Danger Swift against 305b420 |
@broadwaylamb, I've gotten rid of the base class in my last commit. As I feared, there really are no other operators that have to manage a dynamic set of child subscriptions. Unfortunately, that commit moved a whole lot of code, so you may want to review the previous commits individually before looking at that big one to make understanding my reaction to your feedback easier. |
@broadwaylamb, let me know if/when you'd like me to rebase, squash and merge this - or if you'd like additional changes. |
|
@broadwaylamb, it looks like the swift 5.0 build for this pr is failing, but the only thing I see in the logs here is a problem with the swiftlint configuration. I'm guessing that's the reason it's failing, but I'm not sure. Any thoughts? |
@epatey oh. My bad, I removed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for this PR!
I'm taking a stab at
FlatMap
. In support of operators that need serialization of sending values from multiple upstream/child subscriptions, I've implemented a simple/fastSerializedWorkQueue
that adheres to theCombine
threading rules and avoids unnecessary context switches.Remaining work:
.failed
if either upstream or any children fail..finished
only after upstream and all children have completed (i.e. add refcounting of active subscriptions) AND all buffer values have been sent.maxPublishers
.unlimited
to.max(x)
.max(x)
to.unlimited
Inner
class to be shared with all merge like operators.