Skip to content

Commit

Permalink
feat(TaskQueue): allow adding task to queue wihout waiting for comp…
Browse files Browse the repository at this point in the history
…letion
  • Loading branch information
soumyamahunt committed Aug 23, 2022
1 parent c66bb81 commit d8ee18a
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 6 deletions.
52 changes: 52 additions & 0 deletions Sources/AsyncObjects/TaskQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,58 @@ public actor TaskQueue: AsyncObject {
}
}

/// Adds the given throwing operation to queue to be executed asynchronously
/// based on the priority and flags.
///
/// Immediately runs the provided operation if queue isn't blocked by any task,
/// otherwise adds operation to queue to be executed later.
///
/// - Parameters:
/// - priority: The priority with which operation executed. Pass `nil` to use the priority
/// from execution context(`Task.currentPriority`).
/// - flags: Additional attributes to apply when executing the operation.
/// For a list of possible values, see ``Flags``.
/// - operation: The throwing operation to perform.
public nonisolated func addTask<T: Sendable>(
priority: TaskPriority? = nil,
flags: Flags = [],
operation: @Sendable @escaping () async throws -> T
) {
Task {
try await exec(
priority: priority,
flags: flags,
operation: operation
)
}
}

/// Adds the given non-throwing operation to queue to be executed asynchronously
/// based on the priority and flags.
///
/// Immediately runs the provided operation if queue isn't blocked by any task,
/// otherwise adds operation to queue to be executed later.
///
/// - Parameters:
/// - priority: The priority with which operation executed. Pass `nil` to use the priority
/// from execution context(`Task.currentPriority`).
/// - flags: Additional attributes to apply when executing the operation.
/// For a list of possible values, see ``Flags``.
/// - operation: The non-throwing operation to perform.
public nonisolated func addTask<T: Sendable>(
priority: TaskPriority? = nil,
flags: Flags = [],
operation: @Sendable @escaping () async -> T
) {
Task {
await exec(
priority: priority,
flags: flags,
operation: operation
)
}
}

/// Signalling on queue does nothing.
/// Only added to satisfy ``AsyncObject`` requirements.
public func signal() {
Expand Down
6 changes: 3 additions & 3 deletions Tests/AsyncObjectsTests/AsyncCountdownEventTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class AsyncCountdownEventTests: XCTestCase {
let event = AsyncCountdownEvent(until: 3)
await event.increment(by: 10)
Self.signalCountdownEvent(event, times: 10)
await checkExecInterval(durationInRange: 3.5..<4) {
await Self.checkExecInterval(durationInRange: 3.5..<4) {
await event.wait()
}
}
Expand All @@ -78,7 +78,7 @@ class AsyncCountdownEventTests: XCTestCase {
let event = AsyncCountdownEvent(until: 3, initial: 2)
await event.increment(by: 10)
Self.signalCountdownEvent(event, times: 10)
await checkExecInterval(durationInRange: 4.5..<5) {
await Self.checkExecInterval(durationInRange: 4.5..<5) {
await event.wait()
}
}
Expand Down Expand Up @@ -154,7 +154,7 @@ class AsyncCountdownEventTests: XCTestCase {
await event.reset(to: 2)
}
Self.signalCountdownEvent(event, times: 10)
await checkExecInterval(durationInRange: 2.5...3.1) {
await Self.checkExecInterval(durationInRange: 2.5...3.1) {
await event.wait()
}
}
Expand Down
5 changes: 4 additions & 1 deletion Tests/AsyncObjectsTests/TaskOperationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ class TaskOperationTests: XCTestCase {
(try? await Self.sleep(seconds: 3)) != nil
}
operation.signal()
await checkExecInterval(durationInRange: ...3, for: operation.wait)
await Self.checkExecInterval(
durationInRange: ...3,
for: operation.wait
)
}

func testTaskOperationAsyncWaitTimeout() async throws {
Expand Down
112 changes: 112 additions & 0 deletions Tests/AsyncObjectsTests/TaskQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -602,4 +602,116 @@ class TaskQueueTests: XCTestCase {
}
}
}

func testCancellableAndNonCancellableTasksOnSingleQueue() async throws {
let queue = TaskQueue()
await Self.checkExecInterval(durationInSeconds: 0) {
await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await queue.exec {
try await Self.sleep(seconds: 2)
}
}
group.addTask {
try await queue.exec {
try await Self.sleep(seconds: 3)
}
}
group.addTask {
await queue.exec {
do {
try await Self.sleep(seconds: 4)
XCTFail("Unexpected task progression")
} catch {
XCTAssertTrue(
type(of: error) == CancellationError.self
)
}
}
}
group.cancelAll()
}
}
}

func testCancellableAndNonCancellableTasksOnSingleQueueWithBarrier()
async throws
{
let queue = TaskQueue()
try await Self.checkExecInterval(durationInSeconds: 3) {
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await queue.exec {
try await Self.sleep(seconds: 1)
}
}
group.addTask {
try await queue.exec {
try await Self.sleep(seconds: 2)
}
}
group.addTask {
try await queue.exec {
try await Self.sleep(seconds: 3)
}
}
// Make sure previous tasks started
try await Self.sleep(forSeconds: 0.01)
await group.addTaskAndStart {
try await queue.exec(flags: .barrier) {
try await Self.sleep(seconds: 2)
}
}
// Make sure previous tasks started
try await Self.sleep(forSeconds: 0.01)
group.addTask {
try await queue.exec {
try await Self.sleep(seconds: 2)
}
}
group.addTask {
await queue.exec {
do {
try await Self.sleep(seconds: 3)
XCTFail("Unexpected task progression")
} catch {
XCTAssertTrue(
type(of: error) == CancellationError.self
)
}
}
}
group.addTask {
await queue.exec {
do {
try await Self.sleep(seconds: 4)
XCTFail("Unexpected task progression")
} catch {
XCTAssertTrue(
type(of: error) == CancellationError.self
)
}
}
}

for _ in 0..<3 { try await group.next() }
group.cancelAll()
}
}
}

func testTaskExecutionWithJustAddingTasks() async throws {
let queue = TaskQueue()
queue.addTask(flags: .barrier) {
try await Self.sleep(seconds: 2)
}
// Make sure previous tasks started
try await Self.sleep(forSeconds: 0.01)
await Self.checkExecInterval(durationInSeconds: 2) {
queue.addTask {
try! await Self.sleep(seconds: 2)
}
await queue.wait()
}
}
}
4 changes: 2 additions & 2 deletions Tests/AsyncObjectsTests/XCTestCase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ extension XCTestCase {
)
}

func checkExecInterval<R: RangeExpression>(
static func checkExecInterval<R: RangeExpression>(
durationInRange range: R,
for task: () async throws -> Void
) async rethrows where R.Bound == Int {
Expand All @@ -51,7 +51,7 @@ extension XCTestCase {
)
}

func checkExecInterval<R: RangeExpression>(
static func checkExecInterval<R: RangeExpression>(
durationInRange range: R,
for task: () async throws -> Void
) async rethrows where R.Bound == Double {
Expand Down

0 comments on commit d8ee18a

Please sign in to comment.