Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions AsyncObjects.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/* End PBXAggregateTarget section */

/* Begin PBXBuildFile section */
64DDE914E91EBCF451EEB28E /* AsyncObjects.docc in Sources */ = {isa = PBXBuildFile; fileRef = 2F994B835A88B84F6C8AE38B /* AsyncObjects.docc */; };
15423F4ABF75D8A400B6C633 /* AsyncObjects.docc in Sources */ = {isa = PBXBuildFile; fileRef = 92104D54C38BC7BB7060DA49 /* AsyncObjects.docc */; };
OBJ_106 /* AsyncCountdownEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_11 /* AsyncCountdownEvent.swift */; };
OBJ_107 /* AsyncEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_12 /* AsyncEvent.swift */; };
OBJ_108 /* AsyncObject.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_13 /* AsyncObject.swift */; };
Expand All @@ -30,7 +30,7 @@
OBJ_111 /* Continuable.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_16 /* Continuable.swift */; };
OBJ_112 /* Future.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_17 /* Future.swift */; };
OBJ_113 /* Locker.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_18 /* Locker.swift */; };
OBJ_114 /* TaskGroup.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_19 /* TaskGroup.swift */; };
OBJ_114 /* Task.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_19 /* Task.swift */; };
OBJ_115 /* TaskOperation.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_20 /* TaskOperation.swift */; };
OBJ_116 /* TaskQueue.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_21 /* TaskQueue.swift */; };
OBJ_117 /* TaskTracker.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_22 /* TaskTracker.swift */; };
Expand Down Expand Up @@ -104,7 +104,7 @@
/* End PBXBuildFile section */

/* Begin PBXFileReference section */
2F994B835A88B84F6C8AE38B /* AsyncObjects.docc */ = {isa = PBXFileReference; includeInIndex = 1; path = AsyncObjects.docc; sourceTree = "<group>"; };
92104D54C38BC7BB7060DA49 /* AsyncObjects.docc */ = {isa = PBXFileReference; includeInIndex = 1; path = AsyncObjects.docc; sourceTree = "<group>"; };
OBJ_11 /* AsyncCountdownEvent.swift */ = {isa = PBXFileReference; path = AsyncCountdownEvent.swift; sourceTree = "<group>"; };
OBJ_12 /* AsyncEvent.swift */ = {isa = PBXFileReference; path = AsyncEvent.swift; sourceTree = "<group>"; };
OBJ_13 /* AsyncObject.swift */ = {isa = PBXFileReference; path = AsyncObject.swift; sourceTree = "<group>"; };
Expand All @@ -113,7 +113,7 @@
OBJ_16 /* Continuable.swift */ = {isa = PBXFileReference; path = Continuable.swift; sourceTree = "<group>"; };
OBJ_17 /* Future.swift */ = {isa = PBXFileReference; path = Future.swift; sourceTree = "<group>"; };
OBJ_18 /* Locker.swift */ = {isa = PBXFileReference; path = Locker.swift; sourceTree = "<group>"; };
OBJ_19 /* TaskGroup.swift */ = {isa = PBXFileReference; path = TaskGroup.swift; sourceTree = "<group>"; };
OBJ_19 /* Task.swift */ = {isa = PBXFileReference; path = Task.swift; sourceTree = "<group>"; };
OBJ_20 /* TaskOperation.swift */ = {isa = PBXFileReference; path = TaskOperation.swift; sourceTree = "<group>"; };
OBJ_21 /* TaskQueue.swift */ = {isa = PBXFileReference; path = TaskQueue.swift; sourceTree = "<group>"; };
OBJ_22 /* TaskTracker.swift */ = {isa = PBXFileReference; path = TaskTracker.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -220,11 +220,11 @@
OBJ_16 /* Continuable.swift */,
OBJ_17 /* Future.swift */,
OBJ_18 /* Locker.swift */,
OBJ_19 /* TaskGroup.swift */,
OBJ_19 /* Task.swift */,
OBJ_20 /* TaskOperation.swift */,
OBJ_21 /* TaskQueue.swift */,
OBJ_22 /* TaskTracker.swift */,
2F994B835A88B84F6C8AE38B /* AsyncObjects.docc */,
92104D54C38BC7BB7060DA49 /* AsyncObjects.docc */,
);
name = AsyncObjects;
path = Sources/AsyncObjects;
Expand Down Expand Up @@ -556,11 +556,11 @@
OBJ_111 /* Continuable.swift in Sources */,
OBJ_112 /* Future.swift in Sources */,
OBJ_113 /* Locker.swift in Sources */,
OBJ_114 /* TaskGroup.swift in Sources */,
OBJ_114 /* Task.swift in Sources */,
OBJ_115 /* TaskOperation.swift in Sources */,
OBJ_116 /* TaskQueue.swift in Sources */,
OBJ_117 /* TaskTracker.swift in Sources */,
64DDE914E91EBCF451EEB28E /* AsyncObjects.docc in Sources */,
15423F4ABF75D8A400B6C633 /* AsyncObjects.docc in Sources */,
);
};
OBJ_126 /* Sources */ = {
Expand Down
15 changes: 11 additions & 4 deletions Sources/AsyncObjects/AsyncCountdownEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ public actor AsyncCountdownEvent: AsyncObject {

// MARK: Internal

/// Resume provided continuation with additional changes based on the associated flags.
///
/// - Parameter continuation: The queued continuation to resume.
@inlinable
func _resumeContinuation(_ continuation: Continuation) {
currentCount += 1
continuation.resume()
}

/// Add continuation with the provided key in `continuations` map.
///
/// - Parameters:
Expand All @@ -58,8 +67,7 @@ public actor AsyncCountdownEvent: AsyncObject {
withKey key: UUID
) {
guard !isSet, continuations.isEmpty else {
currentCount += 1
continuation.resume()
_resumeContinuation(continuation)
return
}
continuations[key] = continuation
Expand Down Expand Up @@ -90,8 +98,7 @@ public actor AsyncCountdownEvent: AsyncObject {
func _resumeContinuations() {
while !continuations.isEmpty && isSet {
let (_, continuation) = continuations.removeFirst()
continuation.resume()
self.currentCount += 1
_resumeContinuation(continuation)
}
}

Expand Down
128 changes: 102 additions & 26 deletions Sources/AsyncObjects/TaskOperation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import Dispatch
public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
@unchecked Sendable
{
/// The type used to track completion of provided operation and unstructured tasks created in it.
private typealias Tracker = TaskTracker
/// The asynchronous action to perform as part of the operation..
private let underlyingAction: @Sendable () async throws -> R
/// The top-level task that executes asynchronous action provided
Expand All @@ -29,18 +27,23 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
/// synchronize data access and modifications.
@usableFromInline
let locker: Locker

/// A type representing a set of behaviors for the executed
/// task type and task completion behavior.
///
/// ``TaskOperation`` determines the execution behavior of
/// provided action as task based on the provided flags.
public typealias Flags = TaskOperationFlags
/// The priority of top-level task executed.
///
/// In case of `nil` priority from `Task.currentPriority`
/// of task that starts the operation used.
public let priority: TaskPriority?
/// If completion of unstructured tasks created as part of provided task
/// should be tracked.
/// A set of behaviors for the executed task type and task completion behavior.
///
/// If true, operation only completes if the provided asynchronous action
/// and all of its created unstructured task completes.
/// Otherwise, operation completes if the provided action itself completes.
public let shouldTrackUnstructuredTasks: Bool
/// Provided flags determine the execution behavior of
/// the action as task.
public let flags: Flags

/// A Boolean value indicating whether the operation executes its task asynchronously.
///
Expand Down Expand Up @@ -119,14 +122,14 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
///
/// - Returns: The newly created asynchronous operation.
public init(
trackUnstructuredTasks shouldTrackUnstructuredTasks: Bool = false,
synchronizedWith locker: Locker = .init(),
priority: TaskPriority? = nil,
flags: Flags = [],
operation: @escaping @Sendable () async throws -> R
) {
self.shouldTrackUnstructuredTasks = shouldTrackUnstructuredTasks
self.locker = locker
self.priority = priority
self.flags = flags
self.underlyingAction = operation
super.init()
}
Expand Down Expand Up @@ -154,22 +157,12 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
/// as part of a new top-level task on behalf of the current actor.
public override func main() {
guard isExecuting, execTask == nil else { return }
execTask = Task(priority: priority) { [weak self] in
guard
let action = self?.underlyingAction,
let trackUnstructuredTasks = self?.shouldTrackUnstructuredTasks
else { throw CancellationError() }
let final = { @Sendable[weak self] in self?._finish(); return }
return trackUnstructuredTasks
? try await Tracker.$current.withValue(
.init(onComplete: final),
operation: action
)
: try await {
defer { final() }
return try await action()
}()
}
let final = { @Sendable[weak self] in self?._finish(); return }
execTask = flags.createTask(
priority: priority,
operation: underlyingAction,
onComplete: final
)
}

/// Advises the operation object that it should stop executing its task.
Expand Down Expand Up @@ -275,3 +268,86 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
/// if the operation hasn't been started yet with either
/// ``TaskOperation/start()`` or ``TaskOperation/signal()``.
public struct EarlyInvokeError: Error, Sendable {}

/// A set of behaviors for ``TaskOperation``s,
/// such as the task type and task completion behavior.
///
/// ``TaskOperation`` determines the execution behavior of
/// provided action as task based on the provided flags.
public struct TaskOperationFlags: OptionSet, Sendable {
/// Indicates to ``TaskOperation``, completion of unstructured tasks
/// created as part of provided operation should be tracked.
///
/// If provided, GCD operation only completes if the provided asynchronous action
/// and all of its created unstructured task completes.
/// Otherwise, operation completes if the provided action itself completes.
public static let trackUnstructuredTasks = Self.init(rawValue: 1 << 0)
/// Indicates to ``TaskOperation`` to disassociate action from the current execution context
/// by running as a new detached task.
///
/// Provided action is executed asynchronously as part of a new top-level task,
/// with the provided task priority and without inheriting actor context that started
/// the GCD operation.
public static let detached = Self.init(rawValue: 1 << 1)

/// The type used to track completion of provided operation and unstructured tasks created in it.
private typealias Tracker = TaskTracker

/// Runs the given throwing operation asynchronously as part of a new top-level task
/// based on the current flags indicating whether to on behalf of the current actor
/// and whether to track unstructured tasks created in provided operation.
///
/// - Parameters:
/// - priority: The priority of the task that operation executes.
/// Pass `nil` to use the priority from `Task.currentPriority`
/// of task that starts the operation.
/// - operation: The asynchronous operation to execute.
/// - completion: The action to invoke when task completes.
///
/// - Returns: A reference to the task.
fileprivate func createTask<R: Sendable>(
priority: TaskPriority? = nil,
operation: @escaping @Sendable () async throws -> R,
onComplete completion: @escaping @Sendable () -> Void
) -> Task<R, Error> {
typealias LocalTask = Task<R, Error>
typealias ThrowingAction = @Sendable () async throws -> R
typealias TaskInitializer = (TaskPriority?, ThrowingAction) -> LocalTask

let initializer =
self.contains(.detached)
? LocalTask.detached
: LocalTask.init
return initializer(priority) {
return self.contains(.trackUnstructuredTasks)
? try await Tracker.$current.withValue(
.init(onComplete: completion),
operation: operation
)
: try await {
defer { completion() }
return try await operation()
}()
}
}

/// The corresponding value of the raw type.
///
/// A new instance initialized with rawValue will be equivalent to this instance.
/// For example:
/// ```swift
/// print(TaskOperationFlags(rawValue: 1 << 1) == TaskOperationFlags.detached)
/// // Prints "true"
/// ```
public let rawValue: UInt8
/// Creates a new flag from the given raw value.
///
/// - Parameter rawValue: The raw value of the flag set to create.
/// - Returns: The newly created flag set.
///
/// - Note: Do not use this method to create flag,
/// use the default flags provided instead.
public init(rawValue: UInt8) {
self.rawValue = rawValue
}
}
42 changes: 25 additions & 17 deletions Sources/AsyncObjects/TaskQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -162,24 +162,37 @@ public actor TaskQueue: AsyncObject {
|| flags.wait(forCurrent: currentRunning)
}

/// Resume provided continuation with additional changes based on the associated flags.
///
/// - Parameter continuation: The queued continuation to resume.
/// - Returns: Whether queue is free to proceed scheduling other tasks.
@inlinable
@discardableResult
func _resumeQueuedContinuation(
_ continuation: QueuedContinuation
) -> Bool {
currentRunning += 1
continuation.value.resume()
guard continuation.flags.isBlockEnabled else { return true }
blocked = true
return false
}

/// Add continuation with the provided key and associated flags to queue.
///
/// - Parameters:
/// - flags: The flags associated with continuation operation.
/// - key: The key in the continuation queue.
/// - continuation: The continuation to add to queue.
/// - continuation: The continuation and flags to add to queue.
@inlinable
func _queueContinuation(
withFlags flags: Flags = [],
atKey key: UUID = .init(),
_ continuation: Continuation
_ continuation: QueuedContinuation
) {
guard _wait(whenFlags: flags) else {
currentRunning += 1
continuation.resume()
guard _wait(whenFlags: continuation.flags) else {
_resumeQueuedContinuation(continuation)
return
}
queue[key] = (value: continuation, flags: flags)
queue[key] = continuation
}

/// Remove continuation associated with provided key from queue.
Expand Down Expand Up @@ -218,16 +231,12 @@ public actor TaskQueue: AsyncObject {
/// and operation flags preconditions satisfied.
@inlinable
func _resumeQueuedTasks() {
while let (_, (continuation, flags)) = queue.elements.first,
while let (_, continuation) = queue.elements.first,
!blocked,
!flags.wait(forCurrent: currentRunning)
!continuation.flags.wait(forCurrent: currentRunning)
{
queue.removeFirst()
currentRunning += 1
continuation.resume()
guard flags.isBlockEnabled else { continue }
blocked = true
break
guard _resumeQueuedContinuation(continuation) else { break }
}
}

Expand All @@ -252,9 +261,8 @@ public actor TaskQueue: AsyncObject {
try await Continuation.with { continuation in
Task { [weak self] in
await self?._queueContinuation(
withFlags: flags,
atKey: key,
continuation
(value: continuation, flags: flags)
)
}
}
Expand Down
1 change: 0 additions & 1 deletion Tests/AsyncObjectsTests/StandardLibraryTests.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import XCTest

/// Tests inner workings of structured concurrency
@MainActor
class StandardLibraryTests: XCTestCase {

func testTaskValueFetchingCancelation() async throws {
Expand Down
2 changes: 1 addition & 1 deletion Tests/AsyncObjectsTests/TaskOperationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class TaskOperationTests: XCTestCase {
func createOperationWithChildTasks(
track: Bool = false
) -> TaskOperation<Void> {
return TaskOperation(trackUnstructuredTasks: track) {
return TaskOperation(flags: track ? .trackUnstructuredTasks : []) {
Task {
try await Self.sleep(seconds: 1)
}
Expand Down
Loading