Skip to content

Commit

Permalink
Fix some lock acquiring in Publishers.FlatMap (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
broadwaylamb committed Nov 8, 2020
1 parent 4977ca1 commit 5436868
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 13 deletions.
33 changes: 20 additions & 13 deletions Sources/OpenCombine/Publishers/Publishers.FlatMap.swift
Expand Up @@ -157,17 +157,17 @@ extension Publishers {
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Child.Output == Downstream.Input, Upstream.Failure == Downstream.Failure
{
let inner = Inner(downstream: subscriber,
let outer = Outer(downstream: subscriber,
maxPublishers: maxPublishers,
map: transform)
subscriber.receive(subscription: inner)
upstream.subscribe(inner)
subscriber.receive(subscription: outer)
upstream.subscribe(outer)
}
}
}

extension Publishers.FlatMap {
private final class Inner<Downstream: Subscriber>
private final class Outer<Downstream: Subscriber>
: Subscriber,
Subscription,
CustomStringConvertible,
Expand Down Expand Up @@ -243,7 +243,7 @@ extension Publishers.FlatMap {
subscription.request(maxPublishers)
}

fileprivate func receive(_ input: Upstream.Output) -> Subscribers.Demand {
fileprivate func receive(_ input: Input) -> Subscribers.Demand {
lock.lock()
let cancelledOrCompleted = self.cancelledOrCompleted
lock.unlock()
Expand All @@ -260,9 +260,9 @@ extension Publishers.FlatMap {
return .none
}

fileprivate func receive(completion: Subscribers.Completion<Child.Failure>) {
outerSubscription = nil
fileprivate func receive(completion: Subscribers.Completion<Failure>) {
lock.lock()
outerSubscription = nil
outerFinished = true
switch completion {
case .finished:
Expand All @@ -272,6 +272,8 @@ extension Publishers.FlatMap {
let wasAlreadyCancelledOrCompleted = cancelledOrCompleted
cancelledOrCompleted = true
for (_, subscription) in subscriptions {
// Cancelling subscriptions with the lock acquired. Not good,
// but that's what Combine does. This code path is tested.
subscription.cancel()
}
subscriptions = [:]
Expand Down Expand Up @@ -354,16 +356,21 @@ extension Publishers.FlatMap {

fileprivate func cancel() {
lock.lock()
if cancelledOrCompleted {
lock.unlock()
return
}
cancelledOrCompleted = true
let subscriptions = self.subscriptions
self.subscriptions = [:]
let outerSubscription = self.outerSubscription
self.outerSubscription = nil
lock.unlock()
for (_, subscription) in subscriptions {
subscription.cancel()
}
// Combine doesn't acquire the lock here. Weird.
// Combine doesn't acquire outerLock here. Weird.
outerSubscription?.cancel()
outerSubscription = nil
}

// MARK: - Reflection
Expand Down Expand Up @@ -471,9 +478,9 @@ extension Publishers.FlatMap {
private func releaseLockThenSendCompletionDownstreamIfNeeded(
outerFinished: Bool
) -> Bool {
#if DEBUG
#if DEBUG
lock.assertOwner() // Sanity check
#endif
#endif
if !cancelledOrCompleted && outerFinished && buffer.isEmpty &&
subscriptions.count + pendingSubscriptions == 0 {
cancelledOrCompleted = true
Expand All @@ -495,10 +502,10 @@ extension Publishers.FlatMap {
CustomReflectable,
CustomPlaygroundDisplayConvertible {
private let index: SubscriptionIndex
private let inner: Inner
private let inner: Outer
fileprivate let combineIdentifier = CombineIdentifier()

fileprivate init(index: SubscriptionIndex, inner: Inner) {
fileprivate init(index: SubscriptionIndex, inner: Outer) {
self.index = index
self.inner = inner
}
Expand Down
25 changes: 25 additions & 0 deletions Tests/OpenCombineTests/PublisherTests/FlatMapTests.swift
Expand Up @@ -643,6 +643,31 @@ final class FlatMapTests: XCTestCase {
XCTAssertEqual(childSubscription2.history, [.requested(.unlimited)])
}

func testCrashesWhenUpstreamFailsDuringChildCancellation() {
let helper = OperatorTestHelper(
publisherType: CustomPublisherBase<CustomPublisher, TestingError>.self,
initialDemand: .unlimited,
receiveValueDemand: .none,
createSut: { $0.flatMap { $0 } }
)

let childSubscription = CustomSubscription()
let child = CustomPublisher(subscription: childSubscription)

var counter = 0
childSubscription.onCancel = {
if counter >= 5 { return }
counter += 1
helper.publisher.send(completion: .failure(.oops))
}

XCTAssertEqual(helper.publisher.send(child), .none)

assertCrashes {
helper.publisher.send(completion: .failure(.oops))
}
}

func testDoesNotCompleteWithBufferedValues() {
let upstreamPublisher = PassthroughSubject<Void, Never>()

Expand Down

0 comments on commit 5436868

Please sign in to comment.