Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions genkit-tools/cli/src/commands/flow-batch-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/

import { FlowInvokeEnvelopeMessage } from '@genkit-ai/tools-common';
import { logger } from '@genkit-ai/tools-common/utils';
import { Command } from 'commander';
import { readFile, writeFile } from 'fs/promises';
Expand Down Expand Up @@ -59,13 +58,11 @@ export const flowBatchRun = new Command('flow:batchRun')
logger.info(`Running '/flow/${flowName}'...`);
let response = await manager.runAction({
key: `/flow/${flowName}`,
input: {
start: {
input: data,
labels: options.label ? { batchRun: options.label } : undefined,
auth: options.auth ? JSON.parse(options.auth) : undefined,
},
} as FlowInvokeEnvelopeMessage,
input: data,
context: options.auth ? JSON.parse(options.auth) : undefined,
telemetryLabels: options.label
? { batchRun: options.label }
: undefined,
});
logger.info(
'Result:\n' + JSON.stringify(response.result, undefined, ' ')
Expand Down
9 changes: 2 additions & 7 deletions genkit-tools/cli/src/commands/flow-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/

import { FlowInvokeEnvelopeMessage } from '@genkit-ai/tools-common';
import { logger } from '@genkit-ai/tools-common/utils';
import { Command } from 'commander';
import { writeFile } from 'fs/promises';
Expand Down Expand Up @@ -50,12 +49,8 @@ export const flowRun = new Command('flow:run')
await manager.runAction(
{
key: `/flow/${flowName}`,
input: {
start: {
input: data ? JSON.parse(data) : undefined,
},
auth: options.auth ? JSON.parse(options.auth) : undefined,
} as FlowInvokeEnvelopeMessage,
input: data ? JSON.parse(data) : undefined,
context: options.auth ? JSON.parse(options.auth) : undefined,
},
options.stream
? (chunk) => console.log(JSON.stringify(chunk, undefined, ' '))
Expand Down
10 changes: 2 additions & 8 deletions genkit-tools/common/src/eval/evaluate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import {
EvalKeyAugments,
EvalRun,
EvalRunKey,
FlowActionInputSchema,
GenerateRequest,
GenerateRequestSchema,
GenerateResponseSchema,
Expand Down Expand Up @@ -257,15 +256,10 @@ async function runFlowAction(params: {
const { manager, actionRef, testCase, auth } = { ...params };
let state: InferenceRunState;
try {
const flowInput = FlowActionInputSchema.parse({
start: {
input: testCase.input,
},
auth: auth ? JSON.parse(auth) : undefined,
});
const runActionResponse = await manager.runAction({
key: actionRef,
input: flowInput,
input: testCase.input,
context: auth ? JSON.parse(auth) : undefined,
});
state = {
...testCase,
Expand Down
23 changes: 23 additions & 0 deletions genkit-tools/common/src/manager/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {

const STREAM_DELIMITER = '\n';
const HEALTH_CHECK_INTERVAL = 5000;
export const GENKIT_REFLECTION_API_SPEC_VERSION = 1;

interface RuntimeManagerOptions {
/** URL of the telemetry server. */
Expand Down Expand Up @@ -278,6 +279,7 @@ export class RuntimeManager {
try {
await axios.post(`${runtime.reflectionServerUrl}/api/notify`, {
telemetryServerUrl: this.telemetryServerUrl,
reflectionApiSpecVersion: GENKIT_REFLECTION_API_SPEC_VERSION,
});
} catch (error) {
logger.error(`Failed to notify runtime ${runtime.id}: ${error}`);
Expand Down Expand Up @@ -326,6 +328,27 @@ export class RuntimeManager {
if (isValidRuntimeInfo(runtimeInfo)) {
const fileName = path.basename(filePath);
if (await checkServerHealth(runtimeInfo.reflectionServerUrl)) {
if (
runtimeInfo.reflectionApiSpecVersion !=
GENKIT_REFLECTION_API_SPEC_VERSION
) {
if (
!runtimeInfo.reflectionApiSpecVersion ||
runtimeInfo.reflectionApiSpecVersion <
GENKIT_REFLECTION_API_SPEC_VERSION
) {
logger.warn(
'Genkit CLI is newer than runtime library. Some feature may not be supported. ' +
'Consider upgrading your runtime library version (debug info: expected ' +
`${GENKIT_REFLECTION_API_SPEC_VERSION}, got ${runtimeInfo.reflectionApiSpecVersion}).`
);
} else {
logger.error(
'Genkit CLI version is outdated. Please update `genkit-cli` to the latest version.'
);
process.exit(1);
}
}
this.filenameToRuntimeMap[fileName] = runtimeInfo;
this.idToFileMap[runtimeInfo.id] = fileName;
this.eventEmitter.emit(RuntimeEvent.ADD, runtimeInfo);
Expand Down
4 changes: 4 additions & 0 deletions genkit-tools/common/src/manager/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ export interface RuntimeInfo {
timestamp: string;
/** Display name for the project, typically basename of the root folder */
projectName?: string;
/** Genkit runtime library version. Ex: nodejs/0.9.5 or go/0.2.0 */
genkitVersion?: string;
/** Reflection API specification version. Ex: 1 */
reflectionApiSpecVersion?: number;
}

export enum RuntimeEvent {
Expand Down
4 changes: 2 additions & 2 deletions genkit-tools/common/src/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ export function startServer(manager: RuntimeManager, port: number) {
});

app.post('/api/streamAction', bodyParser.json(), async (req, res) => {
const { key, input } = req.body;
const { key, input, context } = req.body;
res.writeHead(200, {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Content-Type': 'text/plain',
'Transfer-Encoding': 'chunked',
});

const result = await manager.runAction({ key, input }, (chunk) => {
const result = await manager.runAction({ key, input, context }, (chunk) => {
res.write(JSON.stringify(chunk) + '\n');
});
res.write(JSON.stringify(result));
Expand Down
8 changes: 8 additions & 0 deletions genkit-tools/common/src/types/apis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ export const RunActionRequestSchema = z.object({
.any()
.optional()
.describe('An input with the type that this action expects.'),
context: z
.any()
.optional()
.describe('Additional runtime context data (ex. auth context data).'),
telemetryLabels: z
.record(z.string(), z.string())
.optional()
.describe('Labels to be applied to telemetry data.'),
});

export type RunActionRequest = z.infer<typeof RunActionRequestSchema>;
Expand Down
100 changes: 0 additions & 100 deletions genkit-tools/common/src/types/flow.ts

This file was deleted.

1 change: 0 additions & 1 deletion genkit-tools/common/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ export * from './apis';
export * from './env';
export * from './eval';
export * from './evaluators';
export * from './flow';
export * from './model';
export * from './prompt';
export * from './retrievers';
Expand Down
1 change: 0 additions & 1 deletion genkit-tools/common/tests/utils/trace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ export class MockTrace {
let baseFlowSpan = { ...this.BASE_FLOW_SPAN };
baseFlowSpan.attributes['genkit:input'] = JSON.stringify(flowInput);
baseFlowSpan.attributes['genkit:output'] = JSON.stringify(flowOutput);
baseFlowSpan.attributes['genkit:metadata:flow:state'] = baseFlowState;

let wrapperActionSpan = { ...this.WRAPPER_ACTION_SPAN };
wrapperActionSpan.attributes['genkit:input'] = JSON.stringify({
Expand Down
Loading
Loading