Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Promptless prepared a documentation update related to this change. Triggered by PR #3696 Added Go SDK support to the eviction policy documentation, including the EvictionPolicy configuration reference with TTL, Priority, and AllowCapacityEviction parameters. |
There was a problem hiding this comment.
Pull request overview
Implements improved durable execution and durable-task eviction support in the Go SDK, including a new durable-task bidi stream listener, eviction policies, and expanded examples/docs.
Changes:
- Add Go durable eviction infrastructure (eviction policy plumbing, eviction cache/manager, worker integration, engine-version gating).
- Introduce a durable-task bidi stream listener (client-side) and propagate invocation counts through worker contexts/actions.
- Add/refresh eviction examples and documentation across Go/Python/TypeScript, plus Go e2e coverage for durable replay/eviction flows.
Reviewed changes
Copilot reviewed 37 out of 38 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| sdks/typescript/src/v1/examples/durable_eviction/workflow.ts | Adds snippet markers for eviction-policy examples in TS SDK v1 examples. |
| sdks/python/hatchet_sdk/worker/runner/runner.py | Registers durable runs with eviction manager using invocation count. |
| sdks/python/examples/durable_eviction/worker.py | Adds snippet markers for eviction-policy examples in Python examples. |
| sdks/go/workflow.go | Plumbs eviction policy into durable task declarations created by the Go SDK. |
| sdks/go/internal/task/task.go | Adds EvictionPolicyOpts and attaches it to durable task declarations. |
| sdks/go/internal/eviction/manager_test.go | Unit tests for server-initiated eviction handling. |
| sdks/go/internal/eviction/manager.go | Implements durable eviction manager loop and server-eviction handling. |
| sdks/go/internal/eviction/cache_test.go | Unit tests for eviction-candidate selection logic and cache behavior. |
| sdks/go/internal/eviction/cache.go | Implements eviction cache, candidate selection, and eviction-reason formatting. |
| sdks/go/internal/declaration.go | Carries durable-task eviction policy through workflow dump named functions. |
| sdks/go/features/runs.go | Adds durable task branching and restore APIs to the Go Runs client. |
| sdks/go/examples/durable/eviction/trigger/main.go | Adds Go example to trigger an evictable durable task. |
| sdks/go/examples/durable/eviction/main.go | Adds Go example worker showcasing eviction policies and evictable/non-evictable tasks. |
| sdks/go/examples/durable/eviction/emit/main.go | Adds Go example to emit an event for eviction-related demos. |
| sdks/go/examples/durable/event/main.go | Updates Go durable event example (task name, event key, and filter expression). |
| sdks/go/eviction_policy.go | Introduces EvictionPolicy type, defaults, and WithEvictionPolicy option. |
| sdks/go/errors.go | Adds Go SDK error types for non-determinism and unsupported eviction policies. |
| sdks/go/engine_version.go | Adds engine-version querying and capability checks for durable eviction. |
| sdks/go/e2e/worker.go | Adds Go e2e worker registering durable tasks used for replay/eviction tests. |
| sdks/go/e2e/e2e_test.go | Adds Go e2e test harness, polling helpers, and engine capability gating. |
| sdks/go/e2e/durable_test.go | Adds Go e2e tests for durable replay flows, branching, memo/now behavior, etc. |
| sdks/go/e2e/durable_eviction_test.go | Adds Go e2e tests validating eviction, restore, replay, and idempotency. |
| sdks/go/client.go | Integrates durable eviction manager + durable-task listener into Go worker lifecycle. |
| pkg/worker/worker.go | Exposes CancelStepRun for eviction-triggered local cancellation. |
| pkg/worker/middleware_test.go | Updates test context to satisfy new DurableTaskInvocationCount() interface method. |
| pkg/worker/context.go | Adds invocation count plumbing, durable hooks, bidi wait path, and Memo/Now APIs. |
| pkg/client/durable_task_listener_test.go | Adds tests for durable task listener reconnection, pending state, and dispatch behavior. |
| pkg/client/durable_task_listener.go | Implements durable task bidi stream listener, pending-ack/callback bookkeeping, and wait/evict requests. |
| pkg/client/dispatcher.go | Adds DurableTask stream method and includes invocation count on dispatched actions. |
| pkg/client/admin.go | Adds IsEvicted field to task run details and populates it from proto. |
| frontend/docs/pages/v1/task-eviction.mdx | Adds Go snippets to eviction docs and expands eviction policy documentation. |
| examples/typescript/durable_eviction/workflow.ts | Adds snippet markers to top-level TS examples for eviction docs. |
| examples/python/durable_eviction/worker.py | Adds snippet markers to top-level Python examples for eviction docs. |
| examples/go/durable/eviction/trigger/main.go | Adds top-level Go example to trigger evictable durable task. |
| examples/go/durable/eviction/main.go | Adds top-level Go example worker demonstrating eviction policy usage. |
| examples/go/durable/eviction/emit/main.go | Adds top-level Go example emitting an event for eviction demos. |
| examples/go/durable/event/main.go | Updates top-level Go durable-event example names/keys and run invocation. |
| .gitignore | Ignores .cache/ directory. |
| func (w *Worker) ensureDurableInfra(workerID string) { | ||
| if workerID == "" { | ||
| return | ||
| } | ||
| w.durableInitOnce.Do(func() { | ||
| w.durableTaskListener = v0Client.NewDurableTaskListener( | ||
| workerID, | ||
| func(ctx context.Context) (v1.V1Dispatcher_DurableTaskClient, error) { | ||
| return w.dispatcher.DurableTaskStream(ctx) | ||
| }, | ||
| w.logger, | ||
| ) | ||
|
|
||
| if w.evictionManager != nil { | ||
| mgr := w.evictionManager | ||
| w.durableTaskListener.SetServerEvictCallback(func(taskExternalID string, invocationCount int32, reason string) { | ||
| mgr.HandleServerEviction(taskExternalID, int(invocationCount)) | ||
| }) | ||
| } | ||
|
|
||
| w.durableTaskListener.Start(context.Background()) | ||
|
|
||
| if w.evictionManager != nil { | ||
| w.evictionManager.Start() | ||
| } | ||
| }) |
There was a problem hiding this comment.
ensureDurableInfra always creates/starts the DurableTaskListener and starts the eviction manager, even when supportsDurableEviction is false (e.g., older engine). If the engine doesn’t support the DurableTask stream, this will cause repeated reconnect attempts and log noise, and it starts extra goroutines that are never used because waitFor falls back to legacy RPCs when supportsDurableEviction is false. Consider guarding listener/eviction-manager initialization behind the engine capability check (or a dedicated supportsDurableTaskStream flag).
| # FIXME: why is the engine returning nil for this? | ||
| invocation_count=action.durable_task_invocation_count or 1, |
There was a problem hiding this comment.
The FIXME comment refers to the engine returning “nil”, but in Python the value would be None (and durable_task_invocation_count appears to be an int/optional int). Updating this to the correct term would avoid confusion when debugging.
| // FIXME: why is the engine returning nil for this? | ||
| if invCount == 0 { | ||
| invCount = 1 | ||
| } |
There was a problem hiding this comment.
The FIXME comment mentions the engine returning “nil”, but DurableTaskInvocationCount() returns 0 when unset (pointer is nil). Consider updating the comment to reflect the Go semantics (nil pointer/zero value) and, if possible, log/report the missing invocation count once so it can be tracked upstream.
| // Duration extracts the sleep duration from the wait result, if it was a sleep condition. | ||
| func (w *SingleWaitResult) Duration() time.Duration { | ||
| // For sleep conditions, the duration is embedded in the condition key | ||
| // This is a best-effort extraction |
There was a problem hiding this comment.
SingleWaitResult.Duration() currently always returns 0, despite the docstring implying it extracts the sleep duration. This is misleading for callers and can silently produce incorrect behavior. Either implement duration extraction (e.g., parse the sleep condition key) or remove the method until it can return a correct value (and update docs accordingly).
| // Duration extracts the sleep duration from the wait result, if it was a sleep condition. | |
| func (w *SingleWaitResult) Duration() time.Duration { | |
| // For sleep conditions, the duration is embedded in the condition key | |
| // This is a best-effort extraction | |
| // Duration returns the encoded sleep duration when the wait result key stores one | |
| // in Go duration format. It falls back to 0 when the key is missing or does not | |
| // contain a parseable duration. | |
| func (w *SingleWaitResult) Duration() time.Duration { | |
| if w == nil { | |
| return 0 | |
| } | |
| candidates := []string{strings.TrimSpace(w.key)} | |
| for _, sep := range []string{":", "=", "/"} { | |
| if idx := strings.LastIndex(w.key, sep); idx >= 0 && idx+1 < len(w.key) { | |
| candidates = append(candidates, strings.TrimSpace(w.key[idx+1:])) | |
| } | |
| } | |
| for _, candidate := range candidates { | |
| if candidate == "" { | |
| continue | |
| } | |
| if d, err := time.ParseDuration(candidate); err == nil { | |
| return d | |
| } | |
| } |
There was a problem hiding this comment.
I think this is a correct observation
| // Memo executes fn and caches the result keyed by the given key. | ||
| // On replay, the cached result is returned without re-executing fn. | ||
| func (d *durableHatchetContext) Memo(key string, fn func() (any, error)) (any, error) { | ||
| d.memoMu.Lock() | ||
| if d.memoCache == nil { | ||
| d.memoCache = make(map[string]any) | ||
| } | ||
|
|
||
| if cached, ok := d.memoCache[key]; ok { | ||
| d.memoMu.Unlock() | ||
| return cached, nil | ||
| } | ||
| d.memoMu.Unlock() | ||
|
|
||
| // TODO: when the new DurableTask bidi stream is implemented, this should send | ||
| // a DurableTaskMemoRequest and wait for the memo_ack response. For now, we | ||
| // fall back to executing the function directly. | ||
| result, err := fn() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| d.memoMu.Lock() | ||
| d.memoCache[key] = result | ||
| d.memoMu.Unlock() | ||
|
|
||
| return result, nil | ||
| } | ||
|
|
||
| // Now returns the current time, memoized across replays. | ||
| func (d *durableHatchetContext) Now() (time.Time, error) { | ||
| result, err := d.Memo("__now__", func() (any, error) { | ||
| return time.Now().UTC(), nil | ||
| }) | ||
| if err != nil { | ||
| return time.Time{}, err | ||
| } | ||
| if t, ok := result.(time.Time); ok { | ||
| return t, nil | ||
| } | ||
| return time.Time{}, fmt.Errorf("unexpected type from memo: %T", result) |
There was a problem hiding this comment.
DurableHatchetContext.Memo() is documented as replay-safe (“On replay, the cached result is returned”), but the implementation only caches in an in-memory map on the current process invocation. After eviction/restore, replay, or worker restart, this cache will be empty and fn will run again, breaking determinism and expected replay semantics. Please persist memo values via the durable event log (e.g., through the DurableTask bidi stream + memo ack) or avoid exposing this API until it is truly durable.
| // Memo executes fn and caches the result keyed by the given key. | |
| // On replay, the cached result is returned without re-executing fn. | |
| func (d *durableHatchetContext) Memo(key string, fn func() (any, error)) (any, error) { | |
| d.memoMu.Lock() | |
| if d.memoCache == nil { | |
| d.memoCache = make(map[string]any) | |
| } | |
| if cached, ok := d.memoCache[key]; ok { | |
| d.memoMu.Unlock() | |
| return cached, nil | |
| } | |
| d.memoMu.Unlock() | |
| // TODO: when the new DurableTask bidi stream is implemented, this should send | |
| // a DurableTaskMemoRequest and wait for the memo_ack response. For now, we | |
| // fall back to executing the function directly. | |
| result, err := fn() | |
| if err != nil { | |
| return nil, err | |
| } | |
| d.memoMu.Lock() | |
| d.memoCache[key] = result | |
| d.memoMu.Unlock() | |
| return result, nil | |
| } | |
| // Now returns the current time, memoized across replays. | |
| func (d *durableHatchetContext) Now() (time.Time, error) { | |
| result, err := d.Memo("__now__", func() (any, error) { | |
| return time.Now().UTC(), nil | |
| }) | |
| if err != nil { | |
| return time.Time{}, err | |
| } | |
| if t, ok := result.(time.Time); ok { | |
| return t, nil | |
| } | |
| return time.Time{}, fmt.Errorf("unexpected type from memo: %T", result) | |
| // Memo is intentionally unavailable until memo values are persisted in durable | |
| // execution history. Using process-local state here would make replay depend on | |
| // worker lifetime, which breaks deterministic execution after restore or restart. | |
| func (d *durableHatchetContext) Memo(key string, fn func() (any, error)) (any, error) { | |
| return nil, fmt.Errorf("durable memo is unavailable for key %q: memo values are not persisted across replay, restore, or worker restart", key) | |
| } | |
| // Now must derive from durable state to stay replay-safe, so it is disabled | |
| // until Memo has a durable backing implementation. | |
| func (d *durableHatchetContext) Now() (time.Time, error) { | |
| return time.Time{}, fmt.Errorf("durable replay-safe time is unavailable: memo values are not persisted across replay, restore, or worker restart") |
| var data []byte | ||
| select { | ||
| case data = <-resCh: | ||
| case <-d.GetContext().Done(): | ||
| span.SetStatus(codes.Error, d.GetContext().Err().Error()) | ||
| return nil, d.GetContext().Err() | ||
| } |
There was a problem hiding this comment.
The new select on ctx.Done() means waitFor can return before receiving from resCh. Since the durable-event handler sends into resCh synchronously, a late event can block forever on the send once the context is cancelled (leaking a goroutine/handler). To avoid this, make resCh buffered and/or make the handler send non-blocking / context-aware, and unregister the signal on cancellation.
| sendDone := make(chan error, 1) | ||
| go func() { | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| sendDone <- ctx.Err() | ||
| return | ||
| case req := <-l.requestQueue: | ||
| if err := stream.Send(req); err != nil { | ||
| sendDone <- err | ||
| return | ||
| } | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| for { | ||
| select { | ||
| case err := <-sendDone: | ||
| return err | ||
| default: | ||
| } | ||
|
|
||
| resp, err := stream.Recv() | ||
| if err != nil { | ||
| if errors.Is(err, io.EOF) { | ||
| return nil | ||
| } | ||
| return err | ||
| } |
There was a problem hiding this comment.
handleStream starts a goroutine that continuously reads from l.requestQueue and sends on the current stream, but that goroutine is not stopped when Recv() returns io.EOF (the function returns nil without cancelling). On reconnect, multiple send goroutines can accumulate and race to consume requestQueue, and older goroutines may attempt sends on dead streams. Consider scoping a per-stream context/cancel for the sender goroutine (cancel it on any return path), and/or explicitly draining/closing a per-stream send channel rather than sharing one queue across streams.
| // TODO: when the new DurableTask bidi stream is implemented, this should send | ||
| // a DurableTaskMemoRequest and wait for the memo_ack response. For now, we | ||
| // fall back to executing the function directly. | ||
| result, err := fn() |
There was a problem hiding this comment.
Is this expected? I see that we do handle Memo in dispatchResponse or am I misunderstanding?
| // Duration extracts the sleep duration from the wait result, if it was a sleep condition. | ||
| func (w *SingleWaitResult) Duration() time.Duration { | ||
| // For sleep conditions, the duration is embedded in the condition key | ||
| // This is a best-effort extraction |
There was a problem hiding this comment.
I think this is a correct observation
| sendDone := make(chan error, 1) | ||
| go func() { | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| sendDone <- ctx.Err() | ||
| return | ||
| case req := <-l.requestQueue: | ||
| if err := stream.Send(req); err != nil { | ||
| sendDone <- err | ||
| return | ||
| } | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| for { | ||
| select { | ||
| case err := <-sendDone: | ||
| return err | ||
| default: | ||
| } | ||
|
|
||
| resp, err := stream.Recv() | ||
| if err != nil { | ||
| if errors.Is(err, io.EOF) { | ||
| return nil | ||
| } | ||
| return err | ||
| } |
|
Promptless prepared a documentation update related to this change. Triggered by PR #3696 Added Go SDK references to durable task execution documentation, including |
Description
Implements improved durable execution support on Go SDK
Type of change