Skip to content

Commit

Permalink
build: resolve concurrency check warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
soumyamahunt committed Aug 22, 2022
1 parent dd1b776 commit c66bb81
Show file tree
Hide file tree
Showing 20 changed files with 494 additions and 271 deletions.
53 changes: 48 additions & 5 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// swift-tools-version: 5.6

import PackageDescription
import class Foundation.ProcessInfo

let appleGitHub = "https://github.com/apple"
let package = Package(
name: "AsyncObjects",
platforms: [
Expand All @@ -17,20 +19,61 @@ let package = Package(
),
],
dependencies: [
.package(url: "https://github.com/apple/swift-collections.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-format", from: "0.50600.1"),
.package(url: "\(appleGitHub)/swift-collections.git", from: "1.0.0"),
.package(url: "\(appleGitHub)/swift-docc-plugin", from: "1.0.0"),
.package(url: "\(appleGitHub)/swift-format", from: "0.50600.1"),
],
targets: [
.target(
name: "AsyncObjects",
dependencies: [
.product(name: "OrderedCollections", package: "swift-collections"),
]
.product(
name: "OrderedCollections",
package: "swift-collections"
),
],
swiftSettings: swiftSettings
),
.testTarget(
name: "AsyncObjectsTests",
dependencies: ["AsyncObjects"]
),
]
)

var swiftSettings: [SwiftSetting] {
var swiftSettings: [SwiftSetting] = []

if ProcessInfo.processInfo.environment[
"SWIFTCI_CONCURRENCY_CHECKS"
] != nil {
swiftSettings.append(
.unsafeFlags([
"-Xfrontend",
"-warn-concurrency",
"-enable-actor-data-race-checks",
"-require-explicit-sendable",
])
)
}

if ProcessInfo.processInfo.environment[
"SWIFTCI_WARNINGS_AS_ERRORS"
] != nil {
swiftSettings.append(
.unsafeFlags([
"-warnings-as-errors"
])
)
}

if ProcessInfo.processInfo.environment[
"ASYNCOBJECTS_USE_CHECKEDCONTINUATION"
] != nil {
swiftSettings.append(
.define("ASYNCOBJECTS_USE_CHECKEDCONTINUATION")
)
}

return swiftSettings
}
36 changes: 20 additions & 16 deletions Sources/AsyncObjects/AsyncCountdownEvent.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Foundation
@preconcurrency import Foundation
import OrderedCollections

/// An event object that controls access to a resource between high and low priority tasks
Expand Down Expand Up @@ -41,13 +41,15 @@ public actor AsyncCountdownEvent: AsyncObject {
/// Queued tasks are resumed from suspension when event is set and until current count exceeds limit.
public var isSet: Bool { currentCount >= 0 && currentCount <= limit }

// MARK: Internal

/// Add continuation with the provided key in `continuations` map.
///
/// - Parameters:
/// - continuation: The `continuation` to add.
/// - key: The key in the map.
@inlinable
func addContinuation(
func _addContinuation(
_ continuation: Continuation,
withKey key: UUID
) {
Expand All @@ -59,7 +61,7 @@ public actor AsyncCountdownEvent: AsyncObject {
///
/// - Parameter key: The key in the map.
@inlinable
func removeContinuation(withKey key: UUID) {
func _removeContinuation(withKey key: UUID) {
let continuation = continuations.removeValue(forKey: key)
continuation?.cancel()
}
Expand All @@ -68,14 +70,14 @@ public actor AsyncCountdownEvent: AsyncObject {
///
/// - Parameter number: The number to decrement count by.
@inlinable
func decrementCount(by number: UInt = 1) {
func _decrementCount(by number: UInt = 1) {
guard currentCount > 0 else { return }
currentCount -= number
}

/// Resume previously waiting continuations for countdown event.
@inlinable
func resumeContinuations() {
func _resumeContinuations() {
while !continuations.isEmpty && isSet {
let (_, continuation) = continuations.removeFirst()
continuation.resume()
Expand All @@ -84,27 +86,29 @@ public actor AsyncCountdownEvent: AsyncObject {
}

/// Suspends the current task, then calls the given closure with a throwing continuation for the current task.
/// Continuation can be cancelled with error if current task is cancelled, by invoking `removeContinuation`.
/// Continuation can be cancelled with error if current task is cancelled, by invoking `_removeContinuation`.
///
/// Spins up a new continuation and requests to track it with key by invoking `addContinuation`.
/// This operation cooperatively checks for cancellation and reacting to it by invoking `removeContinuation`.
/// Spins up a new continuation and requests to track it with key by invoking `_addContinuation`.
/// This operation cooperatively checks for cancellation and reacting to it by invoking `_removeContinuation`.
/// Continuation can be resumed with error and some cleanup code can be run here.
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inlinable
func withPromisedContinuation() async throws {
func _withPromisedContinuation() async throws {
let key = UUID()
try await withTaskCancellationHandler { [weak self] in
Task { [weak self] in
await self?.removeContinuation(withKey: key)
await self?._removeContinuation(withKey: key)
}
} operation: { () -> Continuation.Success in
try await Continuation.with { continuation in
self.addContinuation(continuation, withKey: key)
self._addContinuation(continuation, withKey: key)
}
}
}

// MARK: Public

/// Creates new countdown event with the limit count down up to and an initial count.
/// By default, both limit and initial count are zero.
///
Expand Down Expand Up @@ -141,7 +145,7 @@ public actor AsyncCountdownEvent: AsyncObject {
/// are resumed from suspension until current count exceeds limit.
public func reset() {
self.currentCount = initialCount
resumeContinuations()
_resumeContinuations()
}

/// Resets initial count and current count to specified value.
Expand All @@ -153,7 +157,7 @@ public actor AsyncCountdownEvent: AsyncObject {
public func reset(to count: UInt) {
initialCount = count
self.currentCount = count
resumeContinuations()
_resumeContinuations()
}

/// Registers a signal (decrements) with the countdown event.
Expand All @@ -171,8 +175,8 @@ public actor AsyncCountdownEvent: AsyncObject {
///
/// - Parameter count: The number of signals to register.
public func signal(repeat count: UInt) {
decrementCount(by: count)
resumeContinuations()
_decrementCount(by: count)
_resumeContinuations()
}

/// Waits for, or increments, a countdown event.
Expand All @@ -184,6 +188,6 @@ public actor AsyncCountdownEvent: AsyncObject {
@Sendable
public func wait() async {
if isSet { currentCount += 1; return }
try? await withPromisedContinuation()
try? await _withPromisedContinuation()
}
}
24 changes: 14 additions & 10 deletions Sources/AsyncObjects/AsyncEvent.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Foundation
@preconcurrency import Foundation

/// An object that controls execution of tasks depending on the signal state.
///
Expand All @@ -16,13 +16,15 @@ public actor AsyncEvent: AsyncObject {
/// Indicates whether current state of event is signalled.
private var signalled: Bool

// MARK: Internal

/// Add continuation with the provided key in `continuations` map.
///
/// - Parameters:
/// - continuation: The `continuation` to add.
/// - key: The key in the map.
@inlinable
func addContinuation(
func _addContinuation(
_ continuation: Continuation,
withKey key: UUID
) {
Expand All @@ -34,33 +36,35 @@ public actor AsyncEvent: AsyncObject {
///
/// - Parameter key: The key in the map.
@inlinable
func removeContinuation(withKey key: UUID) {
func _removeContinuation(withKey key: UUID) {
let continuation = continuations.removeValue(forKey: key)
continuation?.cancel()
}

/// Suspends the current task, then calls the given closure with a throwing continuation for the current task.
/// Continuation can be cancelled with error if current task is cancelled, by invoking `removeContinuation`.
/// Continuation can be cancelled with error if current task is cancelled, by invoking `_removeContinuation`.
///
/// Spins up a new continuation and requests to track it with key by invoking `addContinuation`.
/// This operation cooperatively checks for cancellation and reacting to it by invoking `removeContinuation`.
/// Spins up a new continuation and requests to track it with key by invoking `_addContinuation`.
/// This operation cooperatively checks for cancellation and reacting to it by invoking `_removeContinuation`.
/// Continuation can be resumed with error and some cleanup code can be run here.
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inlinable
func withPromisedContinuation() async throws {
func _withPromisedContinuation() async throws {
let key = UUID()
try await withTaskCancellationHandler { [weak self] in
Task { [weak self] in
await self?.removeContinuation(withKey: key)
await self?._removeContinuation(withKey: key)
}
} operation: { () -> Continuation.Success in
try await Continuation.with { continuation in
self.addContinuation(continuation, withKey: key)
self._addContinuation(continuation, withKey: key)
}
}
}

// MARK: Public

/// Creates a new event with signal state provided.
/// By default, event is initially in signalled state.
///
Expand Down Expand Up @@ -96,6 +100,6 @@ public actor AsyncEvent: AsyncObject {
@Sendable
public func wait() async {
guard !signalled else { return }
try? await withPromisedContinuation()
try? await _withPromisedContinuation()
}
}
5 changes: 3 additions & 2 deletions Sources/AsyncObjects/AsyncObject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Foundation

/// A result value indicating whether a task finished before a specified time.
@frozen
public enum TaskTimeoutResult: Hashable {
public enum TaskTimeoutResult: Hashable, Sendable {
/// Indicates that a task successfully finished
/// before the specified time elapsed.
case success
Expand Down Expand Up @@ -240,7 +240,8 @@ public func waitForTaskCompletion(
}
}
group.addTask {
(try? await Task.sleep(nanoseconds: timeout + 1_000)) == nil
await Task.yield()
return (try? await Task.sleep(nanoseconds: timeout + 1_000)) == nil
}
if let result = await group.next() {
timedOut = !result
Expand Down
30 changes: 17 additions & 13 deletions Sources/AsyncObjects/AsyncSemaphore.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Foundation
@preconcurrency import Foundation
import OrderedCollections

/// An object that controls access to a resource across multiple task contexts through use of a traditional counting semaphore.
Expand All @@ -25,13 +25,15 @@ public actor AsyncSemaphore: AsyncObject {
@usableFromInline
private(set) var count: Int

// MARK: Internal

/// Add continuation with the provided key in `continuations` map.
///
/// - Parameters:
/// - continuation: The `continuation` to add.
/// - key: The key in the map.
@inlinable
func addContinuation(
func _addContinuation(
_ continuation: Continuation,
withKey key: UUID
) {
Expand All @@ -43,41 +45,43 @@ public actor AsyncSemaphore: AsyncObject {
///
/// - Parameter key: The key in the map.
@inlinable
func removeContinuation(withKey key: UUID) {
func _removeContinuation(withKey key: UUID) {
let continuation = continuations.removeValue(forKey: key)
continuation?.cancel()
incrementCount()
_incrementCount()
}

/// Increments semaphore count within limit provided.
@inlinable
func incrementCount() {
func _incrementCount() {
guard count < limit else { return }
count += 1
}

/// Suspends the current task, then calls the given closure with a throwing continuation for the current task.
/// Continuation can be cancelled with error if current task is cancelled, by invoking `removeContinuation`.
/// Continuation can be cancelled with error if current task is cancelled, by invoking `_removeContinuation`.
///
/// Spins up a new continuation and requests to track it with key by invoking `addContinuation`.
/// This operation cooperatively checks for cancellation and reacting to it by invoking `removeContinuation`.
/// Spins up a new continuation and requests to track it with key by invoking `_addContinuation`.
/// This operation cooperatively checks for cancellation and reacting to it by invoking `_removeContinuation`.
/// Continuation can be resumed with error and some cleanup code can be run here.
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inlinable
func withPromisedContinuation() async throws {
func _withPromisedContinuation() async throws {
let key = UUID()
try await withTaskCancellationHandler { [weak self] in
Task { [weak self] in
await self?.removeContinuation(withKey: key)
await self?._removeContinuation(withKey: key)
}
} operation: { () -> Continuation.Success in
try await Continuation.with { continuation in
self.addContinuation(continuation, withKey: key)
self._addContinuation(continuation, withKey: key)
}
}
}

// MARK: Public

/// Creates new counting semaphore with an initial value.
/// By default, initial value is zero.
///
Expand All @@ -100,7 +104,7 @@ public actor AsyncSemaphore: AsyncObject {
/// If the previous value was less than zero,
/// current task is resumed from suspension.
public func signal() {
incrementCount()
_incrementCount()
guard !continuations.isEmpty else { return }
let (_, continuation) = continuations.removeFirst()
continuation.resume()
Expand All @@ -114,6 +118,6 @@ public actor AsyncSemaphore: AsyncObject {
public func wait() async {
count -= 1
if count > 0 { return }
try? await withPromisedContinuation()
try? await _withPromisedContinuation()
}
}

0 comments on commit c66bb81

Please sign in to comment.