Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
895 changes: 471 additions & 424 deletions AsyncObjects.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ let package = Package(
dependencies: [
.package(url: "\(appleGitHub)/swift-collections.git", from: "1.0.0"),
.package(url: "\(appleGitHub)/swift-docc-plugin", from: "1.0.0"),
.package(url: "\(appleGitHub)/swift-format", from: "0.50600.1"),
.package(url: "\(appleGitHub)/swift-format", from: "0.50700.0"),
],
targets: [
.target(
Expand All @@ -42,7 +42,7 @@ let package = Package(
]
)

var swiftSettings: [SwiftSetting] {
var swiftSettings: [SwiftSetting] = {
var swiftSettings: [SwiftSetting] = []

if ProcessInfo.processInfo.environment[
Expand Down Expand Up @@ -77,9 +77,9 @@ var swiftSettings: [SwiftSetting] {
}

return swiftSettings
}
}()

var testingSwiftSettings: [SwiftSetting] {
var testingSwiftSettings: [SwiftSetting] = {
var swiftSettings: [SwiftSetting] = []

if ProcessInfo.processInfo.environment[
Expand All @@ -96,4 +96,4 @@ var testingSwiftSettings: [SwiftSetting] {
}

return swiftSettings
}
}()
190 changes: 140 additions & 50 deletions Sources/AsyncObjects/AsyncCountdownEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Foundation
#else
@preconcurrency import Foundation
#endif

import OrderedCollections

/// An event object that controls access to a resource between high and low priority tasks
Expand All @@ -15,20 +16,42 @@ import OrderedCollections
/// You can indicate high priority usage of resource by using ``increment(by:)`` method,
/// and indicate free of resource by calling ``signal(repeat:)`` or ``signal()`` methods.
/// For low priority resource usage or detect resource idling use ``wait()`` method
/// or its timeout variation ``wait(forNanoseconds:)``.
/// or its timeout variation ``wait(forNanoseconds:)``:
///
/// ```swift
/// // create event with initial count and count down limit
/// let event = AsyncCountdownEvent()
/// // increment countdown count from high priority tasks
/// event.increment(by: 1)
///
/// // wait for countdown signal from low priority tasks,
/// // fails only if task cancelled
/// try await event.wait()
/// // or wait with some timeout
/// try await event.wait(forNanoseconds: 1_000_000_000)
///
/// // signal countdown after completing high priority tasks
/// event.signal()
/// ```
///
/// Use the ``limit`` parameter to indicate concurrent low priority usage, i.e. if limit set to zero,
/// only one low priority usage allowed at one time.
public actor AsyncCountdownEvent: AsyncObject {
public actor AsyncCountdownEvent: AsyncObject, ContinuableCollection {
/// The suspended tasks continuation type.
@usableFromInline
typealias Continuation = SafeContinuation<GlobalContinuation<Void, Error>>
internal typealias Continuation = SafeContinuation<
GlobalContinuation<Void, Error>
>
/// The platform dependent lock used to synchronize continuations tracking.
@usableFromInline
let locker: Locker = .init()
internal let locker: Locker = .init()
/// The continuations stored with an associated key for all the suspended task that are waiting to be resumed.
@usableFromInline
private(set) var continuations: OrderedDictionary<UUID, Continuation> = [:]
internal private(set) var continuations:
OrderedDictionary<
UUID,
Continuation
> = [:]
/// The limit up to which the countdown counts and triggers event.
///
/// By default this is set to zero and can be changed during initialization.
Expand All @@ -42,7 +65,7 @@ public actor AsyncCountdownEvent: AsyncObject {
///
/// Can be changed after initialization
/// by using ``reset(to:)`` method.
public private(set) var initialCount: UInt
public var initialCount: UInt
/// Indicates whether countdown event current count is within ``limit``.
///
/// Queued tasks are resumed from suspension when event is set and until current count exceeds limit.
Expand All @@ -54,13 +77,13 @@ public actor AsyncCountdownEvent: AsyncObject {
///
/// - Returns: Whether to wait to be resumed later.
@inlinable
func _wait() -> Bool { !isSet || !continuations.isEmpty }
internal func shouldWait() -> Bool { !isSet || !continuations.isEmpty }

/// Resume provided continuation with additional changes based on the associated flags.
///
/// - Parameter continuation: The queued continuation to resume.
@inlinable
func _resumeContinuation(_ continuation: Continuation) {
internal func resumeContinuation(_ continuation: Continuation) {
currentCount += 1
continuation.resume()
}
Expand All @@ -71,12 +94,12 @@ public actor AsyncCountdownEvent: AsyncObject {
/// - continuation: The `continuation` to add.
/// - key: The key in the map.
@inlinable
func _addContinuation(
internal func addContinuation(
_ continuation: Continuation,
withKey key: UUID
) {
guard !continuation.resumed else { return }
guard _wait() else { _resumeContinuation(continuation); return }
guard shouldWait() else { resumeContinuation(continuation); return }
continuations[key] = continuation
}

Expand All @@ -85,49 +108,52 @@ public actor AsyncCountdownEvent: AsyncObject {
///
/// - Parameter key: The key in the map.
@inlinable
func _removeContinuation(withKey key: UUID) {
internal func removeContinuation(withKey key: UUID) {
continuations.removeValue(forKey: key)
}

/// Decrements countdown count by the provided number.
///
/// - Parameter number: The number to decrement count by.
@inlinable
func _decrementCount(by number: UInt = 1) {
defer { _resumeContinuations() }
internal func decrementCount(by number: UInt = 1) {
defer { resumeContinuations() }
guard currentCount > 0 else { return }
currentCount -= number
}

/// Resume previously waiting continuations for countdown event.
@inlinable
func _resumeContinuations() {
internal func resumeContinuations() {
while !continuations.isEmpty && isSet {
let (_, continuation) = continuations.removeFirst()
_resumeContinuation(continuation)
resumeContinuation(continuation)
}
}

/// Suspends the current task, then calls the given closure with a throwing continuation for the current task.
/// Continuation can be cancelled with error if current task is cancelled, by invoking `_removeContinuation`.
/// Increments the countdown event current count by the specified value.
///
/// Spins up a new continuation and requests to track it with key by invoking `_addContinuation`.
/// This operation cooperatively checks for cancellation and reacting to it by invoking `_removeContinuation`.
/// Continuation can be resumed with error and some cleanup code can be run here.
/// - Parameter count: The value by which to increase ``currentCount``.
@inlinable
internal func incrementCount(by count: UInt = 1) {
self.currentCount += count
}

/// Resets current count to initial count.
@inlinable
internal func resetCount() {
self.currentCount = initialCount
resumeContinuations()
}

/// Resets initial count and current count to specified value.
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
/// - Parameter count: The new initial count.
@inlinable
nonisolated func _withPromisedContinuation() async throws {
let key = UUID()
try await Continuation.withCancellation(synchronizedWith: locker) {
Task { [weak self] in
await self?._removeContinuation(withKey: key)
}
} operation: { continuation in
Task { [weak self] in
await self?._addContinuation(continuation, withKey: key)
}
}
internal func resetCount(to count: UInt) {
initialCount = count
self.currentCount = count
resumeContinuations()
}

// MARK: Public
Expand Down Expand Up @@ -158,47 +184,97 @@ public actor AsyncCountdownEvent: AsyncObject {
/// Use this to indicate usage of resource from high priority tasks.
///
/// - Parameter count: The value by which to increase ``currentCount``.
public func increment(by count: UInt = 1) {
self.currentCount += count
public nonisolated func increment(
by count: UInt = 1,
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
Task { await incrementCount(by: count) }
}

/// Resets current count to initial count.
///
/// If the current count becomes less or equal to limit, multiple queued tasks
/// are resumed from suspension until current count exceeds limit.
public func reset() {
self.currentCount = initialCount
_resumeContinuations()
///
/// - Parameters:
/// - file: The file reset originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
/// - function: The function reset originates from (there's usually no need to
/// pass it explicitly as it defaults to `#function`).
/// - line: The line reset originates from (there's usually no need to pass it
/// explicitly as it defaults to `#line`).
public nonisolated func reset(
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
Task { await resetCount() }
}

/// Resets initial count and current count to specified value.
///
/// If the current count becomes less or equal to limit, multiple queued tasks
/// are resumed from suspension until current count exceeds limit.
///
/// - Parameter count: The new initial count.
public func reset(to count: UInt) {
initialCount = count
self.currentCount = count
_resumeContinuations()
/// - Parameters:
/// - count: The new initial count.
/// - file: The file reset originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
/// - function: The function reset originates from (there's usually no need to
/// pass it explicitly as it defaults to `#function`).
/// - line: The line reset originates from (there's usually no need to pass it
/// explicitly as it defaults to `#line`).
public nonisolated func reset(
to count: UInt,
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
Task { await resetCount(to: count) }
}

/// Registers a signal (decrements) with the countdown event.
///
/// Decrement the countdown. If the current count becomes less or equal to limit,
/// one queued task is resumed from suspension.
public func signal() {
signal(repeat: 1)
///
/// - Parameters:
/// - file: The file signal originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
/// - function: The function signal originates from (there's usually no need to
/// pass it explicitly as it defaults to `#function`).
/// - line: The line signal originates from (there's usually no need to pass it
/// explicitly as it defaults to `#line`).
public nonisolated func signal(
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
Task { await decrementCount(by: 1) }
}

/// Registers multiple signals (decrements by provided count) with the countdown event.
///
/// Decrement the countdown by the provided count. If the current count becomes less or equal to limit,
/// multiple queued tasks are resumed from suspension until current count exceeds limit.
///
/// - Parameter count: The number of signals to register.
public func signal(repeat count: UInt) {
_decrementCount(by: count)
/// - Parameters:
/// - count: The number of signals to register.
/// - file: The file signal originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
/// - function: The function signal originates from (there's usually no need to
/// pass it explicitly as it defaults to `#function`).
/// - line: The line signal originates from (there's usually no need to pass it
/// explicitly as it defaults to `#line`).
public nonisolated func signal(
repeat count: UInt,
file: String = #fileID,
function: String = #function,
line: UInt = #line
) {
Task { await decrementCount(by: count) }
}

/// Waits for, or increments, a countdown event.
Expand All @@ -207,9 +283,23 @@ public actor AsyncCountdownEvent: AsyncObject {
/// Otherwise, current task is suspended until either a signal occurs or event is reset.
///
/// Use this to wait for high priority tasks completion to start low priority ones.
///
/// - Parameters:
/// - file: The file wait request originates from (there's usually no need to pass it
/// explicitly as it defaults to `#fileID`).
/// - function: The function wait request originates from (there's usually no need to
/// pass it explicitly as it defaults to `#function`).
/// - line: The line wait request originates from (there's usually no need to pass it
/// explicitly as it defaults to `#line`).
///
/// - Throws: `CancellationError` if cancelled.
@Sendable
public func wait() async {
guard _wait() else { currentCount += 1; return }
try? await _withPromisedContinuation()
public func wait(
file: String = #fileID,
function: String = #function,
line: UInt = #line
) async throws {
guard shouldWait() else { currentCount += 1; return }
try await withPromisedContinuation()
}
}
Loading