diff --git a/.github/workflows/node.js.yml b/.github/workflows/node.js.yml index 5cc2680..1a86f01 100644 --- a/.github/workflows/node.js.yml +++ b/.github/workflows/node.js.yml @@ -14,7 +14,5 @@ jobs: - run: npm i - run: npm run build - run: npx pkg-pr-new publish - - run: cd example && npm i && cd .. - run: npm run typecheck - - run: cd example && npm run lint && cd .. - run: npm test diff --git a/eslint.config.js b/eslint.config.js index ad34bb3..4eb2464 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -34,7 +34,7 @@ export default [ ...typescriptEslint.configs["recommended"].rules, ...pluginJs.configs.recommended.rules, "@typescript-eslint/no-floating-promises": "error", - "@typescript-eslint/no-explicit-any": "warn", + "@typescript-eslint/no-explicit-any": "off", // allow (_arg: number) => {} and const _foo = 1; "no-unused-vars": "off", "@typescript-eslint/no-unused-vars": [ diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index 0d97802..6651140 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -11,6 +11,7 @@ import type * as admin from "../admin.js"; import type * as example from "../example.js"; import type * as transcription from "../transcription.js"; +import type * as userConfirmation from "../userConfirmation.js"; import type { ApiFromModules, @@ -30,6 +31,7 @@ declare const fullApi: ApiFromModules<{ admin: typeof admin; example: typeof example; transcription: typeof transcription; + userConfirmation: typeof userConfirmation; }>; declare const fullApiWithMounts: typeof fullApi; @@ -44,30 +46,80 @@ export declare const internal: FilterApi< export declare const components: { workflow: { + event: { + create: FunctionReference< + "mutation", + "internal", + { name: string; workflowId: string }, + string + >; + send: FunctionReference< + "mutation", + "internal", + { + eventId?: string; + name?: string; + result: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + workflowId: string; + workpoolOptions?: { + defaultRetryBehavior?: { + base: number; + initialBackoffMs: number; + maxAttempts: number; + }; + logLevel?: "DEBUG" | "TRACE" | "INFO" | "REPORT" | "WARN" | "ERROR"; + maxParallelism?: number; + retryActionsByDefault?: boolean; + }; + }, + string + >; + }; journal: { load: FunctionReference< "query", "internal", - { workflowId: string }, + { shortCircuit?: boolean; workflowId: string }, { + blocked?: boolean; journalEntries: Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + kind?: "function"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + } + | { + args: { eventId?: string }; + argsSize: number; + completedAt?: number; + eventId?: string; + inProgress: boolean; + kind: "event"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + }; stepNumber: number; workflowId: string; }>; @@ -101,21 +153,38 @@ export declare const components: { | boolean | { base: number; initialBackoffMs: number; maxAttempts: number }; schedulerOptions?: { runAt?: number } | { runAfter?: number }; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + kind?: "function"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + } + | { + args: { eventId?: string }; + argsSize: number; + completedAt?: number; + eventId?: string; + inProgress: boolean; + kind: "event"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + }; }>; workflowId: string; workpoolOptions?: { @@ -132,21 +201,38 @@ export declare const components: { Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + kind?: "function"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + } + | { + args: { eventId?: string }; + argsSize: number; + completedAt?: number; + eventId?: string; + inProgress: boolean; + kind: "event"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + }; stepNumber: number; workflowId: string; }> @@ -199,21 +285,38 @@ export declare const components: { inProgress: Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + kind?: "function"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + } + | { + args: { eventId?: string }; + argsSize: number; + completedAt?: number; + eventId?: string; + inProgress: boolean; + kind: "event"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + }; stepNumber: number; workflowId: string; }>; diff --git a/example/convex/userConfirmation.ts b/example/convex/userConfirmation.ts new file mode 100644 index 0000000..3ba95b9 --- /dev/null +++ b/example/convex/userConfirmation.ts @@ -0,0 +1,50 @@ +import { defineEvent, vWorkflowId } from "@convex-dev/workflow"; +import { v } from "convex/values"; +import { internal } from "./_generated/api"; +import { internalAction, mutation } from "./_generated/server"; +import { workflow } from "./example"; + +const approvalEvent = defineEvent({ + name: "approval", + validator: v.union( + v.object({ approved: v.literal(true), choice: v.number() }), + v.object({ approved: v.literal(false), reason: v.string() }), + ), +}); + +export const confirmationWorkflow = workflow.define({ + args: { prompt: v.string() }, + returns: v.string(), + handler: async (step, args): Promise => { + const proposals = await step.runAction( + internal.userConfirmation.generateProposals, + { prompt: args.prompt }, + { retry: true }, + ); + const approval = await step.awaitEvent(approvalEvent); + if (!approval.approved) { + return "rejected: " + approval.reason; + } + const choice = proposals[approval.choice]; + return choice; + }, +}); + +export const generateProposals = internalAction({ + args: { prompt: v.string() }, + handler: async (ctx, args) => { + // imagine this is a call to an LLM + return ["proposal1", "proposal2", "proposal3"]; + }, +}); + +export const chooseProposal = mutation({ + args: { workflowId: vWorkflowId, choice: v.number() }, + handler: async (ctx, args) => { + await workflow.sendEvent(ctx, args.workflowId, approvalEvent, { + approved: true, + choice: args.choice, + }); + return true; + }, +}); diff --git a/src/client/events.ts b/src/client/events.ts new file mode 100644 index 0000000..bed12bf --- /dev/null +++ b/src/client/events.ts @@ -0,0 +1,35 @@ +import type { EventId, EventSpec, VEventId } from "../types.js"; +import { v, type Infer, type Validator } from "convex/values"; + +/** + * Define a named event with a validator. + * @param spec - The event spec. + * @returns Utility functions to specify type-safe events and results. + */ +export function defineEvent< + Name extends string, + V extends Validator, +>(spec: { + name: Name; + validator?: V; +}): EventSpec> & { + /** + * A validator for the named event ID. + */ + vEventId: VEventId; + /** + * Use this to provide an ID to `awaitEvent` or `sendEvent`. + */ + withId: (id: EventId) => EventSpec>; +} { + return { + ...spec, + withId: (id: EventId) => ({ ...spec, id }), + vEventId: v.string() as VEventId, + }; +} + +export type TypedRunResult = + | { kind: "success"; returnValue: T } + | { kind: "failed"; error: string } + | { kind: "canceled" }; diff --git a/src/client/index.ts b/src/client/index.ts index 2cc5c5b..9aac212 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -15,13 +15,20 @@ import { } from "convex/server"; import type { ObjectType, PropertyValidators, Validator } from "convex/values"; import type { Step } from "../component/schema.js"; -import type { OnCompleteArgs, WorkflowId } from "../types.js"; +import type { + EventId, + EventSpec, + OnCompleteArgs, + WorkflowId, +} from "../types.js"; import { safeFunctionName } from "./safeFunctionName.js"; import type { OpaqueIds, WorkflowComponent, WorkflowStep } from "./types.js"; import { workflowMutation } from "./workflowMutation.js"; +import { parse } from "convex-helpers/validators"; export { vWorkflowId, type WorkflowId } from "../types.js"; export type { RunOptions } from "./types.js"; +export { defineEvent } from "./events.js"; export type CallbackOptions = { /** @@ -56,16 +63,13 @@ export type CallbackOptions = { export type WorkflowDefinition< ArgsValidator extends PropertyValidators, - // eslint-disable-next-line @typescript-eslint/no-explicit-any ReturnsValidator extends Validator | void = any, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ReturnValue extends ReturnValueForOptionalValidator = any, > = { args?: ArgsValidator; handler: ( step: WorkflowStep, args: ObjectType, - ) => Promise; + ) => Promise>; returns?: ReturnsValidator; workpoolOptions?: WorkpoolRetryOptions; }; @@ -93,11 +97,16 @@ export class WorkflowManager { define< ArgsValidator extends PropertyValidators, ReturnsValidator extends Validator | void, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ReturnValue extends ReturnValueForOptionalValidator = any, >( - workflow: WorkflowDefinition, - ): RegisteredMutation<"internal", ObjectType, void> { + workflow: WorkflowDefinition, + ): RegisteredMutation< + "internal", + { + fn: "You should not call this directly, call workflow.start instead"; + args: ObjectType; + }, + void + > { return workflowMutation( this.component, workflow, @@ -116,7 +125,7 @@ export class WorkflowManager { async start>( ctx: RunMutationCtx, workflow: F, - args: FunctionArgs, + args: FunctionArgs["args"], options?: CallbackOptions & { /** * By default, during creation the workflow will be initiated immediately. @@ -204,6 +213,48 @@ export class WorkflowManager { workflowId, }); } + + /** + * Send an event to a workflow. + * + * @param ctx - Either ctx from a mutation/action or a workflow step. + * @param args - The event arguments. + */ + async sendEvent( + ctx: RunMutationCtx, + workflowId: WorkflowId, + args: EventSpec, + ...runResult: T extends null ? [] : [T] + ): Promise> { + let result = { + kind: "success" as const, + returnValue: runResult[0] ?? (null as T), + }; + if (args.validator && result.kind === "success") { + result = { + ...result, + returnValue: parse(args.validator, result.returnValue), + }; + } + return (await ctx.runMutation(this.component.event.send, { + eventId: args.id, + result, + name: args.name, + workflowId: workflowId, + workpoolOptions: this.options?.workpoolOptions, + })) as EventId; + } + + async createEvent( + ctx: RunMutationCtx, + component: WorkflowComponent, + args: { name: Name; workflowId: WorkflowId }, + ): Promise> { + return (await ctx.runMutation(component.event.create, { + name: args.name, + workflowId: args.workflowId, + })) as EventId; + } } type RunQueryCtx = { diff --git a/src/client/step.ts b/src/client/step.ts index 0e8a837..cc0d150 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -18,6 +18,8 @@ import { valueSize, } from "../component/schema.js"; import type { SchedulerOptions, WorkflowComponent } from "./types.js"; +import { MAX_JOURNAL_SIZE } from "../shared.js"; +import type { EventId } from "../types.js"; export type WorkerResult = | { type: "handlerDone"; runResult: RunResult } @@ -25,9 +27,17 @@ export type WorkerResult = export type StepRequest = { name: string; - functionType: FunctionType; - function: FunctionReference; - args: unknown; + target: + | { + kind: "function"; + functionType: FunctionType; + function: FunctionReference; + args: unknown; + } + | { + kind: "event"; + args: { eventId?: EventId }; + }; retry: RetryBehavior | boolean | undefined; schedulerOptions: SchedulerOptions; @@ -35,8 +45,6 @@ export type StepRequest = { reject: (error: unknown) => void; }; -const MAX_JOURNAL_SIZE = 8 << 20; - export class StepExecutor { private journalEntrySize: number; @@ -56,6 +64,7 @@ export class StepExecutor { ); if (this.journalEntrySize > MAX_JOURNAL_SIZE) { + // This should never happen, but we'll throw an error just in case. throw new Error(journalSizeError(this.journalEntrySize, this.workflowId)); } } @@ -76,7 +85,14 @@ export class StepExecutor { const message = await this.receiver.get(); messages.push(message); } - await this.startSteps(messages); + const entries = await this.startSteps(messages); + if (entries.every((entry) => entry.step.runResult)) { + for (let i = 0; i < entries.length; i++) { + const entry = entries[i]; + this.completeMessage(messages[i], entry); + } + continue; + } return { type: "executorBlocked", }; @@ -104,10 +120,12 @@ export class StepExecutor { ); } const stepArgsJson = JSON.stringify(convexToJson(entry.step.args)); - const messageArgsJson = JSON.stringify(convexToJson(message.args as Value)); + const messageArgsJson = JSON.stringify( + convexToJson(message.target.args as Value), + ); if (stepArgsJson !== messageArgsJson) { throw new Error( - `Journal entry mismatch: ${entry.step.args} !== ${message.args}`, + `Journal entry mismatch: ${entry.step.args} !== ${message.target.args}`, ); } if (entry.step.runResult === undefined) { @@ -131,17 +149,29 @@ export class StepExecutor { async startSteps(messages: StepRequest[]): Promise { const steps = await Promise.all( messages.map(async (message) => { - const step = { + const commonFields = { inProgress: true, name: message.name, - functionType: message.functionType, - handle: await createFunctionHandle(message.function), - args: message.args, - argsSize: valueSize(message.args as Value), + args: message.target.args, + argsSize: valueSize(message.target.args as Value), outcome: undefined, startedAt: this.now, completedAt: undefined, }; + const target = message.target; + const step = + target.kind === "function" + ? { + kind: "function" as const, + ...commonFields, + functionType: target.functionType, + handle: await createFunctionHandle(target.function), + } + : { + kind: "event" as const, + ...commonFields, + args: target.args, + }; return { retry: message.retry, schedulerOptions: message.schedulerOptions, diff --git a/src/client/stepContext.ts b/src/client/stepContext.ts index 8145672..f155fbe 100644 --- a/src/client/stepContext.ts +++ b/src/client/stepContext.ts @@ -9,7 +9,8 @@ import { safeFunctionName } from "./safeFunctionName.js"; import type { StepRequest } from "./step.js"; import type { RetryOption } from "@convex-dev/workpool"; import type { RunOptions, WorkflowStep } from "./types.js"; -import type { WorkflowId } from "../types.js"; +import type { EventSpec, WorkflowId } from "../types.js"; +import { parse } from "convex-helpers/validators"; export class StepContext implements WorkflowStep { constructor( @@ -41,6 +42,24 @@ export class StepContext implements WorkflowStep { return this.runFunction("action", action, args, opts); } + async awaitEvent( + event: EventSpec, + ): Promise { + const result = await this.run({ + name: event.name, + target: { + kind: "event", + args: { eventId: event.id }, + }, + retry: undefined, + schedulerOptions: {}, + }); + if (event.validator) { + return parse(event.validator, result); + } + return result as T; + } + private async runFunction< F extends FunctionReference, >( @@ -49,17 +68,27 @@ export class StepContext implements WorkflowStep { args: unknown, opts?: RunOptions & RetryOption, ): Promise { - let send: unknown; - const { name, ...rest } = opts ?? {}; - const { retry, ...schedulerOptions } = rest; - const p = new Promise((resolve, reject) => { - send = this.sender.push({ - name: name ?? safeFunctionName(f), + const { name, retry, ...schedulerOptions } = opts ?? {}; + return this.run({ + name: name ?? safeFunctionName(f), + target: { + kind: "function", functionType, function: f, args, - retry, - schedulerOptions, + }, + retry, + schedulerOptions, + }); + } + + private async run( + request: Omit, + ): Promise { + let send: unknown; + const p = new Promise((resolve, reject) => { + send = this.sender.push({ + ...request, resolve, reject, }); diff --git a/src/client/types.ts b/src/client/types.ts index 9e90065..1632ff8 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -7,7 +7,7 @@ import type { } from "convex/server"; import type { api } from "../component/_generated/api.js"; import type { GenericId } from "convex/values"; -import type { WorkflowId } from "../types.js"; +import type { EventSpec, WorkflowId } from "../types.js"; export type WorkflowComponent = UseApi; @@ -81,6 +81,21 @@ export type WorkflowStep = { args: FunctionArgs, opts?: RunOptions & RetryOption, ): Promise>; + + /** + * Blocks until a matching event is sent to this workflow. + * + * If an ID is specified, an event with that ID must already exist and must + * not already be "awaited" or "consumed". + * + * If a name is specified, the first available event is consumed that matches + * the name. If there is no available event, it will create one with that name + * with status "awaited". + * @param event + */ + awaitEvent( + event: EventSpec, + ): Promise; }; export type UseApi = Expand<{ diff --git a/src/client/workflowMutation.ts b/src/client/workflowMutation.ts index 2355a87..489aa1d 100644 --- a/src/client/workflowMutation.ts +++ b/src/client/workflowMutation.ts @@ -2,7 +2,9 @@ import { BaseChannel } from "async-channel"; import { assert } from "convex-helpers"; import { validate, ValidationError } from "convex-helpers/validators"; import { + createFunctionHandle, internalMutationGeneric, + makeFunctionReference, type RegisteredMutation, } from "convex/server"; import { @@ -22,12 +24,19 @@ import { type RunResult, type WorkpoolOptions } from "@convex-dev/workpool"; import { type WorkflowComponent } from "./types.js"; import { vWorkflowId } from "../types.js"; import { formatErrorWithStack } from "../shared.js"; +import { safeFunctionName } from "./safeFunctionName.js"; -const workflowArgs = v.object({ - workflowId: vWorkflowId, - generationNumber: v.number(), -}); -const INVALID_WORKFLOW_MESSAGE = `Invalid arguments for workflow: Did you invoke the workflow with ctx.runMutation() instead of workflow.start()?`; +const workflowArgs = v.union( + v.object({ + workflowId: vWorkflowId, + generationNumber: v.number(), + }), + v.object({ + fn: v.string(), + args: v.any(), + }), +); +const INVALID_WORKFLOW_MESSAGE = `Invalid arguments for workflow: Did you invoke the workflow with ctx.runMutation() instead of workflow.start()? Pro tip: to start a workflow directly from the CLI or dashboard, you can use args '{ fn: "path/to/file:workflowName", args: { ...your workflow args } }'`; // This function is defined in the calling component but then gets passed by // function handle to the workflow component for execution. This function runs @@ -37,7 +46,14 @@ export function workflowMutation( component: WorkflowComponent, registered: WorkflowDefinition, defaultWorkpoolOptions?: WorkpoolOptions, -): RegisteredMutation<"internal", ObjectType, void> { +): RegisteredMutation< + "internal", + { + fn: "You should not call this directly, call workflow.start instead"; + args: ObjectType; + }, + void +> { const workpoolOptions = { ...defaultWorkpoolOptions, ...registered.workpoolOptions, @@ -47,10 +63,20 @@ export function workflowMutation( if (!validate(workflowArgs, args)) { throw new Error(INVALID_WORKFLOW_MESSAGE); } + if ("fn" in args) { + const fn = makeFunctionReference(args.fn); + const workflowId = await ctx.runMutation(component.workflow.create, { + workflowName: safeFunctionName(fn), + workflowHandle: await createFunctionHandle(fn), + workflowArgs: args.args, + maxParallelism: workpoolOptions.maxParallelism, + }); + return workflowId; + } const { workflowId, generationNumber } = args; const { workflow, logLevel, journalEntries, ok } = await ctx.runQuery( component.journal.load, - { workflowId }, + { workflowId, shortCircuit: true }, ); const inProgress = journalEntries.filter(({ step }) => step.inProgress); const console = createLogger(logLevel); @@ -157,7 +183,6 @@ export function workflowMutation( } } }, - // eslint-disable-next-line @typescript-eslint/no-explicit-any }) as any; } diff --git a/src/component/_generated/api.d.ts b/src/component/_generated/api.d.ts index 2caf60c..32f5453 100644 --- a/src/component/_generated/api.d.ts +++ b/src/component/_generated/api.d.ts @@ -8,6 +8,7 @@ * @module */ +import type * as event from "../event.js"; import type * as journal from "../journal.js"; import type * as logging from "../logging.js"; import type * as model from "../model.js"; @@ -30,6 +31,7 @@ import type { * ``` */ declare const fullApi: ApiFromModules<{ + event: typeof event; journal: typeof journal; logging: typeof logging; model: typeof model; @@ -38,30 +40,80 @@ declare const fullApi: ApiFromModules<{ workflow: typeof workflow; }>; export type Mounts = { + event: { + create: FunctionReference< + "mutation", + "public", + { name: string; workflowId: string }, + string + >; + send: FunctionReference< + "mutation", + "public", + { + eventId?: string; + name?: string; + result: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + workflowId: string; + workpoolOptions?: { + defaultRetryBehavior?: { + base: number; + initialBackoffMs: number; + maxAttempts: number; + }; + logLevel?: "DEBUG" | "TRACE" | "INFO" | "REPORT" | "WARN" | "ERROR"; + maxParallelism?: number; + retryActionsByDefault?: boolean; + }; + }, + string + >; + }; journal: { load: FunctionReference< "query", "public", - { workflowId: string }, + { shortCircuit?: boolean; workflowId: string }, { + blocked?: boolean; journalEntries: Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + kind?: "function"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + } + | { + args: { eventId?: string }; + argsSize: number; + completedAt?: number; + eventId?: string; + inProgress: boolean; + kind: "event"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + }; stepNumber: number; workflowId: string; }>; @@ -95,21 +147,38 @@ export type Mounts = { | boolean | { base: number; initialBackoffMs: number; maxAttempts: number }; schedulerOptions?: { runAt?: number } | { runAfter?: number }; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + kind?: "function"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + } + | { + args: { eventId?: string }; + argsSize: number; + completedAt?: number; + eventId?: string; + inProgress: boolean; + kind: "event"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + }; }>; workflowId: string; workpoolOptions?: { @@ -126,21 +195,38 @@ export type Mounts = { Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + kind?: "function"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + } + | { + args: { eventId?: string }; + argsSize: number; + completedAt?: number; + eventId?: string; + inProgress: boolean; + kind: "event"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + }; stepNumber: number; workflowId: string; }> @@ -193,21 +279,38 @@ export type Mounts = { inProgress: Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + kind?: "function"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + } + | { + args: { eventId?: string }; + argsSize: number; + completedAt?: number; + eventId?: string; + inProgress: boolean; + kind: "event"; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + workId?: string; + }; stepNumber: number; workflowId: string; }>; diff --git a/src/component/event.ts b/src/component/event.ts new file mode 100644 index 0000000..cf5ae72 --- /dev/null +++ b/src/component/event.ts @@ -0,0 +1,197 @@ +// Get event status + +import { v } from "convex/values"; +import { mutation, type MutationCtx } from "./_generated/server.js"; +import { vResultValidator } from "@convex-dev/workpool"; +import type { Doc, Id } from "./_generated/dataModel.js"; +import { assert } from "convex-helpers"; +import { enqueueWorkflow, getWorkpool, workpoolOptions } from "./pool.js"; + +export async function awaitEvent( + ctx: MutationCtx, + entry: Doc<"steps">, + args: { eventId?: Id<"events">; name: string }, +) { + const event = await getOrCreateEvent(ctx, entry.workflowId, args, [ + "sent", + "created", + ]); + switch (event.state.kind) { + case "consumed": { + throw new Error( + `Event already consumed: ${event._id} (${entry.step.name}) in workflow ${entry.workflowId} step ${entry.stepNumber} (${entry._id})`, + ); + } + case "waiting": { + throw new Error( + `Event already waiting: ${event._id} (${entry.step.name}) in workflow ${entry.workflowId} step ${entry.stepNumber} (${entry._id})`, + ); + } + } + + switch (event.state.kind) { + case "sent": { + await ctx.db.patch(event._id, { + state: { + kind: "consumed", + sentAt: event.state.sentAt, + waitingAt: Date.now(), + consumedAt: Date.now(), + stepId: entry._id, + }, + }); + entry.step.runResult = event.state.result; + entry.step.inProgress = false; + entry.step.completedAt = Date.now(); + break; + } + case "created": { + await ctx.db.patch(event._id, { + state: { + kind: "waiting", + waitingAt: Date.now(), + stepId: entry._id, + }, + }); + break; + } + } + assert(entry.step.kind === "event", "Step is not an event"); + entry.step.eventId = event._id; + await ctx.db.replace(entry._id, entry); + // if there's a name, see if there's one to consume. + // if it's there, mark it consumed and swap in the result. + return entry; +} + +async function getOrCreateEvent( + ctx: MutationCtx, + workflowId: Id<"workflows">, + args: { eventId?: Id<"events">; name?: string }, + statuses: Doc<"events">["state"]["kind"][], +): Promise> { + if (args.eventId) { + const event = await ctx.db.get(args.eventId); + if (!event) { + throw new Error( + `Event not found: ${args.eventId} (${args.name}) in workflow ${workflowId}`, + ); + } + return event; + } + assert(args.name, "Name is required if eventId is not specified"); + for (const status of statuses) { + const event = await ctx.db + .query("events") + .withIndex("workflowId_state", (q) => + q.eq("workflowId", workflowId).eq("state.kind", status), + ) + .filter((q) => q.eq(q.field("name"), args.name)) + .first(); + if (event) return event; + } + const eventId = await ctx.db.insert("events", { + workflowId, + name: args.name, + state: { + kind: "created", + }, + }); + return (await ctx.db.get(eventId))!; +} + +export const send = mutation({ + args: { + workflowId: v.id("workflows"), + eventId: v.optional(v.id("events")), + name: v.optional(v.string()), + result: vResultValidator, + workpoolOptions: v.optional(workpoolOptions), + }, + returns: v.id("events"), + handler: async (ctx, args) => { + const event = await getOrCreateEvent( + ctx, + args.workflowId, + { + eventId: args.eventId, + name: args.name, + }, + ["waiting", "created"], + ); + const name = args.name ?? event.name; + switch (event.state.kind) { + case "sent": { + throw new Error( + `Event already sent: ${event._id} (${name}) in workflow ${args.workflowId}`, + ); + } + case "consumed": { + throw new Error( + `Event already consumed: ${event._id} (${name}) in workflow ${args.workflowId}`, + ); + } + case "created": { + await ctx.db.patch(event._id, { + state: { kind: "sent", result: args.result, sentAt: Date.now() }, + }); + break; + } + case "waiting": { + const step = await ctx.db.get(event.state.stepId); + assert( + step, + `Entry ${event.state.stepId} not found when sending event ${event._id} (${name}) in workflow ${args.workflowId}`, + ); + assert(step.step.kind === "event", "Step is not an event"); + step.step.eventId = event._id; + step.step.runResult = args.result; + step.step.inProgress = false; + step.step.completedAt = Date.now(); + await ctx.db.replace(step._id, step); + await ctx.db.patch(event._id, { + state: { + kind: "consumed", + stepId: step._id, + waitingAt: event.state.waitingAt, + sentAt: Date.now(), + consumedAt: Date.now(), + }, + }); + const anyMoreEvents = await ctx.db + .query("events") + .withIndex("workflowId_state", (q) => + q.eq("workflowId", args.workflowId).eq("state.kind", "waiting"), + ) + .order("desc") + .first(); + if (!anyMoreEvents) { + const workflow = await ctx.db.get(args.workflowId); + assert(workflow, `Workflow ${args.workflowId} not found`); + const workpool = await getWorkpool(ctx, args.workpoolOptions); + await enqueueWorkflow(ctx, workflow, workpool); + } + break; + } + } + return event._id; + }, +}); + +export const create = mutation({ + args: { + name: v.string(), + workflowId: v.id("workflows"), + }, + returns: v.id("events"), + handler: async (ctx, args) => { + const eventId = await ctx.db.insert("events", { + workflowId: args.workflowId, + name: args.name, + state: { + kind: "created", + }, + }); + return eventId; + }, +}); diff --git a/src/component/journal.ts b/src/component/journal.ts index df9642e..8175423 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -19,33 +19,54 @@ import { internal } from "./_generated/api.js"; import { type FunctionHandle } from "convex/server"; import { getDefaultLogger } from "./utils.js"; import { assert } from "convex-helpers"; +import { MAX_JOURNAL_SIZE } from "../shared.js"; +import { awaitEvent } from "./event.js"; export const load = query({ args: { workflowId: v.id("workflows"), + shortCircuit: v.optional(v.boolean()), }, returns: v.object({ workflow: workflowDocument, journalEntries: v.array(journalDocument), ok: v.boolean(), logLevel, + blocked: v.optional(v.boolean()), }), - handler: async (ctx, { workflowId }) => { + handler: async (ctx, { workflowId, shortCircuit }) => { const workflow = await ctx.db.get(workflowId); assert(workflow, `Workflow not found: ${workflowId}`); const { logLevel } = await getDefaultLogger(ctx); const journalEntries: JournalEntry[] = []; - let sizeSoFar = 0; + let journalSize = 0; + if (shortCircuit) { + const inProgress = await ctx.db + .query("steps") + .withIndex("inProgress", (q) => + q.eq("step.inProgress", true).eq("workflowId", workflowId), + ) + .first(); + if (inProgress) { + return { + journalEntries: [inProgress], + blocked: true, + workflow, + logLevel, + ok: false, + }; + } + } for await (const entry of ctx.db .query("steps") .withIndex("workflow", (q) => q.eq("workflowId", workflowId))) { journalEntries.push(entry); - sizeSoFar += journalEntrySize(entry); - if (sizeSoFar > 4 * 1024 * 1024) { - return { journalEntries, ok: false, workflow, logLevel }; + journalSize += journalEntrySize(entry); + if (journalSize > MAX_JOURNAL_SIZE) { + return { journalEntries, workflow, logLevel, ok: false }; } } - return { journalEntries, ok: true, workflow, logLevel }; + return { journalEntries, workflow, logLevel, ok: true }; }, }); @@ -91,51 +112,70 @@ export const startSteps = mutation({ const entries = await Promise.all( args.steps.map(async (stepArgs, index) => { const { step, retry, schedulerOptions } = stepArgs; - const { name, handle, args } = step; const stepNumber = stepNumberBase + index; const stepId = await ctx.db.insert("steps", { workflowId: workflow._id, stepNumber, step, }); - const entry = await ctx.db.get(stepId); + let entry = await ctx.db.get(stepId); assert(entry, "Step not found"); - const context: OnCompleteContext = { - generationNumber, - stepId, - }; - let workId: WorkId; - switch (step.functionType) { - case "query": { - workId = await workpool.enqueueQuery( - ctx, - handle as FunctionHandle<"query">, - args, - { context, onComplete, name, ...schedulerOptions }, - ); - break; - } - case "mutation": { - workId = await workpool.enqueueMutation( - ctx, - handle as FunctionHandle<"mutation">, - args, - { context, onComplete, name, ...schedulerOptions }, - ); - break; + const { name } = step; + if (step.kind === "event") { + // Note: This modifies entry in place as well. + entry = await awaitEvent(ctx, entry, { + name, + eventId: step.args.eventId, + }); + if (entry.step.runResult) { + console.event("eventConsumed", { + workflowId: entry.workflowId, + workflowName: workflow.name, + status: entry.step.runResult.kind, + eventName: entry.step.name, + stepNumber: entry.stepNumber, + durationMs: entry.step.completedAt! - entry.step.startedAt, + }); } - case "action": { - workId = await workpool.enqueueAction( - ctx, - handle as FunctionHandle<"action">, - args, - { context, onComplete, name, retry, ...schedulerOptions }, - ); - break; + } else { + const context: OnCompleteContext = { + generationNumber, + stepId, + workpoolOptions: args.workpoolOptions, + }; + let workId: WorkId; + switch (step.functionType) { + case "query": { + workId = await workpool.enqueueQuery( + ctx, + step.handle as FunctionHandle<"query">, + step.args, + { context, onComplete, name, ...schedulerOptions }, + ); + break; + } + case "mutation": { + workId = await workpool.enqueueMutation( + ctx, + step.handle as FunctionHandle<"mutation">, + step.args, + { context, onComplete, name, ...schedulerOptions }, + ); + break; + } + case "action": { + workId = await workpool.enqueueAction( + ctx, + step.handle as FunctionHandle<"action">, + step.args, + { context, onComplete, name, retry, ...schedulerOptions }, + ); + break; + } } + entry.step.workId = workId; + await ctx.db.replace(entry._id, entry); } - entry.step.workId = workId; - await ctx.db.replace(entry._id, entry); console.event("started", { workflowId: workflow._id, diff --git a/src/component/pool.ts b/src/component/pool.ts index 781ad68..0663fa3 100644 --- a/src/component/pool.ts +++ b/src/component/pool.ts @@ -19,6 +19,7 @@ import { logLevel } from "./logging.js"; import { getWorkflow } from "./model.js"; import { getDefaultLogger } from "./utils.js"; import { completeHandler } from "./workflow.js"; +import type { Doc } from "./_generated/dataModel.js"; export const workpoolOptions = v.object({ logLevel: v.optional(logLevel), @@ -53,7 +54,7 @@ export async function getWorkpool( }); } -export const onCompleteContext = v.object({ +const onCompleteContext = v.object({ generationNumber: v.number(), stepId: v.id("steps"), workpoolOptions: v.optional(workpoolOptions), @@ -155,19 +156,28 @@ export const onComplete = internalMutation({ return; } const workpool = await getWorkpool(ctx, args.context.workpoolOptions); - await workpool.enqueueMutation( - ctx, - workflow.workflowHandle as FunctionHandle<"mutation">, - { workflowId: workflow._id, generationNumber }, - { - name: workflow.name, - onComplete: internal.pool.handlerOnComplete, - context: { workflowId, generationNumber }, - }, - ); + await enqueueWorkflow(ctx, workflow, workpool); }, }); +export async function enqueueWorkflow( + ctx: MutationCtx, + workflow: Doc<"workflows">, + workpool: Workpool, +) { + const { _id: workflowId, generationNumber, name, workflowHandle } = workflow; + await workpool.enqueueMutation( + ctx, + workflowHandle as FunctionHandle<"mutation">, + { workflowId, generationNumber }, + { + name, + onComplete: internal.pool.handlerOnComplete, + context: { workflowId, generationNumber }, + }, + ); +} + export type OnComplete = typeof onComplete extends RegisteredAction< "public", diff --git a/src/component/schema.ts b/src/component/schema.ts index c96c0af..749afaf 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -59,19 +59,31 @@ export const workflowDocument = v.object({ }); export type Workflow = Infer; -export const step = v.object({ +const stepCommonFields = { name: v.string(), inProgress: v.boolean(), workId: v.optional(vWorkIdValidator), - functionType: literals("query", "mutation", "action"), - handle: v.string(), argsSize: v.number(), args: v.any(), runResult: v.optional(vResultValidator), - startedAt: v.number(), completedAt: v.optional(v.number()), -}); +}; + +export const step = v.union( + v.object({ + kind: v.optional(v.literal("function")), + functionType: literals("query", "mutation", "action"), + handle: v.string(), + ...stepCommonFields, + }), + v.object({ + kind: v.literal("event"), + ...stepCommonFields, + eventId: v.optional(v.id("events")), + args: v.object({ eventId: v.optional(v.id("events")) }), + }), +); export type Step = Infer; function stepSize(step: Step): number { @@ -81,8 +93,11 @@ function stepSize(step: Step): number { if (step.workId) { size += step.workId.length; } - size += step.functionType.length; - size += step.handle.length; + if (step.kind) size += step.kind.length; + if (step.kind !== "event") { + size += step.functionType.length; + size += step.handle.length; + } size += 8 + step.argsSize; if (step.runResult) { size += resultSize(step.runResult); @@ -115,6 +130,33 @@ export const journalDocument = v.object({ }); export type JournalEntry = Infer; +export const event = { + workflowId: v.id("workflows"), + name: v.string(), + state: v.union( + v.object({ + kind: v.literal("created"), + }), + v.object({ + kind: v.literal("sent"), + result: vResultValidator, + sentAt: v.number(), + }), + v.object({ + kind: v.literal("waiting"), + waitingAt: v.number(), + stepId: v.id("steps"), + }), + v.object({ + kind: v.literal("consumed"), + waitingAt: v.number(), + sentAt: v.number(), + consumedAt: v.number(), + stepId: v.id("steps"), + }), + ), +}; + export default defineSchema({ config: defineTable({ logLevel: v.optional(logLevel), @@ -124,6 +166,10 @@ export default defineSchema({ steps: defineTable(journalObject) .index("workflow", ["workflowId", "stepNumber"]) .index("inProgress", ["step.inProgress", "workflowId"]), + events: defineTable(event).index("workflowId_state", [ + "workflowId", + "state.kind", + ]), onCompleteFailures: defineTable( v.union( v.object({ diff --git a/src/shared.ts b/src/shared.ts index bd4e39d..858ac3b 100644 --- a/src/shared.ts +++ b/src/shared.ts @@ -1,3 +1,5 @@ +export const MAX_JOURNAL_SIZE = 8 << 20; + export function formatErrorWithStack(error: unknown): string { if (error instanceof Error) { return error.toString() + (error.stack ? "\n" + error.stack : ""); diff --git a/src/types.ts b/src/types.ts index a0fb107..86d588d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,9 +1,22 @@ import type { RunResult } from "@convex-dev/workpool"; -import { v, type VString } from "convex/values"; +import { v, type Validator, type VNull, type VString } from "convex/values"; export type WorkflowId = string & { __isWorkflowId: true }; export const vWorkflowId = v.string() as VString; +export type EventId = string & { + __isEventId: true; + __name: Name; +}; +export type VEventId = VString>; +export const vEventId = v.string() as VString>; + +export type EventSpec = { + name: Name; + validator?: Validator; + id?: EventId; +}; + export type OnCompleteArgs = { /** * The ID of the work that completed.