From d6aa264265803adee2cfdf697bc9c530e87726f6 Mon Sep 17 00:00:00 2001 From: Michal Kalita Date: Wed, 16 Jul 2025 16:28:35 +0200 Subject: [PATCH 1/4] feat: progress notification --- src/main.ts | 2 +- src/mcp/server.ts | 59 +++++++++++++---- src/tools/actor.ts | 19 +++++- src/utils/progress.ts | 105 ++++++++++++++++++++++++++++++ tests/unit/utils.progress.test.ts | 79 ++++++++++++++++++++++ 5 files changed, 246 insertions(+), 18 deletions(-) create mode 100644 src/utils/progress.ts create mode 100644 tests/unit/utils.progress.test.ts diff --git a/src/main.ts b/src/main.ts index 92c75859..4ddb64ce 100644 --- a/src/main.ts +++ b/src/main.ts @@ -56,7 +56,7 @@ if (STANDBY_MODE) { await Actor.fail('If you need to debug a specific Actor, please provide the debugActor and debugActorInput fields in the input'); } const options = { memory: input.maxActorMemoryBytes } as ActorCallOptions; - const { items } = await callActorGetDataset(input.debugActor!, input.debugActorInput!, process.env.APIFY_TOKEN, options); + const { items } = await callActorGetDataset(input.debugActor!, input.debugActorInput!, process.env.APIFY_TOKEN, options, null); await Actor.pushData(items); log.info(`Pushed ${items.count} items to the dataset`); diff --git a/src/mcp/server.ts b/src/mcp/server.ts index 75f00a4e..9ab9de9a 100644 --- a/src/mcp/server.ts +++ b/src/mcp/server.ts @@ -26,6 +26,7 @@ import { import { addRemoveTools, betaTools, callActorGetDataset, defaultTools, getActorsAsTools } from '../tools/index.js'; import { actorNameToToolName, decodeDotPropertyNames } from '../tools/utils.js'; import type { ActorMcpTool, ActorTool, HelperTool, ToolEntry } from '../types.js'; +import { createProgressTracker } from '../utils/progress.js'; import { connectMCPClient } from './client.js'; import { EXTERNAL_TOOL_CALL_TIMEOUT_MSEC } from './const.js'; import { processParamsGetTools } from './utils.js'; @@ -425,6 +426,16 @@ export class ActorsMcpServer { // Handle internal tool if (tool.type === 'internal') { const internalTool = tool.tool as HelperTool; + + // Only create progress tracker for call-actor tool + const progressTracker = internalTool.name === 'call-actor' + ? createProgressTracker(progressToken, extra.sendNotification) + : null; + + if (progressTracker) { + await progressTracker.updateProgress(0, `Starting: ${internalTool.name}`); + } + const res = await internalTool.call({ args, extra, @@ -434,6 +445,10 @@ export class ActorsMcpServer { userRentedActorIds, }) as object; + if (progressTracker) { + await progressTracker.complete(`Completed: ${internalTool.name}`); + } + return { ...res }; } @@ -479,21 +494,37 @@ export class ActorsMcpServer { if (tool.type === 'actor') { const actorTool = tool.tool as ActorTool; + // Create progress tracker if progressToken is available + const progressTracker = createProgressTracker(progressToken, extra.sendNotification); + const callOptions: ActorCallOptions = { memory: actorTool.memoryMbytes }; - const { items } = await callActorGetDataset( - actorTool.actorFullName, - args, - apifyToken as string, - callOptions, - ); - return { - content: items.items.map((item: Record) => { - return { - type: 'text', - text: JSON.stringify(item), - }; - }), - }; + + try { + const { items } = await callActorGetDataset( + actorTool.actorFullName, + args, + apifyToken as string, + callOptions, + progressTracker, + ); + + if (progressTracker) { + await progressTracker.complete(`Completed: ${actorTool.actorFullName}`); + } + + return { + content: items.items.map((item: Record) => { + return { + type: 'text', + text: JSON.stringify(item), + }; + }), + }; + } finally { + if (progressTracker) { + progressTracker.stopPeriodicUpdates(); + } + } } } catch (error) { if (error instanceof ApifyApiError) { diff --git a/src/tools/actor.ts b/src/tools/actor.ts index 5d3e9b5e..7e5373d0 100644 --- a/src/tools/actor.ts +++ b/src/tools/actor.ts @@ -19,6 +19,7 @@ import { actorDefinitionPrunedCache } from '../state.js'; import type { ActorDefinitionStorage, ActorInfo, InternalTool, ToolEntry } from '../types.js'; import { getActorDefinitionStorageFieldNames } from '../utils/actor.js'; import { getValuesByDotKeys } from '../utils/generic.js'; +import type { ProgressTracker } from '../utils/progress.js'; import { getActorDefinition } from './build.js'; import { actorNameToToolName, @@ -50,6 +51,7 @@ export type CallActorGetDatasetResult = { * @param {ActorCallOptions} callOptions - The options to pass to the actor. * @param {unknown} input - The input to pass to the actor. * @param {string} apifyToken - The Apify token to use for authentication. + * @param {ProgressTracker} progressTracker - Optional progress tracker for real-time updates. * @returns {Promise<{ actorRun: any, items: object[] }>} - A promise that resolves to an object containing the actor run and dataset items. * @throws {Error} - Throws an error if the `APIFY_TOKEN` is not set */ @@ -58,6 +60,7 @@ export async function callActorGetDataset( input: unknown, apifyToken: string, callOptions: ActorCallOptions | undefined = undefined, + progressTracker?: ProgressTracker | null, ): Promise { try { log.info(`Calling Actor ${actorName} with input: ${JSON.stringify(input)}`); @@ -65,9 +68,18 @@ export async function callActorGetDataset( const client = new ApifyClient({ token: apifyToken }); const actorClient = client.actor(actorName); - const actorRun: ActorRun = await actorClient.call(input, callOptions); - const dataset = client.dataset(actorRun.defaultDatasetId); - // const dataset = client.dataset('Ehtn0Y4wIKviFT2WB'); + // Start the actor run but don't wait for completion + const actorRun: ActorRun = await actorClient.start(input, callOptions); + + // Start progress tracking if tracker is provided + if (progressTracker) { + progressTracker.startActorRunUpdates(actorRun.id, apifyToken, actorName); + } + + // Wait for the actor to complete + const completedRun = await client.run(actorRun.id).waitForFinish(); + + const dataset = client.dataset(completedRun.defaultDatasetId); const [items, defaultBuild] = await Promise.all([ dataset.listItems(), (await actorClient.defaultBuild()).get(), @@ -356,6 +368,7 @@ You can only use actors that are included in the list; actors not in the list ca input, apifyToken, callOptions, + null, ); return { diff --git a/src/utils/progress.ts b/src/utils/progress.ts new file mode 100644 index 00000000..4f4e6182 --- /dev/null +++ b/src/utils/progress.ts @@ -0,0 +1,105 @@ +import type { ProgressNotification } from '@modelcontextprotocol/sdk/types.js'; + +import { ApifyClient } from '../apify-client.js'; + +export class ProgressTracker { + private progressToken: string | number; + private sendNotification: (notification: ProgressNotification) => Promise; + private currentProgress = 0; + private total = 100; + private intervalId?: NodeJS.Timeout; + + constructor( + progressToken: string | number, + sendNotification: (notification: ProgressNotification) => Promise, + total = 100, + ) { + this.progressToken = progressToken; + this.sendNotification = sendNotification; + this.total = total; + } + + async updateProgress(progress: number, message?: string): Promise { + this.currentProgress = Math.min(progress, this.total); + + try { + const notification: ProgressNotification = { + method: 'notifications/progress' as const, + params: { + progressToken: this.progressToken, + progress: this.currentProgress, + total: this.total, + ...(message && { message }), + }, + }; + + await this.sendNotification(notification); + } catch { + // Silent fail - don't break execution + } + } + + startActorRunUpdates(runId: string, apifyToken: string, actorName: string): void { + this.stopPeriodicUpdates(); + const client = new ApifyClient({ token: apifyToken }); + let lastStatus = ''; + let lastStatusMessage = ''; + + this.intervalId = setInterval(async () => { + try { + const run = await client.run(runId).get(); + if (!run) return; + + const { status, statusMessage } = run; + + // Only send notification if status or statusMessage changed + if (status !== lastStatus || statusMessage !== lastStatusMessage) { + lastStatus = status; + lastStatusMessage = statusMessage || ''; + + // Calculate progress based on status + let progress = 0; + if (status === 'RUNNING') progress = 50; + else if (status === 'SUCCEEDED') progress = 100; + else if (status === 'FAILED') progress = 100; + + const message = statusMessage + ? `${actorName}: ${statusMessage}` + : `${actorName}: ${status}`; + + await this.updateProgress(progress, message); + + // Stop polling if actor finished + if (status === 'SUCCEEDED' || status === 'FAILED' || status === 'ABORTED' || status === 'TIMED-OUT') { + this.stopPeriodicUpdates(); + } + } + } catch { + // Silent fail - continue polling + } + }, 5_000); + } + + stopPeriodicUpdates(): void { + if (this.intervalId) { + clearInterval(this.intervalId); + this.intervalId = undefined; + } + } + + async complete(message = 'Completed'): Promise { + this.stopPeriodicUpdates(); + await this.updateProgress(this.total, message); + } +} + +export function createProgressTracker( + progressToken: string | number | undefined, + sendNotification: ((notification: ProgressNotification) => Promise) | undefined, +): ProgressTracker | null { + if (!progressToken || !sendNotification) { + return null; + } + + return new ProgressTracker(progressToken, sendNotification); +} diff --git a/tests/unit/utils.progress.test.ts b/tests/unit/utils.progress.test.ts new file mode 100644 index 00000000..8247ded3 --- /dev/null +++ b/tests/unit/utils.progress.test.ts @@ -0,0 +1,79 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { ProgressTracker } from '../../src/utils/progress.js'; + +describe('ProgressTracker', () => { + it('should send progress notifications correctly', async () => { + const mockSendNotification = vi.fn(); + const progressToken = 'test-token-123'; + const tracker = new ProgressTracker(progressToken, mockSendNotification, 100); + + await tracker.updateProgress(25, 'Quarter done'); + + expect(mockSendNotification).toHaveBeenCalledWith({ + method: 'notifications/progress', + params: { + progressToken, + progress: 25, + total: 100, + message: 'Quarter done', + }, + }); + }); + + it('should track actor run status updates', async () => { + const mockSendNotification = vi.fn(); + const tracker = new ProgressTracker('test-token', mockSendNotification, 100); + + // Test with a simple manual update instead of mocking the full actor run flow + await tracker.updateProgress(0, 'test-actor: READY'); + await tracker.updateProgress(50, 'test-actor: RUNNING'); + await tracker.updateProgress(100, 'test-actor: SUCCEEDED'); + + expect(mockSendNotification).toHaveBeenCalledTimes(3); + expect(mockSendNotification).toHaveBeenNthCalledWith(1, { + method: 'notifications/progress', + params: { + progressToken: 'test-token', + progress: 0, + total: 100, + message: 'test-actor: READY', + }, + }); + expect(mockSendNotification).toHaveBeenNthCalledWith(3, { + method: 'notifications/progress', + params: { + progressToken: 'test-token', + progress: 100, + total: 100, + message: 'test-actor: SUCCEEDED', + }, + }); + }); + + it('should complete correctly', async () => { + const mockSendNotification = vi.fn(); + const tracker = new ProgressTracker('test-token', mockSendNotification, 100); + + await tracker.complete('All done!'); + + expect(mockSendNotification).toHaveBeenCalledWith({ + method: 'notifications/progress', + params: { + progressToken: 'test-token', + progress: 100, + total: 100, + message: 'All done!', + }, + }); + }); + + it('should handle notification send errors gracefully', async () => { + const mockSendNotification = vi.fn().mockRejectedValue(new Error('Network error')); + const tracker = new ProgressTracker('test-token', mockSendNotification); + + // Should not throw + await expect(tracker.updateProgress(50, 'Test')).resolves.toBeUndefined(); + expect(mockSendNotification).toHaveBeenCalled(); + }); +}); From 2ea7ef3c2eb6bf38331572259d309dc710538b9c Mon Sep 17 00:00:00 2001 From: Michal Kalita Date: Thu, 17 Jul 2025 11:49:53 +0200 Subject: [PATCH 2/4] fix: remove optional parameter --- src/main.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.ts b/src/main.ts index 4ddb64ce..92c75859 100644 --- a/src/main.ts +++ b/src/main.ts @@ -56,7 +56,7 @@ if (STANDBY_MODE) { await Actor.fail('If you need to debug a specific Actor, please provide the debugActor and debugActorInput fields in the input'); } const options = { memory: input.maxActorMemoryBytes } as ActorCallOptions; - const { items } = await callActorGetDataset(input.debugActor!, input.debugActorInput!, process.env.APIFY_TOKEN, options, null); + const { items } = await callActorGetDataset(input.debugActor!, input.debugActorInput!, process.env.APIFY_TOKEN, options); await Actor.pushData(items); log.info(`Pushed ${items.count} items to the dataset`); From 011e3e46dacc83ec3a520b113e3d5c9e0565b197 Mon Sep 17 00:00:00 2001 From: Michal Kalita Date: Fri, 18 Jul 2025 17:53:57 +0200 Subject: [PATCH 3/4] fix: increment progress number, ignore total --- src/const.ts | 2 ++ src/mcp/server.ts | 12 ++-------- src/utils/progress.ts | 30 +++++++---------------- tests/unit/utils.progress.test.ts | 40 ++++++++----------------------- 4 files changed, 22 insertions(+), 62 deletions(-) diff --git a/src/const.ts b/src/const.ts index b198653d..0152ea7a 100644 --- a/src/const.ts +++ b/src/const.ts @@ -80,3 +80,5 @@ export const ALGOLIA = { apiKey: 'e97714a64e2b4b8b8fe0b01cd8592870', // search only (public) API key indexName: 'test_test_apify_sdk', }; + +export const PROGRESS_NOTIFICATION_INTERVAL_MS = 5_000; // 5 seconds diff --git a/src/mcp/server.ts b/src/mcp/server.ts index d774a2b5..5d6bb593 100644 --- a/src/mcp/server.ts +++ b/src/mcp/server.ts @@ -430,10 +430,6 @@ export class ActorsMcpServer { ? createProgressTracker(progressToken, extra.sendNotification) : null; - if (progressTracker) { - await progressTracker.updateProgress(0, `Starting: ${internalTool.name}`); - } - const res = await internalTool.call({ args, extra, @@ -444,7 +440,7 @@ export class ActorsMcpServer { }) as object; if (progressTracker) { - await progressTracker.complete(`Completed: ${internalTool.name}`); + progressTracker.stop(); } return { ...res }; @@ -506,10 +502,6 @@ export class ActorsMcpServer { progressTracker, ); - if (progressTracker) { - await progressTracker.complete(`Completed: ${actorTool.actorFullName}`); - } - return { content: items.items.map((item: Record) => { return { @@ -520,7 +512,7 @@ export class ActorsMcpServer { }; } finally { if (progressTracker) { - progressTracker.stopPeriodicUpdates(); + progressTracker.stop(); } } } diff --git a/src/utils/progress.ts b/src/utils/progress.ts index 4f4e6182..385c90ff 100644 --- a/src/utils/progress.ts +++ b/src/utils/progress.ts @@ -1,26 +1,24 @@ import type { ProgressNotification } from '@modelcontextprotocol/sdk/types.js'; import { ApifyClient } from '../apify-client.js'; +import { PROGRESS_NOTIFICATION_INTERVAL_MS } from '../const.js'; export class ProgressTracker { private progressToken: string | number; private sendNotification: (notification: ProgressNotification) => Promise; private currentProgress = 0; - private total = 100; private intervalId?: NodeJS.Timeout; constructor( progressToken: string | number, sendNotification: (notification: ProgressNotification) => Promise, - total = 100, ) { this.progressToken = progressToken; this.sendNotification = sendNotification; - this.total = total; } - async updateProgress(progress: number, message?: string): Promise { - this.currentProgress = Math.min(progress, this.total); + async updateProgress(message?: string): Promise { + this.currentProgress += 1; try { const notification: ProgressNotification = { @@ -28,7 +26,6 @@ export class ProgressTracker { params: { progressToken: this.progressToken, progress: this.currentProgress, - total: this.total, ...(message && { message }), }, }; @@ -40,7 +37,7 @@ export class ProgressTracker { } startActorRunUpdates(runId: string, apifyToken: string, actorName: string): void { - this.stopPeriodicUpdates(); + this.stop(); const client = new ApifyClient({ token: apifyToken }); let lastStatus = ''; let lastStatusMessage = ''; @@ -57,40 +54,29 @@ export class ProgressTracker { lastStatus = status; lastStatusMessage = statusMessage || ''; - // Calculate progress based on status - let progress = 0; - if (status === 'RUNNING') progress = 50; - else if (status === 'SUCCEEDED') progress = 100; - else if (status === 'FAILED') progress = 100; - const message = statusMessage ? `${actorName}: ${statusMessage}` : `${actorName}: ${status}`; - await this.updateProgress(progress, message); + await this.updateProgress(message); // Stop polling if actor finished if (status === 'SUCCEEDED' || status === 'FAILED' || status === 'ABORTED' || status === 'TIMED-OUT') { - this.stopPeriodicUpdates(); + this.stop(); } } } catch { // Silent fail - continue polling } - }, 5_000); + }, PROGRESS_NOTIFICATION_INTERVAL_MS); } - stopPeriodicUpdates(): void { + stop(): void { if (this.intervalId) { clearInterval(this.intervalId); this.intervalId = undefined; } } - - async complete(message = 'Completed'): Promise { - this.stopPeriodicUpdates(); - await this.updateProgress(this.total, message); - } } export function createProgressTracker( diff --git a/tests/unit/utils.progress.test.ts b/tests/unit/utils.progress.test.ts index 8247ded3..dfb9e061 100644 --- a/tests/unit/utils.progress.test.ts +++ b/tests/unit/utils.progress.test.ts @@ -6,16 +6,15 @@ describe('ProgressTracker', () => { it('should send progress notifications correctly', async () => { const mockSendNotification = vi.fn(); const progressToken = 'test-token-123'; - const tracker = new ProgressTracker(progressToken, mockSendNotification, 100); + const tracker = new ProgressTracker(progressToken, mockSendNotification); - await tracker.updateProgress(25, 'Quarter done'); + await tracker.updateProgress('Quarter done'); expect(mockSendNotification).toHaveBeenCalledWith({ method: 'notifications/progress', params: { progressToken, - progress: 25, - total: 100, + progress: 1, message: 'Quarter done', }, }); @@ -23,20 +22,19 @@ describe('ProgressTracker', () => { it('should track actor run status updates', async () => { const mockSendNotification = vi.fn(); - const tracker = new ProgressTracker('test-token', mockSendNotification, 100); + const tracker = new ProgressTracker('test-token', mockSendNotification); // Test with a simple manual update instead of mocking the full actor run flow - await tracker.updateProgress(0, 'test-actor: READY'); - await tracker.updateProgress(50, 'test-actor: RUNNING'); - await tracker.updateProgress(100, 'test-actor: SUCCEEDED'); + await tracker.updateProgress('test-actor: READY'); + await tracker.updateProgress('test-actor: RUNNING'); + await tracker.updateProgress('test-actor: SUCCEEDED'); expect(mockSendNotification).toHaveBeenCalledTimes(3); expect(mockSendNotification).toHaveBeenNthCalledWith(1, { method: 'notifications/progress', params: { progressToken: 'test-token', - progress: 0, - total: 100, + progress: 1, message: 'test-actor: READY', }, }); @@ -44,36 +42,18 @@ describe('ProgressTracker', () => { method: 'notifications/progress', params: { progressToken: 'test-token', - progress: 100, - total: 100, + progress: 3, message: 'test-actor: SUCCEEDED', }, }); }); - it('should complete correctly', async () => { - const mockSendNotification = vi.fn(); - const tracker = new ProgressTracker('test-token', mockSendNotification, 100); - - await tracker.complete('All done!'); - - expect(mockSendNotification).toHaveBeenCalledWith({ - method: 'notifications/progress', - params: { - progressToken: 'test-token', - progress: 100, - total: 100, - message: 'All done!', - }, - }); - }); - it('should handle notification send errors gracefully', async () => { const mockSendNotification = vi.fn().mockRejectedValue(new Error('Network error')); const tracker = new ProgressTracker('test-token', mockSendNotification); // Should not throw - await expect(tracker.updateProgress(50, 'Test')).resolves.toBeUndefined(); + await expect(tracker.updateProgress('Test')).resolves.toBeUndefined(); expect(mockSendNotification).toHaveBeenCalled(); }); }); From 1e7cddec81030a6b0fd6879e0ec10f8723057867 Mon Sep 17 00:00:00 2001 From: Michal Kalita Date: Mon, 21 Jul 2025 09:55:16 +0200 Subject: [PATCH 4/4] fix: start tracker for call-actor internal tool --- src/main.ts | 2 +- src/mcp/server.ts | 1 + src/tools/actor.ts | 4 ++-- src/types.ts | 3 +++ 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main.ts b/src/main.ts index 92c75859..d299a6e5 100644 --- a/src/main.ts +++ b/src/main.ts @@ -47,7 +47,7 @@ if (STANDBY_MODE) { log.info('Actor is running in the STANDBY mode.'); app.listen(PORT, () => { - log.info(`The Actor web server is listening for user requests at ${HOST}`); + log.info(`The Actor web server is listening for user requests at ${HOST}:${PORT}`); }); } else { log.info('Actor is not designed to run in the NORMAL model (use this mode only for debugging purposes)'); diff --git a/src/mcp/server.ts b/src/mcp/server.ts index 5d6bb593..cc022826 100644 --- a/src/mcp/server.ts +++ b/src/mcp/server.ts @@ -437,6 +437,7 @@ export class ActorsMcpServer { mcpServer: this.server, apifyToken, userRentedActorIds, + progressTracker, }) as object; if (progressTracker) { diff --git a/src/tools/actor.ts b/src/tools/actor.ts index 7e5373d0..f02b737d 100644 --- a/src/tools/actor.ts +++ b/src/tools/actor.ts @@ -313,7 +313,7 @@ export const callActor: ToolEntry = { inputSchema: zodToJsonSchema(callActorArgs), ajvValidate: ajv.compile(zodToJsonSchema(callActorArgs)), call: async (toolArgs) => { - const { apifyMcpServer, args, apifyToken } = toolArgs; + const { apifyMcpServer, args, apifyToken, progressTracker } = toolArgs; const { actor: actorName, input, callOptions } = callActorArgs.parse(args); const actors = apifyMcpServer.listActorToolNames(); @@ -368,7 +368,7 @@ You can only use actors that are included in the list; actors not in the list ca input, apifyToken, callOptions, - null, + progressTracker, ); return { diff --git a/src/types.ts b/src/types.ts index 99402292..7a68cb17 100644 --- a/src/types.ts +++ b/src/types.ts @@ -7,6 +7,7 @@ import type { ActorDefaultRunOptions, ActorDefinition, ActorStoreList, PricingIn import type { ACTOR_PRICING_MODEL } from './const.js'; import type { ActorsMcpServer } from './mcp/server.js'; import type { toolCategories } from './tools/index.js'; +import type { ProgressTracker } from './utils/progress.js'; export interface ISchemaProperties { type: string; @@ -105,6 +106,8 @@ export type InternalToolArgs = { apifyToken: string; /** List of Actor IDs that the user has rented */ userRentedActorIds?: string[]; + /** Optional progress tracker for long running internal tools, like call-actor */ + progressTracker?: ProgressTracker | null; } /**