feat: optimistic scheduling#2867
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
…elanger/optimistic-scheduling-2
abelanger5
left a comment
There was a problem hiding this comment.
Need to address some comments, additionally we should add the optimistic scheduling paths to the testing matrix.
| func (d *DispatcherImpl) handleRetries( | ||
| ctx context.Context, | ||
| tenantId string, | ||
| toRetry []*sqlcv1.V1Task, |
There was a problem hiding this comment.
maybe this should also be a []*taskWithPayload for consistency? when I initially did this, I was mostly trying to remove the sqlcv1.V1Task from everywhere I could to make it less confusing what was what
| } | ||
|
|
||
| dispatcherIdToWorkerIdsToStepRuns[dispatcherId][workerId] = append(dispatcherIdToWorkerIdsToStepRuns[dispatcherId][workerId], bulkAssigned.QueueItem.TaskID) | ||
| if !bulkAssigned.IsAssignedLocally { |
There was a problem hiding this comment.
how much risk is there here that we forget to check this flag somewhere and end up with duplicate assigns?
There was a problem hiding this comment.
this should be the only place we instantiate the tasks which get sent to the dispatchers
There was a problem hiding this comment.
Pull request overview
This pull request implements optimistic scheduling, enabling up to 3x performance improvements (24ms → 8ms for single-task workflows) by processing task creation and assignment within a single transaction when workers are connected to the same gRPC session.
Changes:
- Adds optimistic scheduling with semaphore-based concurrency control (configurable via env var, default 10 slots)
- Implements new
OptimisticTxtransaction wrapper with post-commit hooks - Refactors trigger logic to support both transactional and non-transactional paths
- Integrates optimistic scheduling into ingestor and admin services for local task dispatch
- Updates SQL triggers to conditionally insert into concurrency/dag tables
Reviewed changes
Copilot reviewed 33 out of 33 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/repository/trigger.go | Refactored to extract reusable prepareTriggerFrom* and triggerWorkflows methods supporting both transaction modes |
| pkg/repository/optimistic_tx.go | New transaction wrapper with post-commit hook support for optimistic path |
| pkg/repository/scheduler_optimistic.go | New repository implementation for transactional trigger+scheduling operations |
| pkg/scheduling/v1/pool.go | Adds semaphore-based slot management for optimistic scheduling |
| pkg/scheduling/v1/queuer.go | Implements optimistic queue processing within transactions |
| pkg/scheduling/v1/tenant_manager.go | Orchestrates optimistic scheduling with transaction coordination |
| internal/services/ingestor/ingestor_v1.go | Attempts optimistic scheduling for events before falling back to message queue |
| internal/services/admin/v1/server.go | Attempts optimistic scheduling for workflow triggers before falling back to message queue |
| internal/services/dispatcher/dispatcher_v1.go | Adds HandleLocalAssignments for synchronous local worker dispatch |
| cmd/hatchet-migrate/migrate/migrations/20260127201500_v1_0_72.sql | Adds conditional inserts to task trigger to avoid unnecessary queries |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for _, f := range o.postCommit { | ||
| f() | ||
| } |
There was a problem hiding this comment.
The post-commit hooks are executed synchronously after the transaction is committed. If any of these hooks panics or takes a long time, it will block the caller. Since post-commit hooks are typically used for side effects that should not affect transaction success (like sending messages or updating metrics), consider running them in a goroutine or adding error recovery. Additionally, there's no mechanism to handle errors from post-commit hooks - they're fire-and-forget which could lead to silent failures.
| for _, qr := range allQueueResults { | ||
| t.resultsCh <- qr | ||
| } |
There was a problem hiding this comment.
If the transaction commits successfully but sending results to t.resultsCh blocks or panics (line 360), the function will not return the scheduled tasks. The channel write is unbuffered and could block indefinitely if the receiver is slow or not consuming. Consider making this a non-blocking select with a default case or running it in a goroutine to avoid blocking the optimistic scheduling return path.
| // we don't run this in a transaction because workflow versions won't change during the course of this operation | ||
| workflowVersionsByNames, err := r.queries.ListWorkflowsByNames(ctx, r.pool, sqlcv1.ListWorkflowsByNamesParams{ |
There was a problem hiding this comment.
The comment on line 2111 states "we don't run this in a transaction" but this code can now be called with either r.pool or a transaction tx. When called from prepareTriggerFromWorkflowNames with a transaction, this ListWorkflowsByNames query incorrectly uses r.pool instead of the passed tx parameter, breaking transaction isolation. This should use the tx parameter that is passed to the function.
| // we don't run this in a transaction because workflow versions won't change during the course of this operation | |
| workflowVersionsByNames, err := r.queries.ListWorkflowsByNames(ctx, r.pool, sqlcv1.ListWorkflowsByNamesParams{ | |
| // use the provided executor (transaction or pool); workflow versions won't change during the course of this operation | |
| workflowVersionsByNames, err := r.queries.ListWorkflowsByNames(ctx, tx, sqlcv1.ListWorkflowsByNamesParams{ |
| tenantIdWorkflowNameCache := expirable.NewLRU(10000, func(key string, value *sqlcv1.ListWorkflowsByNamesRow) {}, 5*time.Second) | ||
| stepsInWorkflowVersionCache := expirable.NewLRU(10000, func(key string, value []*sqlcv1.ListStepsByWorkflowVersionIdsRow) {}, 5*time.Second) |
There was a problem hiding this comment.
The cache TTLs for the new LRU caches are inconsistent with their usage patterns. tenantIdWorkflowNameCache and stepsInWorkflowVersionCache use 5 second expiry, which is very short and may result in excessive cache misses, especially given these are used in critical performance paths for optimistic scheduling. Consider using a longer TTL (e.g., 5 minutes like stepIdLabelsCache) or making these configurable. The short TTL may undermine the performance benefits of optimistic scheduling.
| tenantIdWorkflowNameCache := expirable.NewLRU(10000, func(key string, value *sqlcv1.ListWorkflowsByNamesRow) {}, 5*time.Second) | |
| stepsInWorkflowVersionCache := expirable.NewLRU(10000, func(key string, value []*sqlcv1.ListStepsByWorkflowVersionIdsRow) {}, 5*time.Second) | |
| tenantIdWorkflowNameCache := expirable.NewLRU(10000, func(key string, value *sqlcv1.ListWorkflowsByNamesRow) {}, 5*time.Minute) | |
| stepsInWorkflowVersionCache := expirable.NewLRU(10000, func(key string, value []*sqlcv1.ListStepsByWorkflowVersionIdsRow) {}, 5*time.Minute) |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 41 out of 41 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| go func() { | ||
| ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | ||
| defer cancel() |
There was a problem hiding this comment.
The goroutine spawned to signal optimistic scheduling results uses context.Background() with a 30-second timeout instead of using the original context. This means if the parent context is cancelled, these signals will still attempt to send for up to 30 seconds. Consider using context.WithTimeout(ctx, 30*time.Second) to respect parent cancellation.
| migrate-strategy: ["latest"] | ||
| rabbitmq-enabled: ["true", "false"] | ||
| pg-version: ["15-alpine"] | ||
| rabbitmq-enabled: ["true"] |
There was a problem hiding this comment.
The test matrix removed rabbitmq-enabled: "false" option and only tests with RabbitMQ enabled. This reduces test coverage and could miss issues that occur specifically when RabbitMQ is disabled. The previous configuration tested both scenarios.
| if !bulkAssigned.IsAssignedLocally { | ||
| dispatcherIdToWorkerIdsToStepRuns[dispatcherId][workerId] = append(dispatcherIdToWorkerIdsToStepRuns[dispatcherId][workerId], bulkAssigned.QueueItem.TaskID) | ||
| } |
There was a problem hiding this comment.
In the optimistic scheduling path, tasks assigned locally skip sending messages to the scheduler via !bulkAssigned.IsAssignedLocally. However, there's no validation that these locally assigned tasks are actually being dispatched. If HandleLocalAssignments fails silently or doesn't cover all error cases, tasks could be "lost" without any retry mechanism.
| if !bulkAssigned.IsAssignedLocally { | |
| dispatcherIdToWorkerIdsToStepRuns[dispatcherId][workerId] = append(dispatcherIdToWorkerIdsToStepRuns[dispatcherId][workerId], bulkAssigned.QueueItem.TaskID) | |
| } | |
| dispatcherIdToWorkerIdsToStepRuns[dispatcherId][workerId] = append( | |
| dispatcherIdToWorkerIdsToStepRuns[dispatcherId][workerId], | |
| bulkAssigned.QueueItem.TaskID, | |
| ) |
| func doCallback(f func()) { | ||
| go func() { | ||
| defer func() { | ||
| recover() // nolint: errcheck |
There was a problem hiding this comment.
The doCallback function uses a bare recover() that silently swallows all panics without logging. This makes debugging very difficult if something goes wrong in the callback. Consider at least logging the recovered value before discarding it.
| go func() { | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| if l != nil { | ||
| l.Error().Interface("panic", r).Msg("panic in callback") | ||
| } | ||
| } | ||
| } | ||
| }() | ||
| }() |
There was a problem hiding this comment.
The callback execution is moved into a goroutine but the panic recovery is also moved into it. This changes the behavior - previously the panic recovery protected the calling code, now it only protects the goroutine. If the callback panics before the goroutine is scheduled, it could crash the caller. Consider whether this behavior change is intentional.
| // store in the cache | ||
| k := fmt.Sprintf("%s:%s", tenantId, workflowVersion.WorkflowName) | ||
|
|
||
| r.tenantIdWorkflowNameCache.Add(k, workflowVersion) |
There was a problem hiding this comment.
The cache lookups now use .Get() which returns (value, bool), but the cache insertion uses .Add() instead of .Set(). The expirable.LRU's .Add() method returns a boolean indicating if an eviction occurred. If this behavior change is significant (e.g., for monitoring cache pressure), it should be handled. Otherwise, this is just a minor API difference to be aware of.
mrkaye97
left a comment
There was a problem hiding this comment.
lgtm, would be great to run the python tests locally with / without optimistic mode on to see how it looks + just confirm
| required: true | ||
| name: Release | ||
| jobs: | ||
| # load: |
There was a problem hiding this comment.
do we want to uncomment/delete these?
* placeholder * feat: db tables for user events (#2862) * feat: db tables for user events * move event payloads to payloads table, fix env var loading * fix: address pr review comments * missed save * feat: optimistic scheduling (#2867) * feat: db tables for user events * move event payloads to payloads table, fix env var loading * refactor: small changes to prepare optimistic txs * feat: optimistic scheduling * address pr review comments * rm comments * fix: rampup test race condition * fix: goleak * feat: grpc-side triggers * fix: config and sem logic * fix: respect optimistic scheduling env var * add optimistic to testing matrix, remove pg-only mode * fix cleanup of pubbuffers * merge migrations * last testing fixes
Description
New implementation to replace #2258
Adds support for "optimistic" scheduling, meaning that if we can create tasks from the gRPC engine with transactional safety, and schedule tasks on workers which are connected to the current gRPC session (these are two separate concepts, referred to in code by
localSchedulerandlocalDispatcher). We allocate a small set of semaphores for that.Features:
task-completed. We can similarly addtask-failedandtask-cancelledin the future.Drawbacks:
Limitations:
nengines are horizontally scales the chances of optimistic scheduling reduce by1/n- we only use local schedulers when they have a lease on a tenant. We will need to build out a sticky load balancing strategy to take advantage of optimistic scheduling in HA setups if we'd like to tackle that.Type of change