Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions Sources/AsyncQueue/ActorQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
// MARK: Initialization

/// Instantiates an actor queue.
public init() {
/// - Parameter name: Human readable name of the queue.
public init(name: String? = nil) {
let (taskStream, taskStreamContinuation) = AsyncStream<ActorTask>.makeStream()
self.taskStreamContinuation = taskStreamContinuation

Task {
Task(name: name) {
// In an ideal world, we would isolate this `for await` loop to the `ActorType`.
// However, there's no good way to do that without retaining the actor and creating a cycle.
for await actorTask in taskStream {
Expand Down Expand Up @@ -146,10 +147,13 @@ extension Task {
/// it only makes it impossible for you to explicitly cancel the task.
///
/// - Parameters:
/// - name: Human readable name of the task.
/// - priority: The priority of the operation task.
/// - actorQueue: The queue on which to enqueue the task.
/// - operation: The operation to perform.
@discardableResult
public init<ActorType: Actor>(
name: String? = nil,
priority: TaskPriority? = nil,
on actorQueue: ActorQueue<ActorType>,
operation: @Sendable @escaping (isolated ActorType) async -> Success,
Expand All @@ -162,11 +166,11 @@ extension Task {
await semaphore.wait()
delivery.execute({ @Sendable executionContext in
await delivery.sendValue(operation(executionContext))
}, in: executionContext, priority: priority)
}, in: executionContext, name: name, priority: priority)
},
)
actorQueue.taskStreamContinuation.yield(task)
self.init(priority: priority) {
self.init(name: name, priority: priority) {
await withTaskCancellationHandler(
operation: {
await semaphore.signal()
Expand Down Expand Up @@ -199,12 +203,14 @@ extension Task {
/// it only makes it impossible for you to explicitly cancel the task.
///
/// - Parameters:
/// - name: Human readable name of the task.
/// - priority: The priority of the task.
/// Pass `nil` to use the priority from `Task.currentPriority`.
/// - actorQueue: The queue on which to enqueue the task.
/// - operation: The operation to perform.
@discardableResult
public init<ActorType: Actor>(
name: String? = nil,
priority: TaskPriority? = nil,
on actorQueue: ActorQueue<ActorType>,
operation: @escaping @Sendable (isolated ActorType) async throws -> Success,
Expand All @@ -221,11 +227,11 @@ extension Task {
} catch {
await delivery.sendFailure(error)
}
}, in: executionContext, priority: priority)
}, in: executionContext, name: name, priority: priority)
},
)
actorQueue.taskStreamContinuation.yield(task)
self.init(priority: priority) {
self.init(name: name, priority: priority) {
try await withTaskCancellationHandler(
operation: {
await semaphore.signal()
Expand Down Expand Up @@ -258,12 +264,14 @@ extension Task {
/// it only makes it impossible for you to explicitly cancel the task.
///
/// - Parameters:
/// - name: Human readable name of the task.
/// - priority: The priority of the task.
/// Pass `nil` to use the priority from `Task.currentPriority`.
/// - actorQueue: The queue on which to enqueue the task.
/// - operation: The operation to perform.
@discardableResult
public init(
name: String? = nil,
priority: TaskPriority? = nil,
on actorQueue: ActorQueue<MainActor>,
operation: @MainActor @escaping () async -> Success,
Expand All @@ -276,11 +284,11 @@ extension Task {
await semaphore.wait()
delivery.execute({ @Sendable executionContext in
await delivery.sendValue(operation())
}, in: executionContext, priority: priority)
}, in: executionContext, name: name, priority: priority)
},
)
actorQueue.taskStreamContinuation.yield(task)
self.init(priority: priority) {
self.init(name: name, priority: priority) {
await withTaskCancellationHandler(
operation: {
await semaphore.signal()
Expand Down Expand Up @@ -313,12 +321,14 @@ extension Task {
/// it only makes it impossible for you to explicitly cancel the task.
///
/// - Parameters:
/// - name: Human readable name of the task.
/// - priority: The priority of the task.
/// Pass `nil` to use the priority from `Task.currentPriority`.
/// - actorQueue: The queue on which to enqueue the task.
/// - operation: The operation to perform.
@discardableResult
public init(
name: String? = nil,
priority: TaskPriority? = nil,
on actorQueue: ActorQueue<MainActor>,
operation: @escaping @MainActor () async throws -> Success,
Expand All @@ -335,11 +345,11 @@ extension Task {
} catch {
await delivery.sendFailure(error)
}
}, in: executionContext, priority: priority)
}, in: executionContext, name: name, priority: priority)
},
)
actorQueue.taskStreamContinuation.yield(task)
self.init(priority: priority) {
self.init(name: name, priority: priority) {
try await withTaskCancellationHandler(
operation: {
await semaphore.signal()
Expand Down
29 changes: 19 additions & 10 deletions Sources/AsyncQueue/FIFOQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ public final class FIFOQueue: Sendable {
// MARK: Initialization

/// Instantiates a FIFO queue.
/// - Parameter name: Human readable name of the queue.
/// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue.
public init(priority: TaskPriority? = nil) {
public init(name: String? = nil, priority: TaskPriority? = nil) {
let (taskStream, taskStreamContinuation) = AsyncStream<FIFOTask>.makeStream()
self.taskStreamContinuation = taskStreamContinuation

Task.detached(priority: priority) {
Task.detached(name: name, priority: priority) {
for await fifoTask in taskStream {
await fifoTask.task()
}
Expand Down Expand Up @@ -79,10 +80,12 @@ extension Task {
/// it only makes it impossible for you to explicitly cancel the task.
///
/// - Parameters:
/// - name: Human readable name of the task.
/// - fifoQueue: The queue on which to enqueue the task.
/// - operation: The operation to perform.
@discardableResult
public init(
name: String? = nil,
on fifoQueue: FIFOQueue,
@_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async -> Success,
) where Failure == Never {
Expand All @@ -93,10 +96,10 @@ extension Task {
await semaphore.wait()
await delivery.execute({ @Sendable delivery in
await delivery.sendValue(executeOnce.operation())
}, in: delivery).value
}, in: delivery, name: name).value
}
fifoQueue.taskStreamContinuation.yield(task)
self.init {
self.init(name: name) {
await withTaskCancellationHandler(
operation: {
await semaphore.signal()
Expand Down Expand Up @@ -129,10 +132,12 @@ extension Task {
/// it only makes it impossible for you to explicitly cancel the task.
///
/// - Parameters:
/// - name: Human readable name of the task.
/// - fifoQueue: The queue on which to enqueue the task.
/// - operation: The operation to perform.
@discardableResult
public init(
name: String? = nil,
on fifoQueue: FIFOQueue,
@_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async throws -> Success,
) where Failure == any Error {
Expand All @@ -147,10 +152,10 @@ extension Task {
} catch {
delivery.sendFailure(error)
}
}, in: delivery).value
}, in: delivery, name: name).value
}
fifoQueue.taskStreamContinuation.yield(task)
self.init {
self.init(name: name) {
try await withTaskCancellationHandler(
operation: {
await semaphore.signal()
Expand Down Expand Up @@ -183,13 +188,15 @@ extension Task {
/// it only makes it impossible for you to explicitly cancel the task.
///
/// - Parameters:
/// - name: Human readable name of the task.
/// - priority: The priority of the task.
/// Pass `nil` to use the priority from `Task.currentPriority`.
/// - fifoQueue: The queue on which to enqueue the task.
/// - isolatedActor: The actor to which the operation is isolated.
/// - operation: The operation to perform.
@discardableResult
public init<ActorType: Actor>(
name: String? = nil,
priority: TaskPriority? = nil,
on fifoQueue: FIFOQueue,
isolatedTo isolatedActor: ActorType,
Expand All @@ -201,10 +208,10 @@ extension Task {
await semaphore.wait()
await delivery.execute({ @Sendable isolatedActor in
await delivery.sendValue(operation(isolatedActor))
}, in: isolatedActor, priority: priority).value
}, in: isolatedActor, name: name, priority: priority).value
}
fifoQueue.taskStreamContinuation.yield(task)
self.init {
self.init(name: name) {
await withTaskCancellationHandler(
operation: {
await semaphore.signal()
Expand Down Expand Up @@ -237,13 +244,15 @@ extension Task {
/// it only makes it impossible for you to explicitly cancel the task.
///
/// - Parameters:
/// - name: Human readable name of the task.
/// - priority: The priority of the task.
/// Pass `nil` to use the priority from `Task.currentPriority`.
/// - fifoQueue: The queue on which to enqueue the task.
/// - isolatedActor: The actor to which the operation is isolated.
/// - operation: The operation to perform.
@discardableResult
public init<ActorType: Actor>(
name: String? = nil,
priority: TaskPriority? = nil,
on fifoQueue: FIFOQueue,
isolatedTo isolatedActor: ActorType,
Expand All @@ -259,10 +268,10 @@ extension Task {
} catch {
await delivery.sendFailure(error)
}
}, in: isolatedActor, priority: priority).value
}, in: isolatedActor, name: name, priority: priority).value
}
fifoQueue.taskStreamContinuation.yield(task)
self.init(priority: priority) {
self.init(name: name, priority: priority) {
try await withTaskCancellationHandler(
operation: {
await semaphore.signal()
Expand Down
3 changes: 2 additions & 1 deletion Sources/AsyncQueue/Utilities/Delivery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ actor Delivery<Success: Sendable, Failure: Error> {
func execute<ActorType: Actor>(
_ operation: sending @escaping (isolated ActorType) async -> Void,
in context: isolated ActorType,
name: String? = nil,
priority: TaskPriority? = nil,
) -> Task<Void, Never> {
// In Swift 6, a `Task` enqueued from an actor begins executing immediately on that actor.
// Since we're running on our actor's context already, we can just dispatch a Task to get first-enqueued-first-start task execution.
let task = Task(priority: priority) {
let task = Task(name: name, priority: priority) {
await operation(context)
}
taskContainer.withLock {
Expand Down