feat(appkit): migrate analytics plugin to TaskFlow#379
Draft
ditadi wants to merge 1 commit into
Draft
Conversation
This was referenced May 12, 2026
14d0870 to
206704d
Compare
c88f075 to
bcda23c
Compare
Analytics is the first production consumer of the TaskFlow surface
added in PRs 3 + 4. The plugin registers an `analytics:query` durable
task, routes OBO vs SP through `runInUserContext`, persists
`statement_submitted` events so recovery can re-attach via
`pollStatement` (the connector split from PR 2), and emits typed
`data` / `recovered` / `statement_submitted` events that PR 6's
frontend SSE client will consume.
End-to-end proof of the integration: the OBO guard, the connector
split, the typed events, and the recovery semantics all meet here.
`analytics/types.ts`:
- `AnalyticsQueryTaskInput` — `{ query_key, params?, isAsUser? }`. The
`isAsUser` discriminator is part of the input (not derived from the
active scope inside the handler) so recovery re-runs route to the
correct OBO / SP branch even when the original `runInUserContext`
frame is gone.
- `AnalyticsQueryTaskResult` — `{ data, columns, statementId?, fromCache? }`.
- `AnalyticsTaskEvents` — `{ statement_submitted, data, recovered }`.
The typed event map is what `executeTask`'s SSE bridge stamps onto
`event: <name>` / `data: <JSON.stringify(payload)>`.
`analytics/analytics.ts` — `~538 LOC net add` covering:
- `setup()` registers the `analytics:query` task with `autoRecover:
false` (OBO recovery cannot be safely auto-retried — the user's
token has a finite lifetime).
- `_runQueryTask(input, ctx)` — task body. Re-derives the OBO branch
from `userContextFromTaskCtx(ctx)` (sidecar carries the
`UserContext` across the FFI), throws with the F6 actionable
message when `input.isAsUser && !userCtx` (recovery without
context: req is a hard error, not a silent fall-through to SP).
- `_runQueryInner(input, ctx)` — recovery-aware execution. Scans
`ctx.previousEvents` for a `statement_submitted` checkpoint; if
found, skips `submitStatement` and re-attaches via
`pollStatement(statementId)` so the warehouse statement isn't
re-issued on crash recovery. Emits `recovered` before doing so for
client observability.
- `_emitDataFrame` — wraps `ctx.emit("data", ...)` with the typed
payload shape so wire-shape changes are caught at compile time.
- `_handleQueryRoute` — HTTP entrypoint goes through `executeTask`
when TaskFlow is enabled; falls back to the legacy `_queryDirect`
path when `createApp({ taskflow: false })`. Same wire-shape on
both paths so clients don't branch.
- `_queryDirect` — preserved verbatim for the opt-out path. Eventually
retire once internal consumers commit to TaskFlow.
- Idempotency-key derivation (`executorKey`) is shared between the
programmatic `query()` and `_handleQueryRoute` so the same logical
query produces the same IK regardless of entrypoint. Identity
comes only from `getCurrentUserContext()` — never from request
headers or settings.
Tests:
- `analytics.test.ts` — refactored for the durable wire. Uses
`createStubTaskflowService` from `tools/test-helpers.ts` (added in
PR 4) so each test exercises the registration + submission shape
without booting the real engine. `~307 LOC delta`.
- `analytics.integration.test.ts` — wire-shape assertions. Verifies
the SSE response carries `event: data` with the expected payload,
`event: statement_submitted` checkpoint, and terminal
`event: completed` framing. `~47 LOC delta`.
- `analytics.recovery.test.ts` (new, 142 LOC) — exercises the
`_runQueryTask` recovery branch with a pre-populated
`ctx.previousEvents` carrying `statement_submitted`, and asserts
the OBO-without-context throw produces the F6 actionable error
message rather than silently submitting as SP.
- `analytics/defaults.ts` removed — the legacy `queryDefaults`
(`PluginExecuteConfig` with retry/cache/timeout knobs) is no
longer consumed: TaskFlow handles those concerns natively.
`context/index.ts`:
- Re-exports `isInUserContext` from the barrel so analytics can
import it via `from "../../context"` alongside
`getCurrentUserContext`. (PR 4 had this temporarily off the
barrel because no consumer existed yet; PR 5 is that consumer.)
Verify:
- `pnpm -r typecheck`, `pnpm build`, `pnpm test` (123 files, 2281
tests) all green.
- `pnpm exec knip` clean — defaults.ts removal closes the
orphaned-file warning.
- `pnpm exec biome check` clean on touched files.
Risk. Performance cliff for sub-500ms cached queries: TaskFlow adds
WAL-write + dedup overhead per invocation, bounded but real. If the
regression is unacceptable in benchmarks, add a `direct: true`
opt-out as a follow-up — the legacy `_queryDirect` path is already
retained for `createApp({ taskflow: false })` and can be exposed
per-call without touching the recovery / OBO surface.
Not in this PR. No frontend changes (no React, no
`subscribeToTaskflowTask` — that's PR 6). No demo plugin. No doc
rewrite.
Stacked on: stack/taskflow/execute-task.
Signed-off-by: ditadi <victordperd@gmail.com>
bcda23c to
9a2926d
Compare
206704d to
13a91d1
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
🥞 Stacked PR
Use this link to review incremental changes.
Analytics is the first production consumer of the TaskFlow surface
added in PRs 3 + 4. The plugin registers an
analytics:querydurabletask, routes OBO vs SP through
runInUserContext, persistsstatement_submittedevents so recovery can re-attach viapollStatement(the connector split from PR 2), and emits typeddata/recovered/statement_submittedevents that PR 6'sfrontend SSE client will consume.
End-to-end proof of the integration: the OBO guard, the connector
split, the typed events, and the recovery semantics all meet here.
analytics/types.ts:AnalyticsQueryTaskInput—{ query_key, params?, isAsUser? }. TheisAsUserdiscriminator is part of the input (not derived from theactive scope inside the handler) so recovery re-runs route to the
correct OBO / SP branch even when the original
runInUserContextframe is gone.
AnalyticsQueryTaskResult—{ data, columns, statementId?, fromCache? }.AnalyticsTaskEvents—{ statement_submitted, data, recovered }.The typed event map is what
executeTask's SSE bridge stamps ontoevent: <name>/data: <JSON.stringify(payload)>.analytics/analytics.ts—~538 LOC net addcovering:setup()registers theanalytics:querytask withautoRecover: false(OBO recovery cannot be safely auto-retried — the user'stoken has a finite lifetime).
_runQueryTask(input, ctx)— task body. Re-derives the OBO branchfrom
userContextFromTaskCtx(ctx)(sidecar carries theUserContextacross the FFI), throws with the F6 actionablemessage when
input.isAsUser && !userCtx(recovery withoutcontext: req is a hard error, not a silent fall-through to SP).
_runQueryInner(input, ctx)— recovery-aware execution. Scansctx.previousEventsfor astatement_submittedcheckpoint; iffound, skips
submitStatementand re-attaches viapollStatement(statementId)so the warehouse statement isn'tre-issued on crash recovery. Emits
recoveredbefore doing so forclient observability.
_emitDataFrame— wrapsctx.emit("data", ...)with the typedpayload shape so wire-shape changes are caught at compile time.
_handleQueryRoute— HTTP entrypoint goes throughexecuteTaskwhen TaskFlow is enabled; falls back to the legacy
_queryDirectpath when
createApp({ taskflow: false }). Same wire-shape onboth paths so clients don't branch.
_queryDirect— preserved verbatim for the opt-out path. Eventuallyretire once internal consumers commit to TaskFlow.
executorKey) is shared between theprogrammatic
query()and_handleQueryRouteso the same logicalquery produces the same IK regardless of entrypoint. Identity
comes only from
getCurrentUserContext()— never from requestheaders or settings.
Tests:
analytics.test.ts— refactored for the durable wire. UsescreateStubTaskflowServicefromtools/test-helpers.ts(added inPR 4) so each test exercises the registration + submission shape
without booting the real engine.
~307 LOC delta.analytics.integration.test.ts— wire-shape assertions. Verifiesthe SSE response carries
event: datawith the expected payload,event: statement_submittedcheckpoint, and terminalevent: completedframing.~47 LOC delta.analytics.recovery.test.ts(new, 142 LOC) — exercises the_runQueryTaskrecovery branch with a pre-populatedctx.previousEventscarryingstatement_submitted, and assertsthe OBO-without-context throw produces the F6 actionable error
message rather than silently submitting as SP.
analytics/defaults.tsremoved — the legacyqueryDefaults(
PluginExecuteConfigwith retry/cache/timeout knobs) is nolonger consumed: TaskFlow handles those concerns natively.
context/index.ts:isInUserContextfrom the barrel so analytics canimport it via
from "../../context"alongsidegetCurrentUserContext. (PR 4 had this temporarily off thebarrel because no consumer existed yet; PR 5 is that consumer.)
Verify:
pnpm -r typecheck,pnpm build,pnpm test(123 files, 2281tests) all green.
pnpm exec knipclean — defaults.ts removal closes theorphaned-file warning.
pnpm exec biome checkclean on touched files.Risk. Performance cliff for sub-500ms cached queries: TaskFlow adds
WAL-write + dedup overhead per invocation, bounded but real. If the
regression is unacceptable in benchmarks, add a
direct: trueopt-out as a follow-up — the legacy
_queryDirectpath is alreadyretained for
createApp({ taskflow: false })and can be exposedper-call without touching the recovery / OBO surface.
Not in this PR. No frontend changes (no React, no
subscribeToTaskflowTask— that's PR 6). No demo plugin. No docrewrite.
Stacked on: stack/taskflow/execute-task.
Signed-off-by: ditadi victordperd@gmail.com