diff --git a/genkit-tools/common/src/manager/manager.ts b/genkit-tools/common/src/manager/manager.ts index 3a33457ad3..068e06eb50 100644 --- a/genkit-tools/common/src/manager/manager.ts +++ b/genkit-tools/common/src/manager/manager.ts @@ -16,9 +16,9 @@ import axios, { AxiosError } from 'axios'; import chokidar from 'chokidar'; +import EventEmitter from 'events'; import fs from 'fs/promises'; import path from 'path'; - import { Action, RunActionResponse, @@ -28,18 +28,12 @@ import * as apis from '../types/apis'; import { TraceData } from '../types/trace'; import { logger } from '../utils/logger'; import { checkServerHealth, findRuntimesDir } from '../utils/utils'; -import { GenkitToolsError, StreamingCallback } from './types'; - -interface RuntimeInfo { - /** Runtime ID (either user-set or `pid`). */ - id: string; - /** Process ID of the runtime. */ - pid: number; - /** URL of the reflection server. */ - reflectionServerUrl: string; - /** Timestamp when the runtime was started. */ - timestamp: string; -} +import { + GenkitToolsError, + RuntimeEvent, + RuntimeInfo, + StreamingCallback, +} from './types'; const STREAM_DELIMITER = '\n'; const HEALTH_CHECK_INTERVAL = 5000; @@ -54,6 +48,7 @@ interface RuntimeManagerOptions { export class RuntimeManager { private filenameToRuntimeMap: Record = {}; private idToFileMap: Record = {}; + private eventEmitter = new EventEmitter(); private constructor( readonly telemetryServerUrl?: string, @@ -110,6 +105,23 @@ export class RuntimeManager { ); } + /** + * Subscribe to changes to the available runtimes. e.g.) whenever a new + * runtime is added or removed. + * + * The `listener` will be called with the `eventType` that occured and the + * `runtime` to which it applies. + * + * @param listener the callback function. + */ + onRuntimeEvent( + listener: (eventType: RuntimeEvent, runtime: RuntimeInfo) => void + ) { + Object.values(RuntimeEvent).forEach((event) => + this.eventEmitter.on(event, (rt) => listener(event, rt)) + ); + } + /** * Retrieves all runnable actions. */ @@ -305,6 +317,7 @@ export class RuntimeManager { if (await checkServerHealth(runtimeInfo.reflectionServerUrl)) { this.filenameToRuntimeMap[fileName] = runtimeInfo; this.idToFileMap[runtimeInfo.id] = fileName; + this.eventEmitter.emit(RuntimeEvent.ADD, runtimeInfo); await this.notifyRuntime(runtimeInfo); logger.debug( `Added runtime with ID ${runtimeInfo.id} at URL: ${runtimeInfo.reflectionServerUrl}` @@ -326,10 +339,11 @@ export class RuntimeManager { private handleRemovedRuntime(filePath: string) { const fileName = path.basename(filePath); if (fileName in this.filenameToRuntimeMap) { - const id = this.filenameToRuntimeMap[fileName].id; + const runtime = this.filenameToRuntimeMap[fileName]; delete this.filenameToRuntimeMap[fileName]; - delete this.idToFileMap[id]; - logger.debug(`Removed runtime with id ${id}.`); + delete this.idToFileMap[runtime.id]; + this.eventEmitter.emit(RuntimeEvent.REMOVE, runtime); + logger.debug(`Removed runtime with id ${runtime.id}.`); } } diff --git a/genkit-tools/common/src/manager/types.ts b/genkit-tools/common/src/manager/types.ts index 5f1f20aff5..2a1ce7e016 100644 --- a/genkit-tools/common/src/manager/types.ts +++ b/genkit-tools/common/src/manager/types.ts @@ -26,3 +26,19 @@ export class GenkitToolsError extends Error { // Streaming callback function. export type StreamingCallback = (chunk: T) => void; + +export interface RuntimeInfo { + /** Runtime ID (either user-set or `pid`). */ + id: string; + /** Process ID of the runtime. */ + pid: number; + /** URL of the reflection server. */ + reflectionServerUrl: string; + /** Timestamp when the runtime was started. */ + timestamp: string; +} + +export enum RuntimeEvent { + ADD = 'add', + REMOVE = 'remove', +} diff --git a/genkit-tools/common/src/server/server.ts b/genkit-tools/common/src/server/server.ts index 9863c4e1b7..60ddbecf6f 100644 --- a/genkit-tools/common/src/server/server.ts +++ b/genkit-tools/common/src/server/server.ts @@ -79,6 +79,36 @@ export async function startServer(manager: RuntimeManager, port: number) { res.end(); }); + // General purpose endpoint for Server Side Events to the Developer UI. + // Currently only event type "current-time" is supported, which notifies the + // subsriber of the currently selected Genkit Runtime (typically most recent). + app.get('/api/sse', async (_, res) => { + res.writeHead(200, { + 'Access-Control-Allow-Origin': '*', + 'Cache-Control': 'no-cache', + 'Content-Type': 'text/event-stream', + Connection: 'keep-alive', + }); + + // On connection, immediately send the "current" runtime (i.e. most recent) + const runtimeInfo = JSON.stringify(manager.getMostRecentRuntime() ?? {}); + res.write('event: current-runtime\n'); + res.write(`data: ${runtimeInfo}\n\n`); + + // When runtimes are added or removed, notify the Dev UI which runtime + // is considered "current" (i.e. most recent). In the future, we could send + // updates and let the developer decide which to use. + manager.onRuntimeEvent(() => { + const runtimeInfo = JSON.stringify(manager.getMostRecentRuntime() ?? {}); + res.write('event: current-runtime\n'); + res.write(`data: ${runtimeInfo}\n\n`); + }); + + res.on('close', () => { + res.end(); + }); + }); + app.get('/api/__health', (_, res) => { res.status(200).send(''); }); diff --git a/genkit-tools/common/src/types/index.ts b/genkit-tools/common/src/types/index.ts index 6174af34a7..078d050f29 100644 --- a/genkit-tools/common/src/types/index.ts +++ b/genkit-tools/common/src/types/index.ts @@ -14,6 +14,7 @@ * limitations under the License. */ +export { RuntimeEvent, RuntimeInfo } from '../manager/types'; export * from './action'; export * from './analytics'; export * from './apis';