diff --git a/src/IMessage/Sources/IMessageCore/PassivelyAwareDispatchQueue.swift b/src/IMessage/Sources/IMessageCore/PassivelyAwareDispatchQueue.swift index 80403c1d..05e56a97 100644 --- a/src/IMessage/Sources/IMessageCore/PassivelyAwareDispatchQueue.swift +++ b/src/IMessage/Sources/IMessageCore/PassivelyAwareDispatchQueue.swift @@ -16,10 +16,8 @@ public final class PassivelyAwareDispatchQueue { public let queue: DispatchQueue - private var pending = Protected(0) - private var passiveWorkItem: DispatchWorkItem? + private var activityState = Protected(ActivityState()) private var uponIdle = Protected() - private var activityEpoch = Protected(0) public private(set) var idleDelay: TimeInterval public init(label: String, idleDelay: TimeInterval, qos: DispatchQoS = .unspecified) { @@ -28,60 +26,67 @@ public final class PassivelyAwareDispatchQueue { } // Updating the idle callback is not itself considered "work" at all; it - // happens instantly and it'll run even if the passive work item was + // happens instantly and it'll run even if the passive work was // scheduled before the callback was updated. public func setIdleCallback(_ callback: PassiveCallback?) { uponIdle.withLock { $0 = callback } } public func async(execute activeWork: @Sendable @escaping () -> Void) { - let (epoch, _) = bumpStateInResponseToWorkSubmission() + bumpStateInResponseToWorkSubmission() queue.async { [self] in activeWork() - let pendingPostDecrement = pending.withLock { - $0 -= 1 - return $0 - } + let (pendingPostDecrement, currentEpoch) = completeWork() #if DEBUG log.debug("\(queue.label): ✅ finished work, pending is now \(pendingPostDecrement)") #endif if pendingPostDecrement == 0 { // There isn't any work left in the queue, so arm the passive // work to potentially execute soon. - armPassive(expectingEpoch: epoch, quiescence: .began) + armPassive(expectingEpoch: currentEpoch, quiescence: .began) } } } } private extension PassivelyAwareDispatchQueue { - private func bumpStateInResponseToWorkSubmission() -> (epoch: UInt, newCount: Int) { - let newEpoch = activityEpoch.withLock { $0 += 1; return $0 } - // If we had scheduled passive work, prevent it from running. This won't - // stop it if it already had a chance to begin executing, though. - passiveWorkItem?.cancel() - passiveWorkItem = nil - let newCount = pending.withLock { $0 += 1; return $0 } + struct ActivityState { + var pending = 0 + var epoch: UInt = 0 + } + + private func bumpStateInResponseToWorkSubmission() { + let newCount = activityState.withLock { state in + state.epoch += 1 + state.pending += 1 + return state.pending + } #if DEBUG log.debug("\(queue.label): 🔄 enqueuing work, pending is now \(newCount)") #endif - return (newEpoch, newCount) } - // This is only ever called from the queue, so we don't need to protect `passiveWorkItem`. - func armPassive(expectingEpoch expectedEpoch: UInt, quiescence: Quiescence) { - passiveWorkItem?.cancel() + private func completeWork() -> (pending: Int, epoch: UInt) { + activityState.withLock { state in + state.pending -= 1 + return (state.pending, state.epoch) + } + } - let workItem = DispatchWorkItem { [weak self] in + func armPassive(expectingEpoch expectedEpoch: UInt, quiescence: Quiescence) { + // Submission-side epoch changes logically cancel delayed idle checks + // without retaining and releasing DispatchWorkItems across threads. + queue.asyncAfter(deadline: .now() + idleDelay) { [weak self] in guard let self else { return } #if DEBUG // log.debug("\(queue.label): 💭 running passive work now") #endif - let isQuiet = pending.read() == 0 - let epochUnchanged = activityEpoch.read() == expectedEpoch + let (isQuiet, epochUnchanged) = activityState.withLock { state in + (state.pending == 0, state.epoch == expectedEpoch) + } guard isQuiet, epochUnchanged else { #if DEBUG log.debug("\(queue.label): 🚫 backing out of passive work (quiet? \(isQuiet), epoch unchanged? \(epochUnchanged))") @@ -91,19 +96,14 @@ private extension PassivelyAwareDispatchQueue { uponIdle.read()?(quiescence) - if pending.read() == 0 { + let shouldContinue = activityState.withLock { state in + state.pending == 0 && state.epoch == expectedEpoch + } + if shouldContinue { // If no active work was scheduled while we were busy with // passive work, schedule the passive work to run again soon. armPassive(expectingEpoch: expectedEpoch, quiescence: .continuing) } } - - // Prime passive work, scheduling it to run at some point in the future. - // - // It is immediately cancelled when active work is submitted; in other - // words, the mere submission of active work preempts the execution of - // passive work. - queue.asyncAfter(deadline: .now() + idleDelay, execute: workItem) - passiveWorkItem = workItem } } diff --git a/src/IMessage/Sources/IMessageTests/PassivelyAwareDispatchQueueTests.swift b/src/IMessage/Sources/IMessageTests/PassivelyAwareDispatchQueueTests.swift new file mode 100644 index 00000000..323e0f91 --- /dev/null +++ b/src/IMessage/Sources/IMessageTests/PassivelyAwareDispatchQueueTests.swift @@ -0,0 +1,117 @@ +import Dispatch +import Foundation +import IMessageCore +import Testing + +@Test func passivelyAwareQueueFiresIdleAfterActiveWorkDrains() { + let queue = PassivelyAwareDispatchQueue(label: testQueueLabel(), idleDelay: 0.02) + let observations = Protected<[String]>([]) + let idleObserved = DispatchSemaphore(value: 0) + let workFinished = DispatchSemaphore(value: 0) + + queue.setIdleCallback { quiescence in + observations.withLock { $0.append(label(for: quiescence)) } + idleObserved.signal() + } + + queue.async { + workFinished.signal() + } + + #expect(workFinished.wait(timeout: .now() + 1) == .success) + #expect(idleObserved.wait(timeout: .now() + 1) == .success) + #expect(observations.read().first == "began") +} + +@Test func passivelyAwareQueueSuppressesStaleIdleCallbacksAfterNewWork() { + let queue = PassivelyAwareDispatchQueue(label: testQueueLabel(), idleDelay: 0.3) + let observations = Protected<[String]>([]) + let idleObserved = DispatchSemaphore(value: 0) + let firstWorkFinished = DispatchSemaphore(value: 0) + let secondWorkFinished = DispatchSemaphore(value: 0) + + queue.setIdleCallback { quiescence in + observations.withLock { $0.append(label(for: quiescence)) } + idleObserved.signal() + } + + queue.async { + firstWorkFinished.signal() + } + #expect(firstWorkFinished.wait(timeout: .now() + 1) == .success) + + Thread.sleep(forTimeInterval: 0.1) + + queue.async { + secondWorkFinished.signal() + } + #expect(secondWorkFinished.wait(timeout: .now() + 1) == .success) + + #expect(idleObserved.wait(timeout: .now() + 0.25) == .timedOut) + #expect(idleObserved.wait(timeout: .now() + 1) == .success) + #expect(observations.read() == ["began"]) +} + +@Test func passivelyAwareQueueRepeatsContinuingIdleWhileQuiet() { + let queue = PassivelyAwareDispatchQueue(label: testQueueLabel(), idleDelay: 0.02) + let observations = Protected<[String]>([]) + let idleObservedTwice = DispatchSemaphore(value: 0) + + queue.setIdleCallback { quiescence in + let count = observations.withLock { observations in + observations.append(label(for: quiescence)) + return observations.count + } + if count == 2 { + idleObservedTwice.signal() + } + } + + queue.async {} + + #expect(idleObservedTwice.wait(timeout: .now() + 1) == .success) + #expect(Array(observations.read().prefix(2)) == ["began", "continuing"]) +} + +@Test func passivelyAwareQueueHandlesRapidConcurrentSubmissions() { + let queue = PassivelyAwareDispatchQueue(label: testQueueLabel(), idleDelay: 0.01) + let totalWorkItems = 500 + let completedCount = Protected(0) + let allWorkFinished = DispatchSemaphore(value: 0) + let idleObserved = DispatchSemaphore(value: 0) + + queue.setIdleCallback { quiescence in + if case .began = quiescence { + idleObserved.signal() + } + } + + DispatchQueue.concurrentPerform(iterations: totalWorkItems) { _ in + queue.async { + let completed = completedCount.withLock { count in + count += 1 + return count + } + if completed == totalWorkItems { + allWorkFinished.signal() + } + } + } + + #expect(allWorkFinished.wait(timeout: .now() + 2) == .success) + #expect(idleObserved.wait(timeout: .now() + 2) == .success) + #expect(completedCount.read() == totalWorkItems) +} + +private func label(for quiescence: Quiescence) -> String { + switch quiescence { + case .began: + return "began" + case .continuing: + return "continuing" + } +} + +private func testQueueLabel() -> String { + "passively-aware-dispatch-queue-test-\(UUID().uuidString)" +}