Skip to content

Commit

Permalink
refactor(AsyncObject)!: propagate cancellation error instead of swa…
Browse files Browse the repository at this point in the history
…llowing (#8)

* refactor(`AsyncObject`)!: propagate cancellation error instead of swallowing

* wip: add delay to deinit tests

* wip: add delay for tests where needed

* wip: make future initializer synchrnous

* refactor: remove logic duplication for continuation management

* refactor: remove unnecessary class from test

* refactor: refactor test helper functions

* fix: manage cancelling operation before starting properly

* deps: update swift-format with Swift 5.7 support

* wip: refactor safe continuation

* style: add logging parameters to public interfaces

* style: add logging parameters to public interfaces

* wip: remove `@preconcurrency` from `Foundation` for Swift >=5.7

* wip: standardize method names
  • Loading branch information
soumyamahunt committed Nov 12, 2022
1 parent bca8299 commit 9f7f243
Show file tree
Hide file tree
Showing 32 changed files with 3,066 additions and 1,824 deletions.
895 changes: 471 additions & 424 deletions AsyncObjects.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ let package = Package(
dependencies: [
.package(url: "\(appleGitHub)/swift-collections.git", from: "1.0.0"),
.package(url: "\(appleGitHub)/swift-docc-plugin", from: "1.0.0"),
.package(url: "\(appleGitHub)/swift-format", from: "0.50600.1"),
.package(url: "\(appleGitHub)/swift-format", from: "0.50700.0"),
],
targets: [
.target(
Expand All @@ -42,7 +42,7 @@ let package = Package(
]
)

var swiftSettings: [SwiftSetting] {
var swiftSettings: [SwiftSetting] = {
var swiftSettings: [SwiftSetting] = []

if ProcessInfo.processInfo.environment[
Expand Down Expand Up @@ -77,9 +77,9 @@ var swiftSettings: [SwiftSetting] {
}

return swiftSettings
}
}()

var testingSwiftSettings: [SwiftSetting] {
var testingSwiftSettings: [SwiftSetting] = {
var swiftSettings: [SwiftSetting] = []

if ProcessInfo.processInfo.environment[
Expand All @@ -96,4 +96,4 @@ var testingSwiftSettings: [SwiftSetting] {
}

return swiftSettings
}
}()
190 changes: 140 additions & 50 deletions Sources/AsyncObjects/AsyncCountdownEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Foundation
#else
@preconcurrency import Foundation
#endif

import OrderedCollections

/// An event object that controls access to a resource between high and low priority tasks
Expand All @@ -15,20 +16,42 @@ import OrderedCollections
/// You can indicate high priority usage of resource by using ``increment(by:)`` method,
/// and indicate free of resource by calling ``signal(repeat:)`` or ``signal()`` methods.
/// For low priority resource usage or detect resource idling use ``wait()`` method
/// or its timeout variation ``wait(forNanoseconds:)``.
/// or its timeout variation ``wait(forNanoseconds:)``:
///
/// ```swift
/// // create event with initial count and count down limit
/// let event = AsyncCountdownEvent()
/// // increment countdown count from high priority tasks
/// event.increment(by: 1)
///
/// // wait for countdown signal from low priority tasks,
/// // fails only if task cancelled
/// try await event.wait()
/// // or wait with some timeout
/// try await event.wait(forNanoseconds: 1_000_000_000)
///
/// // signal countdown after completing high priority tasks
/// event.signal()
/// ```
///
/// Use the ``limit`` parameter to indicate concurrent low priority usage, i.e. if limit set to zero,
/// only one low priority usage allowed at one time.
public actor AsyncCountdownEvent: AsyncObject {
public actor AsyncCountdownEvent: AsyncObject, ContinuableCollection {
/// The suspended tasks continuation type.
@usableFromInline
typealias Continuation = SafeContinuation<GlobalContinuation<Void, Error>>
internal typealias Continuation = SafeContinuation<
GlobalContinuation<Void, Error>
>
/// The platform dependent lock used to synchronize continuations tracking.
@usableFromInline
let locker: Locker = .init()
internal let locker: Locker = .init()
/// The continuations stored with an associated key for all the suspended task that are waiting to be resumed.
@usableFromInline
private(set) var continuations: OrderedDictionary<UUID, Continuation> = [:]
internal private(set) var continuations:
OrderedDictionary<
UUID,
Continuation
> = [:]
/// The limit up to which the countdown counts and triggers event.
///
/// By default this is set to zero and can be changed during initialization.
Expand All @@ -42,7 +65,7 @@ public actor AsyncCountdownEvent: AsyncObject {
///
/// Can be changed after initialization
/// by using ``reset(to:)`` method.
public private(set) var initialCount: UInt
public var initialCount: UInt
/// Indicates whether countdown event current count is within ``limit``.
///
/// Queued tasks are resumed from suspension when event is set and until current count exceeds limit.
Expand All @@ -54,13 +77,13 @@ public actor AsyncCountdownEvent: AsyncObject {
///
/// - Returns: Whether to wait to be resumed later.
@inlinable
func _wait() -> Bool { !isSet || !continuations.isEmpty }
internal func shouldWait() -> Bool { !isSet || !continuations.isEmpty }

/// Resume provided continuation with additional changes based on the associated flags.
///
/// - Parameter continuation: The queued continuation to resume.
@inlinable
func _resumeContinuation(_ continuation: Continuation) {
internal func resumeContinuation(_ continuation: Continuation) {
currentCount += 1
continuation.resume()
}
Expand All @@ -71,12 +94,12 @@ public actor AsyncCountdownEvent: AsyncObject {
/// - continuation: The `continuation` to add.
/// - key: The key in the map.
@inlinable
func _addContinuation(
internal func addContinuation(
_ continuation: Continuation,
withKey key: UUID
) {
guard !continuation.resumed else { return }
guard _wait() else { _resumeContinuation(continuation); return }
guard shouldWait() else { resumeContinuation(continuation); return }
continuations[key] = continuation
}

Expand All @@ -85,49 +108,52 @@ public actor AsyncCountdownEvent: AsyncObject {
///
/// - Parameter key: The key in the map.
@inlinable
func _removeContinuation(withKey key: UUID) {
internal func removeContinuation(withKey key: UUID) {
continuations.removeValue(forKey: key)
}

/// Decrements countdown count by the provided number.
///
/// - Parameter number: The number to decrement count by.
@inlinable
func _decrementCount(by number: UInt = 1) {
defer { _resumeContinuations() }
internal func decrementCount(by number: UInt = 1) {
defer { resumeContinuations() }
guard currentCount > 0 else { return }
currentCount -= number
}

/// Resume previously waiting continuations for countdown event.
@inlinable
func _resumeContinuations() {
internal func resumeContinuations() {
while !continuations.isEmpty && isSet {
let (_, continuation) = continuations.removeFirst()
_resumeContinuation(continuation)
resumeContinuation(continuation)
}
}

/// Suspends the current task, then calls the given closure with a throwing continuation for the current task.
/// Continuation can be cancelled with error if current task is cancelled, by invoking `_removeContinuation`.
/// Increments the countdown event current count by the specified value.
///
/// Spins up a new continuation and requests to track it with key by invoking `_addContinuation`.
/// This operation cooperatively checks for cancellation and reacting to it by invoking `_removeContinuation`.
/// Continuation can be resumed with error and some cleanup code can be run here.
/// - Parameter count: The value by which to increase ``currentCount``.
@inlinable
internal func incrementCount(by count: UInt = 1) {
self.currentCount += count
}

/// Resets current count to initial count.
@inlinable
internal func resetCount() {
self.currentCount = initialCount
resumeContinuations()
}

/// Resets initial count and current count to specified value.
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
/// - Parameter count: The new initial count.
@inlinable
nonisolated func _withPromisedContinuation() async throws {
let key = UUID()
try await Continuation.withCancellation(synchronizedWith: locker) {
Task { [weak self] in
await self?._removeContinuation(withKey: key)
}
} operation: { continuation in
Task { [weak self] in
await self?._addContinuation(continuation, withKey: key)
}
}
internal func resetCount(to count: UInt) {
initialCount = count
self.currentCount = count
resumeContinuations()
}

// MARK: Public
Expand Down Expand Up @@ -158,47 +184,97 @@ public actor AsyncCountdownEvent: AsyncObject {
/// Use this to indicate usage of resource from high priority tasks.
///
/// - Parameter count: The value by which to increase ``currentCount``.
public func increment(by count: UInt = 1) {
self.currentCount += count
public nonisolated func increment(
by count: UInt = 1,
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
Task { await incrementCount(by: count) }
}

/// Resets current count to initial count.
///
/// If the current count becomes less or equal to limit, multiple queued tasks
/// are resumed from suspension until current count exceeds limit.
public func reset() {
self.currentCount = initialCount
_resumeContinuations()
///
/// - Parameters:
/// - file: The file reset originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
/// - function: The function reset originates from (there's usually no need to
/// pass it explicitly as it defaults to `#function`).
/// - line: The line reset originates from (there's usually no need to pass it
/// explicitly as it defaults to `#line`).
public nonisolated func reset(
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
Task { await resetCount() }
}

/// Resets initial count and current count to specified value.
///
/// If the current count becomes less or equal to limit, multiple queued tasks
/// are resumed from suspension until current count exceeds limit.
///
/// - Parameter count: The new initial count.
public func reset(to count: UInt) {
initialCount = count
self.currentCount = count
_resumeContinuations()
/// - Parameters:
/// - count: The new initial count.
/// - file: The file reset originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
/// - function: The function reset originates from (there's usually no need to
/// pass it explicitly as it defaults to `#function`).
/// - line: The line reset originates from (there's usually no need to pass it
/// explicitly as it defaults to `#line`).
public nonisolated func reset(
to count: UInt,
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
Task { await resetCount(to: count) }
}

/// Registers a signal (decrements) with the countdown event.
///
/// Decrement the countdown. If the current count becomes less or equal to limit,
/// one queued task is resumed from suspension.
public func signal() {
signal(repeat: 1)
///
/// - Parameters:
/// - file: The file signal originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
/// - function: The function signal originates from (there's usually no need to
/// pass it explicitly as it defaults to `#function`).
/// - line: The line signal originates from (there's usually no need to pass it
/// explicitly as it defaults to `#line`).
public nonisolated func signal(
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
Task { await decrementCount(by: 1) }
}

/// Registers multiple signals (decrements by provided count) with the countdown event.
///
/// Decrement the countdown by the provided count. If the current count becomes less or equal to limit,
/// multiple queued tasks are resumed from suspension until current count exceeds limit.
///
/// - Parameter count: The number of signals to register.
public func signal(repeat count: UInt) {
_decrementCount(by: count)
/// - Parameters:
/// - count: The number of signals to register.
/// - file: The file signal originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
/// - function: The function signal originates from (there's usually no need to
/// pass it explicitly as it defaults to `#function`).
/// - line: The line signal originates from (there's usually no need to pass it
/// explicitly as it defaults to `#line`).
public nonisolated func signal(
repeat count: UInt,
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
Task { await decrementCount(by: count) }
}

/// Waits for, or increments, a countdown event.
Expand All @@ -207,9 +283,23 @@ public actor AsyncCountdownEvent: AsyncObject {
/// Otherwise, current task is suspended until either a signal occurs or event is reset.
///
/// Use this to wait for high priority tasks completion to start low priority ones.
///
/// - Parameters:
/// - file: The file wait request originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
/// - function: The function wait request originates from (there's usually no need to
/// pass it explicitly as it defaults to `#function`).
/// - line: The line wait request originates from (there's usually no need to pass it
/// explicitly as it defaults to `#line`).
///
/// - Throws: `CancellationError` if cancelled.
@Sendable
public func wait() async {
guard _wait() else { currentCount += 1; return }
try? await _withPromisedContinuation()
public func wait(
file: String = #fileID,
function: String = #function,
line: UInt = #line
) async throws {
guard shouldWait() else { currentCount += 1; return }
try await withPromisedContinuation()
}
}

0 comments on commit 9f7f243

Please sign in to comment.