Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 33 additions & 33 deletions src/IMessage/Sources/IMessageCore/PassivelyAwareDispatchQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ public final class PassivelyAwareDispatchQueue {

public let queue: DispatchQueue

private var pending = Protected<Int>(0)
private var passiveWorkItem: DispatchWorkItem?
private var activityState = Protected(ActivityState())
private var uponIdle = Protected<PassiveCallback?>()
private var activityEpoch = Protected<UInt>(0)
public private(set) var idleDelay: TimeInterval

public init(label: String, idleDelay: TimeInterval, qos: DispatchQoS = .unspecified) {
Expand All @@ -28,60 +26,67 @@ public final class PassivelyAwareDispatchQueue {
}

// Updating the idle callback is not itself considered "work" at all; it
// happens instantly and it'll run even if the passive work item was
// happens instantly and it'll run even if the passive work was
// scheduled before the callback was updated.
public func setIdleCallback(_ callback: PassiveCallback?) {
uponIdle.withLock { $0 = callback }
}

public func async(execute activeWork: @Sendable @escaping () -> Void) {
let (epoch, _) = bumpStateInResponseToWorkSubmission()
bumpStateInResponseToWorkSubmission()

queue.async { [self] in
activeWork()

let pendingPostDecrement = pending.withLock {
$0 -= 1
return $0
}
let (pendingPostDecrement, currentEpoch) = completeWork()
#if DEBUG
log.debug("\(queue.label): ✅ finished work, pending is now \(pendingPostDecrement)")
#endif
if pendingPostDecrement == 0 {
// There isn't any work left in the queue, so arm the passive
// work to potentially execute soon.
armPassive(expectingEpoch: epoch, quiescence: .began)
armPassive(expectingEpoch: currentEpoch, quiescence: .began)
}
}
}
}

private extension PassivelyAwareDispatchQueue {
private func bumpStateInResponseToWorkSubmission() -> (epoch: UInt, newCount: Int) {
let newEpoch = activityEpoch.withLock { $0 += 1; return $0 }
// If we had scheduled passive work, prevent it from running. This won't
// stop it if it already had a chance to begin executing, though.
passiveWorkItem?.cancel()
passiveWorkItem = nil
let newCount = pending.withLock { $0 += 1; return $0 }
struct ActivityState {
var pending = 0
var epoch: UInt = 0
}

private func bumpStateInResponseToWorkSubmission() {
let newCount = activityState.withLock { state in
state.epoch += 1
state.pending += 1
return state.pending
}
#if DEBUG
log.debug("\(queue.label): 🔄 enqueuing work, pending is now \(newCount)")
#endif
return (newEpoch, newCount)
}

// This is only ever called from the queue, so we don't need to protect `passiveWorkItem`.
func armPassive(expectingEpoch expectedEpoch: UInt, quiescence: Quiescence) {
passiveWorkItem?.cancel()
private func completeWork() -> (pending: Int, epoch: UInt) {
activityState.withLock { state in
state.pending -= 1
return (state.pending, state.epoch)
}
}

let workItem = DispatchWorkItem { [weak self] in
func armPassive(expectingEpoch expectedEpoch: UInt, quiescence: Quiescence) {
// Submission-side epoch changes logically cancel delayed idle checks
// without retaining and releasing DispatchWorkItems across threads.
queue.asyncAfter(deadline: .now() + idleDelay) { [weak self] in
guard let self else { return }
Comment on lines +78 to 82
#if DEBUG
// log.debug("\(queue.label): 💭 running passive work now")
#endif

let isQuiet = pending.read() == 0
let epochUnchanged = activityEpoch.read() == expectedEpoch
let (isQuiet, epochUnchanged) = activityState.withLock { state in
(state.pending == 0, state.epoch == expectedEpoch)
}
guard isQuiet, epochUnchanged else {
#if DEBUG
log.debug("\(queue.label): 🚫 backing out of passive work (quiet? \(isQuiet), epoch unchanged? \(epochUnchanged))")
Expand All @@ -91,19 +96,14 @@ private extension PassivelyAwareDispatchQueue {

uponIdle.read()?(quiescence)

if pending.read() == 0 {
let shouldContinue = activityState.withLock { state in
state.pending == 0 && state.epoch == expectedEpoch
}
if shouldContinue {
// If no active work was scheduled while we were busy with
// passive work, schedule the passive work to run again soon.
armPassive(expectingEpoch: expectedEpoch, quiescence: .continuing)
Comment on lines 97 to 105
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Stop rearming when the idle callback is nil.

uponIdle is optional, but the .continuing path ignores that and re-schedules forever as long as pending == 0 and the epoch stays unchanged. That means a queue with no idle callback registered — or one whose callback clears itself with setIdleCallback(nil) — will keep waking up indefinitely with nothing to run.

♻️ Proposed fix
             uponIdle.read()?(quiescence)

-            let shouldContinue = activityState.withLock { state in
-                state.pending == 0 && state.epoch == expectedEpoch
-            }
+            let shouldContinue = uponIdle.read() != nil && activityState.withLock { state in
+                state.pending == 0 && state.epoch == expectedEpoch
+            }
             if shouldContinue {
                 // If no active work was scheduled while we were busy with
                 // passive work, schedule the passive work to run again soon.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
uponIdle.read()?(quiescence)
if pending.read() == 0 {
let shouldContinue = activityState.withLock { state in
state.pending == 0 && state.epoch == expectedEpoch
}
if shouldContinue {
// If no active work was scheduled while we were busy with
// passive work, schedule the passive work to run again soon.
armPassive(expectingEpoch: expectedEpoch, quiescence: .continuing)
uponIdle.read()?(quiescence)
let shouldContinue = uponIdle.read() != nil && activityState.withLock { state in
state.pending == 0 && state.epoch == expectedEpoch
}
if shouldContinue {
// If no active work was scheduled while we were busy with
// passive work, schedule the passive work to run again soon.
armPassive(expectingEpoch: expectedEpoch, quiescence: .continuing)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/IMessage/Sources/IMessageCore/PassivelyAwareDispatchQueue.swift` around
lines 97 - 105, The idle rearm logic currently always calls
armPassive(expectingEpoch:expectedEpoch, quiescence:.continuing) even when the
optional uponIdle callback is nil; change the condition so you only re-arm if an
idle callback is set (check uponIdle.read() != nil or equivalent) after
computing shouldContinue from activityState.withLock (so you still respect
pending and epoch), and ensure callers that clear the callback via
setIdleCallback(nil) stop further re-scheduling.

}
}

// Prime passive work, scheduling it to run at some point in the future.
//
// It is immediately cancelled when active work is submitted; in other
// words, the mere submission of active work preempts the execution of
// passive work.
queue.asyncAfter(deadline: .now() + idleDelay, execute: workItem)
passiveWorkItem = workItem
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import Dispatch
import Foundation
import IMessageCore
import Testing

@Test func passivelyAwareQueueFiresIdleAfterActiveWorkDrains() {
let queue = PassivelyAwareDispatchQueue(label: testQueueLabel(), idleDelay: 0.02)
let observations = Protected<[String]>([])
let idleObserved = DispatchSemaphore(value: 0)
let workFinished = DispatchSemaphore(value: 0)

queue.setIdleCallback { quiescence in
observations.withLock { $0.append(label(for: quiescence)) }
idleObserved.signal()
}

queue.async {
workFinished.signal()
}

#expect(workFinished.wait(timeout: .now() + 1) == .success)
#expect(idleObserved.wait(timeout: .now() + 1) == .success)
#expect(observations.read().first == "began")
}

@Test func passivelyAwareQueueSuppressesStaleIdleCallbacksAfterNewWork() {
let queue = PassivelyAwareDispatchQueue(label: testQueueLabel(), idleDelay: 0.3)
let observations = Protected<[String]>([])
let idleObserved = DispatchSemaphore(value: 0)
let firstWorkFinished = DispatchSemaphore(value: 0)
let secondWorkFinished = DispatchSemaphore(value: 0)

queue.setIdleCallback { quiescence in
observations.withLock { $0.append(label(for: quiescence)) }
idleObserved.signal()
}

queue.async {
firstWorkFinished.signal()
}
#expect(firstWorkFinished.wait(timeout: .now() + 1) == .success)

Thread.sleep(forTimeInterval: 0.1)

queue.async {
secondWorkFinished.signal()
Comment on lines +43 to +46
}
#expect(secondWorkFinished.wait(timeout: .now() + 1) == .success)

#expect(idleObserved.wait(timeout: .now() + 0.25) == .timedOut)
#expect(idleObserved.wait(timeout: .now() + 1) == .success)
#expect(observations.read() == ["began"])
}

@Test func passivelyAwareQueueRepeatsContinuingIdleWhileQuiet() {
let queue = PassivelyAwareDispatchQueue(label: testQueueLabel(), idleDelay: 0.02)
let observations = Protected<[String]>([])
let idleObservedTwice = DispatchSemaphore(value: 0)

queue.setIdleCallback { quiescence in
let count = observations.withLock { observations in
observations.append(label(for: quiescence))
return observations.count
}
if count == 2 {
idleObservedTwice.signal()
}
}

queue.async {}

#expect(idleObservedTwice.wait(timeout: .now() + 1) == .success)
#expect(Array(observations.read().prefix(2)) == ["began", "continuing"])
}

@Test func passivelyAwareQueueHandlesRapidConcurrentSubmissions() {
let queue = PassivelyAwareDispatchQueue(label: testQueueLabel(), idleDelay: 0.01)
let totalWorkItems = 500
let completedCount = Protected<Int>(0)
let allWorkFinished = DispatchSemaphore(value: 0)
let idleObserved = DispatchSemaphore(value: 0)

queue.setIdleCallback { quiescence in
if case .began = quiescence {
idleObserved.signal()
}
}

DispatchQueue.concurrentPerform(iterations: totalWorkItems) { _ in
queue.async {
let completed = completedCount.withLock { count in
count += 1
return count
}
if completed == totalWorkItems {
allWorkFinished.signal()
}
}
}

#expect(allWorkFinished.wait(timeout: .now() + 2) == .success)
#expect(idleObserved.wait(timeout: .now() + 2) == .success)
#expect(completedCount.read() == totalWorkItems)
}

private func label(for quiescence: Quiescence) -> String {
switch quiescence {
case .began:
return "began"
case .continuing:
return "continuing"
}
}

private func testQueueLabel() -> String {
"passively-aware-dispatch-queue-test-\(UUID().uuidString)"
}