Skip to content

Commit

Permalink
fix: fix potential data race handling actor reentrancy
Browse files Browse the repository at this point in the history
  • Loading branch information
soumyamahunt committed Aug 30, 2022
1 parent 81c2c9c commit 63fed91
Show file tree
Hide file tree
Showing 23 changed files with 478 additions and 204 deletions.
22 changes: 21 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ let package = Package(
),
.testTarget(
name: "AsyncObjectsTests",
dependencies: ["AsyncObjects"]
dependencies: ["AsyncObjects"],
swiftSettings: testingSwiftSettings
),
]
)
Expand Down Expand Up @@ -77,3 +78,22 @@ var swiftSettings: [SwiftSetting] {

return swiftSettings
}

var testingSwiftSettings: [SwiftSetting] {
var swiftSettings: [SwiftSetting] = []

if ProcessInfo.processInfo.environment[
"SWIFTCI_CONCURRENCY_CHECKS"
] != nil {
swiftSettings.append(
.unsafeFlags([
"-Xfrontend",
"-warn-concurrency",
"-enable-actor-data-race-checks",
"-require-explicit-sendable",
])
)
}

return swiftSettings
}
19 changes: 15 additions & 4 deletions Sources/AsyncObjects/AsyncCountdownEvent.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#if swift(>=5.7)
import Foundation
#else
@preconcurrency import Foundation
#endif
import OrderedCollections

/// An event object that controls access to a resource between high and low priority tasks
Expand Down Expand Up @@ -53,6 +57,11 @@ public actor AsyncCountdownEvent: AsyncObject {
_ continuation: Continuation,
withKey key: UUID
) {
guard !isSet, continuations.isEmpty else {
currentCount += 1
continuation.resume()
return
}
continuations[key] = continuation
}

Expand All @@ -71,6 +80,7 @@ public actor AsyncCountdownEvent: AsyncObject {
/// - Parameter number: The number to decrement count by.
@inlinable
func _decrementCount(by number: UInt = 1) {
defer { _resumeContinuations() }
guard currentCount > 0 else { return }
currentCount -= number
}
Expand All @@ -94,15 +104,17 @@ public actor AsyncCountdownEvent: AsyncObject {
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inlinable
func _withPromisedContinuation() async throws {
nonisolated func _withPromisedContinuation() async throws {
let key = UUID()
try await withTaskCancellationHandler { [weak self] in
Task { [weak self] in
await self?._removeContinuation(withKey: key)
}
} operation: { () -> Continuation.Success in
try await Continuation.with { continuation in
self._addContinuation(continuation, withKey: key)
Task { [weak self] in
await self?._addContinuation(continuation, withKey: key)
}
}
}
}
Expand Down Expand Up @@ -176,7 +188,6 @@ public actor AsyncCountdownEvent: AsyncObject {
/// - Parameter count: The number of signals to register.
public func signal(repeat count: UInt) {
_decrementCount(by: count)
_resumeContinuations()
}

/// Waits for, or increments, a countdown event.
Expand All @@ -187,7 +198,7 @@ public actor AsyncCountdownEvent: AsyncObject {
/// Use this to wait for high priority tasks completion to start low priority ones.
@Sendable
public func wait() async {
if isSet { currentCount += 1; return }
guard !isSet else { currentCount += 1; return }
try? await _withPromisedContinuation()
}
}
14 changes: 11 additions & 3 deletions Sources/AsyncObjects/AsyncEvent.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#if swift(>=5.7)
import Foundation
#else
@preconcurrency import Foundation
#endif

/// An object that controls execution of tasks depending on the signal state.
///
Expand All @@ -14,7 +18,8 @@ public actor AsyncEvent: AsyncObject {
@usableFromInline
private(set) var continuations: [UUID: Continuation] = [:]
/// Indicates whether current state of event is signalled.
private var signalled: Bool
@usableFromInline
var signalled: Bool

// MARK: Internal

Expand All @@ -28,6 +33,7 @@ public actor AsyncEvent: AsyncObject {
_ continuation: Continuation,
withKey key: UUID
) {
guard !signalled else { continuation.resume(); return }
continuations[key] = continuation
}

Expand All @@ -50,15 +56,17 @@ public actor AsyncEvent: AsyncObject {
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inlinable
func _withPromisedContinuation() async throws {
nonisolated func _withPromisedContinuation() async throws {
let key = UUID()
try await withTaskCancellationHandler { [weak self] in
Task { [weak self] in
await self?._removeContinuation(withKey: key)
}
} operation: { () -> Continuation.Success in
try await Continuation.with { continuation in
self._addContinuation(continuation, withKey: key)
Task { [weak self] in
await self?._addContinuation(continuation, withKey: key)
}
}
}
}
Expand Down
13 changes: 10 additions & 3 deletions Sources/AsyncObjects/AsyncSemaphore.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#if swift(>=5.7)
import Foundation
#else
@preconcurrency import Foundation
#endif
import OrderedCollections

/// An object that controls access to a resource across multiple task contexts through use of a traditional counting semaphore.
Expand Down Expand Up @@ -37,6 +41,7 @@ public actor AsyncSemaphore: AsyncObject {
_ continuation: Continuation,
withKey key: UUID
) {
guard count <= 0 else { continuation.resume(); return }
continuations[key] = continuation
}

Expand Down Expand Up @@ -67,15 +72,17 @@ public actor AsyncSemaphore: AsyncObject {
///
/// - Throws: If `resume(throwing:)` is called on the continuation, this function throws that error.
@inlinable
func _withPromisedContinuation() async throws {
nonisolated func _withPromisedContinuation() async throws {
let key = UUID()
try await withTaskCancellationHandler { [weak self] in
Task { [weak self] in
await self?._removeContinuation(withKey: key)
}
} operation: { () -> Continuation.Success in
try await Continuation.with { continuation in
self._addContinuation(continuation, withKey: key)
Task { [weak self] in
await self?._addContinuation(continuation, withKey: key)
}
}
}
}
Expand Down Expand Up @@ -117,7 +124,7 @@ public actor AsyncSemaphore: AsyncObject {
@Sendable
public func wait() async {
count -= 1
if count > 0 { return }
guard count <= 0 else { return }
try? await _withPromisedContinuation()
}
}
95 changes: 75 additions & 20 deletions Sources/AsyncObjects/CancellationSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,66 @@ public actor CancellationSource {
linkedSources.append(source)
}

/// Propagate cancellation to linked cancellation sources.
@inlinable
nonisolated func _propagateCancellation() async {
await withTaskGroup(of: Void.self) { group in
let linkedSources = await linkedSources
linkedSources.forEach { group.addTask(operation: $0.cancel) }
await group.waitForAll()
}
}

// MARK: Public

/// Creates a new cancellation source object.
///
/// - Returns: The newly created cancellation source.
public init() { }

#if swift(>=5.7)
/// Creates a new cancellation source object linking to all the provided cancellation sources.
///
/// Initiating cancellation in any of the provided cancellation sources
/// will ensure newly created cancellation source receive cancellation event.
///
/// - Parameter sources: The cancellation sources the newly created object will be linked to.
///
/// - Returns: The newly created cancellation source.
public nonisolated init(linkedWith sources: [CancellationSource]) async {
await withTaskGroup(of: Void.self) { group in
sources.forEach { source in
group.addTask { await source._addSource(self) }
}
await group.waitForAll()
}
}

/// Creates a new cancellation source object linking to all the provided cancellation sources.
///
/// Initiating cancellation in any of the provided cancellation sources
/// will ensure newly created cancellation source receive cancellation event.
///
/// - Parameter sources: The cancellation sources the newly created object will be linked to.
///
/// - Returns: The newly created cancellation source.
public init(linkedWith sources: CancellationSource...) async {
await self.init(linkedWith: sources)
}

/// Creates a new cancellation source object
/// and triggers cancellation event on this object after specified timeout.
///
/// - Parameter nanoseconds: The delay after which cancellation event triggered.
///
/// - Returns: The newly created cancellation source.
public init(cancelAfterNanoseconds nanoseconds: UInt64) {
self.init()
Task { [weak self] in
try await self?.cancel(afterNanoseconds: nanoseconds)
}
}
#else
/// Creates a new cancellation source object linking to all the provided cancellation sources.
///
/// Initiating cancellation in any of the provided cancellation sources
Expand Down Expand Up @@ -100,6 +153,7 @@ public actor CancellationSource {
try await self?.cancel(afterNanoseconds: nanoseconds)
}
}
#endif

/// Register task for cooperative cancellation when cancellation event received on cancellation source.
///
Expand All @@ -120,10 +174,7 @@ public actor CancellationSource {
public func cancel() async {
registeredTasks.forEach { $1() }
registeredTasks = [:]
await withTaskGroup(of: Void.self) { group in
linkedSources.forEach { group.addTask(operation: $0.cancel) }
await group.waitForAll()
}
await _propagateCancellation()
}

/// Trigger cancellation event after provided delay,
Expand All @@ -143,16 +194,17 @@ public extension Task {
/// Runs the given non-throwing operation asynchronously as part of a new task on behalf of the current actor,
/// with the provided cancellation source controlling cooperative cancellation.
///
/// A child task with the provided operation is created, cancellation of which is controlled by provided cancellation source.
/// In the event of cancellation child task is cancelled, while returning the value in the returned task.
/// In case you want to register and track the top-level task for cancellation use the async initializer instead.
/// A top-level task with the provided operation is created, cancellation of which is controlled by provided cancellation source.
/// In the event of cancellation top-level task is cancelled, while returning the value in the returned task.
///
/// - Parameters:
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
/// - Returns: The newly created task.
/// - Note: In case you want to register and track the top-level task
/// for cancellation use the async initializer instead.
@discardableResult
init(
priority: TaskPriority? = nil,
Expand All @@ -169,16 +221,17 @@ public extension Task {
/// Runs the given throwing operation asynchronously as part of a new task on behalf of the current actor,
/// with the provided cancellation source controlling cooperative cancellation.
///
/// A child task with the provided operation is created, cancellation of which is controlled by provided cancellation source.
/// In the event of cancellation child task is cancelled, while propagating error in the returned task.
/// In case you want to register and track the top-level task for cancellation use the async initializer instead.
/// A top-level task with the provided operation is created, cancellation of which is controlled by provided cancellation source.
/// In the event of cancellation top-level task is cancelled, while propagating error in the returned task.
///
/// - Parameters:
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
/// - Returns: The newly created task.
/// - Note: In case you want to register and track the top-level task
/// for cancellation use the async initializer instead.
@discardableResult
init(
priority: TaskPriority? = nil,
Expand All @@ -195,16 +248,17 @@ public extension Task {
/// Runs the given non-throwing operation asynchronously as part of a new task,
/// with the provided cancellation source controlling cooperative cancellation.
///
/// A child task with the provided operation is created, cancellation of which is controlled by provided cancellation source.
/// In the event of cancellation child task is cancelled, while returning the value in the returned task.
/// In case you want to register and track the top-level task for cancellation use the async initializer instead.
/// A top-level task with the provided operation is created, cancellation of which is controlled by provided cancellation source.
/// In the event of cancellation top-level task is cancelled, while returning the value in the returned task.
///
/// - Parameters:
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - priority: The priority of the task.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
/// - Returns: The newly created task.
/// - Note: In case you want to register and track the top-level task
/// for cancellation use the async initializer instead.
@discardableResult
static func detached(
priority: TaskPriority? = nil,
Expand All @@ -221,16 +275,17 @@ public extension Task {
/// Runs the given throwing operation asynchronously as part of a new task,
/// with the provided cancellation source controlling cooperative cancellation.
///
/// A child task with the provided operation is created, cancellation of which is controlled by provided cancellation source.
/// In the event of cancellation child task is cancelled, while returning the value in the returned task.
/// In case you want to register and track the top-level task for cancellation use the async initializer instead.
/// A top-level task with the provided operation is created, cancellation of which is controlled by provided cancellation source.
/// In the event of cancellation top-level task is cancelled, while returning the value in the returned task.
///
/// - Parameters:
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - priority: The priority of the task.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
/// - Returns: The newly created task.
/// - Note: In case you want to register and track the top-level task
/// for cancellation use the async initializer instead.
@discardableResult
static func detached(
priority: TaskPriority? = nil,
Expand Down Expand Up @@ -292,7 +347,7 @@ public extension Task {
/// The created task will be cancelled when cancellation event triggered on the provided cancellation source.
///
/// - Parameters:
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - priority: The priority of the task.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
Expand All @@ -314,7 +369,7 @@ public extension Task {
/// The created task will be cancelled when cancellation event triggered on the provided cancellation source.
///
/// - Parameters:
/// - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
/// - priority: The priority of the task.
/// - cancellationSource: The cancellation source on which new task will be registered for cancellation.
/// - operation: The operation to perform.
///
Expand Down
2 changes: 1 addition & 1 deletion Sources/AsyncObjects/Continuable.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/// A type that allows to interface between synchronous and asynchronous code,
/// by representing task state and allowing task resuming with some value or error.
@usableFromInline
protocol Continuable: Sendable {
protocol Continuable {
/// The type of value to resume the continuation with in case of success.
associatedtype Success
/// The type of error to resume the continuation with in case of failure.
Expand Down

0 comments on commit 63fed91

Please sign in to comment.