Skip to content

Commit 8e06d33

Browse files
songhuaixudfed
andauthored
queue task add name (#61)
Co-authored-by: Dan Federman <dfed@me.com>
1 parent 79533e0 commit 8e06d33

File tree

3 files changed

+41
-21
lines changed

3 files changed

+41
-21
lines changed

Sources/AsyncQueue/ActorQueue.swift

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,12 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
5555
// MARK: Initialization
5656

5757
/// Instantiates an actor queue.
58-
public init() {
58+
/// - Parameter name: Human readable name of the queue.
59+
public init(name: String? = nil) {
5960
let (taskStream, taskStreamContinuation) = AsyncStream<ActorTask>.makeStream()
6061
self.taskStreamContinuation = taskStreamContinuation
6162

62-
Task {
63+
Task(name: name) {
6364
// In an ideal world, we would isolate this `for await` loop to the `ActorType`.
6465
// However, there's no good way to do that without retaining the actor and creating a cycle.
6566
for await actorTask in taskStream {
@@ -146,10 +147,13 @@ extension Task {
146147
/// it only makes it impossible for you to explicitly cancel the task.
147148
///
148149
/// - Parameters:
150+
/// - name: Human readable name of the task.
151+
/// - priority: The priority of the operation task.
149152
/// - actorQueue: The queue on which to enqueue the task.
150153
/// - operation: The operation to perform.
151154
@discardableResult
152155
public init<ActorType: Actor>(
156+
name: String? = nil,
153157
priority: TaskPriority? = nil,
154158
on actorQueue: ActorQueue<ActorType>,
155159
operation: @Sendable @escaping (isolated ActorType) async -> Success,
@@ -162,11 +166,11 @@ extension Task {
162166
await semaphore.wait()
163167
delivery.execute({ @Sendable executionContext in
164168
await delivery.sendValue(operation(executionContext))
165-
}, in: executionContext, priority: priority)
169+
}, in: executionContext, name: name, priority: priority)
166170
},
167171
)
168172
actorQueue.taskStreamContinuation.yield(task)
169-
self.init(priority: priority) {
173+
self.init(name: name, priority: priority) {
170174
await withTaskCancellationHandler(
171175
operation: {
172176
await semaphore.signal()
@@ -199,12 +203,14 @@ extension Task {
199203
/// it only makes it impossible for you to explicitly cancel the task.
200204
///
201205
/// - Parameters:
206+
/// - name: Human readable name of the task.
202207
/// - priority: The priority of the task.
203208
/// Pass `nil` to use the priority from `Task.currentPriority`.
204209
/// - actorQueue: The queue on which to enqueue the task.
205210
/// - operation: The operation to perform.
206211
@discardableResult
207212
public init<ActorType: Actor>(
213+
name: String? = nil,
208214
priority: TaskPriority? = nil,
209215
on actorQueue: ActorQueue<ActorType>,
210216
operation: @escaping @Sendable (isolated ActorType) async throws -> Success,
@@ -221,11 +227,11 @@ extension Task {
221227
} catch {
222228
await delivery.sendFailure(error)
223229
}
224-
}, in: executionContext, priority: priority)
230+
}, in: executionContext, name: name, priority: priority)
225231
},
226232
)
227233
actorQueue.taskStreamContinuation.yield(task)
228-
self.init(priority: priority) {
234+
self.init(name: name, priority: priority) {
229235
try await withTaskCancellationHandler(
230236
operation: {
231237
await semaphore.signal()
@@ -258,12 +264,14 @@ extension Task {
258264
/// it only makes it impossible for you to explicitly cancel the task.
259265
///
260266
/// - Parameters:
267+
/// - name: Human readable name of the task.
261268
/// - priority: The priority of the task.
262269
/// Pass `nil` to use the priority from `Task.currentPriority`.
263270
/// - actorQueue: The queue on which to enqueue the task.
264271
/// - operation: The operation to perform.
265272
@discardableResult
266273
public init(
274+
name: String? = nil,
267275
priority: TaskPriority? = nil,
268276
on actorQueue: ActorQueue<MainActor>,
269277
operation: @MainActor @escaping () async -> Success,
@@ -276,11 +284,11 @@ extension Task {
276284
await semaphore.wait()
277285
delivery.execute({ @Sendable executionContext in
278286
await delivery.sendValue(operation())
279-
}, in: executionContext, priority: priority)
287+
}, in: executionContext, name: name, priority: priority)
280288
},
281289
)
282290
actorQueue.taskStreamContinuation.yield(task)
283-
self.init(priority: priority) {
291+
self.init(name: name, priority: priority) {
284292
await withTaskCancellationHandler(
285293
operation: {
286294
await semaphore.signal()
@@ -313,12 +321,14 @@ extension Task {
313321
/// it only makes it impossible for you to explicitly cancel the task.
314322
///
315323
/// - Parameters:
324+
/// - name: Human readable name of the task.
316325
/// - priority: The priority of the task.
317326
/// Pass `nil` to use the priority from `Task.currentPriority`.
318327
/// - actorQueue: The queue on which to enqueue the task.
319328
/// - operation: The operation to perform.
320329
@discardableResult
321330
public init(
331+
name: String? = nil,
322332
priority: TaskPriority? = nil,
323333
on actorQueue: ActorQueue<MainActor>,
324334
operation: @escaping @MainActor () async throws -> Success,
@@ -335,11 +345,11 @@ extension Task {
335345
} catch {
336346
await delivery.sendFailure(error)
337347
}
338-
}, in: executionContext, priority: priority)
348+
}, in: executionContext, name: name, priority: priority)
339349
},
340350
)
341351
actorQueue.taskStreamContinuation.yield(task)
342-
self.init(priority: priority) {
352+
self.init(name: name, priority: priority) {
343353
try await withTaskCancellationHandler(
344354
operation: {
345355
await semaphore.signal()

Sources/AsyncQueue/FIFOQueue.swift

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@ public final class FIFOQueue: Sendable {
2727
// MARK: Initialization
2828

2929
/// Instantiates a FIFO queue.
30+
/// - Parameter name: Human readable name of the queue.
3031
/// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue.
31-
public init(priority: TaskPriority? = nil) {
32+
public init(name: String? = nil, priority: TaskPriority? = nil) {
3233
let (taskStream, taskStreamContinuation) = AsyncStream<FIFOTask>.makeStream()
3334
self.taskStreamContinuation = taskStreamContinuation
3435

35-
Task.detached(priority: priority) {
36+
Task.detached(name: name, priority: priority) {
3637
for await fifoTask in taskStream {
3738
await fifoTask.task()
3839
}
@@ -79,10 +80,12 @@ extension Task {
7980
/// it only makes it impossible for you to explicitly cancel the task.
8081
///
8182
/// - Parameters:
83+
/// - name: Human readable name of the task.
8284
/// - fifoQueue: The queue on which to enqueue the task.
8385
/// - operation: The operation to perform.
8486
@discardableResult
8587
public init(
88+
name: String? = nil,
8689
on fifoQueue: FIFOQueue,
8790
@_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async -> Success,
8891
) where Failure == Never {
@@ -93,10 +96,10 @@ extension Task {
9396
await semaphore.wait()
9497
await delivery.execute({ @Sendable delivery in
9598
await delivery.sendValue(executeOnce.operation())
96-
}, in: delivery).value
99+
}, in: delivery, name: name).value
97100
}
98101
fifoQueue.taskStreamContinuation.yield(task)
99-
self.init {
102+
self.init(name: name) {
100103
await withTaskCancellationHandler(
101104
operation: {
102105
await semaphore.signal()
@@ -129,10 +132,12 @@ extension Task {
129132
/// it only makes it impossible for you to explicitly cancel the task.
130133
///
131134
/// - Parameters:
135+
/// - name: Human readable name of the task.
132136
/// - fifoQueue: The queue on which to enqueue the task.
133137
/// - operation: The operation to perform.
134138
@discardableResult
135139
public init(
140+
name: String? = nil,
136141
on fifoQueue: FIFOQueue,
137142
@_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async throws -> Success,
138143
) where Failure == any Error {
@@ -147,10 +152,10 @@ extension Task {
147152
} catch {
148153
delivery.sendFailure(error)
149154
}
150-
}, in: delivery).value
155+
}, in: delivery, name: name).value
151156
}
152157
fifoQueue.taskStreamContinuation.yield(task)
153-
self.init {
158+
self.init(name: name) {
154159
try await withTaskCancellationHandler(
155160
operation: {
156161
await semaphore.signal()
@@ -183,13 +188,15 @@ extension Task {
183188
/// it only makes it impossible for you to explicitly cancel the task.
184189
///
185190
/// - Parameters:
191+
/// - name: Human readable name of the task.
186192
/// - priority: The priority of the task.
187193
/// Pass `nil` to use the priority from `Task.currentPriority`.
188194
/// - fifoQueue: The queue on which to enqueue the task.
189195
/// - isolatedActor: The actor to which the operation is isolated.
190196
/// - operation: The operation to perform.
191197
@discardableResult
192198
public init<ActorType: Actor>(
199+
name: String? = nil,
193200
priority: TaskPriority? = nil,
194201
on fifoQueue: FIFOQueue,
195202
isolatedTo isolatedActor: ActorType,
@@ -201,10 +208,10 @@ extension Task {
201208
await semaphore.wait()
202209
await delivery.execute({ @Sendable isolatedActor in
203210
await delivery.sendValue(operation(isolatedActor))
204-
}, in: isolatedActor, priority: priority).value
211+
}, in: isolatedActor, name: name, priority: priority).value
205212
}
206213
fifoQueue.taskStreamContinuation.yield(task)
207-
self.init {
214+
self.init(name: name) {
208215
await withTaskCancellationHandler(
209216
operation: {
210217
await semaphore.signal()
@@ -237,13 +244,15 @@ extension Task {
237244
/// it only makes it impossible for you to explicitly cancel the task.
238245
///
239246
/// - Parameters:
247+
/// - name: Human readable name of the task.
240248
/// - priority: The priority of the task.
241249
/// Pass `nil` to use the priority from `Task.currentPriority`.
242250
/// - fifoQueue: The queue on which to enqueue the task.
243251
/// - isolatedActor: The actor to which the operation is isolated.
244252
/// - operation: The operation to perform.
245253
@discardableResult
246254
public init<ActorType: Actor>(
255+
name: String? = nil,
247256
priority: TaskPriority? = nil,
248257
on fifoQueue: FIFOQueue,
249258
isolatedTo isolatedActor: ActorType,
@@ -259,10 +268,10 @@ extension Task {
259268
} catch {
260269
await delivery.sendFailure(error)
261270
}
262-
}, in: isolatedActor, priority: priority).value
271+
}, in: isolatedActor, name: name, priority: priority).value
263272
}
264273
fifoQueue.taskStreamContinuation.yield(task)
265-
self.init(priority: priority) {
274+
self.init(name: name, priority: priority) {
266275
try await withTaskCancellationHandler(
267276
operation: {
268277
await semaphore.signal()

Sources/AsyncQueue/Utilities/Delivery.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,12 @@ actor Delivery<Success: Sendable, Failure: Error> {
4242
func execute<ActorType: Actor>(
4343
_ operation: sending @escaping (isolated ActorType) async -> Void,
4444
in context: isolated ActorType,
45+
name: String? = nil,
4546
priority: TaskPriority? = nil,
4647
) -> Task<Void, Never> {
4748
// In Swift 6, a `Task` enqueued from an actor begins executing immediately on that actor.
4849
// Since we're running on our actor's context already, we can just dispatch a Task to get first-enqueued-first-start task execution.
49-
let task = Task(priority: priority) {
50+
let task = Task(name: name, priority: priority) {
5051
await operation(context)
5152
}
5253
taskContainer.withLock {

0 commit comments

Comments
 (0)