From 7d3ac07408bb721d29abb9273604628cf6fa0132 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Aug 2024 11:54:29 -0700 Subject: [PATCH 1/9] Split out `defineFlow` into itself + `defineStreamingFlow`. --- js/flow/src/context.ts | 13 +- js/flow/src/experimental.ts | 26 +- js/flow/src/flow.ts | 461 +++++++++++------- js/flow/src/index.ts | 6 +- js/flow/src/types.ts | 2 +- js/flow/tests/flow_test.ts | 20 +- js/plugins/firebase/src/experimental.ts | 19 +- js/plugins/firebase/src/functions.ts | 20 +- .../functions/src/index.ts | 6 +- 9 files changed, 337 insertions(+), 236 deletions(-) diff --git a/js/flow/src/context.ts b/js/flow/src/context.ts index 91a5c51287..5a6fdadddf 100644 --- a/js/flow/src/context.ts +++ b/js/flow/src/context.ts @@ -190,18 +190,15 @@ export class Context< } // Sleep for the specified number of seconds. - async sleep( - stepName: string, - seconds: number - ): Promise { + async sleep(stepName: string, seconds: number): Promise { const resolvedStepName = this.resolveStepName(stepName); if (this.isCached(resolvedStepName)) { setCustomMetadataAttribute(metadataPrefix('state'), 'skipped'); return this.getCached(resolvedStepName); } - await this.flow.scheduler( - this.flow, + await this.flow.scheduler?.( + this.flow as unknown as Flow, { runScheduled: { flowId: this.flowId, @@ -244,8 +241,8 @@ export class Context< this.updateCachedValue(resolvedStepName, states); return ops; } - await this.flow.scheduler( - this.flow, + await this.flow.scheduler?.( + this.flow as unknown as Flow, { runScheduled: { flowId: this.flowId, diff --git a/js/flow/src/experimental.ts b/js/flow/src/experimental.ts index 790461eb50..30a14d9c5b 100644 --- a/js/flow/src/experimental.ts +++ b/js/flow/src/experimental.ts @@ -24,8 +24,8 @@ import { FlowStillRunningError, } from './errors.js'; import { + CallableFlow, Flow, - FlowWrapper, RunStepConfig, StepsFunction, defineFlow, @@ -39,30 +39,27 @@ import { getActiveContext } from './utils.js'; export function durableFlow< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, - S extends z.ZodTypeAny = z.ZodTypeAny, >( config: { name: string; inputSchema?: I; outputSchema?: O; - streamSchema?: S; - invoker?: Invoker; - scheduler?: Scheduler; + invoker?: Invoker; + scheduler?: Scheduler; }, - steps: StepsFunction -): Flow { + steps: StepsFunction +): Flow { return defineFlow( { name: config.name, inputSchema: config.inputSchema, outputSchema: config.outputSchema, - streamSchema: config.streamSchema, invoker: config.invoker, experimentalScheduler: config.scheduler, experimentalDurable: true, }, steps - ); + ).flow; } /** @@ -71,9 +68,8 @@ export function durableFlow< export async function scheduleFlow< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, - S extends z.ZodTypeAny = z.ZodTypeAny, >( - flow: Flow | FlowWrapper, + flow: Flow | CallableFlow, payload: z.infer, delaySeconds?: number ): Promise { @@ -97,7 +93,7 @@ export async function resumeFlow< O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, >( - flow: Flow | FlowWrapper, + flow: Flow | CallableFlow, flowId: string, payload: any ): Promise { @@ -118,9 +114,8 @@ export async function resumeFlow< export async function getFlowState< I extends z.ZodTypeAny, O extends z.ZodTypeAny, - S extends z.ZodTypeAny, >( - flow: Flow | FlowWrapper, + flow: Flow | CallableFlow, flowId: string ): Promise { if (!(flow instanceof Flow)) { @@ -163,9 +158,8 @@ export function runAction( export async function waitFlowToComplete< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, - S extends z.ZodTypeAny = z.ZodTypeAny, >( - flow: Flow | FlowWrapper, + flow: Flow | CallableFlow, flowId: string ): Promise> { if (!(flow instanceof Flow)) { diff --git a/js/flow/src/flow.ts b/js/flow/src/flow.ts index 1eb51fb7b3..fa9e09fa18 100644 --- a/js/flow/src/flow.ts +++ b/js/flow/src/flow.ts @@ -102,32 +102,123 @@ export interface __RequestWithAuth extends express.Request { } /** - * Defines the flow. + * Base configuration for a flow. */ -export function defineFlow< +export interface BaseFlowConfig< + I extends z.ZodTypeAny = z.ZodTypeAny, + O extends z.ZodTypeAny = z.ZodTypeAny, + S extends z.ZodTypeAny = z.ZodTypeAny, +> { + /** Name of the flow. */ + name: string; + /** Schema of the input to the flow. */ + inputSchema?: I; + /** Schema of the output from the flow. */ + outputSchema?: O; + /** Auth policy. */ + authPolicy?: FlowAuthPolicy; + /** Middleware for HTTP requests. Not called for direct invocations. */ + middleware?: express.RequestHandler[]; + /** Invoker for the flow. Defaults to local dispatcher. */ + invoker?: Invoker; +} + +/** + * Configuration for a non-streaming flow. + */ +export interface FlowConfig< + I extends z.ZodTypeAny = z.ZodTypeAny, + O extends z.ZodTypeAny = z.ZodTypeAny, +> extends BaseFlowConfig { + /** Experimental. Whether the flow is durable. */ + experimentalDurable?: boolean; + /** Experimental. Scheduler for a durable flow. `experimentalDurable` must be true. */ + experimentalScheduler?: Scheduler; +} + +/** + * Configuration for a streaming flow. + */ +export interface StreamingFlowConfig< + I extends z.ZodTypeAny = z.ZodTypeAny, + O extends z.ZodTypeAny = z.ZodTypeAny, + S extends z.ZodTypeAny = z.ZodTypeAny, +> extends BaseFlowConfig { + /** Schema of the streaming chunks from the flow. */ + streamSchema?: S; +} + +/** + * Non-streaming flow that can be called directly like a function. + */ +export interface CallableFlow< + I extends z.ZodTypeAny = z.ZodTypeAny, + O extends z.ZodTypeAny = z.ZodTypeAny, +> { + ( + input?: z.infer, + opts?: { withLocalAuthContext?: unknown } + ): Promise>; + flow: Flow; +} + +/** + * Streaming flow that can be called directly like a function. + */ +export interface StreamableFlow< + I extends z.ZodTypeAny = z.ZodTypeAny, + O extends z.ZodTypeAny = z.ZodTypeAny, + S extends z.ZodTypeAny = z.ZodTypeAny, +> { + ( + input?: z.infer, + opts?: { withLocalAuthContext?: unknown } + ): StreamingResponse; + flow: Flow; +} + +/** + * Streaming response from a streaming flow. + */ +interface StreamingResponse< + O extends z.ZodTypeAny = z.ZodTypeAny, + S extends z.ZodTypeAny = z.ZodTypeAny, +> { + stream(): AsyncGenerator | undefined>; + output(): Promise>; +} + +/** + * Function to be executed in the flow. + */ +export type StepsFunction< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, +> = ( + /** Input to the flow. */ + input: z.infer, + /** Streaming callback for streaming flows only. */ + streamingCallback?: S extends z.ZodVoid + ? undefined + : StreamingCallback> +) => Promise>; + +/** + * Defines a non-streaming flow. + */ +export function defineFlow< + I extends z.ZodTypeAny = z.ZodTypeAny, + O extends z.ZodTypeAny = z.ZodTypeAny, >( - config: { - name: string; - inputSchema?: I; - outputSchema?: O; - streamSchema?: S; - authPolicy?: FlowAuthPolicy; - middleware?: express.RequestHandler[]; - invoker?: Invoker; - experimentalDurable?: boolean; - experimentalScheduler?: Scheduler; - }, - steps: StepsFunction -): Flow { - const f = new Flow( + config: FlowConfig, + steps: StepsFunction +): CallableFlow { + const f = new Flow( { name: config.name, inputSchema: config.inputSchema, outputSchema: config.outputSchema, - streamSchema: config.streamSchema, experimentalDurable: !!config.experimentalDurable, stateStore: globalConfig ? () => globalConfig.getFlowStateStore() @@ -135,11 +226,11 @@ export function defineFlow< authPolicy: config.authPolicy, middleware: config.middleware, // We always use local dispatcher in dev mode or when one is not provided. - invoker: async (flow, msg, streamingCallback) => { + invoker: async (flow, msg) => { if (!isDevEnv() && config.invoker) { - return config.invoker(flow, msg, streamingCallback); + return config.invoker(flow, msg); } - const state = await flow.runEnvelope(msg, streamingCallback); + const state = await flow.runEnvelope(msg); return state.operation; }, scheduler: async (flow, msg, delay = 0) => { @@ -158,15 +249,53 @@ export function defineFlow< ); createdFlows().push(f); wrapAsAction(f); - return f; + const callableFlow: CallableFlow = async (input, opts) => { + return f.run(input, opts); + }; + callableFlow.flow = f; + return callableFlow; } -export interface FlowWrapper< +/** + * Defines a streaming flow. + */ +export function defineStreamingFlow< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, -> { - flow: Flow; +>( + config: StreamingFlowConfig, + steps: StepsFunction +): StreamableFlow { + const f = new Flow( + { + name: config.name, + inputSchema: config.inputSchema, + outputSchema: config.outputSchema, + experimentalDurable: false, + stateStore: globalConfig + ? () => globalConfig.getFlowStateStore() + : undefined, + authPolicy: config.authPolicy, + middleware: config.middleware, + // We always use local dispatcher in dev mode or when one is not provided. + invoker: async (flow, msg, streamingCallback) => { + if (!isDevEnv() && config.invoker) { + return config.invoker(flow, msg, streamingCallback); + } + const state = await flow.runEnvelope(msg, streamingCallback); + return state.operation; + }, + }, + steps + ); + createdFlows().push(f); + wrapAsAction(f); + const streamableFlow: StreamableFlow = (input, opts) => { + return f.stream(input, opts); + }; + streamableFlow.flow = f; + return streamableFlow; } export class Flow< @@ -180,7 +309,7 @@ export class Flow< readonly streamSchema?: S; readonly stateStore?: () => Promise; readonly invoker: Invoker; - readonly scheduler: Scheduler; + readonly scheduler?: S extends z.ZodVoid ? Scheduler : undefined; readonly experimentalDurable: boolean; readonly authPolicy?: FlowAuthPolicy; readonly middleware?: express.RequestHandler[]; @@ -193,7 +322,7 @@ export class Flow< streamSchema?: S; stateStore?: () => Promise; invoker: Invoker; - scheduler: Scheduler; + scheduler?: S extends z.ZodVoid ? Scheduler : undefined; experimentalDurable: boolean; authPolicy?: FlowAuthPolicy; middleware?: express.RequestHandler[]; @@ -227,7 +356,9 @@ export class Flow< async runDirectly( input: unknown, opts: { - streamingCallback?: StreamingCallback; + streamingCallback?: S extends z.ZodVoid + ? undefined + : StreamingCallback>; labels?: Record; auth?: unknown; } @@ -256,7 +387,9 @@ export class Flow< */ async runEnvelope( req: FlowInvokeEnvelopeMessage, - streamingCallback?: StreamingCallback, + streamingCallback?: S extends z.ZodVoid + ? undefined + : StreamingCallback>, auth?: unknown ): Promise { logger.debug(req, 'runEnvelope'); @@ -282,8 +415,8 @@ export class Flow< const state = createNewState(flowId, this.name, req.schedule.input); try { await (await this.stateStore()).save(flowId, state); - await this.scheduler( - this, + await this.scheduler?.( + this as unknown as Flow, // TODO: Fix this hack? { runScheduled: { flowId } } as FlowInvokeEnvelopeMessage, req.schedule.delay ); @@ -383,13 +516,126 @@ export class Flow< ); } + /** + * Runs the flow. This is used when calling a flow from another flow. + */ + async run( + payload?: z.infer, + opts?: { withLocalAuthContext?: unknown } + ): Promise> { + const input = this.inputSchema ? this.inputSchema.parse(payload) : payload; + await this.authPolicy?.(opts?.withLocalAuthContext, payload); + + if (this.middleware) { + logger.warn( + `Flow (${this.name}) middleware won't run when invoked with runFlow.` + ); + } + + const state = await this.runEnvelope({ + start: { + input, + }, + }); + if (!state.operation.done) { + throw new FlowStillRunningError( + `flow ${state.name} did not finish execution` + ); + } + if (state.operation.result?.error) { + throw new FlowExecutionError( + state.operation.name, + state.operation.result?.error, + state.operation.result?.stacktrace + ); + } + return state.operation.result?.response; + } + + /** + * Runs the flow and streams results. This is used when calling a flow from another flow. + */ + stream( + payload?: z.infer, + opts?: { withLocalAuthContext?: unknown } + ): StreamingResponse { + let chunkStreamController: ReadableStreamController>; + const chunkStream = new ReadableStream>({ + start(controller) { + chunkStreamController = controller; + }, + pull() {}, + cancel() {}, + }); + + const authPromise = + this.authPolicy?.(opts?.withLocalAuthContext, payload) ?? + Promise.resolve(); + + const operationPromise = authPromise + .then(() => + this.runEnvelope( + { + start: { + input: this.inputSchema + ? this.inputSchema.parse(payload) + : payload, + }, + }, + ((chunk: z.infer) => { + chunkStreamController.enqueue(chunk); + }) as S extends z.ZodVoid ? undefined : StreamingCallback> + ) + ) + .then((s) => s.operation); + operationPromise.then((o) => { + chunkStreamController.close(); + return o; + }); + + return { + async output() { + return operationPromise.then((op) => { + if (!op.done) { + throw new FlowStillRunningError( + `flow ${op.name} did not finish execution` + ); + } + if (op.result?.error) { + throw new FlowExecutionError( + op.name, + op.result?.error, + op.result?.stacktrace + ); + } + return op.result?.response; + }); + }, + async *stream() { + const reader = chunkStream.getReader(); + while (true) { + const chunk = await reader.read(); + if (chunk.value) { + yield chunk.value; + } + if (chunk.done) { + break; + } + } + return await operationPromise; + }, + }; + } + // TODO: refactor me... this is a mess! private async executeSteps( ctx: Context, handler: StepsFunction, dispatchType: string, - streamingCallback: StreamingCallback | undefined, - labels: Record | undefined + streamingCallback?: S extends z.ZodVoid + ? undefined + : StreamingCallback>, + labels?: Record ) { const startTimeMs = performance.now(); await initializeAllPlugins(); @@ -575,9 +821,9 @@ export class Flow< }); try { const state = await this.runDirectly(input, { - streamingCallback: (chunk) => { + streamingCallback: ((chunk: z.infer) => { res.write(JSON.stringify(chunk) + streamDelimiter); - }, + }) as S extends z.ZodVoid ? undefined : StreamingCallback>, auth, }); res.write(JSON.stringify(state.operation)); @@ -640,140 +886,6 @@ export class Flow< } } -/** - * Runs the flow. If the flow does not get interrupted may return a completed (done=true) operation. - */ -export async function runFlow< - I extends z.ZodTypeAny = z.ZodTypeAny, - O extends z.ZodTypeAny = z.ZodTypeAny, - S extends z.ZodTypeAny = z.ZodTypeAny, ->( - flow: Flow | FlowWrapper, - payload?: z.infer, - opts?: { withLocalAuthContext?: unknown } -): Promise> { - if (!(flow instanceof Flow)) { - flow = flow.flow; - } - - const input = flow.inputSchema ? flow.inputSchema.parse(payload) : payload; - await flow.authPolicy?.(opts?.withLocalAuthContext, payload); - - if (flow.middleware) { - logger.warn( - `Flow (${flow.name}) middleware won't run when invoked with runFlow.` - ); - } - - const state = await flow.runEnvelope({ - start: { - input, - }, - }); - if (!state.operation.done) { - throw new FlowStillRunningError( - `flow ${state.name} did not finish execution` - ); - } - if (state.operation.result?.error) { - throw new FlowExecutionError( - state.operation.name, - state.operation.result?.error, - state.operation.result?.stacktrace - ); - } - return state.operation.result?.response; -} - -interface StreamingResponse< - O extends z.ZodTypeAny = z.ZodTypeAny, - S extends z.ZodTypeAny = z.ZodTypeAny, -> { - stream(): AsyncGenerator | undefined>; - output(): Promise>; -} - -/** - * Runs the flow and streams results. If the flow does not get interrupted may return a completed (done=true) operation. - */ -export function streamFlow< - I extends z.ZodTypeAny = z.ZodTypeAny, - O extends z.ZodTypeAny = z.ZodTypeAny, - S extends z.ZodTypeAny = z.ZodTypeAny, ->( - flowOrFlowWrapper: Flow | FlowWrapper, - payload?: z.infer, - opts?: { withLocalAuthContext?: unknown } -): StreamingResponse { - const flow = !(flowOrFlowWrapper instanceof Flow) - ? flowOrFlowWrapper.flow - : flowOrFlowWrapper; - - let chunkStreamController: ReadableStreamController>; - const chunkStream = new ReadableStream>({ - start(controller) { - chunkStreamController = controller; - }, - pull() {}, - cancel() {}, - }); - - const authPromise = - flow.authPolicy?.(opts?.withLocalAuthContext, payload) ?? Promise.resolve(); - - const operationPromise = authPromise - .then(() => - flow.runEnvelope( - { - start: { - input: flow.inputSchema ? flow.inputSchema.parse(payload) : payload, - }, - }, - (c) => { - chunkStreamController.enqueue(c); - } - ) - ) - .then((s) => s.operation); - operationPromise.then((o) => { - chunkStreamController.close(); - return o; - }); - - return { - output() { - return operationPromise.then((op) => { - if (!op.done) { - throw new FlowStillRunningError( - `flow ${op.name} did not finish execution` - ); - } - if (op.result?.error) { - throw new FlowExecutionError( - op.name, - op.result?.error, - op.result?.stacktrace - ); - } - return op.result?.response; - }); - }, - async *stream() { - const reader = chunkStream.getReader(); - while (true) { - const chunk = await reader.read(); - if (chunk.value) { - yield chunk.value; - } - if (chunk.done) { - break; - } - } - return await operationPromise; - }, - }; -} - function createNewState( flowId: string, name: string, @@ -795,15 +907,6 @@ function createNewState( }; } -export type StepsFunction< - I extends z.ZodTypeAny = z.ZodTypeAny, - O extends z.ZodTypeAny = z.ZodTypeAny, - S extends z.ZodTypeAny = z.ZodTypeAny, -> = ( - input: z.infer, - streamingCallback: StreamingCallback> | undefined -) => Promise>; - function wrapAsAction< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, @@ -834,7 +937,9 @@ function wrapAsAction< setCustomMetadataAttribute(metadataPrefix('wrapperAction'), 'true'); return await flow.runEnvelope( envelope, - getStreamingCallback(), + getStreamingCallback() as S extends z.ZodVoid + ? undefined + : StreamingCallback>, envelope.auth ); } diff --git a/js/flow/src/index.ts b/js/flow/src/index.ts index 2653fadca5..dff66b3e87 100644 --- a/js/flow/src/index.ts +++ b/js/flow/src/index.ts @@ -25,12 +25,12 @@ export { FirestoreStateStore } from './firestoreStateStore.js'; export { Flow, defineFlow, - runFlow, + defineStreamingFlow, startFlowsServer, - streamFlow, + type CallableFlow, type FlowAuthPolicy, - type FlowWrapper, type StepsFunction, + type StreamableFlow, type __RequestWithAuth, } from './flow.js'; export { run, runAction, runMap } from './steps.js'; diff --git a/js/flow/src/types.ts b/js/flow/src/types.ts index 1462aa906a..daece26408 100644 --- a/js/flow/src/types.ts +++ b/js/flow/src/types.ts @@ -25,7 +25,7 @@ export type Invoker< > = ( flow: Flow, msg: FlowInvokeEnvelopeMessage, - streamingCallback?: StreamingCallback + streamingCallback?: S extends z.ZodVoid ? undefined : StreamingCallback ) => Promise; export type Scheduler< diff --git a/js/flow/tests/flow_test.ts b/js/flow/tests/flow_test.ts index ff84ac9524..b47bb1194c 100644 --- a/js/flow/tests/flow_test.ts +++ b/js/flow/tests/flow_test.ts @@ -19,7 +19,7 @@ import { __hardResetRegistryForTesting } from '@genkit-ai/core/registry'; import assert from 'node:assert'; import { beforeEach, describe, it } from 'node:test'; import { z } from 'zod'; -import { defineFlow, runFlow, streamFlow } from '../src/flow.js'; +import { defineFlow, defineStreamingFlow } from '../src/flow.js'; import { configureInMemoryStateStore } from './testUtil.js'; function createTestFlow() { @@ -36,7 +36,7 @@ function createTestFlow() { } function createTestStreamingFlow() { - return defineFlow( + return defineStreamingFlow( { name: 'testFlow', inputSchema: z.number(), @@ -66,7 +66,7 @@ describe('flow', () => { configureInMemoryStateStore('prod'); const testFlow = createTestFlow(); - const result = await runFlow(testFlow, 'foo'); + const result = await testFlow('foo'); assert.equal(result, 'bar foo'); }); @@ -84,7 +84,7 @@ describe('flow', () => { } ); - await assert.rejects(async () => await runFlow(testFlow, 'foo'), { + await assert.rejects(async () => await testFlow('foo'), { name: 'Error', message: 'bad happened: foo', }); @@ -104,7 +104,7 @@ describe('flow', () => { ); await assert.rejects( - async () => await runFlow(testFlow, { foo: 'foo', bar: 'bar' } as any), + async () => await testFlow({ foo: 'foo', bar: 'bar' } as any), (err: Error) => { assert.strictEqual(err.name, 'ZodError'); assert.equal( @@ -122,7 +122,7 @@ describe('flow', () => { configureInMemoryStateStore('prod'); const testFlow = createTestStreamingFlow(); - const response = streamFlow(testFlow, 3); + const response = testFlow(3); const gotChunks: any[] = []; for await (const chunk of response.stream()) { @@ -135,7 +135,7 @@ describe('flow', () => { it('should rethrow the error', async () => { configureInMemoryStateStore('prod'); - const testFlow = defineFlow( + const testFlow = defineStreamingFlow( { name: 'throwing', inputSchema: z.string(), @@ -145,7 +145,7 @@ describe('flow', () => { } ); - const response = streamFlow(testFlow, 'foo'); + const response = testFlow('foo'); await assert.rejects(async () => await response.output(), { name: 'Error', message: 'bad happened: foo', @@ -163,7 +163,7 @@ describe('flow', () => { const stateStore = configureInMemoryStateStore('dev'); const testFlow = createTestFlow(); - const result = await runFlow(testFlow, 'foo'); + const result = await testFlow('foo'); assert.equal(result, 'bar foo'); assert.equal(Object.keys(stateStore.state).length, 1); @@ -190,7 +190,7 @@ describe('flow', () => { const stateStore = configureInMemoryStateStore('prod'); const testFlow = createTestFlow(); - const result = await runFlow(testFlow, 'foo'); + const result = await testFlow('foo'); assert.equal(result, 'bar foo'); assert.equal(Object.keys(stateStore.state).length, 0); diff --git a/js/plugins/firebase/src/experimental.ts b/js/plugins/firebase/src/experimental.ts index e55b46043b..6249f0bcd6 100644 --- a/js/plugins/firebase/src/experimental.ts +++ b/js/plugins/firebase/src/experimental.ts @@ -36,12 +36,10 @@ import { callHttpsFunction, getFunctionUrl, getLocation } from './helpers.js'; interface ScheduledFlowConfig< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, - S extends z.ZodTypeAny = z.ZodTypeAny, > { name: string; inputSchema?: I; outputSchema?: O; - streamSchema?: S; taskQueueOptions?: TaskQueueOptions; } @@ -53,18 +51,17 @@ interface ScheduledFlowConfig< export function onScheduledFlow< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, - S extends z.ZodTypeAny = z.ZodTypeAny, >( - config: ScheduledFlowConfig, - steps: StepsFunction -): FunctionFlow { + config: ScheduledFlowConfig, + steps: StepsFunction +): FunctionFlow { const f = durableFlow( { ...config, invoker: async (flow, data, streamingCallback) => { const responseJson = await callHttpsFunction( flow.name, - await getLocation(), + getLocation(), data, streamingCallback ); @@ -79,7 +76,7 @@ export function onScheduledFlow< const wrapped = wrapScheduledFlow(f, config); - const funcFlow = wrapped as FunctionFlow; + const funcFlow = wrapped as FunctionFlow; funcFlow.flow = f; return funcFlow; @@ -88,8 +85,10 @@ export function onScheduledFlow< function wrapScheduledFlow< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, - S extends z.ZodTypeAny = z.ZodTypeAny, ->(flow: Flow, config: ScheduledFlowConfig): HttpsFunction { +>( + flow: Flow, + config: ScheduledFlowConfig +): HttpsFunction { const tq = onTaskDispatched( { ...config.taskQueueOptions, diff --git a/js/plugins/firebase/src/functions.ts b/js/plugins/firebase/src/functions.ts index 4e21efc605..8e3fde8aad 100644 --- a/js/plugins/firebase/src/functions.ts +++ b/js/plugins/firebase/src/functions.ts @@ -17,11 +17,12 @@ import { OperationSchema } from '@genkit-ai/core'; import { logger } from '@genkit-ai/core/logging'; import { - defineFlow, + CallableFlow, + defineStreamingFlow, Flow, FlowAuthPolicy, - FlowWrapper, StepsFunction, + StreamableFlow, } from '@genkit-ai/flow'; import * as express from 'express'; import { getAppCheck } from 'firebase-admin/app-check'; @@ -40,8 +41,13 @@ import { export type FunctionFlow< I extends z.ZodTypeAny, O extends z.ZodTypeAny, +> = HttpsFunction & CallableFlow; + +export type StreamingFunctionFlow< + I extends z.ZodTypeAny, + O extends z.ZodTypeAny, S extends z.ZodTypeAny, -> = HttpsFunction & FlowWrapper; +> = HttpsFunction & StreamableFlow; export interface FunctionFlowAuth { provider: express.RequestHandler; @@ -73,8 +79,8 @@ export function onFlow< >( config: FunctionFlowConfig, steps: StepsFunction -): FunctionFlow { - const f = defineFlow( +): StreamingFunctionFlow { + const f = defineStreamingFlow( { ...config, authPolicy: config.authPolicy.policy, @@ -101,11 +107,11 @@ export function onFlow< }, }, steps - ); + ).flow; const wrapped = wrapHttpsFlow(f, config); - const funcFlow = wrapped as FunctionFlow; + const funcFlow = wrapped as StreamingFunctionFlow; funcFlow.flow = f; return funcFlow; diff --git a/js/testapps/firebase-functions-sample1/functions/src/index.ts b/js/testapps/firebase-functions-sample1/functions/src/index.ts index e6cf96b02b..2fc4bfdc5b 100644 --- a/js/testapps/firebase-functions-sample1/functions/src/index.ts +++ b/js/testapps/firebase-functions-sample1/functions/src/index.ts @@ -19,7 +19,7 @@ import { configureGenkit } from '@genkit-ai/core'; import { firebase } from '@genkit-ai/firebase'; import { firebaseAuth } from '@genkit-ai/firebase/auth'; import { noAuth, onFlow } from '@genkit-ai/firebase/functions'; -import { run, runFlow, streamFlow } from '@genkit-ai/flow'; +import { run } from '@genkit-ai/flow'; import { geminiPro, vertexAI } from '@genkit-ai/vertexai'; import { onRequest } from 'firebase-functions/v2/https'; import * as z from 'zod'; @@ -103,7 +103,7 @@ export const streamConsumer = onFlow( authPolicy: noAuth(), }, async () => { - const response = streamFlow(streamer, 5); + const response = streamer(5); for await (const chunk of response.stream()) { console.log('chunk', chunk); @@ -118,7 +118,7 @@ export const triggerJokeFlow = onRequest( async (req, res) => { const { subject } = req.query; console.log('req.query', req.query); - const op = await runFlow(jokeFlow, String(subject), { + const op = await jokeFlow(String(subject), { withLocalAuthContext: { admin: true }, }); console.log('operation', op); From d28ca6598126404cf140d86044abaddf0db864dd Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Aug 2024 12:02:15 -0700 Subject: [PATCH 2/9] Cleaned up scheduler types. --- js/flow/src/flow.ts | 2 +- js/flow/src/types.ts | 3 +-- js/testapps/flow-sample1/src/index.ts | 8 ++++---- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/js/flow/src/flow.ts b/js/flow/src/flow.ts index fa9e09fa18..8e502921df 100644 --- a/js/flow/src/flow.ts +++ b/js/flow/src/flow.ts @@ -133,7 +133,7 @@ export interface FlowConfig< /** Experimental. Whether the flow is durable. */ experimentalDurable?: boolean; /** Experimental. Scheduler for a durable flow. `experimentalDurable` must be true. */ - experimentalScheduler?: Scheduler; + experimentalScheduler?: Scheduler; } /** diff --git a/js/flow/src/types.ts b/js/flow/src/types.ts index daece26408..adc25248bf 100644 --- a/js/flow/src/types.ts +++ b/js/flow/src/types.ts @@ -31,9 +31,8 @@ export type Invoker< export type Scheduler< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, - S extends z.ZodTypeAny = z.ZodTypeAny, > = ( - flow: Flow, + flow: Flow, msg: FlowInvokeEnvelopeMessage, delaySeconds?: number ) => Promise; diff --git a/js/testapps/flow-sample1/src/index.ts b/js/testapps/flow-sample1/src/index.ts index 9c5c90a122..ab5ab2b0b0 100644 --- a/js/testapps/flow-sample1/src/index.ts +++ b/js/testapps/flow-sample1/src/index.ts @@ -18,8 +18,8 @@ import { configureGenkit } from '@genkit-ai/core'; import { firebase } from '@genkit-ai/firebase'; import { defineFlow, + defineStreamingFlow, run, - runFlow, runMap, startFlowsServer, } from '@genkit-ai/flow'; @@ -57,7 +57,7 @@ export const basic = defineFlow({ name: 'basic' }, async (subject) => { export const parent = defineFlow( { name: 'parent', outputSchema: z.string() }, async () => { - return JSON.stringify(await runFlow(basic, 'foo')); + return JSON.stringify(await basic('foo')); } ); @@ -187,7 +187,7 @@ export const waity = durableFlow( ); // genkit flow:run streamy 5 -s -export const streamy = defineFlow( +export const streamy = defineStreamingFlow( { name: 'streamy', inputSchema: z.number(), @@ -207,7 +207,7 @@ export const streamy = defineFlow( ); // genkit flow:run streamy 5 -s -export const streamyThrowy = defineFlow( +export const streamyThrowy = defineStreamingFlow( { name: 'streamyThrowy', inputSchema: z.number(), From 628ea7961737921658863c3ac431997b861bbd26 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Aug 2024 12:14:19 -0700 Subject: [PATCH 3/9] Added errors to durable flow methods on Context. --- js/flow/src/context.ts | 31 +++++++++++++++++++++---------- js/flow/src/flow.ts | 2 +- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/js/flow/src/context.ts b/js/flow/src/context.ts index 5a6fdadddf..f648131a53 100644 --- a/js/flow/src/context.ts +++ b/js/flow/src/context.ts @@ -17,10 +17,10 @@ import { FlowState, FlowStateExecution, Operation } from '@genkit-ai/core'; import { toJsonSchema } from '@genkit-ai/core/schema'; import { - SPAN_TYPE_ATTR, runInNewSpan, setCustomMetadataAttribute, setCustomMetadataAttributes, + SPAN_TYPE_ATTR, } from '@genkit-ai/core/tracing'; import { logger } from 'firebase-functions/v1'; import { z } from 'zod'; @@ -73,7 +73,9 @@ export class Context< } } - // Runs provided function in the current context. The config can specify retry and other behaviors. + /** + * Runs provided function in the current context. The config can specify retry and other behaviors. + */ async run( config: RunStepConfig, input: any | undefined, @@ -124,8 +126,10 @@ export class Context< return name; } - // Executes interrupt step in the current context. - async interrupt( + /** + * Executes interrupt step. + */ + async interrupt( stepName: string, func: (payload: I) => Promise, responseSchema: I | null, @@ -189,16 +193,20 @@ export class Context< ); } - // Sleep for the specified number of seconds. + /** + * Sleep for the specified number of seconds. + */ async sleep(stepName: string, seconds: number): Promise { + if (!this.flow.scheduler) { + throw new Error('Cannot sleep in a flow with no scheduler.'); + } const resolvedStepName = this.resolveStepName(stepName); if (this.isCached(resolvedStepName)) { setCustomMetadataAttribute(metadataPrefix('state'), 'skipped'); return this.getCached(resolvedStepName); } - - await this.flow.scheduler?.( - this.flow as unknown as Flow, + await this.flow.scheduler( + this.flow, { runScheduled: { flowId: this.flowId, @@ -224,6 +232,9 @@ export class Context< flowIds: string[]; pollingConfig?: PollingConfig; }): Promise { + if (!this.flow.scheduler) { + throw new Error('Cannot wait for a flow with no scheduler.'); + } const resolvedStepName = this.resolveStepName(opts.stepName); if (this.isCached(resolvedStepName)) { return this.getCached(resolvedStepName); @@ -241,8 +252,8 @@ export class Context< this.updateCachedValue(resolvedStepName, states); return ops; } - await this.flow.scheduler?.( - this.flow as unknown as Flow, + await this.flow.scheduler( + this.flow, { runScheduled: { flowId: this.flowId, diff --git a/js/flow/src/flow.ts b/js/flow/src/flow.ts index 8e502921df..e0d0810792 100644 --- a/js/flow/src/flow.ts +++ b/js/flow/src/flow.ts @@ -198,7 +198,7 @@ export type StepsFunction< > = ( /** Input to the flow. */ input: z.infer, - /** Streaming callback for streaming flows only. */ + /** Callback for streaming functions only. */ streamingCallback?: S extends z.ZodVoid ? undefined : StreamingCallback> From b8eba2c46337fd0da242c8f89b22eb2cecc76457 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Aug 2024 12:30:07 -0700 Subject: [PATCH 4/9] Minor simplifications. --- js/flow/src/context.ts | 6 +++--- js/flow/src/flow.ts | 23 ++++++++--------------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/js/flow/src/context.ts b/js/flow/src/context.ts index f648131a53..9248898de8 100644 --- a/js/flow/src/context.ts +++ b/js/flow/src/context.ts @@ -32,9 +32,9 @@ import { metadataPrefix } from './utils.js'; * Context object encapsulates flow execution state at runtime. */ export class Context< - I extends z.ZodTypeAny, - O extends z.ZodTypeAny, - S extends z.ZodTypeAny, + I extends z.ZodTypeAny = z.ZodTypeAny, + O extends z.ZodTypeAny = z.ZodTypeAny, + S extends z.ZodTypeAny = z.ZodTypeAny, > { private seenSteps: Record = {}; diff --git a/js/flow/src/flow.ts b/js/flow/src/flow.ts index e0d0810792..c6475bcd93 100644 --- a/js/flow/src/flow.ts +++ b/js/flow/src/flow.ts @@ -216,15 +216,10 @@ export function defineFlow< ): CallableFlow { const f = new Flow( { - name: config.name, - inputSchema: config.inputSchema, - outputSchema: config.outputSchema, - experimentalDurable: !!config.experimentalDurable, + ...config, stateStore: globalConfig ? () => globalConfig.getFlowStateStore() : undefined, - authPolicy: config.authPolicy, - middleware: config.middleware, // We always use local dispatcher in dev mode or when one is not provided. invoker: async (flow, msg) => { if (!isDevEnv() && config.invoker) { @@ -269,15 +264,10 @@ export function defineStreamingFlow< ): StreamableFlow { const f = new Flow( { - name: config.name, - inputSchema: config.inputSchema, - outputSchema: config.outputSchema, - experimentalDurable: false, + ...config, stateStore: globalConfig ? () => globalConfig.getFlowStateStore() : undefined, - authPolicy: config.authPolicy, - middleware: config.middleware, // We always use local dispatcher in dev mode or when one is not provided. invoker: async (flow, msg, streamingCallback) => { if (!isDevEnv() && config.invoker) { @@ -323,7 +313,7 @@ export class Flow< stateStore?: () => Promise; invoker: Invoker; scheduler?: S extends z.ZodVoid ? Scheduler : undefined; - experimentalDurable: boolean; + experimentalDurable?: boolean; authPolicy?: FlowAuthPolicy; middleware?: express.RequestHandler[]; }, @@ -336,7 +326,7 @@ export class Flow< this.stateStore = config.stateStore; this.invoker = config.invoker; this.scheduler = config.scheduler; - this.experimentalDurable = config.experimentalDurable; + this.experimentalDurable = config.experimentalDurable ?? false; this.authPolicy = config.authPolicy; this.middleware = config.middleware; @@ -539,7 +529,7 @@ export class Flow< }); if (!state.operation.done) { throw new FlowStillRunningError( - `flow ${state.name} did not finish execution` + `Flow ${state.name} did not finish execution.` ); } if (state.operation.result?.error) { @@ -946,6 +936,9 @@ function wrapAsAction< ); } +/** + * Start the flows server. + */ export function startFlowsServer(params?: { flows?: Flow[]; port?: number; From 420a56a61c7dd1d34b61a9116d28325f3a25ca0c Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Aug 2024 12:40:12 -0700 Subject: [PATCH 5/9] Made StreamingResponse properties values rather than functions. --- js/flow/src/flow.ts | 44 +++++++++++++++++++------------------- js/flow/tests/flow_test.ts | 6 +++--- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/js/flow/src/flow.ts b/js/flow/src/flow.ts index c6475bcd93..b0a04946eb 100644 --- a/js/flow/src/flow.ts +++ b/js/flow/src/flow.ts @@ -178,14 +178,16 @@ export interface StreamableFlow< } /** - * Streaming response from a streaming flow. + * Response from a streaming flow. */ interface StreamingResponse< O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, > { - stream(): AsyncGenerator | undefined>; - output(): Promise>; + /** Iterator over the streaming chunks. */ + stream: AsyncGenerator | undefined>; + /** Final output of the flow. */ + output: Promise>; } /** @@ -584,24 +586,22 @@ export class Flow< }); return { - async output() { - return operationPromise.then((op) => { - if (!op.done) { - throw new FlowStillRunningError( - `flow ${op.name} did not finish execution` - ); - } - if (op.result?.error) { - throw new FlowExecutionError( - op.name, - op.result?.error, - op.result?.stacktrace - ); - } - return op.result?.response; - }); - }, - async *stream() { + output: operationPromise.then((op) => { + if (!op.done) { + throw new FlowStillRunningError( + `flow ${op.name} did not finish execution` + ); + } + if (op.result?.error) { + throw new FlowExecutionError( + op.name, + op.result?.error, + op.result?.stacktrace + ); + } + return op.result?.response; + }), + stream: (async function* () { const reader = chunkStream.getReader(); while (true) { const chunk = await reader.read(); @@ -613,7 +613,7 @@ export class Flow< } } return await operationPromise; - }, + })(), }; } diff --git a/js/flow/tests/flow_test.ts b/js/flow/tests/flow_test.ts index b47bb1194c..0ccc1e85b7 100644 --- a/js/flow/tests/flow_test.ts +++ b/js/flow/tests/flow_test.ts @@ -125,11 +125,11 @@ describe('flow', () => { const response = testFlow(3); const gotChunks: any[] = []; - for await (const chunk of response.stream()) { + for await (const chunk of response.stream) { gotChunks.push(chunk); } - assert.equal(await response.output(), 'bar 3 true'); + assert.equal(await response.output, 'bar 3 true'); assert.deepEqual(gotChunks, [{ count: 0 }, { count: 1 }, { count: 2 }]); }); @@ -146,7 +146,7 @@ describe('flow', () => { ); const response = testFlow('foo'); - await assert.rejects(async () => await response.output(), { + await assert.rejects(async () => await response.output, { name: 'Error', message: 'bad happened: foo', }); From a14c58f50b01fff6e7c94bc076cc0f398be5c4ed Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Aug 2024 13:43:16 -0700 Subject: [PATCH 6/9] Updated tests. --- js/flow/tests/durable_test.ts | 8 ++++---- js/plugins/google-cloud/tests/logs_test.ts | 12 ++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/js/flow/tests/durable_test.ts b/js/flow/tests/durable_test.ts index 5d0ace5c30..6a29bfdf6a 100644 --- a/js/flow/tests/durable_test.ts +++ b/js/flow/tests/durable_test.ts @@ -25,7 +25,7 @@ import { resumeFlow, scheduleFlow, } from '../src/experimental.js'; -import { defineFlow, runFlow } from '../src/flow.js'; +import { defineFlow } from '../src/flow.js'; import { asyncTurn, configureInMemoryStateStore } from './testUtil.js'; @@ -55,7 +55,7 @@ describe('durable', () => { configureInMemoryStateStore('prod'); const testFlow = createSimpleTestDurableFlow(); - const result = await runFlow(testFlow, 'foo'); + const result = await testFlow('foo'); assert.equal(result, 'bar foo'); }); @@ -114,7 +114,7 @@ describe('durable', () => { const stateStore = configureInMemoryStateStore('dev'); const testFlow = createSimpleTestDurableFlow(); - const result = await runFlow(testFlow, 'foo'); + const result = await testFlow('foo'); assert.equal(result, 'bar foo'); assert.equal(Object.keys(stateStore.state).length, 1); @@ -126,7 +126,7 @@ describe('durable', () => { const stateStore = configureInMemoryStateStore('prod'); const testFlow = createSimpleTestDurableFlow(); - const result = await runFlow(testFlow, 'foo'); + const result = await testFlow('foo'); assert.equal(result, 'bar foo'); assert.equal(Object.keys(stateStore.state).length, 1); diff --git a/js/plugins/google-cloud/tests/logs_test.ts b/js/plugins/google-cloud/tests/logs_test.ts index 543e94be48..883aba55c1 100644 --- a/js/plugins/google-cloud/tests/logs_test.ts +++ b/js/plugins/google-cloud/tests/logs_test.ts @@ -24,7 +24,7 @@ import { FlowStateStore, } from '@genkit-ai/core'; import { registerFlowStateStore } from '@genkit-ai/core/registry'; -import { defineFlow, run, runFlow } from '@genkit-ai/flow'; +import { defineFlow, run } from '@genkit-ai/flow'; import { __addTransportStreamForTesting, googleCloud, @@ -75,7 +75,7 @@ describe('GoogleCloudLogs', () => { it('writes path logs', async () => { const testFlow = createFlow('testFlow'); - await runFlow(testFlow); + await testFlow(); const logMessages = await getLogs(); assert.equal(logMessages.includes('[info] Paths[testFlow]'), true); @@ -88,7 +88,7 @@ describe('GoogleCloudLogs', () => { }); assert.rejects(async () => { - await runFlow(testFlow); + await testFlow(); }); const logMessages = await getLogs(); @@ -145,7 +145,7 @@ describe('GoogleCloudLogs', () => { }); }); - await runFlow(testFlow); + await testFlow(); const logMessages = await getLogs(); assert.equal( @@ -194,14 +194,14 @@ describe('GoogleCloudLogs', () => { async function waitForLogsInit() { await import('winston'); const testFlow = createFlow('testFlow'); - await runFlow(testFlow); + await testFlow(); await getLogs(1); } async function getLogs( logCount: number = 1, maxAttempts: number = 100 - ): promise { + ): Promise { var attempts = 0; while (attempts++ < maxAttempts) { await new Promise((resolve) => setTimeout(resolve, 100)); From ff6cc85f0194af6cf81d72ec27cfe62ae3d43813 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Aug 2024 14:10:49 -0700 Subject: [PATCH 7/9] Updated all samples and tests. --- .../cli/config/nextjs.genkit.ts.template | 4 +-- js/plugins/google-cloud/tests/metrics_test.ts | 32 +++++++++---------- js/plugins/google-cloud/tests/traces_test.ts | 14 ++++---- js/testapps/cat-eval/src/setup.ts | 4 +-- js/testapps/dev-ui-gallery/src/main/flows.ts | 10 +++--- js/testapps/docs-menu-rag/src/index.ts | 4 +-- js/testapps/express/src/index.ts | 4 +-- js/testapps/menu/src/05/flows.ts | 6 ++-- samples/js-coffee-shop/src/index.ts | 6 ++-- samples/js-menu/src/05/flows.ts | 6 ++-- 10 files changed, 45 insertions(+), 45 deletions(-) diff --git a/genkit-tools/cli/config/nextjs.genkit.ts.template b/genkit-tools/cli/config/nextjs.genkit.ts.template index 2ff912ef12..5538db5e1c 100644 --- a/genkit-tools/cli/config/nextjs.genkit.ts.template +++ b/genkit-tools/cli/config/nextjs.genkit.ts.template @@ -5,7 +5,7 @@ import * as z from 'zod'; // Import the Genkit core libraries and plugins. import { generate } from '@genkit-ai/ai'; import { configureGenkit } from '@genkit-ai/core'; -import { defineFlow, runFlow } from '@genkit-ai/flow'; +import { defineFlow } from '@genkit-ai/flow'; $GENKIT_CONFIG_IMPORTS $GENKIT_MODEL_IMPORT @@ -47,6 +47,6 @@ const menuSuggestionFlow = defineFlow( export async function callMenuSuggestionFlow() { // Invoke the flow. The value you pass as the second parameter must conform to // your flow's input schema. - const flowResponse = await runFlow(menuSuggestionFlow, 'banana'); + const flowResponse = await menuSuggestionFlow('banana'); console.log(flowResponse); } diff --git a/js/plugins/google-cloud/tests/metrics_test.ts b/js/plugins/google-cloud/tests/metrics_test.ts index 658bfdfa08..91e7945bcc 100644 --- a/js/plugins/google-cloud/tests/metrics_test.ts +++ b/js/plugins/google-cloud/tests/metrics_test.ts @@ -25,7 +25,7 @@ import { FlowStateStore, } from '@genkit-ai/core'; import { registerFlowStateStore } from '@genkit-ai/core/registry'; -import { defineFlow, run, runAction, runFlow } from '@genkit-ai/flow'; +import { defineFlow, run, runAction } from '@genkit-ai/flow'; import { __getMetricExporterForTesting, GcpOpenTelemetry, @@ -73,8 +73,8 @@ describe('GoogleCloudMetrics', () => { it('writes flow metrics', async () => { const testFlow = createFlow('testFlow'); - await runFlow(testFlow); - await runFlow(testFlow); + await testFlow(); + await testFlow(); const requestCounter = await getCounterMetric('genkit/flow/requests'); const latencyHistogram = await getHistogramMetric('genkit/flow/latency'); @@ -97,7 +97,7 @@ describe('GoogleCloudMetrics', () => { }); assert.rejects(async () => { - await runFlow(testFlow); + await testFlow(); }); const requestCounter = await getCounterMetric('genkit/flow/requests'); @@ -118,8 +118,8 @@ describe('GoogleCloudMetrics', () => { ]); }); - await runFlow(testFlow); - await runFlow(testFlow); + await testFlow(); + await testFlow(); const requestCounter = await getCounterMetric('genkit/action/requests'); const latencyHistogram = await getHistogramMetric('genkit/action/latency'); @@ -138,7 +138,7 @@ describe('GoogleCloudMetrics', () => { it('truncates metric dimensions', async () => { const testFlow = createFlow('anExtremelyLongFlowNameThatIsTooBig'); - await runFlow(testFlow); + await testFlow(); const requestCounter = await getCounterMetric('genkit/flow/requests'); const latencyHistogram = await getHistogramMetric('genkit/flow/latency'); @@ -162,7 +162,7 @@ describe('GoogleCloudMetrics', () => { }); assert.rejects(async () => { - await runFlow(testFlow); + await testFlow(); }); const requestCounter = await getCounterMetric('genkit/action/requests'); @@ -304,7 +304,7 @@ describe('GoogleCloudMetrics', () => { return await runAction(testAction); }); - await runFlow(flow); + await flow(); const requestCounter = await getCounterMetric('genkit/action/requests'); const latencyHistogram = await getHistogramMetric('genkit/action/latency'); @@ -346,7 +346,7 @@ describe('GoogleCloudMetrics', () => { }); }); - await runFlow(flow); + await flow(); const metrics = [ await getCounterMetric('genkit/ai/generate/requests'), @@ -374,7 +374,7 @@ describe('GoogleCloudMetrics', () => { return step1Result + step2Result; }); - await runFlow(flow); + await flow(); const expectedPaths = new Set([ '/{pathTestFlow,t:flow}/{step2,t:flowStep}', @@ -415,7 +415,7 @@ describe('GoogleCloudMetrics', () => { }); assert.rejects(async () => { - await runFlow(flow); + await flow(); }); const reqPoints = await getCounterDataPoints('genkit/flow/path/requests'); @@ -452,7 +452,7 @@ describe('GoogleCloudMetrics', () => { }); assert.rejects(async () => { - await runFlow(flow); + await flow(); }); const reqPoints = await getCounterDataPoints('genkit/flow/path/requests'); @@ -493,7 +493,7 @@ describe('GoogleCloudMetrics', () => { }); assert.rejects(async () => { - await runFlow(flow); + await flow(); }); const reqPoints = await getCounterDataPoints('genkit/flow/path/requests'); @@ -536,7 +536,7 @@ describe('GoogleCloudMetrics', () => { }); assert.rejects(async () => { - await runFlow(flow); + await flow(); }); const reqPoints = await getCounterDataPoints('genkit/flow/path/requests'); @@ -590,7 +590,7 @@ describe('GoogleCloudMetrics', () => { async function getGenkitMetrics( name: string = 'genkit', maxAttempts: number = 100 - ): promise { + ): Promise { var attempts = 0; while (attempts++ < maxAttempts) { await new Promise((resolve) => setTimeout(resolve, 50)); diff --git a/js/plugins/google-cloud/tests/traces_test.ts b/js/plugins/google-cloud/tests/traces_test.ts index bcdefa9dce..3127a08fe4 100644 --- a/js/plugins/google-cloud/tests/traces_test.ts +++ b/js/plugins/google-cloud/tests/traces_test.ts @@ -22,7 +22,7 @@ import { configureGenkit, } from '@genkit-ai/core'; import { registerFlowStateStore } from '@genkit-ai/core/registry'; -import { defineFlow, run, runFlow } from '@genkit-ai/flow'; +import { defineFlow, run } from '@genkit-ai/flow'; import { __forceFlushSpansForTesting, __getSpanExporterForTesting, @@ -62,7 +62,7 @@ describe('GoogleCloudTracing', () => { it('writes traces', async () => { const testFlow = createFlow('testFlow'); - await runFlow(testFlow); + await testFlow(); const spans = await getExportedSpans(); assert.equal(spans.length, 1); @@ -72,7 +72,7 @@ describe('GoogleCloudTracing', () => { it('Adjusts attributes to support GCP trace filtering', async () => { const testFlow = createFlow('testFlow'); - await runFlow(testFlow); + await testFlow(); const spans = await getExportedSpans(); // Check some common attributes @@ -95,7 +95,7 @@ describe('GoogleCloudTracing', () => { }); }); - await runFlow(testFlow); + await testFlow(); const spans = await getExportedSpans(); assert.equal(spans.length, 3); @@ -111,8 +111,8 @@ describe('GoogleCloudTracing', () => { const testFlow1 = createFlow('testFlow1'); const testFlow2 = createFlow('testFlow2'); - await runFlow(testFlow1); - await runFlow(testFlow2); + await testFlow1(); + await testFlow2(); const spans = await getExportedSpans(); assert.equal(spans.length, 2); @@ -136,7 +136,7 @@ describe('GoogleCloudTracing', () => { async function getExportedSpans( name: string = 'genkit', maxAttempts: number = 100 - ): promise { + ): Promise { __forceFlushSpansForTesting(); var attempts = 0; while (attempts++ < maxAttempts) { diff --git a/js/testapps/cat-eval/src/setup.ts b/js/testapps/cat-eval/src/setup.ts index a8f74b8cfc..3d830bd646 100644 --- a/js/testapps/cat-eval/src/setup.ts +++ b/js/testapps/cat-eval/src/setup.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { defineFlow, runFlow } from '@genkit-ai/flow'; +import { defineFlow } from '@genkit-ai/flow'; import * as z from 'zod'; import { indexPdf } from './pdf_rag.js'; @@ -37,7 +37,7 @@ export const setup = defineFlow( await Promise.all( documentArr.map(async (document) => { console.log(`Indexed ${document}`); - return runFlow(indexPdf, document); + return indexPdf(document); }) ); } diff --git a/js/testapps/dev-ui-gallery/src/main/flows.ts b/js/testapps/dev-ui-gallery/src/main/flows.ts index 4033697b51..ce49103403 100644 --- a/js/testapps/dev-ui-gallery/src/main/flows.ts +++ b/js/testapps/dev-ui-gallery/src/main/flows.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { defineFlow, run, runFlow, runMap } from '@genkit-ai/flow'; +import { defineFlow, defineStreamingFlow, run, runMap } from '@genkit-ai/flow'; import * as z from 'zod'; import { generateString } from '../common/util'; @@ -55,8 +55,8 @@ const flowMultiStep = defineFlow({ name: 'flowMultiStep' }, async (input) => { defineFlow({ name: 'flowNested', outputSchema: z.string() }, async () => { return JSON.stringify( { - firstResult: await runFlow(flowSingleStep, 'hello, world!'), - secondResult: await runFlow(flowMultiStep, 'hello, world!'), + firstResult: await flowSingleStep('hello, world!'), + secondResult: await flowMultiStep('hello, world!'), }, null, 2 @@ -67,7 +67,7 @@ defineFlow({ name: 'flowNested', outputSchema: z.string() }, async () => { // Flow - streaming // -defineFlow( +defineStreamingFlow( { name: 'flowStreaming', inputSchema: z.number(), @@ -167,7 +167,7 @@ defineFlow({ name: 'flowMultiStepCaughtError' }, async (input) => { // Flow - streamingThrows // -defineFlow( +defineStreamingFlow( { name: 'flowStreamingThrows', inputSchema: z.number(), diff --git a/js/testapps/docs-menu-rag/src/index.ts b/js/testapps/docs-menu-rag/src/index.ts index cf3a56d08e..43b4d2d95c 100644 --- a/js/testapps/docs-menu-rag/src/index.ts +++ b/js/testapps/docs-menu-rag/src/index.ts @@ -16,7 +16,7 @@ import { configureGenkit } from '@genkit-ai/core'; import { devLocalVectorstore } from '@genkit-ai/dev-local-vectorstore'; -import { defineFlow, runFlow } from '@genkit-ai/flow'; +import { defineFlow } from '@genkit-ai/flow'; import { textEmbeddingGecko, vertexAI } from '@genkit-ai/vertexai'; import * as z from 'zod'; import { indexMenu } from './indexer'; @@ -56,7 +56,7 @@ export const setup = defineFlow( await Promise.all( documentArr.map(async (document) => { console.log(`Indexed ${document}`); - return runFlow(indexMenu, document); + return indexMenu(document); }) ); } diff --git a/js/testapps/express/src/index.ts b/js/testapps/express/src/index.ts index f5bbc87c20..1d592cb46e 100644 --- a/js/testapps/express/src/index.ts +++ b/js/testapps/express/src/index.ts @@ -17,7 +17,7 @@ import { generate } from '@genkit-ai/ai'; import { configureGenkit } from '@genkit-ai/core'; import { firebase } from '@genkit-ai/firebase'; -import { defineFlow, run, runFlow } from '@genkit-ai/flow'; +import { defineFlow, run } from '@genkit-ai/flow'; import { googleAI } from '@genkit-ai/googleai'; import { vertexAI } from '@genkit-ai/vertexai'; import express, { Request, Response } from 'express'; @@ -70,7 +70,7 @@ app.get('/jokeWithFlow', async (req: Request, res: Response) => { res.status(400).send('provide subject query param'); return; } - res.send(await runFlow(jokeFlow, subject)); + res.send(await jokeFlow(subject)); }); app.get('/jokeStream', async (req: Request, res: Response) => { diff --git a/js/testapps/menu/src/05/flows.ts b/js/testapps/menu/src/05/flows.ts index 644e2e9ae9..a91b90bc1b 100644 --- a/js/testapps/menu/src/05/flows.ts +++ b/js/testapps/menu/src/05/flows.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { defineFlow, runFlow } from '@genkit-ai/flow'; +import { defineFlow } from '@genkit-ai/flow'; import fs from 'fs'; import path from 'path'; @@ -78,11 +78,11 @@ export const s05_visionMenuQuestionFlow = defineFlow( }, async (input) => { // Run the first flow to read the menu image. - const menuResult = await runFlow(s05_readMenuFlow); + const menuResult = await s05_readMenuFlow(); // Pass the text of the menu and the question to the second flow // and return the answer as this output. - return runFlow(s05_textMenuQuestionFlow, { + return s05_textMenuQuestionFlow({ question: input.question, menuText: menuResult.menuText, }); diff --git a/samples/js-coffee-shop/src/index.ts b/samples/js-coffee-shop/src/index.ts index e6735a72a1..ffb80702bf 100644 --- a/samples/js-coffee-shop/src/index.ts +++ b/samples/js-coffee-shop/src/index.ts @@ -17,7 +17,7 @@ import { configureGenkit } from '@genkit-ai/core'; import { defineDotprompt, dotprompt } from '@genkit-ai/dotprompt'; import { firebase } from '@genkit-ai/firebase'; -import { defineFlow, runFlow } from '@genkit-ai/flow'; +import { defineFlow } from '@genkit-ai/flow'; import googleAI, { geminiPro } from '@genkit-ai/googleai'; import * as z from 'zod'; @@ -125,8 +125,8 @@ export const testAllCoffeeFlows = defineFlow( }), }, async () => { - const test1 = runFlow(simpleGreetingFlow, { customerName: 'Sam' }); - const test2 = runFlow(greetingWithHistoryFlow, { + const test1 = simpleGreetingFlow({ customerName: 'Sam' }); + const test2 = greetingWithHistoryFlow({ customerName: 'Sam', currentTime: '09:45am', previousOrder: 'Caramel Macchiato', diff --git a/samples/js-menu/src/05/flows.ts b/samples/js-menu/src/05/flows.ts index 4caddfa6f1..1def8ec34c 100644 --- a/samples/js-menu/src/05/flows.ts +++ b/samples/js-menu/src/05/flows.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { defineFlow, runFlow } from '@genkit-ai/flow'; +import { defineFlow } from '@genkit-ai/flow'; import fs from 'fs'; import path from 'path'; @@ -78,11 +78,11 @@ export const s05_visionMenuQuestionFlow = defineFlow( }, async (input) => { // Run the first flow to read the menu image. - const menuResult = await runFlow(s05_readMenuFlow); + const menuResult = await s05_readMenuFlow(); // Pass the text of the menu and the question to the second flow // and return the answer as this output. - return runFlow(s05_textMenuQuestionFlow, { + return s05_textMenuQuestionFlow({ question: input.question, menuText: menuResult.menuText, }); From 919806fcf4da60b6c65ee7b2373ba249cd82e2a1 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Aug 2024 14:21:04 -0700 Subject: [PATCH 8/9] Better streaming callback type. --- js/flow/src/types.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/js/flow/src/types.ts b/js/flow/src/types.ts index adc25248bf..91fd4e7076 100644 --- a/js/flow/src/types.ts +++ b/js/flow/src/types.ts @@ -25,7 +25,9 @@ export type Invoker< > = ( flow: Flow, msg: FlowInvokeEnvelopeMessage, - streamingCallback?: S extends z.ZodVoid ? undefined : StreamingCallback + streamingCallback?: S extends z.ZodVoid + ? undefined + : StreamingCallback> ) => Promise; export type Scheduler< From d2805fb893018fa3faeb8737b6a1c61b08da6ce6 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Thu, 5 Sep 2024 07:28:29 -0700 Subject: [PATCH 9/9] Fix. --- js/flow/src/context.ts | 2 +- js/testapps/firebase-functions-sample1/functions/src/index.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/js/flow/src/context.ts b/js/flow/src/context.ts index 9248898de8..a50ceebe92 100644 --- a/js/flow/src/context.ts +++ b/js/flow/src/context.ts @@ -17,10 +17,10 @@ import { FlowState, FlowStateExecution, Operation } from '@genkit-ai/core'; import { toJsonSchema } from '@genkit-ai/core/schema'; import { + SPAN_TYPE_ATTR, runInNewSpan, setCustomMetadataAttribute, setCustomMetadataAttributes, - SPAN_TYPE_ATTR, } from '@genkit-ai/core/tracing'; import { logger } from 'firebase-functions/v1'; import { z } from 'zod'; diff --git a/js/testapps/firebase-functions-sample1/functions/src/index.ts b/js/testapps/firebase-functions-sample1/functions/src/index.ts index 2fc4bfdc5b..c8814705b6 100644 --- a/js/testapps/firebase-functions-sample1/functions/src/index.ts +++ b/js/testapps/firebase-functions-sample1/functions/src/index.ts @@ -105,11 +105,11 @@ export const streamConsumer = onFlow( async () => { const response = streamer(5); - for await (const chunk of response.stream()) { + for await (const chunk of response.stream) { console.log('chunk', chunk); } - console.log('streamConsumer done', await response.output()); + console.log('streamConsumer done', await response.output); } );