Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions genkit-tools/cli/config/nextjs.genkit.ts.template
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import * as z from 'zod';
// Import the Genkit core libraries and plugins.
import { generate } from '@genkit-ai/ai';
import { configureGenkit } from '@genkit-ai/core';
import { defineFlow, runFlow } from '@genkit-ai/flow';
import { defineFlow } from '@genkit-ai/flow';
$GENKIT_CONFIG_IMPORTS
$GENKIT_MODEL_IMPORT

Expand Down Expand Up @@ -47,6 +47,6 @@ const menuSuggestionFlow = defineFlow(
export async function callMenuSuggestionFlow() {
// Invoke the flow. The value you pass as the second parameter must conform to
// your flow's input schema.
const flowResponse = await runFlow(menuSuggestionFlow, 'banana');
const flowResponse = await menuSuggestionFlow('banana');
console.log(flowResponse);
}
32 changes: 20 additions & 12 deletions js/flow/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import { metadataPrefix } from './utils.js';
* Context object encapsulates flow execution state at runtime.
*/
export class Context<
I extends z.ZodTypeAny,
O extends z.ZodTypeAny,
S extends z.ZodTypeAny,
I extends z.ZodTypeAny = z.ZodTypeAny,
O extends z.ZodTypeAny = z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
> {
private seenSteps: Record<string, number> = {};

Expand Down Expand Up @@ -73,7 +73,9 @@ export class Context<
}
}

// Runs provided function in the current context. The config can specify retry and other behaviors.
/**
* Runs provided function in the current context. The config can specify retry and other behaviors.
*/
async run<T>(
config: RunStepConfig,
input: any | undefined,
Expand Down Expand Up @@ -124,8 +126,10 @@ export class Context<
return name;
}

// Executes interrupt step in the current context.
async interrupt<I extends z.ZodTypeAny, O>(
/**
* Executes interrupt step.
*/
async interrupt(
stepName: string,
func: (payload: I) => Promise<O>,
responseSchema: I | null,
Expand Down Expand Up @@ -189,17 +193,18 @@ export class Context<
);
}

// Sleep for the specified number of seconds.
async sleep<I extends z.ZodTypeAny, O extends z.ZodTypeAny>(
stepName: string,
seconds: number
): Promise<O> {
/**
* Sleep for the specified number of seconds.
*/
async sleep(stepName: string, seconds: number): Promise<O> {
if (!this.flow.scheduler) {
throw new Error('Cannot sleep in a flow with no scheduler.');
}
const resolvedStepName = this.resolveStepName(stepName);
if (this.isCached(resolvedStepName)) {
setCustomMetadataAttribute(metadataPrefix('state'), 'skipped');
return this.getCached(resolvedStepName);
}

await this.flow.scheduler(
this.flow,
{
Expand Down Expand Up @@ -227,6 +232,9 @@ export class Context<
flowIds: string[];
pollingConfig?: PollingConfig;
}): Promise<Operation[]> {
if (!this.flow.scheduler) {
throw new Error('Cannot wait for a flow with no scheduler.');
}
const resolvedStepName = this.resolveStepName(opts.stepName);
if (this.isCached(resolvedStepName)) {
return this.getCached(resolvedStepName);
Expand Down
26 changes: 10 additions & 16 deletions js/flow/src/experimental.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import {
FlowStillRunningError,
} from './errors.js';
import {
CallableFlow,
Flow,
FlowWrapper,
RunStepConfig,
StepsFunction,
defineFlow,
Expand All @@ -39,30 +39,27 @@ import { getActiveContext } from './utils.js';
export function durableFlow<
I extends z.ZodTypeAny = z.ZodTypeAny,
O extends z.ZodTypeAny = z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
>(
config: {
name: string;
inputSchema?: I;
outputSchema?: O;
streamSchema?: S;
invoker?: Invoker<I, O, S>;
scheduler?: Scheduler<I, O, S>;
invoker?: Invoker<I, O, z.ZodVoid>;
scheduler?: Scheduler<I, O>;
},
steps: StepsFunction<I, O, S>
): Flow<I, O, S> {
steps: StepsFunction<I, O, z.ZodVoid>
): Flow<I, O, z.ZodVoid> {
return defineFlow(
{
name: config.name,
inputSchema: config.inputSchema,
outputSchema: config.outputSchema,
streamSchema: config.streamSchema,
invoker: config.invoker,
experimentalScheduler: config.scheduler,
experimentalDurable: true,
},
steps
);
).flow;
}

/**
Expand All @@ -71,9 +68,8 @@ export function durableFlow<
export async function scheduleFlow<
I extends z.ZodTypeAny = z.ZodTypeAny,
O extends z.ZodTypeAny = z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
>(
flow: Flow<I, O, S> | FlowWrapper<I, O, S>,
flow: Flow<I, O, z.ZodVoid> | CallableFlow<I, O>,
payload: z.infer<I>,
delaySeconds?: number
): Promise<Operation> {
Expand All @@ -97,7 +93,7 @@ export async function resumeFlow<
O extends z.ZodTypeAny = z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
>(
flow: Flow<I, O, S> | FlowWrapper<I, O, S>,
flow: Flow<I, O, z.ZodVoid> | CallableFlow<I, O>,
flowId: string,
payload: any
): Promise<Operation> {
Expand All @@ -118,9 +114,8 @@ export async function resumeFlow<
export async function getFlowState<
I extends z.ZodTypeAny,
O extends z.ZodTypeAny,
S extends z.ZodTypeAny,
>(
flow: Flow<I, O, S> | FlowWrapper<I, O, S>,
flow: Flow<I, O, z.ZodVoid> | CallableFlow<I, O>,
flowId: string
): Promise<Operation> {
if (!(flow instanceof Flow)) {
Expand Down Expand Up @@ -163,9 +158,8 @@ export function runAction<I extends z.ZodTypeAny, O extends z.ZodTypeAny>(
export async function waitFlowToComplete<
I extends z.ZodTypeAny = z.ZodTypeAny,
O extends z.ZodTypeAny = z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
>(
flow: Flow<I, O, S> | FlowWrapper<I, O, S>,
flow: Flow<I, O, z.ZodVoid> | CallableFlow<I, O>,
flowId: string
): Promise<z.infer<O>> {
if (!(flow instanceof Flow)) {
Expand Down
Loading