Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable executing FIFO tasks on a desired actor's execution context #6

Merged
merged 5 commits into from
Feb 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions Sources/AsyncQueue/FIFOQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ public final class FIFOQueue: Sendable {
taskStreamContinuation.yield(task)
}

/// Schedules an asynchronous task for execution and immediately returns.
/// The scheduled task will not execute until all prior tasks have completed.
/// - Parameters:
/// - isolatedActor: The actor within which the task is isolated.
/// - task: The task to enqueue.
public func async<ActorType: Actor>(on isolatedActor: ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> Void) {
taskStreamContinuation.yield { await task(isolatedActor) }
}

/// Schedules an asynchronous task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks have completed.
/// - Parameter task: The task to enqueue.
Expand All @@ -70,6 +79,20 @@ public final class FIFOQueue: Sendable {
}
}

/// Schedules an asynchronous task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks have completed.
/// - Parameters:
/// - isolatedActor: The actor within which the task is isolated.
/// - task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
await withUnsafeContinuation { continuation in
taskStreamContinuation.yield {
continuation.resume(returning: await task(isolatedActor))
}
}
}

/// Schedules an asynchronous throwing task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks have completed.
/// - Parameter task: The task to enqueue.
Expand All @@ -86,6 +109,24 @@ public final class FIFOQueue: Sendable {
}
}

/// Schedules an asynchronous throwing task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks have completed.
/// - Parameters:
/// - isolatedActor: The actor within which the task is isolated.
/// - task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
try await withUnsafeThrowingContinuation { continuation in
taskStreamContinuation.yield {
do {
continuation.resume(returning: try await task(isolatedActor))
} catch {
continuation.resume(throwing: error)
}
}
}
}

// MARK: Private

private let taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation
Expand Down
206 changes: 206 additions & 0 deletions Tests/AsyncQueueTests/FIFOQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,32 @@ final class FIFOQueueTests: XCTestCase {
await systemUnderTest.await { /* Drain the queue */ }
}

func test_asyncOn_sendsEventsInOrder() async {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of these new tests mirror the tests directly above them that tested the API without the on isolatedActor parameter.

let counter = Counter()
for iteration in 1...1_000 {
systemUnderTest.async(on: counter) { counter in
counter.incrementAndExpectCount(equals: iteration)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice that you don't need to await.

}
}
await systemUnderTest.await { /* Drain the queue */ }
}

func test_async_asyncOn_sendEventsInOrder() async {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let counter = Counter()
for iteration in 1...1_000 {
if iteration % 2 == 0 {
systemUnderTest.async {
await counter.incrementAndExpectCount(equals: iteration)
}
} else {
systemUnderTest.async(on: counter) { counter in
counter.incrementAndExpectCount(equals: iteration)
}
}
}
await systemUnderTest.await { /* Drain the queue */ }
}

func test_async_executesAsyncBlocksAtomically() async {
let semaphore = Semaphore()
for _ in 1...1_000 {
Expand All @@ -65,7 +91,54 @@ final class FIFOQueueTests: XCTestCase {
await systemUnderTest.await { /* Drain the queue */ }
}

func test_asyncOn_executesAsyncBlocksAtomically() async {
let semaphore = Semaphore()
for _ in 1...1_000 {
systemUnderTest.async(on: semaphore) { semaphore in
let isWaiting = semaphore.isWaiting
// This test will fail occasionally if we aren't executing atomically.
// You can prove this to yourself by replacing `systemUnderTest.async` above with `Task`.
XCTAssertFalse(isWaiting)
// Signal the semaphore before or after we wait – let the scheduler decide.
Task {
semaphore.signal()
}
// Wait for the concurrent task to complete.
await semaphore.wait()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we still need an await since wait() may suspend. Is that right?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct! wait is an async (i.e. possibly suspending) method on the semaphore, whereas signal is not. So we need to await the async method.

}
}
await systemUnderTest.await { /* Drain the queue */ }
}

func test_async_isNotReentrant() async {
let counter = Counter()
systemUnderTest.async { [systemUnderTest] in
systemUnderTest.async {
await counter.incrementAndExpectCount(equals: 2)
}
await counter.incrementAndExpectCount(equals: 1)
systemUnderTest.async {
await counter.incrementAndExpectCount(equals: 3)
}
}
await systemUnderTest.await { /* Drain the queue */ }
}

func test_asyncOn_isNotReentrant() async {
let counter = Counter()
systemUnderTest.async(on: counter) { [systemUnderTest] counter in
systemUnderTest.async(on: counter) { counter in
counter.incrementAndExpectCount(equals: 2)
}
counter.incrementAndExpectCount(equals: 1)
systemUnderTest.async(on: counter) { counter in
counter.incrementAndExpectCount(equals: 3)
}
}
await systemUnderTest.await { /* Drain the queue */ }
}

func test_await_async_areNotReentrant() async {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test was previously called test_async_isNotReentrant, but technically it was testing that await and async aren't reentrant rather than just testing that async is not reentrant.

let counter = Counter()
await systemUnderTest.await { [systemUnderTest] in
systemUnderTest.async {
Expand All @@ -79,6 +152,48 @@ final class FIFOQueueTests: XCTestCase {
await systemUnderTest.await { /* Drain the queue */ }
}

func test_awaitOn_asyncOn_areNotReentrant() async {
let counter = Counter()
await systemUnderTest.await(on: counter) { [systemUnderTest] counter in
systemUnderTest.async(on: counter) { counter in
counter.incrementAndExpectCount(equals: 2)
}
counter.incrementAndExpectCount(equals: 1)
systemUnderTest.async(on: counter) { counter in
counter.incrementAndExpectCount(equals: 3)
}
}
await systemUnderTest.await { /* Drain the queue */ }
}

func test_await_asyncOn_areNotReentrant() async {
let counter = Counter()
await systemUnderTest.await { [systemUnderTest] in
systemUnderTest.async(on: counter) { counter in
counter.incrementAndExpectCount(equals: 2)
}
await counter.incrementAndExpectCount(equals: 1)
systemUnderTest.async(on: counter) { counter in
counter.incrementAndExpectCount(equals: 3)
}
}
await systemUnderTest.await { /* Drain the queue */ }
}

func test_awaitOn_async_areNotReentrant() async {
let counter = Counter()
await systemUnderTest.await(on: counter) { [systemUnderTest] counter in
systemUnderTest.async {
await counter.incrementAndExpectCount(equals: 2)
}
counter.incrementAndExpectCount(equals: 1)
systemUnderTest.async {
await counter.incrementAndExpectCount(equals: 3)
}
}
await systemUnderTest.await { /* Drain the queue */ }
}

func test_async_executesAfterReceiverIsDeallocated() async {
var systemUnderTest: FIFOQueue? = FIFOQueue()
let counter = Counter()
Expand All @@ -104,6 +219,31 @@ final class FIFOQueueTests: XCTestCase {
await waitForExpectations(timeout: 1.0)
}

func test_asyncOn_executesAfterReceiverIsDeallocated() async {
var systemUnderTest: FIFOQueue? = FIFOQueue()
let counter = Counter()
let expectation = self.expectation(description: #function)
let semaphore = Semaphore()
systemUnderTest?.async(on: counter) { counter in
// Make the queue wait.
await semaphore.wait()
counter.incrementAndExpectCount(equals: 1)
}
systemUnderTest?.async(on: counter) { counter in
// This async task should not execute until the semaphore is released.
counter.incrementAndExpectCount(equals: 2)
expectation.fulfill()
}
weak var queue = systemUnderTest
// Nil out our reference to the queue to show that the enqueued tasks will still complete
systemUnderTest = nil
XCTAssertNil(queue)
// Signal the semaphore to unlock the remaining enqueued tasks.
await semaphore.signal()

await waitForExpectations(timeout: 1.0)
}

func test_async_doesNotRetainTaskAfterExecution() async {
final class Reference: Sendable {}
final class ReferenceHolder: @unchecked Sendable {
Expand Down Expand Up @@ -132,6 +272,34 @@ final class FIFOQueueTests: XCTestCase {
XCTAssertNil(weakReference)
}

func test_asyncOn_doesNotRetainTaskAfterExecution() async {
final class Reference: Sendable {}
final class ReferenceHolder: @unchecked Sendable {
var reference: Reference? = Reference()
}
let referenceHolder = ReferenceHolder()
weak var weakReference = referenceHolder.reference
let asyncSemaphore = Semaphore()
let syncSemaphore = Semaphore()
systemUnderTest.async(on: syncSemaphore) { [reference = referenceHolder.reference] syncSemaphore in
// Now that we've started the task and captured the reference, release the synchronous code.
syncSemaphore.signal()
// Wait for the synchronous setup to complete and the reference to be nil'd out.
await asyncSemaphore.wait()
// Retain the unsafe counter until the task is completed.
_ = reference
}
// Wait for the asynchronous task to start.
await syncSemaphore.wait()
referenceHolder.reference = nil
XCTAssertNotNil(weakReference)
// Allow the enqueued task to complete.
await asyncSemaphore.signal()
// Make sure the task has completed.
await systemUnderTest.await { /* Drain the queue */ }
XCTAssertNil(weakReference)
}

func test_await_sendsEventsInOrder() async {
let counter = Counter()
for iteration in 1...1_000 {
Expand All @@ -152,12 +320,38 @@ final class FIFOQueueTests: XCTestCase {
await systemUnderTest.await { /* Drain the queue */ }
}

func test_awaitOn_sendsEventsInOrder() async {
let counter = Counter()
for iteration in 1...1_000 {
systemUnderTest.async {
await counter.incrementAndExpectCount(equals: iteration)
}

guard iteration % 25 == 0 else {
// Keep sending async events to the queue.
continue
}

await systemUnderTest.await(on: counter) { counter in
let count = counter.count
XCTAssertEqual(count, iteration)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}
await systemUnderTest.await { /* Drain the queue */ }
}

func test_await_canReturn() async {
let expectedValue = UUID()
let returnedValue = await systemUnderTest.await { expectedValue }
XCTAssertEqual(expectedValue, returnedValue)
}

func test_awaitOn_canReturn() async {
let expectedValue = UUID()
let returnedValue = await systemUnderTest.await(on: Counter()) { _ in expectedValue }
XCTAssertEqual(expectedValue, returnedValue)
}

func test_await_canThrow() async {
struct TestError: Error, Equatable {
private let identifier = UUID()
Expand All @@ -170,6 +364,18 @@ final class FIFOQueueTests: XCTestCase {
}
}

func test_awaitOn_canThrow() async {
struct TestError: Error, Equatable {
private let identifier = UUID()
}
let expectedError = TestError()
do {
try await systemUnderTest.await(on: Counter()) { _ in throw expectedError }
} catch {
XCTAssertEqual(error as? TestError, expectedError)
}
}

// MARK: Private

private var systemUnderTest = FIFOQueue()
Expand Down