Remove dispatch backlog, replace with timeout lock acquisition#3290
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
📝 Documentation updates detected! New suggestion: Update flow control env var from backlog size to lock acquisition timeout Tip: See how your feedback shapes Promptless in Agent Knowledge Base 🧠 |
| stopTime := time.Now().Add(timeout) | ||
| for time.Now().Before(stopTime) { | ||
| if worker.sendMu.TryLock() { | ||
| return true | ||
| } | ||
| time.Sleep(5 * time.Millisecond) // small backoff to avoid busy spinning | ||
| } |
There was a problem hiding this comment.
I don't love this pattern for a few reasons:
- It doesn't respect ordering, so workers can be crowded out if we happen to call
TryLockfrom a different task send - It feels unpredictable what sort of CPU load we'll see from TryLock running this often
Is there a way we can implement the worker mutex as a semaphore on a channel instead, and have a channel call using time.After, and then have something like:
select {
<- worker.sendSemaphore:
<- time.After:
}(Not real code but hopefully clear what I mean)
The semaphore can be guarded on either send or receive.
This should be efficient and also respect ordering?
There was a problem hiding this comment.
That makes sense. I changed it so it uses a chan as a semaphore.
| GRPCWorkerStreamMaxBacklogSize int `mapstructure:"grpcWorkerStreamMaxBacklogSize" json:"grpcWorkerStreamMaxBacklogSize,omitempty" default:"20"` | ||
| // GRPCWorkerMaxLockAcquisitionTimeMS is the maximum number of milliseconds that the dispatcher will wait while attempting | ||
| // to send messages to workers. If it waits longer, the request will be rejected. Default is 250 | ||
| GRPCWorkerMaxWorkerLockAcquisitionTimeMS int `mapstructure:"grpcWorkerMaxLockAcquisitionTimeMS" json:"grpcWorkerMaxLockAcquisitionTimeMS,omitempty" default:"250"` |
There was a problem hiding this comment.
Would it make more sense for this to be a time.Duration? I can't actually remember if viper supports unmarshalling into a time.Duration directly or if we need to parse a string. I think it might have the advantage of making the config slightly more readable
There was a problem hiding this comment.
Looks like Viper does support time.Duration unmarshalling. Fixed!
Benchmark resultsCompared against |
| func (worker *subscribedWorker) tryAcquireSendLockWithTimeout(timeout time.Duration) bool { | ||
| select { | ||
| // attempt to send to the semaphore, blocks on contention because it has a buffer of 1 | ||
| case worker.sendSemaphore <- struct{}{}: | ||
| return true | ||
| // timing out dequeues the semaphore send | ||
| case <-time.After(timeout): | ||
| return false | ||
| } | ||
| } | ||
|
|
||
| func (worker *subscribedWorker) releaseSendLock() { | ||
| <-worker.sendSemaphore | ||
| } |
There was a problem hiding this comment.
this looks much cleaner, nice!
There was a problem hiding this comment.
Pull request overview
This PR replaces the dispatcher’s per-worker “backlog size” flow control with a timeout-based lock acquisition mechanism to prevent unbounded queuing and reject sends when the worker stream is congested.
Changes:
- Replace backlog tracking (
backlogSize/maxBacklogSize) with a per-worker send semaphore + configurable lock acquisition timeout. - Add runtime/server config wiring for the new lock acquisition timeout setting and plumb it through dispatcher options.
- Update dispatcher worker construction to pass the new timeout (with some remaining hard-coded behavior in the v1
Listenendpoint).
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/config/server/server.go | Replaces backlog-size runtime setting with a duration-based lock acquisition timeout and binds new env var. |
| internal/services/dispatcher/subscribed_worker_v1.go | Implements send lock acquisition with timeout and removes backlog counters in worker send paths. |
| internal/services/dispatcher/subscribed_worker.go | Updates subscribedWorker to use a semaphore + timeout instead of a mutex/backlog counters. |
| internal/services/dispatcher/server.go | Passes lock acquisition timeout when constructing subscribed workers (v1 Listen currently hard-coded). |
| internal/services/dispatcher/dispatcher.go | Renames/plumbs dispatcher option from backlog size to lock acquisition timeout (duration). |
| cmd/hatchet-engine/engine/run.go | Wires server runtime config into dispatcher via the new option. |
Comments suppressed due to low confidence (3)
internal/services/dispatcher/subscribed_worker_v1.go:160
sendToWorkerreleases the send lock viadefer worker.releaseSendLock()in the parent goroutine, but the actualstream.SendMsghappens in a spawned goroutine. Ifctx.Done()fires first,sendToWorkerreturns and releases the lock while the send goroutine may still be running, allowing concurrent sends on the same gRPC stream (not thread-safe). Move the lock release into the send goroutine so it is held untilSendMsgcompletes (similar to theCancelTaskpattern).
defer worker.releaseSendLock()
defer lockSpan.End()
telemetry.WithAttributes(span, telemetry.AttributeKV{
Key: "lock.duration_ms",
Value: time.Since(lockBegin).Milliseconds(),
})
_, streamSpan := telemetry.NewSpan(ctx, "send-worker-stream")
defer streamSpan.End()
sendMsgBegin := time.Now()
sentCh := make(chan error, 1)
go func() {
defer close(sentCh)
err = worker.stream.SendMsg(msg)
if err != nil {
span.RecordError(err)
}
if time.Since(sendMsgBegin) > 50*time.Millisecond {
span.SetStatus(codes.Error, "flow control detected")
span.RecordError(fmt.Errorf("send took too long, we may be in flow control: %s", time.Since(sendMsgBegin)))
}
sentCh <- err
}()
select {
case <-ctx.Done():
return fmt.Errorf("context done before send could complete: %w", ctx.Err())
case err = <-sentCh:
return err
}
internal/services/dispatcher/subscribed_worker_v1.go:153
- There is a data race on the outer
errvariable: the send goroutine assigns toerrwhile the parent goroutine can also assign/read it (case err = <-sentCh). Use a goroutine-local variable (e.g.,sendErr) and only communicate it throughsentCh(and use that value forRecordError).
go func() {
defer close(sentCh)
err = worker.stream.SendMsg(msg)
if err != nil {
span.RecordError(err)
}
if time.Since(sendMsgBegin) > 50*time.Millisecond {
span.SetStatus(codes.Error, "flow control detected")
span.RecordError(fmt.Errorf("send took too long, we may be in flow control: %s", time.Since(sendMsgBegin)))
}
sentCh <- err
}()
internal/services/dispatcher/subscribed_worker_v1.go:130
- The lock acquisition telemetry is currently misleading:
lockBeginis set after the lock is already acquired, andlockSpan.End()is deferred until the whole send completes, so neither the span norlock.duration_msreflect acquisition time. Start timing/span before attempting to acquire the lock, then end the acquisition span immediately after it is acquired (recording the actual wait duration).
if !worker.tryAcquireSendLockWithTimeout(worker.sendLockAcquisitionTimeout) {
err = fmt.Errorf("could not acquire worker send mutex, flow control is active")
span.RecordError(err)
span.SetStatus(codes.Error, "flow control is active")
return err
}
lockBegin := time.Now()
_, lockSpan := telemetry.NewSpan(ctx, "acquire-worker-stream-lock")
defer worker.releaseSendLock()
defer lockSpan.End()
telemetry.WithAttributes(span, telemetry.AttributeKV{
Key: "lock.duration_ms",
Value: time.Since(lockBegin).Milliseconds(),
})
You can also share your feedback on Copilot code review. Take the survey.
| select { | ||
| // attempt to send to the semaphore, blocks on contention because it has a buffer of 1 | ||
| case worker.sendSemaphore <- struct{}{}: | ||
| return true | ||
| // timing out dequeues the semaphore send | ||
| case <-time.After(timeout): |
There was a problem hiding this comment.
This advice might be outdated:
Before Go 1.23, this documentation warned that the underlying Timer would not be recovered by the garbage collector until the timer fired, and that if efficiency was a concern, code should use NewTimer instead and call Timer.Stop if the timer is no longer needed. As of Go 1.23, the garbage collector can recover unreferenced, unstopped timers. There is no reason to prefer NewTimer when After will do
There was a problem hiding this comment.
But perhaps it's not a bad idea for the worker to manage a single timer and to cleanup the timer when we return from this method
There was a problem hiding this comment.
Would that work with multiple goroutines at the same time? It's slightly unclear from the documentation.
There was a problem hiding this comment.
hmm, yeah I'm not entirely sure. can we try writing a Go benchmark to see how it performs? if we can do at least ~50k/second without much issue I think we'll be fine
|
📝 Documentation updates detected! Updated existing suggestion: Update flow control env var from backlog size to lock acquisition timeout Tip: Worried about broken links? Ask Promptless to find and fix them automatically 🔗 |
Description
Prior to this change, there was a fixed backlog of roughly 20 simultaneous worker dispatches before they would start erroring out to be rescheduled. This was suboptimal as it was adding an additional buffer on top of the existing gRPC and TCP flow control buffers that exist internally. The
SendMsgcall will only block once 1) TCP buffer is exhausted 2) gRPC buffer is exhausted, thus making the additional worker backlog only activate once 19 additional worker sends were queued after both of those buffers were exhausted. This change makes it so that a timeout controls whether we send tasks back to the scheduler by checking how long it takes to acquire the lock surrounding SendMsg.Fixes # (issue)
Type of change
What's Changed