diff --git a/src/const.ts b/src/const.ts index b198653d..0152ea7a 100644 --- a/src/const.ts +++ b/src/const.ts @@ -80,3 +80,5 @@ export const ALGOLIA = { apiKey: 'e97714a64e2b4b8b8fe0b01cd8592870', // search only (public) API key indexName: 'test_test_apify_sdk', }; + +export const PROGRESS_NOTIFICATION_INTERVAL_MS = 5_000; // 5 seconds diff --git a/src/main.ts b/src/main.ts index 92c75859..d299a6e5 100644 --- a/src/main.ts +++ b/src/main.ts @@ -47,7 +47,7 @@ if (STANDBY_MODE) { log.info('Actor is running in the STANDBY mode.'); app.listen(PORT, () => { - log.info(`The Actor web server is listening for user requests at ${HOST}`); + log.info(`The Actor web server is listening for user requests at ${HOST}:${PORT}`); }); } else { log.info('Actor is not designed to run in the NORMAL model (use this mode only for debugging purposes)'); diff --git a/src/mcp/server.ts b/src/mcp/server.ts index 73565dc6..cc022826 100644 --- a/src/mcp/server.ts +++ b/src/mcp/server.ts @@ -26,6 +26,7 @@ import { import { addRemoveTools, callActorGetDataset, defaultTools, getActorsAsTools, toolCategories } from '../tools/index.js'; import { actorNameToToolName, decodeDotPropertyNames } from '../tools/utils.js'; import type { ActorMcpTool, ActorTool, HelperTool, ToolEntry } from '../types.js'; +import { createProgressTracker } from '../utils/progress.js'; import { connectMCPClient } from './client.js'; import { EXTERNAL_TOOL_CALL_TIMEOUT_MSEC } from './const.js'; import { processParamsGetTools } from './utils.js'; @@ -423,6 +424,12 @@ export class ActorsMcpServer { // Handle internal tool if (tool.type === 'internal') { const internalTool = tool.tool as HelperTool; + + // Only create progress tracker for call-actor tool + const progressTracker = internalTool.name === 'call-actor' + ? createProgressTracker(progressToken, extra.sendNotification) + : null; + const res = await internalTool.call({ args, extra, @@ -430,8 +437,13 @@ export class ActorsMcpServer { mcpServer: this.server, apifyToken, userRentedActorIds, + progressTracker, }) as object; + if (progressTracker) { + progressTracker.stop(); + } + return { ...res }; } @@ -477,21 +489,33 @@ export class ActorsMcpServer { if (tool.type === 'actor') { const actorTool = tool.tool as ActorTool; + // Create progress tracker if progressToken is available + const progressTracker = createProgressTracker(progressToken, extra.sendNotification); + const callOptions: ActorCallOptions = { memory: actorTool.memoryMbytes }; - const { items } = await callActorGetDataset( - actorTool.actorFullName, - args, - apifyToken as string, - callOptions, - ); - return { - content: items.items.map((item: Record) => { - return { - type: 'text', - text: JSON.stringify(item), - }; - }), - }; + + try { + const { items } = await callActorGetDataset( + actorTool.actorFullName, + args, + apifyToken as string, + callOptions, + progressTracker, + ); + + return { + content: items.items.map((item: Record) => { + return { + type: 'text', + text: JSON.stringify(item), + }; + }), + }; + } finally { + if (progressTracker) { + progressTracker.stop(); + } + } } } catch (error) { if (error instanceof ApifyApiError) { diff --git a/src/tools/actor.ts b/src/tools/actor.ts index 5d3e9b5e..f02b737d 100644 --- a/src/tools/actor.ts +++ b/src/tools/actor.ts @@ -19,6 +19,7 @@ import { actorDefinitionPrunedCache } from '../state.js'; import type { ActorDefinitionStorage, ActorInfo, InternalTool, ToolEntry } from '../types.js'; import { getActorDefinitionStorageFieldNames } from '../utils/actor.js'; import { getValuesByDotKeys } from '../utils/generic.js'; +import type { ProgressTracker } from '../utils/progress.js'; import { getActorDefinition } from './build.js'; import { actorNameToToolName, @@ -50,6 +51,7 @@ export type CallActorGetDatasetResult = { * @param {ActorCallOptions} callOptions - The options to pass to the actor. * @param {unknown} input - The input to pass to the actor. * @param {string} apifyToken - The Apify token to use for authentication. + * @param {ProgressTracker} progressTracker - Optional progress tracker for real-time updates. * @returns {Promise<{ actorRun: any, items: object[] }>} - A promise that resolves to an object containing the actor run and dataset items. * @throws {Error} - Throws an error if the `APIFY_TOKEN` is not set */ @@ -58,6 +60,7 @@ export async function callActorGetDataset( input: unknown, apifyToken: string, callOptions: ActorCallOptions | undefined = undefined, + progressTracker?: ProgressTracker | null, ): Promise { try { log.info(`Calling Actor ${actorName} with input: ${JSON.stringify(input)}`); @@ -65,9 +68,18 @@ export async function callActorGetDataset( const client = new ApifyClient({ token: apifyToken }); const actorClient = client.actor(actorName); - const actorRun: ActorRun = await actorClient.call(input, callOptions); - const dataset = client.dataset(actorRun.defaultDatasetId); - // const dataset = client.dataset('Ehtn0Y4wIKviFT2WB'); + // Start the actor run but don't wait for completion + const actorRun: ActorRun = await actorClient.start(input, callOptions); + + // Start progress tracking if tracker is provided + if (progressTracker) { + progressTracker.startActorRunUpdates(actorRun.id, apifyToken, actorName); + } + + // Wait for the actor to complete + const completedRun = await client.run(actorRun.id).waitForFinish(); + + const dataset = client.dataset(completedRun.defaultDatasetId); const [items, defaultBuild] = await Promise.all([ dataset.listItems(), (await actorClient.defaultBuild()).get(), @@ -301,7 +313,7 @@ export const callActor: ToolEntry = { inputSchema: zodToJsonSchema(callActorArgs), ajvValidate: ajv.compile(zodToJsonSchema(callActorArgs)), call: async (toolArgs) => { - const { apifyMcpServer, args, apifyToken } = toolArgs; + const { apifyMcpServer, args, apifyToken, progressTracker } = toolArgs; const { actor: actorName, input, callOptions } = callActorArgs.parse(args); const actors = apifyMcpServer.listActorToolNames(); @@ -356,6 +368,7 @@ You can only use actors that are included in the list; actors not in the list ca input, apifyToken, callOptions, + progressTracker, ); return { diff --git a/src/types.ts b/src/types.ts index 99402292..7a68cb17 100644 --- a/src/types.ts +++ b/src/types.ts @@ -7,6 +7,7 @@ import type { ActorDefaultRunOptions, ActorDefinition, ActorStoreList, PricingIn import type { ACTOR_PRICING_MODEL } from './const.js'; import type { ActorsMcpServer } from './mcp/server.js'; import type { toolCategories } from './tools/index.js'; +import type { ProgressTracker } from './utils/progress.js'; export interface ISchemaProperties { type: string; @@ -105,6 +106,8 @@ export type InternalToolArgs = { apifyToken: string; /** List of Actor IDs that the user has rented */ userRentedActorIds?: string[]; + /** Optional progress tracker for long running internal tools, like call-actor */ + progressTracker?: ProgressTracker | null; } /** diff --git a/src/utils/progress.ts b/src/utils/progress.ts new file mode 100644 index 00000000..385c90ff --- /dev/null +++ b/src/utils/progress.ts @@ -0,0 +1,91 @@ +import type { ProgressNotification } from '@modelcontextprotocol/sdk/types.js'; + +import { ApifyClient } from '../apify-client.js'; +import { PROGRESS_NOTIFICATION_INTERVAL_MS } from '../const.js'; + +export class ProgressTracker { + private progressToken: string | number; + private sendNotification: (notification: ProgressNotification) => Promise; + private currentProgress = 0; + private intervalId?: NodeJS.Timeout; + + constructor( + progressToken: string | number, + sendNotification: (notification: ProgressNotification) => Promise, + ) { + this.progressToken = progressToken; + this.sendNotification = sendNotification; + } + + async updateProgress(message?: string): Promise { + this.currentProgress += 1; + + try { + const notification: ProgressNotification = { + method: 'notifications/progress' as const, + params: { + progressToken: this.progressToken, + progress: this.currentProgress, + ...(message && { message }), + }, + }; + + await this.sendNotification(notification); + } catch { + // Silent fail - don't break execution + } + } + + startActorRunUpdates(runId: string, apifyToken: string, actorName: string): void { + this.stop(); + const client = new ApifyClient({ token: apifyToken }); + let lastStatus = ''; + let lastStatusMessage = ''; + + this.intervalId = setInterval(async () => { + try { + const run = await client.run(runId).get(); + if (!run) return; + + const { status, statusMessage } = run; + + // Only send notification if status or statusMessage changed + if (status !== lastStatus || statusMessage !== lastStatusMessage) { + lastStatus = status; + lastStatusMessage = statusMessage || ''; + + const message = statusMessage + ? `${actorName}: ${statusMessage}` + : `${actorName}: ${status}`; + + await this.updateProgress(message); + + // Stop polling if actor finished + if (status === 'SUCCEEDED' || status === 'FAILED' || status === 'ABORTED' || status === 'TIMED-OUT') { + this.stop(); + } + } + } catch { + // Silent fail - continue polling + } + }, PROGRESS_NOTIFICATION_INTERVAL_MS); + } + + stop(): void { + if (this.intervalId) { + clearInterval(this.intervalId); + this.intervalId = undefined; + } + } +} + +export function createProgressTracker( + progressToken: string | number | undefined, + sendNotification: ((notification: ProgressNotification) => Promise) | undefined, +): ProgressTracker | null { + if (!progressToken || !sendNotification) { + return null; + } + + return new ProgressTracker(progressToken, sendNotification); +} diff --git a/tests/unit/utils.progress.test.ts b/tests/unit/utils.progress.test.ts new file mode 100644 index 00000000..dfb9e061 --- /dev/null +++ b/tests/unit/utils.progress.test.ts @@ -0,0 +1,59 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { ProgressTracker } from '../../src/utils/progress.js'; + +describe('ProgressTracker', () => { + it('should send progress notifications correctly', async () => { + const mockSendNotification = vi.fn(); + const progressToken = 'test-token-123'; + const tracker = new ProgressTracker(progressToken, mockSendNotification); + + await tracker.updateProgress('Quarter done'); + + expect(mockSendNotification).toHaveBeenCalledWith({ + method: 'notifications/progress', + params: { + progressToken, + progress: 1, + message: 'Quarter done', + }, + }); + }); + + it('should track actor run status updates', async () => { + const mockSendNotification = vi.fn(); + const tracker = new ProgressTracker('test-token', mockSendNotification); + + // Test with a simple manual update instead of mocking the full actor run flow + await tracker.updateProgress('test-actor: READY'); + await tracker.updateProgress('test-actor: RUNNING'); + await tracker.updateProgress('test-actor: SUCCEEDED'); + + expect(mockSendNotification).toHaveBeenCalledTimes(3); + expect(mockSendNotification).toHaveBeenNthCalledWith(1, { + method: 'notifications/progress', + params: { + progressToken: 'test-token', + progress: 1, + message: 'test-actor: READY', + }, + }); + expect(mockSendNotification).toHaveBeenNthCalledWith(3, { + method: 'notifications/progress', + params: { + progressToken: 'test-token', + progress: 3, + message: 'test-actor: SUCCEEDED', + }, + }); + }); + + it('should handle notification send errors gracefully', async () => { + const mockSendNotification = vi.fn().mockRejectedValue(new Error('Network error')); + const tracker = new ProgressTracker('test-token', mockSendNotification); + + // Should not throw + await expect(tracker.updateProgress('Test')).resolves.toBeUndefined(); + expect(mockSendNotification).toHaveBeenCalled(); + }); +});