Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ let package = Package(
name: "AsyncAlgorithmsTests",
dependencies: ["AsyncAlgorithms"]),
]
)
)
129 changes: 129 additions & 0 deletions Sources/AsyncAlgorithms/TaskFirst.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

struct TaskFirstState<Success: Sendable, Failure: Error>: Sendable {
var continuation: UnsafeContinuation<Success, Failure>?
var tasks: [Task<Success, Failure>]? = []

mutating func add(_ task: Task<Success, Failure>) -> Task<Success, Failure>? {
if var tasks = tasks {
tasks.append(task)
self.tasks = tasks
return nil
} else {
return task
}
}
Copy link
Member

@kperryua kperryua Jan 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on putting the withCriticalSection call in this method proper? I think you could similarly add

mutating func set(continuation: ...) { }
mutating func removeAll() -> [Task<Success, Failure>]? { }

which similarly encapsulate the withCriticalSection invocation. I think this would do a lot to make the callsites more readable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The critical section needs to be outside of the mutation method because the mutation has the write back of the value of the structure so it needs to be external to the change.

}

extension Task {
/// Determine the first result of a sequence of tasks.
///
/// - Parameters:
/// - tasks: The running tasks to obtain a result from
/// - Returns: The first result or thrown error from the running tasks
static func first<Tasks: Sequence>(
_ tasks: Tasks
) async throws -> Success
where Tasks.Element == Task<Success, Failure>, Failure == Error {
let state = ManagedCriticalState(TaskFirstState<Success, Failure>())
return try await withTaskCancellationHandler {
let tasks = state.withCriticalRegion { state -> [Task<Success, Failure>] in
defer { state.tasks = nil }
return state.tasks ?? []
}
for task in tasks {
task.cancel()
}
} operation: {
try await withUnsafeThrowingContinuation { continuation in
state.withCriticalRegion { state in
state.continuation = continuation
}
for task in tasks {
Task<Void, Never> {
let result = await task.result
state.withCriticalRegion { state -> UnsafeContinuation<Success, Failure>? in
defer { state.continuation = nil }
return state.continuation
}?.resume(with: result)
}
state.withCriticalRegion { state in
state.add(task)
}?.cancel()
}
}
}
}

/// Determine the first result of a list of tasks.
///
/// - Parameters:
/// - tasks: The running tasks to obtain a result from
/// - Returns: The first result or thrown error from the running tasks
static func first(
_ tasks: Task<Success, Failure>...
) async throws -> Success where Failure == Error {
try await first(tasks)
}
}

extension Task where Failure == Never {
/// Determine the first result of a sequence of tasks.
///
/// - Parameters:
/// - tasks: The running tasks to obtain a result from
/// - Returns: The first result from the running tasks
static func first<Tasks: Sequence>(
_ tasks: Tasks
) async -> Success
where Tasks.Element == Task<Success, Never> {
let state = ManagedCriticalState(TaskFirstState<Success, Failure>())
return await withTaskCancellationHandler {
let tasks = state.withCriticalRegion { state -> [Task<Success, Failure>] in
defer { state.tasks = nil }
return state.tasks ?? []
}
for task in tasks {
task.cancel()
}
} operation: {
await withUnsafeContinuation { continuation in
state.withCriticalRegion { state in
state.continuation = continuation
}
for task in tasks {
Task<Void, Never> {
let result = await task.result
state.withCriticalRegion { state -> UnsafeContinuation<Success, Failure>? in
defer { state.continuation = nil }
return state.continuation
}?.resume(with: result)
}
state.withCriticalRegion { state in
state.add(task)
}?.cancel()
}
}
}
}

/// Determine the first result of a list of tasks.
///
/// - Parameters:
/// - tasks: The running tasks to obtain a result from
/// - Returns: The first result from the running tasks
static func first(
_ tasks: Task<Success, Never>...
) async -> Success {
await first(tasks)
}
}
78 changes: 78 additions & 0 deletions Tests/AsyncAlgorithmsTests/TestTaskFirst.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

import XCTest
import AsyncAlgorithms

final class TestTaskFirst: XCTestCase {
func test_first() async {
let firstValue = await Task.first(Task {
return 1
}, Task {
try! await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
return 2
})
XCTAssertEqual(firstValue, 1)
}

func test_second() async {
let firstValue = await Task.first(Task {
try! await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
return 1
}, Task {
return 2
})
XCTAssertEqual(firstValue, 2)
}

func test_throwing() async {
do {
_ = try await Task.first(Task { () async throws -> Int in
try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
return 1
}, Task { () async throws -> Int in
throw NSError(domain: NSCocoaErrorDomain, code: -1, userInfo: nil)
})
XCTFail()
} catch {
XCTAssertEqual((error as NSError).code, -1)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we test task cancellation as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is another aspect I am considering about this - should these first methods have a cancellation handler? if so that would make the return value optional.

or should that be a second set of functions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a task is cancelled and "finishes first", then first will throw the CancellationError, no? Is that not enough? What did you have in mind?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well I guess I am thinking of perhaps an initializer on Task that takes N blocks and returns an actual Task object that can be cancelled is one option. The other option is to pass a cancelHandler and make the return type optional.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. We can discuss the shape of an API like that separately I think. In the meantime, with the current methods, is cancelling a task any different than the throwing case? Does it still deserve its own test case?


func test_cancellation() async {
let firstReady = expectation(description: "first ready")
let secondReady = expectation(description: "second ready")
let firstCancelled = expectation(description: "first cancelled")
let secondCancelled = expectation(description: "second cancelled")
let task = Task {
_ = await Task.first(Task {
await withTaskCancellationHandler {
firstCancelled.fulfill()
} operation: { () -> Int in
firstReady.fulfill()
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
return 1
}
}, Task {
await withTaskCancellationHandler {
secondCancelled.fulfill()
} operation: { () -> Int in
secondReady.fulfill()
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
return 1
}
})
}
wait(for: [firstReady, secondReady], timeout: 1.0)
task.cancel()
wait(for: [firstCancelled, secondCancelled], timeout: 1.0)
}
}