Skip to content

Commit

Permalink
perf(TaskOperation)!: use platform lock primitive instead of `Dispa…
Browse files Browse the repository at this point in the history
…tchQueue` for synchronization
  • Loading branch information
soumyamahunt committed Aug 23, 2022
1 parent d8ee18a commit f28ee66
Show file tree
Hide file tree
Showing 16 changed files with 758 additions and 457 deletions.
782 changes: 395 additions & 387 deletions AsyncObjects.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Sources/AsyncObjects/CancellationSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public extension Task {
priority: TaskPriority? = nil,
cancellationSource: CancellationSource,
operation: @escaping @Sendable () async throws -> Success
) rethrows where Failure == Error {
) where Failure == Error {
self.init(priority: priority) {
let task = Self.init(priority: priority, operation: operation)
await cancellationSource.register(task: task)
Expand Down Expand Up @@ -235,7 +235,7 @@ public extension Task {
priority: TaskPriority? = nil,
cancellationSource: CancellationSource,
operation: @escaping @Sendable () async throws -> Success
) rethrows -> Self where Failure == Error {
) -> Self where Failure == Error {
return Task.detached(priority: priority) {
let task = Self.init(priority: priority, operation: operation)
await cancellationSource.register(task: task)
Expand Down Expand Up @@ -280,7 +280,7 @@ public extension Task {
priority: TaskPriority? = nil,
cancellationSource: CancellationSource,
operation: @escaping @Sendable () async throws -> Success
) async rethrows where Failure == Error {
) async where Failure == Error {
self.init(priority: priority, operation: operation)
await cancellationSource.register(task: self)
}
Expand Down Expand Up @@ -323,7 +323,7 @@ public extension Task {
priority: TaskPriority? = nil,
cancellationSource: CancellationSource,
operation: @escaping @Sendable () async throws -> Success
) async rethrows -> Self where Failure == Error {
) async -> Self where Failure == Error {
let task = Task.detached(priority: priority, operation: operation)
await cancellationSource.register(task: task)
return task
Expand Down
129 changes: 129 additions & 0 deletions Sources/AsyncObjects/Locker.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#if canImport(Darwin)
@_implementationOnly import Darwin
#elseif canImport(Glibc)
@_implementationOnly import Glibc
#elseif canImport(WinSDK)
@_implementationOnly import WinSDK
#endif

/// A synchronization object that can be used to provide exclusive access to threads.
///
/// The values stored in the lock should be considered opaque and implementation defined,
/// they contain thread ownership information that the system may use to attempt to resolve priority inversions.
/// This lock must be unlocked from the same thread that locked it,
/// attempts to unlock from a different thread will cause an assertion aborting the process.
/// This lock must not be accessed from multiple processes or threads via shared or multiply-mapped memory,
/// the lock implementation relies on the address of the lock value and owning process.
public final class Locker: Equatable, Hashable, Sendable {
#if canImport(Darwin)
/// A type representing data for an unfair lock.
typealias Primitive = os_unfair_lock
#elseif canImport(Glibc)
/// A type representing a MUTual EXclusion object.
typealias Primitive = pthread_mutex_t
#elseif canImport(WinSDK)
/// A type representing a slim reader/writer (SRW) lock.
typealias Primitive = SRWLOCK
#endif

/// Pointer type pointing to platform dependent lock primitive.
typealias PlatformLock = UnsafeMutablePointer<Primitive>
/// Pointer to platform dependent lock primitive.
let platformLock: PlatformLock

/// Creates lock object with the provided pointer to platform dependent lock primitive.
///
/// - Parameter platformLock: Pointer to platform dependent lock primitive.
/// - Returns: The newly created lock object.
init(withLock platformLock: PlatformLock) {
self.platformLock = platformLock
}

/// Allocates and initializes platform dependent lock primitive.
///
/// - Returns: The newly created lock object.
public init() {
let platformLock = PlatformLock.allocate(capacity: 1)
#if canImport(Darwin)
platformLock.initialize(to: os_unfair_lock())
#elseif canImport(Glibc)
pthread_mutex_init(platformLock, nil)
#elseif canImport(WinSDK)
InitializeSRWLock(platformLock)
#endif
self.platformLock = platformLock
}

deinit {
#if canImport(Glibc)
pthread_mutex_destroy(platformLock)
#endif
platformLock.deinitialize(count: 1)
}

/// Acquires exclusive lock.
///
/// If a thread has already acquired lock and hasn't released lock yet,
/// other threads will wait for lock to be released and then acquire lock
/// in order of their request.
public func lock() {
#if canImport(Darwin)
os_unfair_lock_lock(platformLock)
#elseif canImport(Glibc)
pthread_mutex_lock(platformLock)
#elseif canImport(WinSDK)
AcquireSRWLockExclusive(platformLock)
#endif
}

/// Releases exclusive lock.
///
/// A lock must be unlocked only from the same thread in which it was locked.
/// Attempting to unlock from a different thread causes a runtime error.
public func unlock() {
#if canImport(Darwin)
os_unfair_lock_unlock(platformLock)
#elseif canImport(Glibc)
pthread_mutex_unlock(platformLock)
#elseif canImport(WinSDK)
ReleaseSRWLockExclusive(platformLock)
#endif
}

/// Performs a critical piece of work synchronously after acquiring the lock
/// and releases lock when task completes.
///
/// Use this to perform critical tasks or provide access to critical resource
/// that require exclusivity among other concurrent tasks.
///
/// - Parameter critical: The critical task to perform.
/// - Returns: The result from the critical task.
@discardableResult
public func perform<R>(_ critical: () throws -> R) rethrows -> R {
lock()
defer { unlock() }
return try critical()
}

/// Returns a Boolean value indicating whether two locks are equal.
///
/// Checks if two lock objects point to the same platform dependent lock primitive.
///
/// - Parameters:
/// - lhs: A lock to compare.
/// - rhs: Another lock to compare.
///
/// - Returns: If the lock objects compared are equal.
public static func == (lhs: Locker, rhs: Locker) -> Bool {
return lhs.platformLock == rhs.platformLock
}

/// Hashes the pointer to platform dependent lock primitive
/// by feeding into the given hasher.
///
/// - Parameter hasher: The hasher to use when combining
/// the components of this instance.
public func hash(into hasher: inout Hasher) {
hasher.combine(platformLock)
}
}
71 changes: 31 additions & 40 deletions Sources/AsyncObjects/TaskOperation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import Dispatch
public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
@unchecked Sendable
{
/// The dispatch queue used to synchronize data access and modifications.
/// The platform dependent lock used to
/// synchronize data access and modifications.
@usableFromInline
let propQueue: DispatchQueue
let locker: Locker
/// The asynchronous action to perform as part of the operation..
private let underlyingAction: @Sendable () async throws -> R
/// The top-level task that executes asynchronous action provided
Expand All @@ -35,33 +36,35 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
public override var isCancelled: Bool { execTask?.isCancelled ?? false }

/// Private store for boolean value indicating whether the operation is currently executing.
private var _isExecuting: Bool = false
@usableFromInline
var _isExecuting: Bool = false
/// A Boolean value indicating whether the operation is currently executing.
///
/// The value of this property is true if the operation is currently executing
/// provided asynchronous operation or false if it is not.
public override internal(set) var isExecuting: Bool {
get { propQueue.sync { _isExecuting } }
get { locker.perform { _isExecuting } }
@usableFromInline
set {
willChangeValue(forKey: "isExecuting")
propQueue.sync(flags: [.barrier]) { _isExecuting = newValue }
locker.perform { _isExecuting = newValue }
didChangeValue(forKey: "isExecuting")
}
}

/// Private store for boolean value indicating whether the operation has finished executing its task.
private var _isFinished: Bool = false
@usableFromInline
var _isFinished: Bool = false
/// A Boolean value indicating whether the operation has finished executing its task.
///
/// The value of this property is true if the operation is finished executing or cancelled
/// provided asynchronous operation or false if it is not.
public override internal(set) var isFinished: Bool {
get { propQueue.sync { _isFinished } }
get { locker.perform { _isFinished } }
@usableFromInline
set {
willChangeValue(forKey: "isFinished")
propQueue.sync(flags: [.barrier]) {
locker.perform {
_isFinished = newValue
guard newValue, !continuations.isEmpty else { return }
continuations.forEach { $0.value.resume() }
Expand All @@ -79,44 +82,30 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
get async { (await execTask?.result) ?? .failure(CancellationError()) }
}

/// Creates a new operation that executes the provided throwing asynchronous task.
/// Creates a new operation that executes the provided asynchronous task.
///
/// The provided dispatch queue is used to synchronize operation property access and modifications
/// and prevent data races.
/// The operation execution only starts after ``start()`` is invoked.
/// Operation completes when underlying asynchronous task finishes.
/// The provided lock is used to synchronize operation property access and modifications
/// to prevent data races.
///
/// - Parameters:
/// - queue: The dispatch queue to be used to synchronize data access and modifications.
/// - operation: The throwing asynchronous operation to execute.
/// - locker: The locker to use to synchronize property read and mutations.
/// New lock object is created in case none provided.
/// - operation: The asynchronous operation to execute.
///
/// - Returns: The newly created asynchronous operation.
public init(
queue: DispatchQueue,
synchronizedWith locker: Locker = .init(),
operation: @escaping @Sendable () async throws -> R
) {
self.propQueue = queue
self.locker = locker
self.underlyingAction = operation
super.init()
}

deinit { self.continuations.forEach { $0.value.cancel() } }

/// Creates a new operation that executes the provided non-throwing asynchronous task.
///
/// The provided dispatch queue is used to synchronize operation property access and modifications
/// and prevent data races.
///
/// - Parameters:
/// - queue: The dispatch queue to be used to synchronize data access and modifications.
/// - operation: The non-throwing asynchronous operation to execute.
///
/// - Returns: The newly created asynchronous operation.
public init(
queue: DispatchQueue,
operation: @escaping @Sendable () async -> R
) {
self.propQueue = queue
self.underlyingAction = operation
super.init()
deinit {
locker.perform { self.continuations.forEach { $0.value.cancel() } }
}

/// Begins the execution of the operation.
Expand All @@ -138,9 +127,11 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
public override func main() {
guard isExecuting, execTask == nil else { return }
execTask = Task { [weak self] in
guard let self = self else { throw CancellationError() }
defer { self._finish() }
let result = try await underlyingAction()
guard
let action = self?.underlyingAction
else { throw CancellationError() }
defer { self?._finish() }
let result = try await action()
return result
}
}
Expand Down Expand Up @@ -185,8 +176,8 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
_ continuation: Continuation,
withKey key: UUID
) {
propQueue.sync(flags: [.barrier]) {
if isFinished { continuation.resume(); return }
locker.perform {
if _isFinished { continuation.resume(); return }
continuations[key] = continuation
}
}
Expand All @@ -197,7 +188,7 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
/// - Parameter key: The key in the map.
@inlinable
func _removeContinuation(withKey key: UUID) {
propQueue.sync(flags: [.barrier]) {
locker.perform {
let continuation = continuations.removeValue(forKey: key)
continuation?.cancel()
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/AsyncObjects/TaskQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import OrderedCollections

/// An object that acts as a concurrent queue executing submitted tasks concurrently.
///
/// You can use the ``exec(priority:flags:operation:)-92nww``
/// You can use the ``exec(priority:flags:operation:)-2ll3k``
/// or its non-throwing/non-cancellable version to run tasks concurrently.
/// Additionally, you can provide priority of task and ``Flags``
/// to customize execution of submitted operation.
Expand Down
12 changes: 12 additions & 0 deletions Tests/AsyncObjectsTests/AsyncCountdownEventTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,16 @@ class AsyncCountdownEventTests: XCTestCase {
await event.wait()
}
}

func testDeinit() async throws {
let event = AsyncCountdownEvent(until: 0, initial: 1)
Task.detached {
try await Self.sleep(seconds: 1)
await event.signal()
}
await event.wait()
self.addTeardownBlock { [weak event] in
XCTAssertNil(event)
}
}
}
12 changes: 12 additions & 0 deletions Tests/AsyncObjectsTests/AsyncEventTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,16 @@ class AsyncEventTests: XCTestCase {
}
XCTAssertEqual(result, .success)
}

func testDeinit() async throws {
let event = AsyncEvent(signaledInitially: false)
Task.detached {
try await Self.sleep(seconds: 1)
await event.signal()
}
await event.wait()
self.addTeardownBlock { [weak event] in
XCTAssertNil(event)
}
}
}
4 changes: 2 additions & 2 deletions Tests/AsyncObjectsTests/AsyncObjectTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class AsyncObjectTests: XCTestCase {
func testMultipleObjectWaitMultiple() async throws {
let event = AsyncEvent(signaledInitially: false)
let mutex = AsyncSemaphore()
let op = TaskOperation(queue: .global(qos: .background)) {
let op = TaskOperation {
try await Self.sleep(seconds: 3)
}
Task.detached {
Expand Down Expand Up @@ -80,7 +80,7 @@ class AsyncObjectTests: XCTestCase {
var result: TaskTimeoutResult = .success
let event = AsyncEvent(signaledInitially: false)
let mutex = AsyncSemaphore()
let op = TaskOperation(queue: .global(qos: .background)) {
let op = TaskOperation {
try await Self.sleep(seconds: 4)
}
Task.detached {
Expand Down
12 changes: 12 additions & 0 deletions Tests/AsyncObjectsTests/AsyncSemaphoreTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ class AsyncSemaphoreTests: XCTestCase {
}
XCTAssertEqual(data.items.count, 10)
}

func testDeinit() async throws {
let semaphore = AsyncSemaphore()
Task.detached {
try await Self.sleep(seconds: 1)
await semaphore.signal()
}
await semaphore.wait()
self.addTeardownBlock { [weak semaphore] in
XCTAssertNil(semaphore)
}
}
}

actor TaskTimeoutStore {
Expand Down

0 comments on commit f28ee66

Please sign in to comment.