Add FairnessScheduler and TokenExecutor integration#568
Conversation
| /// Test-only: Records session IDs in the order they executed, for verifying round-robin fairness. | ||
| private var _testExecutionHistory: [SessionID] = [] | ||
| #endif | ||
| private var executionScheduled = false |
There was a problem hiding this comment.
This is a common enough pattern that there's a class to help you: IdempotentOperationJoiner. Use IdempotentOperationJoiner.asyncJoiner(iTermGCD.mutationQueue()) to create.
There was a problem hiding this comment.
Done in c435547, this is much cleaner than manual flag management.
| semaphore.signal() | ||
| onSemaphoreSignaled?() | ||
| self.semaphore = nil | ||
| self.onSemaphoreSignaled = nil |
There was a problem hiding this comment.
This is generally a safer way to call one-time closures:
if let closure = onSemaphoreSignaled {
onSemaphoreSignaled = nil
closure()
}
That way, if the closure assigns to onSemaphoreSignaled it still works as expected. I don't think it'll be an issue as written, but it's nice to remove the footgun. Same on line 108 and 123 and 135 and 144.
Also considering how many times this is done, I'd put the "signal and callback" code into a private function.
There was a problem hiding this comment.
Agreed, helper function makes sense as well. Extracted signalAndRelease() and updated call sites at next(), consume(), skipToEnd(), didFinish(), cleanup(asyncFree:) in 1c42fdd.
| var accumulatedLength = ByteExecutionStats() | ||
|
|
||
| tokenQueue.enumerateTokenArrayGroups { [weak self] (group, priority) in | ||
| guard let self = self else { return false } |
There was a problem hiding this comment.
Preserve the use of slownessDetector.measure that execute() has.
| DLog("cleanupForUnregistration") | ||
| // Discard all remaining tokens and trigger their consumption callbacks | ||
| // This ensures availableSlots is correctly incremented for unconsumed tokens | ||
| let unconsumedCount = tokenQueue.discardAllAndReturnCount() |
There was a problem hiding this comment.
This could be reached from the following call chain:
-[VT100Screen terminate]
-[VT100Screen setTerminalEnabled:NO]
-[VT100ScreenMutableState setTerminalEnabled:NO]
iTermFairnessScheduler.unregister(sessionId:)
TokenExecutor.cleanupForUnregistration()
TokenExecutorImpl.cleanupForUnregistration()
But termination is undoable (i.e., you close a window and then select Edit > Undo Close Window before the timeout). Since setTerminalEnabled: could later be called with NO we must not discard tokens yet.
There was a problem hiding this comment.
Honestly, I don't think I ever knew that feature existed. I'll look into this more tomorrow. In the existing code flow, what happens if the queue is full when the session terminates? (That's mostly a reminder to myself to find out.)
There was a problem hiding this comment.
IIRC the TaskNotifier stops selecting on the terminated session so it doesn't block the others.
There was a problem hiding this comment.
Good catch. Tokens are now preserved in cleanupForUnregistration() to support session revive (undo termination).
The implementation required more than just removing the discard call:
-
Moved FairnessScheduler registration from init to
setTerminalEnabled:YES— This creates symmetry with unregistration insetTerminalEnabled:NO. On revive, re-registration gets a fresh sessionId. -
Added
[_tokenExecutor schedule]after re-registration — Ensures preserved tokens are processed even if no new output arrives. Without this, queued tokens could remain stuck indefinitely. -
Tokens drain naturally when the terminal is re-enabled and re-registered, or are freed on dealloc if the session is never revived.
Commit: b159da5
| _tokenExecutor.delegate = self; | ||
|
|
||
| // Register with FairnessScheduler for round-robin token execution (if enabled) | ||
| if ([iTermAdvancedSettingsModel useFairnessScheduler]) { |
There was a problem hiding this comment.
If you pass the useFairnessScheduler flag into TokenExecutor's init then it could take care of registering itself and setting its own fairnessSessionId. I think that makes more sense since those operations are logically part of TokenExecutor initialization.
There was a problem hiding this comment.
Hmm, do we want TokenExecutor coupled to FairnessScheduler though? I fear I may have been so fixated on creating a parallel code path that I didn't think enough about SoC. When I think about it, the TokenExecutor is just a worker that VT100MutableState is putting on the FairnessScheduler, so TokenExecutor doesn't really need knowledge of the FairnessScheduler at all... it just needs something to notify when it has work to do. And in fact, VT100MutableState doesn't really need to be coupled to the FairnessScheduler either; at least not directly. A prototype + duck typing makes sense here to me.
What do you think about keeping the FairnessScheduler wiring in VT100MutableState?
id<iTermScheduler> _scheduler = FairnessScheduler.shared)
_schedSessionId = [_scheduler register:_tokenExecutor]
_tokenExecutor.onWorkEnqueued = ^{[_scheduler sessionDidEnqueueWork:sessionId];}
Something like that? That actually makes testing more straightforward (despite the fact that a few dozen of my tests would need to be changed), and probably helps maintainability as well.
There was a problem hiding this comment.
Ah, you're right. TokenExecutor should not directly depend on it. We could have other schedulers or other kinds of workers in the future. Let's not abstract things unnecessarily; it's ok as-is in this PR
|
Thanks for the feedback! I'll address these shortly. |
| _testExecutionHistory.append(sessionId) | ||
| #endif | ||
|
|
||
| executor.executeTurn(tokenBudget: Self.defaultTokenBudget) { [weak self] result in |
There was a problem hiding this comment.
executeToken takes an escaping completion block but it never escapes. We could save the unnecessary dispatch queue work by changing its interface to return result instead of calling a completion block; then we could call sessionFinishedTurn immediately after it returns.
There was a problem hiding this comment.
Yes, this is currently unnecessary. My thought was that we might have fully concurrent, per-session worker threads someday, but that's out of scope for this.
| } | ||
|
|
||
| // defer has fired — high-priority tasks completed before completion callback | ||
| completion(turnResult) |
There was a problem hiding this comment.
As mentioned elsewhere, life would be easier if this method just returned a value instead of taking an escaping completion block (unless something's going to change later, but that would be a painful change!)
There was a problem hiding this comment.
Hm, I think the completion block may be there just so the mocks can introduce a delay. I don't think adding testability is worth the tradeoff in complexity and additional latency in a fairly sensitive code path. I haven't read the mock code but I wonder if the mocks could be run in another thread instead?
| DLog("cleanupForUnregistration") | ||
| // Discard all remaining tokens and trigger their consumption callbacks | ||
| // This ensures availableSlots is correctly incremented for unconsumed tokens | ||
| let unconsumedCount = tokenQueue.discardAllAndReturnCount() |
There was a problem hiding this comment.
IIRC the TaskNotifier stops selecting on the terminated session so it doesn't block the others.
| // MARK: - Private State | ||
|
|
||
| /// Lock protecting all scheduler state. Use lock.sync {} to access any state below. | ||
| private let lock = Mutex() |
There was a problem hiding this comment.
A nice pattern when you have a bunch of state that needs to be synchronized by one lock is to use MutableAtomicObject on a struct that holds the values, such as how NSProcessInfo+iTerm.swift does with CPUUsageInfo. It prevents you from ever forgetting to take a lock since the values are inaccessible without it.
|
|
||
| /// Internal implementation - must be called while holding lock. | ||
| /// Returns true if ensureExecutionScheduled() should be called after releasing lock. | ||
| private func sessionDidEnqueueWorkLocked(_ sessionId: SessionID) -> Bool { |
There was a problem hiding this comment.
When using MutableAtomicObject you can move this into a mutating method of the struct that holds the values.
| } | ||
|
|
||
| // defer has fired — high-priority tasks completed before completion callback | ||
| completion(turnResult) |
There was a problem hiding this comment.
Hm, I think the completion block may be there just so the mocks can introduce a delay. I don't think adding testability is worth the tradeoff in complexity and additional latency in a fairly sensitive code path. I haven't read the mock code but I wonder if the mocks could be run in another thread instead?
- Add BackpressureLevel enum (none, light, moderate, heavy, blocked) - Track available slots with atomic counter alongside DispatchSemaphore - Add onSemaphoreSignaled callback to TokenArray for slot release notification - Expose backpressureLevel property on TokenExecutor (callable from any queue) This enables adaptive behavior based on queue load (e.g., join timeouts).
…pass - Initialize availableSlots with correct value in property declaration using immediately-executed closure, avoiding race where backpressureLevel could return .blocked before init completes - Document that high-priority tokens intentionally bypass backpressure, so metric only reflects normal PTY token load
- Add Comparable conformance to BackpressureLevel for threshold comparisons - Add backpressureReleaseHandler callback (stub for future PTYTask integration) - Add TwoTierTokenQueue.discardAllAndReturnCount() for cleanup accounting - Call backpressureReleaseHandler when crossing out of heavy backpressure - Fix backpressureLevel to handle negative availableSlots These additions enable future fairness scheduler integration without changing current behavior - the handler starts as nil.
- Add FairnessScheduler.swift with round-robin busy list scheduling - Add useFairnessScheduler feature flag (default OFF) - Add FairnessSchedulerExecutor protocol conformance to TokenExecutor - Add executeTurn() with budget enforcement for scheduler - Add cleanupForUnregistration() for proper token cleanup - Add fairnessSessionId property for scheduler registration - Add notifyScheduler() with conditional dispatch based on flag - When flag OFF: legacy execute() behavior preserved - When flag ON: FairnessScheduler coordinates round-robin execution Tested with multi-tab stress test: - Flag ON: 18k-20k iterations/sec across 1-5 tabs - Flag OFF: 20k iterations/sec (legacy mode)
- Start nextSessionId at 1 so 0 can be "not registered" sentinel - Add setUseFairnessSchedulerForTesting: for unit test control - Add VT100ScreenMutableState registration with FairnessScheduler - Add feature flag gating tests proving flag controls code path - Add FairnessScheduler and TokenExecutor test suites - Add run_fairness_tests.sh test runner
- Added `private let useFairnessScheduler: Bool` to TokenExecutorImpl - notifyScheduler() now uses cached flag as sole decision point - Added assertion for programmer error: flag ON but not registered - Added #if ITERM_DEBUG test hook `testSkipNotifyScheduler` Test updates: - Fixed test classes with proper flag settings in setUp - Added cleanupForUnregistrationOnMutationQueue helper - Skipped tests requiring milestone 3 non-blocking model - Updated run_fairness_tests.sh with runtime warning
Use IdempotentOperationJoiner to coalesce multiple ensureExecutionScheduled() calls into a single async dispatch. The joiner handles the scheduling state atomically via setNeedsUpdate/updateIfNeeded pattern: - Multiple calls before dispatch coalesce into one - Calls during execution schedule a new dispatch (closure cleared before exec) - No manual flag management needed Removed executionScheduled from testReset() since the empty busyList guard in executeNextTurn() handles any stale dispatches.
Addresses PR comments 11-14:
1. Move tokenExecutorShouldQueueTokens() check to after high-priority tasks
and nil-delegate guard, matching execute() ordering
2. Wrap token enumeration in slownessDetector.measure() for instrumentation
3. Add DLog statements matching execute():
- "Will enumerate token arrays" before enumeration
- "Begin/Done executing a batch of tokens..." inside loop
- "Finished enumerating token arrays..." after enumeration
- Active session drain logging
4. Use labeled do{} block so defer { executeHighPriorityTasks() } fires
before completion() is called. Store result in local variable and use
break instead of return for early exits.
…ordering. Tests validate that high-priority tasks scheduled during token execution run before the completion callback. Comments are precise about scope: - testDeferFiresBeforeCompletionCallback: Verifies HP tasks run before completion, but does not isolate outer vs inner defer (both satisfy). - testHighPriorityTaskInDeferAddingTokensTriggersNotifyScheduler: Tests safety net ensuring tokens added by HP tasks get processed. Documents that sessionDidEnqueueWork uses queue.async so can race with sessionFinishedTurn (either path works). - testTurnResultReflectsTokenQueueStateAfterDefer: Documents (non-asserting) that turnResult is calculated before outer defer fires.
…bled - Move FairnessScheduler registration from init to setTerminalEnabled:YES for symmetry with unregistration in setTerminalEnabled:NO - cleanupForUnregistration() no longer discards tokens; they are preserved to support session revive and drain naturally when re-enabled - On revive, re-registration gets a fresh sessionId - Add TODO comments to tests that need adjustment for new behavior
…utation queue FairnessScheduler.register() was using .sync to the mutation queue to protect its internal state. This caused a deadlock when called from a "joined block" context (where the mutation queue is deliberately blocked waiting for the main thread). Solution: Use a private Mutex lock for scheduler bookkeeping instead of the mutation queue. This completely decouples scheduler synchronization from the join protocol. Cleanup still dispatches to mutation queue as required by TokenExecutor.cleanupForUnregistration(). Also includes related changes: - TokenExecutor: Add isRegistered flag to explicitly track registration state (separate from sessionId). Change notifyScheduler() to guard on isRegistered instead of asserting on sessionId, allowing tokens to accumulate before registration completes. - TokenExecutor: Add test hooks (testQueuedTokenCount, testTotalSlots, testAvailableSlots) for verifying token preservation in tests. - TwoTierTokenQueue: Add count property to support testQueuedTokenCount.
…ation - Add FairnessSchedulerSessionRestorationTests (4 tests) verifying re-registration after unregister, preserved token processing, and double-unregister safety - Add TokenExecutorSessionReviveTests (5 tests) covering full disable→preserve→ revive→drain cycle - Update TokenExecutorCleanupTests to verify tokens are preserved (not discarded) after cleanupForUnregistration(); reduce token count to 30 (within 40-slot limit) - Fix TokenExecutorAvailableSlotsBoundaryTests accounting assertions to match new preservation behavior - Add missing isRegistered=true after registration in multiple test classes - Add testFinished guard to prevent async polling crash after test timeout - Enable fairness scheduler flag in TokenExecutorLegacyRemovalTests - Register new test classes in run_fairness_tests.sh
- Add timeouts and state assertions to concurrency tests (testConcurrentRegisterAndUnregister, testConcurrentEnqueueAndUnregister) - Add cleanupCallCount tracking to verify double-unregister does not call cleanup twice (testDoubleUnregister) - Replace timing-dependent inverted expectations with deterministic mutation queue syncs for no-execution tests - Rename testBackgroundSessionGetsEqualTurns to match actual behavior (testBackgroundSessionCanProcessTokens) - Rename testNoDuplicateNotificationsForBusySession to testRapidAddTokensAllProcessed with direct byte count verification - Delete non-asserting testTurnResultReflectsTokenQueueStateAfterDefer - Delete obsolete skipped tests (testLegacyPathWhenSessionIdIsZero, testCodePathDiffersBetweenRegisteredAndUnregistered) - Delete TwoTierTokenQueueTests class (tests dead discardAllAndReturnCount)
These methods were added but never called. TokenExecutor uses removeAll() in invalidate() and preserves tokens in cleanupForUnregistration().
…nce regression The previous deadlock fix (be3df18) used a Mutex for all state access, but this caused ~13% throughput regression on the hot path (sessionDidEnqueueWork). The deadlock only affected register() which used .sync dispatch to the mutation queue. The hot path was always called from the mutation queue via async dispatch, so it never could have deadlocked. This change: - Uses Mutex only for ID allocation in register() (instant, no queue dispatch) - Dispatches session creation async to mutation queue (avoids joined block deadlock) - Restores async dispatch for sessionDidEnqueueWork() and all scheduling state - Manages busyList/busySet directly on mutation queue (no lock needed) Performance improvement: ~6-9% throughput increase, bringing the fairness scheduler within 4-8% of production baseline. Future optimization: Consider an atomic 'hasBusyWork' flag on enqueue, only doing lock work when transitioning 0→1. This might reduce contention enough to make the synchronous path viable again.
Doubles the per-turn token budget to reduce turn overhead. With the previous 500 token budget, throughput was ~7% below production in low-contention scenarios (2 tabs). Doubling to 1000 brings performance within 1% of production. TODO: This fixed budget may not translate well to slower CPUs. Should be tested on minimum supported hardware for validation. Consider investigating adaptive token budgets based on throughput to auto-tune for different hardware capabilities.
…king - Use synchronous completion callback in FairnessScheduler to eliminate unnecessary async dispatch per turn (completion is already on mutationQueue) - Add dispatchPrecondition to verify threading contract in DEBUG builds - Skip legacy activeSessionsWithTokens tracking when using FairnessScheduler (round-robin scheduling already handles prioritization) - Document threading contract for executeTurn protocol method - Update tests to call completion on mutation queue per new contract
…hods Document that cleanupForUnregistration() is called on mutationQueue in the protocol definition and implementations.
Without this, notifyScheduler() early-returns and tokens are never scheduled for execution, resulting in no terminal output.
Documents thread safety requirements for all member variables in TokenExecutorImpl to match the documentation standard in FairnessScheduler. - taskQueue, sideEffects: Thread-safe via iTermTaskQueue internal locking - tokenQueue, executingCount, commit: Access on mutation queue only - pauseCount: Thread-safe via atomic operations - executingSideEffects: Thread-safe via MutableAtomicObject - sideEffectScheduler: Period modified on mutation queue - throughputEstimator: addByteCount from any queue - isRegistered: Access on mutation queue only
f28b4d3 to
7a6fac7
Compare
Upstream added two new delegate methods that FakeSession needs to implement: - screenOffscreenCommandLineShouldBeVisibleForCurrentCommand - screenUpdateBlock:action:
The threading contract verification for the completion callback should only run during tests, not in regular debug builds. ITERM_DEBUG ensures the check runs when tests execute but avoids runtime overhead in both development and release builds.
ca490f8 to
688c0d1
Compare
|
Oops, I had some unpushed changes that were directly relevant to your comments. 1/2/3) The Mutex was a short-lived design change for thread safety.
4) The completion callback is necessary to validate scheduler behavior in tests, but I gated it behind ITERM_DEBUG to eliminate any potential impact on production. The delay is infrastructure, but not currently used. All are tests passing, and I think performance looks good using stress tests in #569. Throughput remains steady while refreshes and framerates are notably improved. ** Basic stress test
** Instrumented stress test with --inject and --title flags
|
… String? signature
|
|
||
| /// Lock protecting only nextSessionId for ID allocation. | ||
| /// This allows register() to be called from joined blocks without deadlock. | ||
| private let idLock = Mutex() |
There was a problem hiding this comment.
Do you think the lock will grow to protect more variables? If not, it would be easy to adapt iTermAtomicInt64 to iTermAtomicUInt64 to make this safer and simpler.
There was a problem hiding this comment.
Agreed, I was trying to minimize changes to make reviewing easier :). But an atomic int is appropriate here.
| return | ||
| } | ||
|
|
||
| if !busySet.contains(sessionId) { |
There was a problem hiding this comment.
Consider making busySet + busyList into a standalone type. It would make it easier to verify that it has properties like "a session ID can't belong to the busy list more than once".
|
|
||
| /// Called when session is unregistered to clean up pending tokens. | ||
| /// Must be called on mutation queue. | ||
| func cleanupForUnregistration() { |
There was a problem hiding this comment.
Is this now dead code or will we need to do something here later?
There was a problem hiding this comment.
Dead code. Removed from protocol and implementation in 883be29
1. Replace Mutex with iTermAtomicInt64 for session ID allocation - Lock-free atomic increment instead of mutex sync - Simpler and more appropriate for single counter 2. Extract BusyQueue type to encapsulate busyList/busySet - Enforces invariant that set and list stay in sync - Uses it_fatalError to catch duplicate enqueue attempts - Cleaner API: enqueue/dequeue/contains/removeFromSet 3. Remove cleanupForUnregistration from protocol (YAGNI) - Method existed but intentionally did nothing - Tokens are preserved on unregister for session revival - No cleanup hook needed if there is nothing to clean up
| @objc static let shared = FairnessScheduler() | ||
|
|
||
| /// Session ID type - monotonically increasing counter | ||
| typealias SessionID = UInt64 |
There was a problem hiding this comment.
since nextSessionIdAtomic is an Int64, shouldn't this also be Int64?
| list.append(id) | ||
| } | ||
|
|
||
| mutating func dequeue() -> SessionID? { |
There was a problem hiding this comment.
What if dequeue() looped, removing values from list, until it found an id that is in set? Then BusyQueue would have the nice property that you can't dequeue removed values or values that contains says aren't there. You could even rename removeFromSet to remove.
1. Replace Mutex with iTermAtomicInt64 for session ID allocation - Lock-free atomic increment instead of mutex sync - Simpler and more appropriate for single counter 2. Extract BusyQueue type to encapsulate busyList/busySet - Enforces invariant that set and list stay in sync - Uses it_fatalError to catch duplicate enqueue attempts - Cleaner API: enqueue/dequeue/contains/removeFromSet 3. Remove cleanupForUnregistration from protocol (YAGNI) - Method existed but intentionally did nothing - Tokens are preserved on unregister for session revival - No cleanup hook needed if there is nothing to clean up
|
This PR is now part of #580. |
Summary
This PR adds the
FairnessSchedulerand integrates it withTokenExecutorfor round-robin token execution across terminal sessions. It builds on PR #567 (backpressure infrastructure) and is gated behind theuseFairnessScheduleradvanced setting (default OFF).Motivation
The fairness scheduler prevents a single busy session from starving others by enforcing round-robin execution on the mutation queue. This PR provides:
useFairnessSchedulerThis PR establishes the execution-side scheduling. The I/O-side changes (per-PTY dispatch sources) come in PR-3.
Changes
Seam
All scheduling code is gated by the feature flag:
Flag OFF produces identical behavior to PR-1.
Why Flag ON Isn't Performant Yet
With flag ON, stress tests show ~20k iter/s for 1 tab but only 9-21 iter/s for 2+ tabs. This is expected.
The scheduler controls execution order, but reading still goes through TaskNotifier's single-threaded select() loop. The backpressureReleaseHandler from PR-1 remains nil — nothing wires it up because there are no dispatch sources to control yet.
Without per-PTY dispatch sources (PR-3), reading is uncontrolled. Sessions accumulate tokens faster than the scheduler can drain them. PR-3 completes the circuit by adding dispatch sources that suspend when backpressureLevel >= .heavy and resume when the handler fires.
Future PRs
Depends on #567.