Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,5 @@ export const ALGOLIA = {
apiKey: 'e97714a64e2b4b8b8fe0b01cd8592870', // search only (public) API key
indexName: 'test_test_apify_sdk',
};

export const PROGRESS_NOTIFICATION_INTERVAL_MS = 5_000; // 5 seconds
2 changes: 1 addition & 1 deletion src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ if (STANDBY_MODE) {
log.info('Actor is running in the STANDBY mode.');

app.listen(PORT, () => {
log.info(`The Actor web server is listening for user requests at ${HOST}`);
log.info(`The Actor web server is listening for user requests at ${HOST}:${PORT}`);
});
} else {
log.info('Actor is not designed to run in the NORMAL model (use this mode only for debugging purposes)');
Expand Down
52 changes: 38 additions & 14 deletions src/mcp/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
import { addRemoveTools, callActorGetDataset, defaultTools, getActorsAsTools, toolCategories } from '../tools/index.js';
import { actorNameToToolName, decodeDotPropertyNames } from '../tools/utils.js';
import type { ActorMcpTool, ActorTool, HelperTool, ToolEntry } from '../types.js';
import { createProgressTracker } from '../utils/progress.js';
import { connectMCPClient } from './client.js';
import { EXTERNAL_TOOL_CALL_TIMEOUT_MSEC } from './const.js';
import { processParamsGetTools } from './utils.js';
Expand Down Expand Up @@ -423,15 +424,26 @@ export class ActorsMcpServer {
// Handle internal tool
if (tool.type === 'internal') {
const internalTool = tool.tool as HelperTool;

// Only create progress tracker for call-actor tool
const progressTracker = internalTool.name === 'call-actor'
? createProgressTracker(progressToken, extra.sendNotification)
: null;

const res = await internalTool.call({
args,
extra,
apifyMcpServer: this,
mcpServer: this.server,
apifyToken,
userRentedActorIds,
progressTracker,
}) as object;

if (progressTracker) {
progressTracker.stop();
}

return { ...res };
}

Expand Down Expand Up @@ -477,21 +489,33 @@ export class ActorsMcpServer {
if (tool.type === 'actor') {
const actorTool = tool.tool as ActorTool;

// Create progress tracker if progressToken is available
const progressTracker = createProgressTracker(progressToken, extra.sendNotification);

const callOptions: ActorCallOptions = { memory: actorTool.memoryMbytes };
const { items } = await callActorGetDataset(
actorTool.actorFullName,
args,
apifyToken as string,
callOptions,
);
return {
content: items.items.map((item: Record<string, unknown>) => {
return {
type: 'text',
text: JSON.stringify(item),
};
}),
};

try {
const { items } = await callActorGetDataset(
actorTool.actorFullName,
args,
apifyToken as string,
callOptions,
progressTracker,
);

return {
content: items.items.map((item: Record<string, unknown>) => {
return {
type: 'text',
text: JSON.stringify(item),
};
}),
};
} finally {
if (progressTracker) {
progressTracker.stop();
}
}
}
} catch (error) {
if (error instanceof ApifyApiError) {
Expand Down
21 changes: 17 additions & 4 deletions src/tools/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { actorDefinitionPrunedCache } from '../state.js';
import type { ActorDefinitionStorage, ActorInfo, InternalTool, ToolEntry } from '../types.js';
import { getActorDefinitionStorageFieldNames } from '../utils/actor.js';
import { getValuesByDotKeys } from '../utils/generic.js';
import type { ProgressTracker } from '../utils/progress.js';
import { getActorDefinition } from './build.js';
import {
actorNameToToolName,
Expand Down Expand Up @@ -50,6 +51,7 @@ export type CallActorGetDatasetResult = {
* @param {ActorCallOptions} callOptions - The options to pass to the actor.
* @param {unknown} input - The input to pass to the actor.
* @param {string} apifyToken - The Apify token to use for authentication.
* @param {ProgressTracker} progressTracker - Optional progress tracker for real-time updates.
* @returns {Promise<{ actorRun: any, items: object[] }>} - A promise that resolves to an object containing the actor run and dataset items.
* @throws {Error} - Throws an error if the `APIFY_TOKEN` is not set
*/
Expand All @@ -58,16 +60,26 @@ export async function callActorGetDataset(
input: unknown,
apifyToken: string,
callOptions: ActorCallOptions | undefined = undefined,
progressTracker?: ProgressTracker | null,
): Promise<CallActorGetDatasetResult> {
try {
log.info(`Calling Actor ${actorName} with input: ${JSON.stringify(input)}`);

const client = new ApifyClient({ token: apifyToken });
const actorClient = client.actor(actorName);

const actorRun: ActorRun = await actorClient.call(input, callOptions);
const dataset = client.dataset(actorRun.defaultDatasetId);
// const dataset = client.dataset('Ehtn0Y4wIKviFT2WB');
// Start the actor run but don't wait for completion
const actorRun: ActorRun = await actorClient.start(input, callOptions);

// Start progress tracking if tracker is provided
if (progressTracker) {
progressTracker.startActorRunUpdates(actorRun.id, apifyToken, actorName);
}

// Wait for the actor to complete
const completedRun = await client.run(actorRun.id).waitForFinish();

const dataset = client.dataset(completedRun.defaultDatasetId);
const [items, defaultBuild] = await Promise.all([
dataset.listItems(),
(await actorClient.defaultBuild()).get(),
Expand Down Expand Up @@ -301,7 +313,7 @@ export const callActor: ToolEntry = {
inputSchema: zodToJsonSchema(callActorArgs),
ajvValidate: ajv.compile(zodToJsonSchema(callActorArgs)),
call: async (toolArgs) => {
const { apifyMcpServer, args, apifyToken } = toolArgs;
const { apifyMcpServer, args, apifyToken, progressTracker } = toolArgs;
const { actor: actorName, input, callOptions } = callActorArgs.parse(args);

const actors = apifyMcpServer.listActorToolNames();
Expand Down Expand Up @@ -356,6 +368,7 @@ You can only use actors that are included in the list; actors not in the list ca
input,
apifyToken,
callOptions,
progressTracker,
);

return {
Expand Down
3 changes: 3 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { ActorDefaultRunOptions, ActorDefinition, ActorStoreList, PricingIn
import type { ACTOR_PRICING_MODEL } from './const.js';
import type { ActorsMcpServer } from './mcp/server.js';
import type { toolCategories } from './tools/index.js';
import type { ProgressTracker } from './utils/progress.js';

export interface ISchemaProperties {
type: string;
Expand Down Expand Up @@ -105,6 +106,8 @@ export type InternalToolArgs = {
apifyToken: string;
/** List of Actor IDs that the user has rented */
userRentedActorIds?: string[];
/** Optional progress tracker for long running internal tools, like call-actor */
progressTracker?: ProgressTracker | null;
}

/**
Expand Down
91 changes: 91 additions & 0 deletions src/utils/progress.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import type { ProgressNotification } from '@modelcontextprotocol/sdk/types.js';

import { ApifyClient } from '../apify-client.js';
import { PROGRESS_NOTIFICATION_INTERVAL_MS } from '../const.js';

export class ProgressTracker {
private progressToken: string | number;
private sendNotification: (notification: ProgressNotification) => Promise<void>;
private currentProgress = 0;
private intervalId?: NodeJS.Timeout;

constructor(
progressToken: string | number,
sendNotification: (notification: ProgressNotification) => Promise<void>,
) {
this.progressToken = progressToken;
this.sendNotification = sendNotification;
}

async updateProgress(message?: string): Promise<void> {
this.currentProgress += 1;

try {
const notification: ProgressNotification = {
method: 'notifications/progress' as const,
params: {
progressToken: this.progressToken,
progress: this.currentProgress,
...(message && { message }),
},
};

await this.sendNotification(notification);
} catch {
// Silent fail - don't break execution
}
}

startActorRunUpdates(runId: string, apifyToken: string, actorName: string): void {
this.stop();
const client = new ApifyClient({ token: apifyToken });
let lastStatus = '';
let lastStatusMessage = '';

this.intervalId = setInterval(async () => {
try {
const run = await client.run(runId).get();
if (!run) return;

const { status, statusMessage } = run;

// Only send notification if status or statusMessage changed
if (status !== lastStatus || statusMessage !== lastStatusMessage) {
lastStatus = status;
lastStatusMessage = statusMessage || '';

const message = statusMessage
? `${actorName}: ${statusMessage}`
: `${actorName}: ${status}`;

await this.updateProgress(message);

// Stop polling if actor finished
if (status === 'SUCCEEDED' || status === 'FAILED' || status === 'ABORTED' || status === 'TIMED-OUT') {
this.stop();
}
}
} catch {
// Silent fail - continue polling
}
}, PROGRESS_NOTIFICATION_INTERVAL_MS);
}

stop(): void {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = undefined;
}
}
}

export function createProgressTracker(
progressToken: string | number | undefined,
sendNotification: ((notification: ProgressNotification) => Promise<void>) | undefined,
): ProgressTracker | null {
if (!progressToken || !sendNotification) {
return null;
}

return new ProgressTracker(progressToken, sendNotification);
}
59 changes: 59 additions & 0 deletions tests/unit/utils.progress.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { describe, expect, it, vi } from 'vitest';

import { ProgressTracker } from '../../src/utils/progress.js';

describe('ProgressTracker', () => {
it('should send progress notifications correctly', async () => {
const mockSendNotification = vi.fn();
const progressToken = 'test-token-123';
const tracker = new ProgressTracker(progressToken, mockSendNotification);

await tracker.updateProgress('Quarter done');

expect(mockSendNotification).toHaveBeenCalledWith({
method: 'notifications/progress',
params: {
progressToken,
progress: 1,
message: 'Quarter done',
},
});
});

it('should track actor run status updates', async () => {
const mockSendNotification = vi.fn();
const tracker = new ProgressTracker('test-token', mockSendNotification);

// Test with a simple manual update instead of mocking the full actor run flow
await tracker.updateProgress('test-actor: READY');
await tracker.updateProgress('test-actor: RUNNING');
await tracker.updateProgress('test-actor: SUCCEEDED');

expect(mockSendNotification).toHaveBeenCalledTimes(3);
expect(mockSendNotification).toHaveBeenNthCalledWith(1, {
method: 'notifications/progress',
params: {
progressToken: 'test-token',
progress: 1,
message: 'test-actor: READY',
},
});
expect(mockSendNotification).toHaveBeenNthCalledWith(3, {
method: 'notifications/progress',
params: {
progressToken: 'test-token',
progress: 3,
message: 'test-actor: SUCCEEDED',
},
});
});

it('should handle notification send errors gracefully', async () => {
const mockSendNotification = vi.fn().mockRejectedValue(new Error('Network error'));
const tracker = new ProgressTracker('test-token', mockSendNotification);

// Should not throw
await expect(tracker.updateProgress('Test')).resolves.toBeUndefined();
expect(mockSendNotification).toHaveBeenCalled();
});
});