Skip to content

Support passing state from execute() to finalize() #64

@deepjoy

Description

@deepjoy

Problem

TypedExecutor::finalize() receives the same deserialized payload T as execute(). When execute() computes values that finalize() needs (timestamps, aggregated results, resource handles), there's no built-in way to pass them through.

Current workarounds are all suboptimal:

  1. External shared state (e.g. AtomicI64 on app state) — couples the executor to mutable shared state unnecessarily, especially when max_concurrency(1) makes atomics overkill.
  2. Persist to DB — adds IO and schema for what's fundamentally in-process state transfer.
  3. Encode into the task payload — not possible since the payload is immutable after submission.

Example

An orchestrator records scan_start_ns in execute(), spawns children, and needs that timestamp in finalize() for post-scan cleanup:

impl TypedExecutor<ScanL1Task> for ScanL1Executor {
    async fn execute(&self, task: ScanL1Task, ctx: &TaskContext) -> Result<(), TaskError> {
        let scan_start_ns = now_ns();
        // spawn children...
        // How does finalize() get scan_start_ns?
        Ok(())
    }

    async fn finalize(&self, task: ScanL1Task, ctx: &TaskContext) -> Result<(), TaskError> {
        // Need scan_start_ns here, but task payload doesn't have it
        // and there's no state bridge from execute()
        Ok(())
    }
}

Proposals

Option A: Executor-scoped state bag on TaskContext

Let execute() store arbitrary typed values that finalize() and on_cancel() can retrieve:

// In execute():
ctx.set_state(scan_start_ns);

// In finalize():
let scan_start_ns = ctx.state::<i64>().unwrap();

Requires TaskContext to carry a per-task-instance TypeMap that survives the execute → waiting → finalize lifecycle. Ergonomic but in-process only — lost on restart.

Option B: TaskContext::set_memo() / memo() with JSON persistence

Store a JSON blob on the task record that persists through the waiting state:

// In execute():
ctx.set_memo(&serde_json::json!({ "scan_start_ns": scan_start_ns }))?;

// In finalize():
let memo: serde_json::Value = ctx.memo()?;
let scan_start_ns = memo["scan_start_ns"].as_i64().unwrap();

Persisted to SQLite alongside the task record, so it survives restarts too.

Option C: Return state from execute()

Change the return type to allow an optional state blob:

fn execute(&self, payload: T, ctx: &TaskContext) -> Result<Option<Vec<u8>>, TaskError>;
fn finalize(&self, payload: T, ctx: &TaskContext, state: Option<&[u8]>) -> Result<(), TaskError>;

More explicit but less ergonomic and breaks the current trait signature.

Preference

Option A is the most ergonomic for in-process use. Option B is the most robust (survives restarts). Either would unblock the orchestrator pattern cleanly.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions