diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index 82c3d40..9d882ef 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -55,11 +55,12 @@ public final class ActorQueue: @unchecked Sendable { // MARK: Initialization /// Instantiates an actor queue. - public init() { + /// - Parameter name: Human readable name of the queue. + public init(name: String? = nil) { let (taskStream, taskStreamContinuation) = AsyncStream.makeStream() self.taskStreamContinuation = taskStreamContinuation - Task { + Task(name: name) { // In an ideal world, we would isolate this `for await` loop to the `ActorType`. // However, there's no good way to do that without retaining the actor and creating a cycle. for await actorTask in taskStream { @@ -146,10 +147,13 @@ extension Task { /// it only makes it impossible for you to explicitly cancel the task. /// /// - Parameters: + /// - name: Human readable name of the task. + /// - priority: The priority of the operation task. /// - actorQueue: The queue on which to enqueue the task. /// - operation: The operation to perform. @discardableResult public init( + name: String? = nil, priority: TaskPriority? = nil, on actorQueue: ActorQueue, operation: @Sendable @escaping (isolated ActorType) async -> Success, @@ -162,11 +166,11 @@ extension Task { await semaphore.wait() delivery.execute({ @Sendable executionContext in await delivery.sendValue(operation(executionContext)) - }, in: executionContext, priority: priority) + }, in: executionContext, name: name, priority: priority) }, ) actorQueue.taskStreamContinuation.yield(task) - self.init(priority: priority) { + self.init(name: name, priority: priority) { await withTaskCancellationHandler( operation: { await semaphore.signal() @@ -199,12 +203,14 @@ extension Task { /// it only makes it impossible for you to explicitly cancel the task. /// /// - Parameters: + /// - name: Human readable name of the task. /// - priority: The priority of the task. /// Pass `nil` to use the priority from `Task.currentPriority`. /// - actorQueue: The queue on which to enqueue the task. /// - operation: The operation to perform. @discardableResult public init( + name: String? = nil, priority: TaskPriority? = nil, on actorQueue: ActorQueue, operation: @escaping @Sendable (isolated ActorType) async throws -> Success, @@ -221,11 +227,11 @@ extension Task { } catch { await delivery.sendFailure(error) } - }, in: executionContext, priority: priority) + }, in: executionContext, name: name, priority: priority) }, ) actorQueue.taskStreamContinuation.yield(task) - self.init(priority: priority) { + self.init(name: name, priority: priority) { try await withTaskCancellationHandler( operation: { await semaphore.signal() @@ -258,12 +264,14 @@ extension Task { /// it only makes it impossible for you to explicitly cancel the task. /// /// - Parameters: + /// - name: Human readable name of the task. /// - priority: The priority of the task. /// Pass `nil` to use the priority from `Task.currentPriority`. /// - actorQueue: The queue on which to enqueue the task. /// - operation: The operation to perform. @discardableResult public init( + name: String? = nil, priority: TaskPriority? = nil, on actorQueue: ActorQueue, operation: @MainActor @escaping () async -> Success, @@ -276,11 +284,11 @@ extension Task { await semaphore.wait() delivery.execute({ @Sendable executionContext in await delivery.sendValue(operation()) - }, in: executionContext, priority: priority) + }, in: executionContext, name: name, priority: priority) }, ) actorQueue.taskStreamContinuation.yield(task) - self.init(priority: priority) { + self.init(name: name, priority: priority) { await withTaskCancellationHandler( operation: { await semaphore.signal() @@ -313,12 +321,14 @@ extension Task { /// it only makes it impossible for you to explicitly cancel the task. /// /// - Parameters: + /// - name: Human readable name of the task. /// - priority: The priority of the task. /// Pass `nil` to use the priority from `Task.currentPriority`. /// - actorQueue: The queue on which to enqueue the task. /// - operation: The operation to perform. @discardableResult public init( + name: String? = nil, priority: TaskPriority? = nil, on actorQueue: ActorQueue, operation: @escaping @MainActor () async throws -> Success, @@ -335,11 +345,11 @@ extension Task { } catch { await delivery.sendFailure(error) } - }, in: executionContext, priority: priority) + }, in: executionContext, name: name, priority: priority) }, ) actorQueue.taskStreamContinuation.yield(task) - self.init(priority: priority) { + self.init(name: name, priority: priority) { try await withTaskCancellationHandler( operation: { await semaphore.signal() diff --git a/Sources/AsyncQueue/FIFOQueue.swift b/Sources/AsyncQueue/FIFOQueue.swift index 194fc37..1775b29 100644 --- a/Sources/AsyncQueue/FIFOQueue.swift +++ b/Sources/AsyncQueue/FIFOQueue.swift @@ -27,12 +27,13 @@ public final class FIFOQueue: Sendable { // MARK: Initialization /// Instantiates a FIFO queue. + /// - Parameter name: Human readable name of the queue. /// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue. - public init(priority: TaskPriority? = nil) { + public init(name: String? = nil, priority: TaskPriority? = nil) { let (taskStream, taskStreamContinuation) = AsyncStream.makeStream() self.taskStreamContinuation = taskStreamContinuation - Task.detached(priority: priority) { + Task.detached(name: name, priority: priority) { for await fifoTask in taskStream { await fifoTask.task() } @@ -79,10 +80,12 @@ extension Task { /// it only makes it impossible for you to explicitly cancel the task. /// /// - Parameters: + /// - name: Human readable name of the task. /// - fifoQueue: The queue on which to enqueue the task. /// - operation: The operation to perform. @discardableResult public init( + name: String? = nil, on fifoQueue: FIFOQueue, @_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async -> Success, ) where Failure == Never { @@ -93,10 +96,10 @@ extension Task { await semaphore.wait() await delivery.execute({ @Sendable delivery in await delivery.sendValue(executeOnce.operation()) - }, in: delivery).value + }, in: delivery, name: name).value } fifoQueue.taskStreamContinuation.yield(task) - self.init { + self.init(name: name) { await withTaskCancellationHandler( operation: { await semaphore.signal() @@ -129,10 +132,12 @@ extension Task { /// it only makes it impossible for you to explicitly cancel the task. /// /// - Parameters: + /// - name: Human readable name of the task. /// - fifoQueue: The queue on which to enqueue the task. /// - operation: The operation to perform. @discardableResult public init( + name: String? = nil, on fifoQueue: FIFOQueue, @_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async throws -> Success, ) where Failure == any Error { @@ -147,10 +152,10 @@ extension Task { } catch { delivery.sendFailure(error) } - }, in: delivery).value + }, in: delivery, name: name).value } fifoQueue.taskStreamContinuation.yield(task) - self.init { + self.init(name: name) { try await withTaskCancellationHandler( operation: { await semaphore.signal() @@ -183,6 +188,7 @@ extension Task { /// it only makes it impossible for you to explicitly cancel the task. /// /// - Parameters: + /// - name: Human readable name of the task. /// - priority: The priority of the task. /// Pass `nil` to use the priority from `Task.currentPriority`. /// - fifoQueue: The queue on which to enqueue the task. @@ -190,6 +196,7 @@ extension Task { /// - operation: The operation to perform. @discardableResult public init( + name: String? = nil, priority: TaskPriority? = nil, on fifoQueue: FIFOQueue, isolatedTo isolatedActor: ActorType, @@ -201,10 +208,10 @@ extension Task { await semaphore.wait() await delivery.execute({ @Sendable isolatedActor in await delivery.sendValue(operation(isolatedActor)) - }, in: isolatedActor, priority: priority).value + }, in: isolatedActor, name: name, priority: priority).value } fifoQueue.taskStreamContinuation.yield(task) - self.init { + self.init(name: name) { await withTaskCancellationHandler( operation: { await semaphore.signal() @@ -237,6 +244,7 @@ extension Task { /// it only makes it impossible for you to explicitly cancel the task. /// /// - Parameters: + /// - name: Human readable name of the task. /// - priority: The priority of the task. /// Pass `nil` to use the priority from `Task.currentPriority`. /// - fifoQueue: The queue on which to enqueue the task. @@ -244,6 +252,7 @@ extension Task { /// - operation: The operation to perform. @discardableResult public init( + name: String? = nil, priority: TaskPriority? = nil, on fifoQueue: FIFOQueue, isolatedTo isolatedActor: ActorType, @@ -259,10 +268,10 @@ extension Task { } catch { await delivery.sendFailure(error) } - }, in: isolatedActor, priority: priority).value + }, in: isolatedActor, name: name, priority: priority).value } fifoQueue.taskStreamContinuation.yield(task) - self.init(priority: priority) { + self.init(name: name, priority: priority) { try await withTaskCancellationHandler( operation: { await semaphore.signal() diff --git a/Sources/AsyncQueue/Utilities/Delivery.swift b/Sources/AsyncQueue/Utilities/Delivery.swift index bae0655..dd6f680 100644 --- a/Sources/AsyncQueue/Utilities/Delivery.swift +++ b/Sources/AsyncQueue/Utilities/Delivery.swift @@ -42,11 +42,12 @@ actor Delivery { func execute( _ operation: sending @escaping (isolated ActorType) async -> Void, in context: isolated ActorType, + name: String? = nil, priority: TaskPriority? = nil, ) -> Task { // In Swift 6, a `Task` enqueued from an actor begins executing immediately on that actor. // Since we're running on our actor's context already, we can just dispatch a Task to get first-enqueued-first-start task execution. - let task = Task(priority: priority) { + let task = Task(name: name, priority: priority) { await operation(context) } taskContainer.withLock {