Skip to content

Commit

Permalink
feat(TaskOperation): allow executing as detached task (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
soumyamahunt committed Aug 31, 2022
1 parent 63fed91 commit e3dcfeb
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 89 deletions.
16 changes: 8 additions & 8 deletions AsyncObjects.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/* End PBXAggregateTarget section */

/* Begin PBXBuildFile section */
64DDE914E91EBCF451EEB28E /* AsyncObjects.docc in Sources */ = {isa = PBXBuildFile; fileRef = 2F994B835A88B84F6C8AE38B /* AsyncObjects.docc */; };
15423F4ABF75D8A400B6C633 /* AsyncObjects.docc in Sources */ = {isa = PBXBuildFile; fileRef = 92104D54C38BC7BB7060DA49 /* AsyncObjects.docc */; };
OBJ_106 /* AsyncCountdownEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_11 /* AsyncCountdownEvent.swift */; };
OBJ_107 /* AsyncEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_12 /* AsyncEvent.swift */; };
OBJ_108 /* AsyncObject.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_13 /* AsyncObject.swift */; };
Expand All @@ -30,7 +30,7 @@
OBJ_111 /* Continuable.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_16 /* Continuable.swift */; };
OBJ_112 /* Future.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_17 /* Future.swift */; };
OBJ_113 /* Locker.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_18 /* Locker.swift */; };
OBJ_114 /* TaskGroup.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_19 /* TaskGroup.swift */; };
OBJ_114 /* Task.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_19 /* Task.swift */; };
OBJ_115 /* TaskOperation.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_20 /* TaskOperation.swift */; };
OBJ_116 /* TaskQueue.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_21 /* TaskQueue.swift */; };
OBJ_117 /* TaskTracker.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_22 /* TaskTracker.swift */; };
Expand Down Expand Up @@ -104,7 +104,7 @@
/* End PBXBuildFile section */

/* Begin PBXFileReference section */
2F994B835A88B84F6C8AE38B /* AsyncObjects.docc */ = {isa = PBXFileReference; includeInIndex = 1; path = AsyncObjects.docc; sourceTree = "<group>"; };
92104D54C38BC7BB7060DA49 /* AsyncObjects.docc */ = {isa = PBXFileReference; includeInIndex = 1; path = AsyncObjects.docc; sourceTree = "<group>"; };
OBJ_11 /* AsyncCountdownEvent.swift */ = {isa = PBXFileReference; path = AsyncCountdownEvent.swift; sourceTree = "<group>"; };
OBJ_12 /* AsyncEvent.swift */ = {isa = PBXFileReference; path = AsyncEvent.swift; sourceTree = "<group>"; };
OBJ_13 /* AsyncObject.swift */ = {isa = PBXFileReference; path = AsyncObject.swift; sourceTree = "<group>"; };
Expand All @@ -113,7 +113,7 @@
OBJ_16 /* Continuable.swift */ = {isa = PBXFileReference; path = Continuable.swift; sourceTree = "<group>"; };
OBJ_17 /* Future.swift */ = {isa = PBXFileReference; path = Future.swift; sourceTree = "<group>"; };
OBJ_18 /* Locker.swift */ = {isa = PBXFileReference; path = Locker.swift; sourceTree = "<group>"; };
OBJ_19 /* TaskGroup.swift */ = {isa = PBXFileReference; path = TaskGroup.swift; sourceTree = "<group>"; };
OBJ_19 /* Task.swift */ = {isa = PBXFileReference; path = Task.swift; sourceTree = "<group>"; };
OBJ_20 /* TaskOperation.swift */ = {isa = PBXFileReference; path = TaskOperation.swift; sourceTree = "<group>"; };
OBJ_21 /* TaskQueue.swift */ = {isa = PBXFileReference; path = TaskQueue.swift; sourceTree = "<group>"; };
OBJ_22 /* TaskTracker.swift */ = {isa = PBXFileReference; path = TaskTracker.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -220,11 +220,11 @@
OBJ_16 /* Continuable.swift */,
OBJ_17 /* Future.swift */,
OBJ_18 /* Locker.swift */,
OBJ_19 /* TaskGroup.swift */,
OBJ_19 /* Task.swift */,
OBJ_20 /* TaskOperation.swift */,
OBJ_21 /* TaskQueue.swift */,
OBJ_22 /* TaskTracker.swift */,
2F994B835A88B84F6C8AE38B /* AsyncObjects.docc */,
92104D54C38BC7BB7060DA49 /* AsyncObjects.docc */,
);
name = AsyncObjects;
path = Sources/AsyncObjects;
Expand Down Expand Up @@ -556,11 +556,11 @@
OBJ_111 /* Continuable.swift in Sources */,
OBJ_112 /* Future.swift in Sources */,
OBJ_113 /* Locker.swift in Sources */,
OBJ_114 /* TaskGroup.swift in Sources */,
OBJ_114 /* Task.swift in Sources */,
OBJ_115 /* TaskOperation.swift in Sources */,
OBJ_116 /* TaskQueue.swift in Sources */,
OBJ_117 /* TaskTracker.swift in Sources */,
64DDE914E91EBCF451EEB28E /* AsyncObjects.docc in Sources */,
15423F4ABF75D8A400B6C633 /* AsyncObjects.docc in Sources */,
);
};
OBJ_126 /* Sources */ = {
Expand Down
15 changes: 11 additions & 4 deletions Sources/AsyncObjects/AsyncCountdownEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ public actor AsyncCountdownEvent: AsyncObject {

// MARK: Internal

/// Resume provided continuation with additional changes based on the associated flags.
///
/// - Parameter continuation: The queued continuation to resume.
@inlinable
func _resumeContinuation(_ continuation: Continuation) {
currentCount += 1
continuation.resume()
}

/// Add continuation with the provided key in `continuations` map.
///
/// - Parameters:
Expand All @@ -58,8 +67,7 @@ public actor AsyncCountdownEvent: AsyncObject {
withKey key: UUID
) {
guard !isSet, continuations.isEmpty else {
currentCount += 1
continuation.resume()
_resumeContinuation(continuation)
return
}
continuations[key] = continuation
Expand Down Expand Up @@ -90,8 +98,7 @@ public actor AsyncCountdownEvent: AsyncObject {
func _resumeContinuations() {
while !continuations.isEmpty && isSet {
let (_, continuation) = continuations.removeFirst()
continuation.resume()
self.currentCount += 1
_resumeContinuation(continuation)
}
}

Expand Down
128 changes: 102 additions & 26 deletions Sources/AsyncObjects/TaskOperation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import Dispatch
public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
@unchecked Sendable
{
/// The type used to track completion of provided operation and unstructured tasks created in it.
private typealias Tracker = TaskTracker
/// The asynchronous action to perform as part of the operation..
private let underlyingAction: @Sendable () async throws -> R
/// The top-level task that executes asynchronous action provided
Expand All @@ -29,18 +27,23 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
/// synchronize data access and modifications.
@usableFromInline
let locker: Locker

/// A type representing a set of behaviors for the executed
/// task type and task completion behavior.
///
/// ``TaskOperation`` determines the execution behavior of
/// provided action as task based on the provided flags.
public typealias Flags = TaskOperationFlags
/// The priority of top-level task executed.
///
/// In case of `nil` priority from `Task.currentPriority`
/// of task that starts the operation used.
public let priority: TaskPriority?
/// If completion of unstructured tasks created as part of provided task
/// should be tracked.
/// A set of behaviors for the executed task type and task completion behavior.
///
/// If true, operation only completes if the provided asynchronous action
/// and all of its created unstructured task completes.
/// Otherwise, operation completes if the provided action itself completes.
public let shouldTrackUnstructuredTasks: Bool
/// Provided flags determine the execution behavior of
/// the action as task.
public let flags: Flags

/// A Boolean value indicating whether the operation executes its task asynchronously.
///
Expand Down Expand Up @@ -119,14 +122,14 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
///
/// - Returns: The newly created asynchronous operation.
public init(
trackUnstructuredTasks shouldTrackUnstructuredTasks: Bool = false,
synchronizedWith locker: Locker = .init(),
priority: TaskPriority? = nil,
flags: Flags = [],
operation: @escaping @Sendable () async throws -> R
) {
self.shouldTrackUnstructuredTasks = shouldTrackUnstructuredTasks
self.locker = locker
self.priority = priority
self.flags = flags
self.underlyingAction = operation
super.init()
}
Expand Down Expand Up @@ -154,22 +157,12 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
/// as part of a new top-level task on behalf of the current actor.
public override func main() {
guard isExecuting, execTask == nil else { return }
execTask = Task(priority: priority) { [weak self] in
guard
let action = self?.underlyingAction,
let trackUnstructuredTasks = self?.shouldTrackUnstructuredTasks
else { throw CancellationError() }
let final = { @Sendable[weak self] in self?._finish(); return }
return trackUnstructuredTasks
? try await Tracker.$current.withValue(
.init(onComplete: final),
operation: action
)
: try await {
defer { final() }
return try await action()
}()
}
let final = { @Sendable[weak self] in self?._finish(); return }
execTask = flags.createTask(
priority: priority,
operation: underlyingAction,
onComplete: final
)
}

/// Advises the operation object that it should stop executing its task.
Expand Down Expand Up @@ -275,3 +268,86 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
/// if the operation hasn't been started yet with either
/// ``TaskOperation/start()`` or ``TaskOperation/signal()``.
public struct EarlyInvokeError: Error, Sendable {}

/// A set of behaviors for ``TaskOperation``s,
/// such as the task type and task completion behavior.
///
/// ``TaskOperation`` determines the execution behavior of
/// provided action as task based on the provided flags.
public struct TaskOperationFlags: OptionSet, Sendable {
/// Indicates to ``TaskOperation``, completion of unstructured tasks
/// created as part of provided operation should be tracked.
///
/// If provided, GCD operation only completes if the provided asynchronous action
/// and all of its created unstructured task completes.
/// Otherwise, operation completes if the provided action itself completes.
public static let trackUnstructuredTasks = Self.init(rawValue: 1 << 0)
/// Indicates to ``TaskOperation`` to disassociate action from the current execution context
/// by running as a new detached task.
///
/// Provided action is executed asynchronously as part of a new top-level task,
/// with the provided task priority and without inheriting actor context that started
/// the GCD operation.
public static let detached = Self.init(rawValue: 1 << 1)

/// The type used to track completion of provided operation and unstructured tasks created in it.
private typealias Tracker = TaskTracker

/// Runs the given throwing operation asynchronously as part of a new top-level task
/// based on the current flags indicating whether to on behalf of the current actor
/// and whether to track unstructured tasks created in provided operation.
///
/// - Parameters:
/// - priority: The priority of the task that operation executes.
/// Pass `nil` to use the priority from `Task.currentPriority`
/// of task that starts the operation.
/// - operation: The asynchronous operation to execute.
/// - completion: The action to invoke when task completes.
///
/// - Returns: A reference to the task.
fileprivate func createTask<R: Sendable>(
priority: TaskPriority? = nil,
operation: @escaping @Sendable () async throws -> R,
onComplete completion: @escaping @Sendable () -> Void
) -> Task<R, Error> {
typealias LocalTask = Task<R, Error>
typealias ThrowingAction = @Sendable () async throws -> R
typealias TaskInitializer = (TaskPriority?, ThrowingAction) -> LocalTask

let initializer =
self.contains(.detached)
? LocalTask.detached
: LocalTask.init
return initializer(priority) {
return self.contains(.trackUnstructuredTasks)
? try await Tracker.$current.withValue(
.init(onComplete: completion),
operation: operation
)
: try await {
defer { completion() }
return try await operation()
}()
}
}

/// The corresponding value of the raw type.
///
/// A new instance initialized with rawValue will be equivalent to this instance.
/// For example:
/// ```swift
/// print(TaskOperationFlags(rawValue: 1 << 1) == TaskOperationFlags.detached)
/// // Prints "true"
/// ```
public let rawValue: UInt8
/// Creates a new flag from the given raw value.
///
/// - Parameter rawValue: The raw value of the flag set to create.
/// - Returns: The newly created flag set.
///
/// - Note: Do not use this method to create flag,
/// use the default flags provided instead.
public init(rawValue: UInt8) {
self.rawValue = rawValue
}
}
42 changes: 25 additions & 17 deletions Sources/AsyncObjects/TaskQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -162,24 +162,37 @@ public actor TaskQueue: AsyncObject {
|| flags.wait(forCurrent: currentRunning)
}

/// Resume provided continuation with additional changes based on the associated flags.
///
/// - Parameter continuation: The queued continuation to resume.
/// - Returns: Whether queue is free to proceed scheduling other tasks.
@inlinable
@discardableResult
func _resumeQueuedContinuation(
_ continuation: QueuedContinuation
) -> Bool {
currentRunning += 1
continuation.value.resume()
guard continuation.flags.isBlockEnabled else { return true }
blocked = true
return false
}

/// Add continuation with the provided key and associated flags to queue.
///
/// - Parameters:
/// - flags: The flags associated with continuation operation.
/// - key: The key in the continuation queue.
/// - continuation: The continuation to add to queue.
/// - continuation: The continuation and flags to add to queue.
@inlinable
func _queueContinuation(
withFlags flags: Flags = [],
atKey key: UUID = .init(),
_ continuation: Continuation
_ continuation: QueuedContinuation
) {
guard _wait(whenFlags: flags) else {
currentRunning += 1
continuation.resume()
guard _wait(whenFlags: continuation.flags) else {
_resumeQueuedContinuation(continuation)
return
}
queue[key] = (value: continuation, flags: flags)
queue[key] = continuation
}

/// Remove continuation associated with provided key from queue.
Expand Down Expand Up @@ -218,16 +231,12 @@ public actor TaskQueue: AsyncObject {
/// and operation flags preconditions satisfied.
@inlinable
func _resumeQueuedTasks() {
while let (_, (continuation, flags)) = queue.elements.first,
while let (_, continuation) = queue.elements.first,
!blocked,
!flags.wait(forCurrent: currentRunning)
!continuation.flags.wait(forCurrent: currentRunning)
{
queue.removeFirst()
currentRunning += 1
continuation.resume()
guard flags.isBlockEnabled else { continue }
blocked = true
break
guard _resumeQueuedContinuation(continuation) else { break }
}
}

Expand All @@ -252,9 +261,8 @@ public actor TaskQueue: AsyncObject {
try await Continuation.with { continuation in
Task { [weak self] in
await self?._queueContinuation(
withFlags: flags,
atKey: key,
continuation
(value: continuation, flags: flags)
)
}
}
Expand Down
1 change: 0 additions & 1 deletion Tests/AsyncObjectsTests/StandardLibraryTests.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import XCTest

/// Tests inner workings of structured concurrency
@MainActor
class StandardLibraryTests: XCTestCase {

func testTaskValueFetchingCancelation() async throws {
Expand Down
2 changes: 1 addition & 1 deletion Tests/AsyncObjectsTests/TaskOperationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class TaskOperationTests: XCTestCase {
func createOperationWithChildTasks(
track: Bool = false
) -> TaskOperation<Void> {
return TaskOperation(trackUnstructuredTasks: track) {
return TaskOperation(flags: track ? .trackUnstructuredTasks : []) {
Task {
try await Self.sleep(seconds: 1)
}
Expand Down

0 comments on commit e3dcfeb

Please sign in to comment.