Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions Sources/GRPCCore/Internal/Concurrency Primitives/Lock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,40 @@ public struct _LockedValueBox<Value> {
public func withLockedValue<T>(_ mutate: (inout Value) throws -> T) rethrows -> T {
return try self.storage.withLockedValue(mutate)
}

/// An unsafe view over the locked value box.
///
/// Prefer ``withLockedValue(_:)`` where possible.
public var unsafe: Unsafe {
Unsafe(storage: self.storage)
}

public struct Unsafe {
@usableFromInline
let storage: LockStorage<Value>

/// Manually acquire the lock.
@inlinable
public func lock() {
self.storage.lock()
}

/// Manually release the lock.
@inlinable
public func unlock() {
self.storage.unlock()
}

/// Mutate the value, assuming the lock has been acquired manually.
@inlinable
public func withValueAssumingLockIsAcquired<T>(
_ mutate: (inout Value) throws -> T
) rethrows -> T {
return try self.storage.withUnsafeMutablePointerToHeader { value in
try mutate(&value.pointee)
}
}
}
}

extension _LockedValueBox: Sendable where Value: Sendable {}
55 changes: 35 additions & 20 deletions Sources/GRPCCore/Streaming/Internal/BroadcastAsyncSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -261,38 +261,44 @@ final class _BroadcastSequenceStorage<Element: Sendable>: Sendable {
func nextElement(
forSubscriber id: _BroadcastSequenceStateMachine<Element>.Subscriptions.ID
) async throws -> Element? {
let onNext = self._state.withLockedValue { $0.nextElement(forSubscriber: id) }
return try await withTaskCancellationHandler {
self._state.unsafe.lock()
let onNext = self._state.unsafe.withValueAssumingLockIsAcquired {
$0.nextElement(forSubscriber: id)
}

switch onNext {
case .return(let returnAndProduceMore):
returnAndProduceMore.producers.resume()
return try returnAndProduceMore.nextResult.get()
switch onNext {
case .return(let returnAndProduceMore):
self._state.unsafe.unlock()
returnAndProduceMore.producers.resume()
return try returnAndProduceMore.nextResult.get()

case .suspend:
return try await withTaskCancellationHandler {
case .suspend:
return try await withCheckedThrowingContinuation { continuation in
let onSetContinuation = self._state.withLockedValue { state in
let onSetContinuation = self._state.unsafe.withValueAssumingLockIsAcquired { state in
state.setContinuation(continuation, forSubscription: id)
}

self._state.unsafe.unlock()

switch onSetContinuation {
case .resume(let continuation, let result):
continuation.resume(with: result)
case .none:
()
}
}
} onCancel: {
let onCancel = self._state.withLockedValue { state in
state.cancelSubscription(withID: id)
}
}
} onCancel: {
let onCancel = self._state.withLockedValue { state in
state.cancelSubscription(withID: id)
}

switch onCancel {
case .resume(let continuation, let result):
continuation.resume(with: result)
case .none:
()
}
switch onCancel {
case .resume(let continuation, let result):
continuation.resume(with: result)
case .none:
()
}
}
}
Expand Down Expand Up @@ -572,9 +578,18 @@ struct _BroadcastSequenceStateMachine<Element: Sendable>: Sendable {
self.producerToken += 1
onYield = .suspend(token)
} else {
// No consumers are slow. Remove the oldest value.
// No consumers are slow, some subscribers exist, a subset of which are waiting
// for a value. Drop the oldest value and resume the fastest consumers.
self.elements.removeFirst()
onYield = .none
let continuations = self.subscriptions.takeContinuations().map {
ConsumerContinuations(continuations: $0, result: .success(element))
}

if let continuations = continuations {
onYield = .resume(continuations)
} else {
onYield = .none
}
}

case self.subscriptions.count:
Expand Down