diff --git a/Package.swift b/Package.swift index 61ad8a92..f9b33741 100644 --- a/Package.swift +++ b/Package.swift @@ -32,6 +32,7 @@ let package = Package( condition: .when(platforms: supportedPlatforms.except([.wasi]))) ], exclude: [ + "Concurrency/Publisher+Concurrency.swift.gyb", "Publishers/Publishers.Encode.swift.gyb", "Publishers/Publishers.MapKeyPath.swift.gyb", "Publishers/Publishers.Catch.swift.gyb" diff --git a/RemainingCombineInterface.swift b/RemainingCombineInterface.swift index f97f6502..f56f56c1 100644 --- a/RemainingCombineInterface.swift +++ b/RemainingCombineInterface.swift @@ -2,104 +2,6 @@ // Please remove the corresponding piece from this file if you implement something, // and complement this file as features are added in Apple's Combine -@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) -public struct AsyncPublisher

: AsyncSequence where P : Publisher, P.Failure == Never { - - /// The type of element produced by this asynchronous sequence. - public typealias Element = P.Output - - public struct Iterator : AsyncIteratorProtocol { - - /// Asynchronously advances to the next element and returns it, or ends the - /// sequence if there is no next element. - /// - /// - Returns: The next element, if it exists, or `nil` to signal the end of - /// the sequence. - public mutating func next() async -> P.Output? - - public typealias Element = P.Output - } - - public init(_ publisher: P) - - /// Creates the asynchronous iterator that produces elements of this - /// asynchronous sequence. - /// - /// - Returns: An instance of the `AsyncIterator` type used to produce - /// elements of the asynchronous sequence. - public func makeAsyncIterator() -> AsyncPublisher

.Iterator - - /// The type of asynchronous iterator that produces elements of this - /// asynchronous sequence. - public typealias AsyncIterator = AsyncPublisher

.Iterator -} - -@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) -public struct AsyncThrowingPublisher

: AsyncSequence where P : Publisher { - - /// The type of element produced by this asynchronous sequence. - public typealias Element = P.Output - - public struct Iterator : AsyncIteratorProtocol { - - /// Asynchronously advances to the next element and returns it, or ends the - /// sequence if there is no next element. - /// - /// - Returns: The next element, if it exists, or `nil` to signal the end of - /// the sequence. - public mutating func next() async throws -> P.Output? - - public typealias Element = P.Output - } - - public init(_ publisher: P) - - /// Creates the asynchronous iterator that produces elements of this - /// asynchronous sequence. - /// - /// - Returns: An instance of the `AsyncIterator` type used to produce - /// elements of the asynchronous sequence. - public func makeAsyncIterator() -> AsyncThrowingPublisher

.Iterator - - /// The type of asynchronous iterator that produces elements of this - /// asynchronous sequence. - public typealias AsyncIterator = AsyncThrowingPublisher

.Iterator -} - - -extension Future where Failure == Never { - - @available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) - final public var value: Output { get async } -} - -extension Future { - - @available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) - final public var value: Output { get async throws } -} - - -extension Publisher where Self.Failure == Never { - - @available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) - public var values: AsyncPublisher { get } -} - -extension Publisher { - - @available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) - public var values: AsyncThrowingPublisher { get } -} - -@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) -extension Subscribers.Completion : Sendable { -} - -@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) -extension Subscribers.Demand : Sendable { -} - extension Publishers { /// A publisher that receives and combines the latest elements from two publishers. diff --git a/Sources/OpenCombine/Concurrency/Future+Concurrency.swift b/Sources/OpenCombine/Concurrency/Future+Concurrency.swift new file mode 100644 index 00000000..1d0c4420 --- /dev/null +++ b/Sources/OpenCombine/Concurrency/Future+Concurrency.swift @@ -0,0 +1,119 @@ +// +// Future+Concurrency.swift +// +// +// Created by Sergej Jaskiewicz on 28.08.2021. +// + +// async/await is only available since Swift 5.5 +#if compiler(>=5.5) +extension Future where Failure == Never { + + @available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) + public var value: Output { + get async { // swiftlint:disable:this implicit_getter + await ContinuationSubscriber.withUnsafeSubscription(self) + } + } +} + +extension Future { + + @available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) + public var value: Output { + get async throws { // swiftlint:disable:this implicit_getter + try await ContinuationSubscriber.withUnsafeThrowingSubscription(self) + } + } +} + +@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) +private final class ContinuationSubscriber + : Subscriber +{ + typealias Failure = UpstreamFailure + + private var continuation: UnsafeContinuation? + private var subscription: Subscription? + private let lock = UnfairLock.allocate() + + private init(_ continuation: UnsafeContinuation) { + self.continuation = continuation + } + + deinit { + lock.deallocate() + } + + func receive(subscription: Subscription) { + lock.lock() + guard self.subscription == nil else { + lock.unlock() + subscription.cancel() + return + } + self.subscription = subscription + lock.unlock() + subscription.request(.max(1)) + } + + func receive(_ input: Input) -> Subscribers.Demand { + lock.lock() + if let continuation = self.continuation.take() { + lock.unlock() + continuation.resume(returning: input) + } else { + lock.unlock() + } + return .none + } + + func receive(completion: Subscribers.Completion) { + lock.lock() + subscription = nil + lock.unlock() + completion.failure.map(handleFailure) + } + + private func handleFailure(_ error: Failure) { + lock.lock() + if let continuation = self.continuation.take() { + lock.unlock() + continuation.resume(throwing: error as! ErrorOrNever) + } else { + lock.unlock() + } + } +} + +@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) +extension ContinuationSubscriber where ErrorOrNever == Error { + fileprivate static func withUnsafeThrowingSubscription( + _ upstream: Upstream + ) async throws -> Input + where Upstream.Output == Input, + Upstream.Failure == UpstreamFailure + { + try await withUnsafeThrowingContinuation { continuation in + upstream.subscribe(ContinuationSubscriber(continuation)) + } + } +} + +@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) +extension ContinuationSubscriber where UpstreamFailure == Never, ErrorOrNever == Never { + fileprivate static func withUnsafeSubscription( + _ upstream: Upstream + ) async -> Input + where Upstream.Output == Input, + Upstream.Failure == Never + { + await withUnsafeContinuation { continuation in + upstream.subscribe(ContinuationSubscriber(continuation)) + } + } +} + +#endif // compiler(>=5.5) diff --git a/Sources/OpenCombine/Concurrency/GENERATED-Publisher+Concurrency.swift b/Sources/OpenCombine/Concurrency/GENERATED-Publisher+Concurrency.swift new file mode 100644 index 00000000..78175a7f --- /dev/null +++ b/Sources/OpenCombine/Concurrency/GENERATED-Publisher+Concurrency.swift @@ -0,0 +1,327 @@ +// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +// ┃ ┃ +// ┃ Auto-generated from GYB template. DO NOT EDIT! ┃ +// ┃ ┃ +// ┃ ┃ +// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ +// +// Publisher+Concurrency.swift +// +// +// Created by Sergej Jaskiewicz on 28.08.2021. +// + +// async/await is only available since Swift 5.5 +#if compiler(>=5.5) +extension Publisher where Failure == Never { + + @available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) + public var values: AsyncPublisher { + return .init(self) + } +} + +@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) +public struct AsyncPublisher: AsyncSequence + where Upstream.Failure == Never +{ + + public typealias Element = Upstream.Output + + public struct Iterator: AsyncIteratorProtocol { + + public typealias Element = Upstream.Output + + fileprivate let inner: Inner + + public func next() async -> Element? { + return await withTaskCancellationHandler( + handler: { [inner] in inner.cancel() }, + operation: { [inner] in await inner.next() } + ) + } + } + + public typealias AsyncIterator = Iterator + + private let publisher: Upstream + + public init(_ publisher: Upstream) { + self.publisher = publisher + } + + public func makeAsyncIterator() -> Iterator { + let inner = Iterator.Inner() + publisher.subscribe(inner) + return Iterator(inner: inner) + } +} + +@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) +extension AsyncPublisher.Iterator { + + // TODO: Test if it's really cancellable + fileprivate final class Inner: Subscriber, Cancellable { + typealias Input = Upstream.Output + typealias Failure = Upstream.Failure + + private enum State { + case awaitingSubscription + case subscribed(Subscription) + case terminal + } + + private let lock = UnfairLock.allocate() + private var pending: [UnsafeContinuation] = [] + private var state = State.awaitingSubscription + private var pendingDemand = Subscribers.Demand.none + + deinit { + lock.deallocate() + } + + func receive(subscription: Subscription) { + lock.lock() + guard case .awaitingSubscription = state else { + lock.unlock() + subscription.cancel() + return + } + state = .subscribed(subscription) + let pendingDemand = self.pendingDemand + self.pendingDemand = .none + lock.unlock() + if pendingDemand != .none { + subscription.request(pendingDemand) + } + } + + func receive(_ input: Input) -> Subscribers.Demand { + lock.lock() + guard case .subscribed = state else { + let pending = self.pending.take() + lock.unlock() + pending.resumeAllWithNil() + return .none + } + precondition(!pending.isEmpty, "Received an output without requesting demand") + let continuation = pending.removeFirst() + lock.unlock() + continuation.resume(returning: input) + return .none + } + + func receive(completion: Subscribers.Completion) { + lock.lock() + state = .terminal + let pending = self.pending.take() + lock.unlock() + pending.resumeAllWithNil() + } + + func cancel() { + lock.lock() + let pending = self.pending.take() + guard case .subscribed(let subscription) = state else { + state = .terminal + lock.unlock() + pending.resumeAllWithNil() + return + } + state = .terminal + lock.unlock() + subscription.cancel() + pending.resumeAllWithNil() + } + + fileprivate func next() async -> Input? { + return await withUnsafeContinuation { continuation in + lock.lock() + switch state { + case .awaitingSubscription: + pending.append(continuation) + pendingDemand += 1 + lock.unlock() + case .subscribed(let subscription): + pending.append(continuation) + lock.unlock() + subscription.request(.max(1)) + case .terminal: + lock.unlock() + continuation.resume(returning: nil) + } + } + } + } +} +extension Publisher { + + @available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) + public var values: AsyncThrowingPublisher { + return .init(self) + } +} + +@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) +public struct AsyncThrowingPublisher: AsyncSequence +{ + + public typealias Element = Upstream.Output + + public struct Iterator: AsyncIteratorProtocol { + + public typealias Element = Upstream.Output + + fileprivate let inner: Inner + + public func next() async throws -> Element? { + return try await withTaskCancellationHandler( + handler: { [inner] in inner.cancel() }, + operation: { [inner] in try await inner.next() } + ) + } + } + + public typealias AsyncIterator = Iterator + + private let publisher: Upstream + + public init(_ publisher: Upstream) { + self.publisher = publisher + } + + public func makeAsyncIterator() -> Iterator { + let inner = Iterator.Inner() + publisher.subscribe(inner) + return Iterator(inner: inner) + } +} + +@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) +extension AsyncThrowingPublisher.Iterator { + + // TODO: Test if it's really cancellable + fileprivate final class Inner: Subscriber, Cancellable { + typealias Input = Upstream.Output + typealias Failure = Upstream.Failure + + private enum State { + case awaitingSubscription + case subscribed(Subscription) + case terminal(Error?) + } + + private let lock = UnfairLock.allocate() + private var pending: [UnsafeContinuation] = [] + private var state = State.awaitingSubscription + private var pendingDemand = Subscribers.Demand.none + + deinit { + lock.deallocate() + } + + func receive(subscription: Subscription) { + lock.lock() + guard case .awaitingSubscription = state else { + lock.unlock() + subscription.cancel() + return + } + state = .subscribed(subscription) + let pendingDemand = self.pendingDemand + self.pendingDemand = .none + lock.unlock() + if pendingDemand != .none { + subscription.request(pendingDemand) + } + } + + func receive(_ input: Input) -> Subscribers.Demand { + lock.lock() + guard case .subscribed = state else { + let pending = self.pending.take() + lock.unlock() + pending.resumeAllWithNil() + return .none + } + precondition(!pending.isEmpty, "Received an output without requesting demand") + let continuation = pending.removeFirst() + lock.unlock() + continuation.resume(returning: input) + return .none + } + + func receive(completion: Subscribers.Completion) { + lock.lock() + switch state { + case .awaitingSubscription, .subscribed: + if let continuation = pending.first { + // TODO: Test that it's nil even if the publisher fails + state = .terminal(nil) + let remaining = pending.take().dropFirst() + lock.unlock() + switch completion { + case .finished: + continuation.resume(returning: nil) + case .failure(let error): + continuation.resume(throwing: error) + } + remaining.resumeAllWithNil() + } else { + state = .terminal(completion.failure) + lock.unlock() + } + case .terminal: + let pending = self.pending.take() + lock.unlock() + pending.resumeAllWithNil() + } + } + + func cancel() { + lock.lock() + let pending = self.pending.take() + guard case .subscribed(let subscription) = state else { + state = .terminal(nil) + lock.unlock() + pending.resumeAllWithNil() + return + } + state = .terminal(nil) + lock.unlock() + subscription.cancel() + pending.resumeAllWithNil() + } + + fileprivate func next() async throws -> Input? { + return try await withUnsafeThrowingContinuation { continuation in + lock.lock() + switch state { + case .awaitingSubscription: + pending.append(continuation) + pendingDemand += 1 + lock.unlock() + case .subscribed(let subscription): + pending.append(continuation) + lock.unlock() + subscription.request(.max(1)) + case .terminal: + lock.unlock() + continuation.resume(returning: nil) + } + } + } + } +} + +@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) +extension Sequence { + fileprivate func resumeAllWithNil() + where Element == UnsafeContinuation + { + for continuation in self { + continuation.resume(returning: nil) + } + } +} +#endif // compiler(>=5.5) diff --git a/Sources/OpenCombine/Concurrency/Publisher+Concurrency.swift.gyb b/Sources/OpenCombine/Concurrency/Publisher+Concurrency.swift.gyb new file mode 100644 index 00000000..a702c28d --- /dev/null +++ b/Sources/OpenCombine/Concurrency/Publisher+Concurrency.swift.gyb @@ -0,0 +1,196 @@ +${template_header} +// +// Publisher+Concurrency.swift +// +// +// Created by Sergej Jaskiewicz on 28.08.2021. +// + +// async/await is only available since Swift 5.5 +#if compiler(>=5.5) +%{ +instantiations = [('AsyncPublisher', False), ('AsyncThrowingPublisher', True)] +}% +% for instantiation, throwing in instantiations: +extension Publisher ${'' if throwing else 'where Failure == Never '}{ + + @available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) + public var values: ${instantiation} { + return .init(self) + } +} + +@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) +public struct ${instantiation}: AsyncSequence +% if not throwing: + where Upstream.Failure == Never +% end +{ + + public typealias Element = Upstream.Output + + public struct Iterator: AsyncIteratorProtocol { + + public typealias Element = Upstream.Output + + fileprivate let inner: Inner + + public func next() async ${'throws ' if throwing else ''}-> Element? { + return ${'try ' if throwing else ''}await withTaskCancellationHandler( + handler: { [inner] in inner.cancel() }, + operation: { [inner] in ${'try ' if throwing else ''}await inner.next() } + ) + } + } + + public typealias AsyncIterator = Iterator + + private let publisher: Upstream + + public init(_ publisher: Upstream) { + self.publisher = publisher + } + + public func makeAsyncIterator() -> Iterator { + let inner = Iterator.Inner() + publisher.subscribe(inner) + return Iterator(inner: inner) + } +} + +@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) +extension ${instantiation}.Iterator { + + // TODO: Test if it's really cancellable + fileprivate final class Inner: Subscriber, Cancellable { + typealias Input = Upstream.Output + typealias Failure = Upstream.Failure + + private enum State { + case awaitingSubscription + case subscribed(Subscription) + case terminal${'(Error?)' if throwing else ''} + } + + private let lock = UnfairLock.allocate() + private var pending: [UnsafeContinuation] = [] + private var state = State.awaitingSubscription + private var pendingDemand = Subscribers.Demand.none + + deinit { + lock.deallocate() + } + + func receive(subscription: Subscription) { + lock.lock() + guard case .awaitingSubscription = state else { + lock.unlock() + subscription.cancel() + return + } + state = .subscribed(subscription) + let pendingDemand = self.pendingDemand + self.pendingDemand = .none + lock.unlock() + if pendingDemand != .none { + subscription.request(pendingDemand) + } + } + + func receive(_ input: Input) -> Subscribers.Demand { + lock.lock() + guard case .subscribed = state else { + let pending = self.pending.take() + lock.unlock() + pending.resumeAllWithNil() + return .none + } + precondition(!pending.isEmpty, "Received an output without requesting demand") + let continuation = pending.removeFirst() + lock.unlock() + continuation.resume(returning: input) + return .none + } + + func receive(completion: Subscribers.Completion) { + lock.lock() +% if throwing: + switch state { + case .awaitingSubscription, .subscribed: + if let continuation = pending.first { + // TODO: Test that it's nil even if the publisher fails + state = .terminal(nil) + let remaining = pending.take().dropFirst() + lock.unlock() + switch completion { + case .finished: + continuation.resume(returning: nil) + case .failure(let error): + continuation.resume(throwing: error) + } + remaining.resumeAllWithNil() + } else { + state = .terminal(completion.failure) + lock.unlock() + } + case .terminal: + let pending = self.pending.take() + lock.unlock() + pending.resumeAllWithNil() + } +% else: + state = .terminal + let pending = self.pending.take() + lock.unlock() + pending.resumeAllWithNil() +% end + } + + func cancel() { + lock.lock() + let pending = self.pending.take() + guard case .subscribed(let subscription) = state else { + state = .terminal${'(nil)' if throwing else ''} + lock.unlock() + pending.resumeAllWithNil() + return + } + state = .terminal${'(nil)' if throwing else ''} + lock.unlock() + subscription.cancel() + pending.resumeAllWithNil() + } + + fileprivate func next() async ${'throws ' if throwing else ''}-> Input? { + return ${'try ' if throwing else ''}await withUnsafe${'Throwing' if throwing else ''}Continuation { continuation in + lock.lock() + switch state { + case .awaitingSubscription: + pending.append(continuation) + pendingDemand += 1 + lock.unlock() + case .subscribed(let subscription): + pending.append(continuation) + lock.unlock() + subscription.request(.max(1)) + case .terminal: + lock.unlock() + continuation.resume(returning: nil) + } + } + } + } +} +% end + +@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) +extension Sequence { + fileprivate func resumeAllWithNil() + where Element == UnsafeContinuation + { + for continuation in self { + continuation.resume(returning: nil) + } + } +} +#endif // compiler(>=5.5) diff --git a/Sources/OpenCombine/Subscribers/Subscribers.Completion.swift b/Sources/OpenCombine/Subscribers/Subscribers.Completion.swift index 9bb3fca8..90c3ad18 100644 --- a/Sources/OpenCombine/Subscribers/Subscribers.Completion.swift +++ b/Sources/OpenCombine/Subscribers/Subscribers.Completion.swift @@ -23,6 +23,10 @@ extension Subscribers.Completion: Equatable where Failure: Equatable {} extension Subscribers.Completion: Hashable where Failure: Hashable {} +#if compiler(>=5.5) +extension Subscribers.Completion: Sendable {} +#endif + extension Subscribers.Completion { private enum CodingKeys: String, CodingKey { case success = "success" @@ -70,4 +74,13 @@ extension Subscribers.Completion { return .failure(error) } } + + internal var failure: Failure? { + switch self { + case .finished: + return nil + case .failure(let failure): + return failure + } + } } diff --git a/Sources/OpenCombine/Subscribers/Subscribers.Demand.swift b/Sources/OpenCombine/Subscribers/Subscribers.Demand.swift index 5eefc30f..a440a0fa 100644 --- a/Sources/OpenCombine/Subscribers/Subscribers.Demand.swift +++ b/Sources/OpenCombine/Subscribers/Subscribers.Demand.swift @@ -466,3 +466,7 @@ extension Subscribers { } } } + +#if compiler(>=5.5) +extension Subscribers.Demand: Sendable {} +#endif