Skip to content

Commit

Permalink
[Xcode 13] Implement async/await support for publishers (no tests yet)
Browse files Browse the repository at this point in the history
  • Loading branch information
broadwaylamb committed Sep 24, 2021
1 parent 226b7b0 commit 32b2240
Show file tree
Hide file tree
Showing 7 changed files with 660 additions and 98 deletions.
1 change: 1 addition & 0 deletions Package.swift
Expand Up @@ -32,6 +32,7 @@ let package = Package(
condition: .when(platforms: supportedPlatforms.except([.wasi])))
],
exclude: [
"Concurrency/Publisher+Concurrency.swift.gyb",
"Publishers/Publishers.Encode.swift.gyb",
"Publishers/Publishers.MapKeyPath.swift.gyb",
"Publishers/Publishers.Catch.swift.gyb"
Expand Down
98 changes: 0 additions & 98 deletions RemainingCombineInterface.swift
Expand Up @@ -2,104 +2,6 @@
// Please remove the corresponding piece from this file if you implement something,
// and complement this file as features are added in Apple's Combine

@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
public struct AsyncPublisher<P> : AsyncSequence where P : Publisher, P.Failure == Never {

/// The type of element produced by this asynchronous sequence.
public typealias Element = P.Output

public struct Iterator : AsyncIteratorProtocol {

/// Asynchronously advances to the next element and returns it, or ends the
/// sequence if there is no next element.
///
/// - Returns: The next element, if it exists, or `nil` to signal the end of
/// the sequence.
public mutating func next() async -> P.Output?

public typealias Element = P.Output
}

public init(_ publisher: P)

/// Creates the asynchronous iterator that produces elements of this
/// asynchronous sequence.
///
/// - Returns: An instance of the `AsyncIterator` type used to produce
/// elements of the asynchronous sequence.
public func makeAsyncIterator() -> AsyncPublisher<P>.Iterator

/// The type of asynchronous iterator that produces elements of this
/// asynchronous sequence.
public typealias AsyncIterator = AsyncPublisher<P>.Iterator
}

@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
public struct AsyncThrowingPublisher<P> : AsyncSequence where P : Publisher {

/// The type of element produced by this asynchronous sequence.
public typealias Element = P.Output

public struct Iterator : AsyncIteratorProtocol {

/// Asynchronously advances to the next element and returns it, or ends the
/// sequence if there is no next element.
///
/// - Returns: The next element, if it exists, or `nil` to signal the end of
/// the sequence.
public mutating func next() async throws -> P.Output?

public typealias Element = P.Output
}

public init(_ publisher: P)

/// Creates the asynchronous iterator that produces elements of this
/// asynchronous sequence.
///
/// - Returns: An instance of the `AsyncIterator` type used to produce
/// elements of the asynchronous sequence.
public func makeAsyncIterator() -> AsyncThrowingPublisher<P>.Iterator

/// The type of asynchronous iterator that produces elements of this
/// asynchronous sequence.
public typealias AsyncIterator = AsyncThrowingPublisher<P>.Iterator
}


extension Future where Failure == Never {

@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
final public var value: Output { get async }
}

extension Future {

@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
final public var value: Output { get async throws }
}


extension Publisher where Self.Failure == Never {

@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
public var values: AsyncPublisher<Self> { get }
}

extension Publisher {

@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
public var values: AsyncThrowingPublisher<Self> { get }
}

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Subscribers.Completion : Sendable {
}

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Subscribers.Demand : Sendable {
}

extension Publishers {

/// A publisher that receives and combines the latest elements from two publishers.
Expand Down
119 changes: 119 additions & 0 deletions Sources/OpenCombine/Concurrency/Future+Concurrency.swift
@@ -0,0 +1,119 @@
//
// Future+Concurrency.swift
//
//
// Created by Sergej Jaskiewicz on 28.08.2021.
//

// async/await is only available since Swift 5.5
#if compiler(>=5.5)
extension Future where Failure == Never {

@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
public var value: Output {
get async { // swiftlint:disable:this implicit_getter
await ContinuationSubscriber.withUnsafeSubscription(self)
}
}
}

extension Future {

@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
public var value: Output {
get async throws { // swiftlint:disable:this implicit_getter
try await ContinuationSubscriber.withUnsafeThrowingSubscription(self)
}
}
}

@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
private final class ContinuationSubscriber<Input,
UpstreamFailure: Error,
ErrorOrNever: Error>
: Subscriber
{
typealias Failure = UpstreamFailure

private var continuation: UnsafeContinuation<Input, ErrorOrNever>?
private var subscription: Subscription?
private let lock = UnfairLock.allocate()

private init(_ continuation: UnsafeContinuation<Input, ErrorOrNever>) {
self.continuation = continuation
}

deinit {
lock.deallocate()
}

func receive(subscription: Subscription) {
lock.lock()
guard self.subscription == nil else {
lock.unlock()
subscription.cancel()
return
}
self.subscription = subscription
lock.unlock()
subscription.request(.max(1))
}

func receive(_ input: Input) -> Subscribers.Demand {
lock.lock()
if let continuation = self.continuation.take() {
lock.unlock()
continuation.resume(returning: input)
} else {
lock.unlock()
}
return .none
}

func receive(completion: Subscribers.Completion<Failure>) {
lock.lock()
subscription = nil
lock.unlock()
completion.failure.map(handleFailure)
}

private func handleFailure(_ error: Failure) {
lock.lock()
if let continuation = self.continuation.take() {
lock.unlock()
continuation.resume(throwing: error as! ErrorOrNever)
} else {
lock.unlock()
}
}
}

@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
extension ContinuationSubscriber where ErrorOrNever == Error {
fileprivate static func withUnsafeThrowingSubscription<Upstream: Publisher>(
_ upstream: Upstream
) async throws -> Input
where Upstream.Output == Input,
Upstream.Failure == UpstreamFailure
{
try await withUnsafeThrowingContinuation { continuation in
upstream.subscribe(ContinuationSubscriber(continuation))
}
}
}

@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
extension ContinuationSubscriber where UpstreamFailure == Never, ErrorOrNever == Never {
fileprivate static func withUnsafeSubscription<Upstream: Publisher>(
_ upstream: Upstream
) async -> Input
where Upstream.Output == Input,
Upstream.Failure == Never
{
await withUnsafeContinuation { continuation in
upstream.subscribe(ContinuationSubscriber(continuation))
}
}
}

#endif // compiler(>=5.5)

0 comments on commit 32b2240

Please sign in to comment.