chore: improve queue processing time limit handling#366
Conversation
WalkthroughImplements per-room maximum processing time enforcement in the federation queue staging area. When processing a room exceeds the configured time limit (default 30 seconds), the room is re-enqueued and processing is paused, allowing other rooms to be processed. Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~8 minutes Suggested labels
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #366 +/- ##
==========================================
- Coverage 50.79% 50.75% -0.05%
==========================================
Files 99 99
Lines 11262 11273 +11
==========================================
+ Hits 5721 5722 +1
- Misses 5541 5551 +10 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
1 issue found across 1 file
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="packages/federation-sdk/src/queues/staging-area.queue.ts">
<violation number="1" location="packages/federation-sdk/src/queues/staging-area.queue.ts:10">
P2: Validate the env value before using it. As written, a malformed FEDERATION_QUEUE_MAX_TIME_PER_ROOM produces NaN and disables the time limit check.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
|
|
||
| type QueueHandler = (roomId: RoomID) => AsyncGenerator<unknown | undefined>; | ||
|
|
||
| const QUEUE_MAX_TIME_PER_ROOM = parseInt(process.env.FEDERATION_QUEUE_MAX_TIME_PER_ROOM || '30', 10) * 1000; |
There was a problem hiding this comment.
P2: Validate the env value before using it. As written, a malformed FEDERATION_QUEUE_MAX_TIME_PER_ROOM produces NaN and disables the time limit check.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/federation-sdk/src/queues/staging-area.queue.ts, line 10:
<comment>Validate the env value before using it. As written, a malformed FEDERATION_QUEUE_MAX_TIME_PER_ROOM produces NaN and disables the time limit check.</comment>
<file context>
@@ -7,6 +7,8 @@ import { ConfigService } from '../services/config.service';
type QueueHandler = (roomId: RoomID) => AsyncGenerator<unknown | undefined>;
+const QUEUE_MAX_TIME_PER_ROOM = parseInt(process.env.FEDERATION_QUEUE_MAX_TIME_PER_ROOM || '30', 10) * 1000;
+
@singleton()
</file context>
| const QUEUE_MAX_TIME_PER_ROOM = parseInt(process.env.FEDERATION_QUEUE_MAX_TIME_PER_ROOM || '30', 10) * 1000; | |
| const QUEUE_MAX_TIME_PER_ROOM = (() => { | |
| const raw = Number.parseInt(process.env.FEDERATION_QUEUE_MAX_TIME_PER_ROOM || '30', 10); | |
| return Number.isFinite(raw) && raw > 0 ? raw * 1000 : 30 * 1000; | |
| })(); |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/federation-sdk/src/queues/staging-area.queue.ts`:
- Around line 62-73: The loop in staging-area.queue.ts only regains control when
this.handler yields, so long-running single events (inside
StagingAreaService.processEventForRoom which calls handlePdu, notify,
markEventAsUnstaged) can exceed QUEUE_MAX_TIME_PER_ROOM; update the code so the
handler (or processEventForRoom) checks the deadline before starting each event
and yields/returns early when Date.now() - startTime >= QUEUE_MAX_TIME_PER_ROOM
so the queue loop can re-add the room and stop processing further events;
specifically modify StagingAreaService.processEventForRoom (or the generator
used by this.handler) to accept the startTime or deadline and perform a
pre-event check and yield/return to let the outer loop requeue via
queue.add(roomId).
- Line 10: The QUEUE_MAX_TIME_PER_ROOM value is parsed from an env var but not
validated, so parseInt returning NaN will silently disable the timeout; change
the initialization for QUEUE_MAX_TIME_PER_ROOM to validate the parsed value
(e.g., parse process.env.FEDERATION_QUEUE_MAX_TIME_PER_ROOM, convert to number,
check isFinite && >0) and fall back to the default 30 seconds if invalid, then
multiply by 1000; ensure any downstream checks that rely on
QUEUE_MAX_TIME_PER_ROOM (the constant defined with parseInt) behave correctly by
using the validated value.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: eaa4d475-6e68-4e89-8bc2-4e0b46aa835b
📒 Files selected for processing (1)
packages/federation-sdk/src/queues/staging-area.queue.ts
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: cubic · AI code reviewer
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: ricardogarim
Repo: RocketChat/homeserver PR: 184
File: packages/federation-sdk/src/services/media.service.ts:21-31
Timestamp: 2025-09-14T13:15:46.588Z
Learning: In the RocketChat homeserver project, ricardogarim prefers to defer timeout and retry enhancements for media downloads when there's already a TODO to make timeouts configurable, indicating they manage technical debt systematically rather than implementing every suggested improvement immediately.
📚 Learning: 2026-03-04T13:37:36.036Z
Learnt from: sampaiodiego
Repo: RocketChat/homeserver PR: 338
File: packages/federation-sdk/src/queues/per-destination.queue.ts:204-208
Timestamp: 2026-03-04T13:37:36.036Z
Learning: In `packages/federation-sdk/src/queues/per-destination.queue.ts`, the `PerDestinationQueue` intentionally generates a new `txnId` for each retry attempt. This is by design: retries are not strict resends of the same payload — new PDUs/EDUs may be added to the queue during the backoff window and batched into the next attempt, making each transaction distinct. Reusing the same `txnId` across retries would therefore be incorrect for this implementation, even though the Matrix federation spec requires txnId reuse for identical payload retries.
Applied to files:
packages/federation-sdk/src/queues/staging-area.queue.ts
|
|
||
| type QueueHandler = (roomId: RoomID) => AsyncGenerator<unknown | undefined>; | ||
|
|
||
| const QUEUE_MAX_TIME_PER_ROOM = parseInt(process.env.FEDERATION_QUEUE_MAX_TIME_PER_ROOM || '30', 10) * 1000; |
There was a problem hiding this comment.
Validate the timeout env var before using it.
A bad value here silently disables the limit: parseInt('abc', 10) becomes NaN, and then Line 70 is always false. That would remove the fairness guardrail this PR is adding.
Proposed fix
-const QUEUE_MAX_TIME_PER_ROOM = parseInt(process.env.FEDERATION_QUEUE_MAX_TIME_PER_ROOM || '30', 10) * 1000;
+const queueMaxTimePerRoomSeconds = Number(process.env.FEDERATION_QUEUE_MAX_TIME_PER_ROOM ?? '30');
+
+if (!Number.isFinite(queueMaxTimePerRoomSeconds) || queueMaxTimePerRoomSeconds <= 0) {
+ throw new Error('FEDERATION_QUEUE_MAX_TIME_PER_ROOM must be a positive number of seconds');
+}
+
+const QUEUE_MAX_TIME_PER_ROOM = queueMaxTimePerRoomSeconds * 1000;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const QUEUE_MAX_TIME_PER_ROOM = parseInt(process.env.FEDERATION_QUEUE_MAX_TIME_PER_ROOM || '30', 10) * 1000; | |
| const queueMaxTimePerRoomSeconds = Number(process.env.FEDERATION_QUEUE_MAX_TIME_PER_ROOM ?? '30'); | |
| if (!Number.isFinite(queueMaxTimePerRoomSeconds) || queueMaxTimePerRoomSeconds <= 0) { | |
| throw new Error('FEDERATION_QUEUE_MAX_TIME_PER_ROOM must be a positive number of seconds'); | |
| } | |
| const QUEUE_MAX_TIME_PER_ROOM = queueMaxTimePerRoomSeconds * 1000; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/federation-sdk/src/queues/staging-area.queue.ts` at line 10, The
QUEUE_MAX_TIME_PER_ROOM value is parsed from an env var but not validated, so
parseInt returning NaN will silently disable the timeout; change the
initialization for QUEUE_MAX_TIME_PER_ROOM to validate the parsed value (e.g.,
parse process.env.FEDERATION_QUEUE_MAX_TIME_PER_ROOM, convert to number, check
isFinite && >0) and fall back to the default 30 seconds if invalid, then
multiply by 1000; ensure any downstream checks that rely on
QUEUE_MAX_TIME_PER_ROOM (the constant defined with parseInt) behave correctly by
using the validated value.
| const startTime = Date.now(); | ||
|
|
||
| // eslint-disable-next-line no-await-in-loop --- this is valid since this.handler is an async generator | ||
| for await (const _ of this.handler(roomId)) { | ||
| // remove the item from the queue in case it was re-enqueued while processing | ||
| this.queue.delete(roomId); | ||
|
|
||
| const elapsed = Date.now() - startTime; | ||
| if (elapsed > QUEUE_MAX_TIME_PER_ROOM) { | ||
| this.queue.add(roomId); | ||
| break; | ||
| } |
There was a problem hiding this comment.
This only slices between events, not within a long event.
Line 65 regains control only when the handler yields, but packages/federation-sdk/src/services/staging-area.service.ts:95-126 yields after handlePdu(), notify(), and markEventAsUnstaged() finish. A single pathological event can therefore run well past the 30s budget before Line 70 ever fires, which leaves the worst-case starvation path open.
Suggested direction
-type QueueHandler = (roomId: RoomID) => AsyncGenerator<unknown | undefined>;
+type QueueHandler = (roomId: RoomID, deadlineAt: number) => AsyncGenerator<unknown | undefined>;
...
- const startTime = Date.now();
+ const deadlineAt = Date.now() + QUEUE_MAX_TIME_PER_ROOM;
...
- for await (const _ of this.handler(roomId)) {
+ for await (const _ of this.handler(roomId, deadlineAt)) {
// remove the item from the queue in case it was re-enqueued while processing
this.queue.delete(roomId);
- const elapsed = Date.now() - startTime;
- if (elapsed > QUEUE_MAX_TIME_PER_ROOM) {
+ if (Date.now() > deadlineAt) {
this.queue.add(roomId);
break;
}Then have StagingAreaService.processEventForRoom() stop before starting the next event once the deadline has passed.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const startTime = Date.now(); | |
| // eslint-disable-next-line no-await-in-loop --- this is valid since this.handler is an async generator | |
| for await (const _ of this.handler(roomId)) { | |
| // remove the item from the queue in case it was re-enqueued while processing | |
| this.queue.delete(roomId); | |
| const elapsed = Date.now() - startTime; | |
| if (elapsed > QUEUE_MAX_TIME_PER_ROOM) { | |
| this.queue.add(roomId); | |
| break; | |
| } | |
| const deadlineAt = Date.now() + QUEUE_MAX_TIME_PER_ROOM; | |
| // eslint-disable-next-line no-await-in-loop --- this is valid since this.handler is an async generator | |
| for await (const _ of this.handler(roomId, deadlineAt)) { | |
| // remove the item from the queue in case it was re-enqueued while processing | |
| this.queue.delete(roomId); | |
| if (Date.now() > deadlineAt) { | |
| this.queue.add(roomId); | |
| break; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/federation-sdk/src/queues/staging-area.queue.ts` around lines 62 -
73, The loop in staging-area.queue.ts only regains control when this.handler
yields, so long-running single events (inside
StagingAreaService.processEventForRoom which calls handlePdu, notify,
markEventAsUnstaged) can exceed QUEUE_MAX_TIME_PER_ROOM; update the code so the
handler (or processEventForRoom) checks the deadline before starting each event
and yields/returns early when Date.now() - startTime >= QUEUE_MAX_TIME_PER_ROOM
so the queue loop can re-add the room and stop processing further events;
specifically modify StagingAreaService.processEventForRoom (or the generator
used by this.handler) to accept the startTime or deadline and perform a
pre-event check and yield/return to let the outer loop requeue via
queue.add(roomId).
FGA-9
Summary by CodeRabbit