feat: add configurable limit on concurrent bulk dispatch goroutines#6751
feat: add configurable limit on concurrent bulk dispatch goroutines#6751ycombinator wants to merge 6 commits intoelastic:mainfrom
Conversation
When agent count exceeds what the bulk engine can process, goroutines pile up in dispatch() waiting to send on the 32-slot channel. Each blocked goroutine holds its stack plus the bulkT object. With 30k+ agents under upgrade/policy storms, this grows unbounded until OOM. This adds an optional cap (max_pending_dispatches) on concurrent dispatch goroutines. When the limit is reached, new dispatches are rejected immediately with ErrTooManyDispatches, which maps to HTTP 429. Agents retry on their next checkin interval, spreading load over time. The default is 0 (no limit) so existing deployments are unaffected. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
This pull request does not have a backport label. Could you fix it @ycombinator? 🙏
|
michel-laterman
left a comment
There was a problem hiding this comment.
We really should be consistent with the type of maxPendingBulkDispatches; either have it as an int64 in the implementation + config or just an int
Also the new tests don't follow our existing test structures with the use of the require test package
| if resp.err != ErrTooManyBulkDispatches { | ||
| t.Fatalf("expected ErrTooManyBulkDispatches, got: %v", resp.err) | ||
| } |
There was a problem hiding this comment.
As the linter says, errors.Is should be used.
Or the require package for the error check
| if resp.err != nil { | ||
| t.Fatalf("expected no error, got: %v", resp.err) | ||
| } |
There was a problem hiding this comment.
We should add something to AGENTS.md that gets claude to use the require package
internal/pkg/bulk/opt.go
Outdated
| blockQueueSz int | ||
| apikeyMaxParallel int | ||
| apikeyMaxReqSize int | ||
| maxPendingBulkDispatches int |
There was a problem hiding this comment.
Shouldn't this be int64?
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
What is the problem this PR solves?
During a 30k Serverless scale test, 22 of 39 fleet-server pods were OOMKilled. Analysis of the captured pod logs showed:
dispatch()waiting to enqueue onto the bulk engine's channel (capacity 32).bulkTobject. With no upper bound on concurrent dispatches, goroutines piled up until pods hit their memory limit (~154 Mi) and were killed.How does this PR solve the problem?
This adds an optional cap on concurrent dispatch goroutines to bound memory usage.
The limit check runs at the top of
dispatch(), before blocking on the channel:fleet-server/internal/pkg/bulk/engine.go
Lines 608 to 622 in 1f456fb
When the limit is reached, the dispatch is rejected immediately with
ErrTooManyDispatches, which maps to HTTP 429 so agents retry on their next checkin interval:fleet-server/internal/pkg/api/error.go
Lines 181 to 189 in 1f456fb
The limit is configurable via
max_pending_dispatchesin the bulk config:fleet-server/internal/pkg/config/input.go
Line 52 in 1f456fb
The default is 0 (no limit) so existing deployments are unaffected. Operators opt in by setting a value appropriate for their deployment size.
How to test this PR locally
Design Checklist
Checklist
Related issues
🤖 Generated with Claude Code