From 4838e851aa3a78e18fc19d7c176579a8f68f8738 Mon Sep 17 00:00:00 2001 From: MQ Date: Tue, 1 Jul 2025 15:19:38 +0200 Subject: [PATCH 1/5] refactor Actor tool fetching logic with caching, server streamable add DELETE endpoint, rewrite server logic to use multiple internal MCP servers - this had to be done as the legacy SSE transport test were hanging for some reason, bump MCP sdk and apify and apify-client versions, split tests touching MCP server internals into separate file, remove tool loading from the root get endpoint (!!!), prepare support for streamable Actorized MCP servers --- package-lock.json | 68 +++---- package.json | 6 +- src/actor/server.ts | 78 ++++++-- src/const.ts | 6 +- src/main.ts | 24 ++- src/mcp/actors.ts | 62 ++++--- src/state.ts | 5 + src/tools/actor.ts | 157 ++++++++-------- src/tools/build.ts | 2 + src/tools/run_collection.ts | 2 +- src/types.ts | 21 ++- src/utils/ttl-lru.ts | 69 +++++++ tests/const.ts | 7 + tests/integration/actor.server-sse.test.ts | 16 +- .../actor.server-streamable.test.ts | 16 +- tests/integration/internals.test.ts | 145 +++++++++++++++ tests/integration/stdio.test.ts | 1 + tests/integration/suite.ts | 174 ++---------------- tests/unit/mcp.actors.test.ts | 59 ++++++ tests/unit/utils.ttl-lru.test.ts | 46 +++++ 20 files changed, 605 insertions(+), 359 deletions(-) create mode 100644 src/state.ts create mode 100644 src/utils/ttl-lru.ts create mode 100644 tests/const.ts create mode 100644 tests/integration/internals.test.ts create mode 100644 tests/unit/mcp.actors.test.ts create mode 100644 tests/unit/utils.ttl-lru.test.ts diff --git a/package-lock.json b/package-lock.json index 236cfe8b..3188236a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,10 +11,10 @@ "dependencies": { "@apify/datastructures": "^2.0.3", "@apify/log": "^2.5.16", - "@modelcontextprotocol/sdk": "^1.11.5", + "@modelcontextprotocol/sdk": "^1.13.2", "ajv": "^8.17.1", - "apify": "^3.4.0", - "apify-client": "^2.12.3", + "apify": "^3.4.2", + "apify-client": "^2.12.6", "express": "^4.21.2", "yargs": "^17.7.2", "zod": "^3.24.1", @@ -1066,11 +1066,12 @@ "license": "MIT" }, "node_modules/@modelcontextprotocol/sdk": { - "version": "1.11.5", - "resolved": "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.11.5.tgz", - "integrity": "sha512-gS7Q7IHpKxjVaNLMUZyTtatZ63ca3h418zPPntAhu/MvG5yfz/8HMcDAOpvpQfx3V3dsw9QQxk8RuFNrQhLlgA==", + "version": "1.13.2", + "resolved": "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.13.2.tgz", + "integrity": "sha512-Vx7qOcmoKkR3qhaQ9qf3GxiVKCEu+zfJddHv6x3dY/9P6+uIwJnmuAur5aB+4FDXf41rRrDnOEGkviX5oYZ67w==", + "license": "MIT", "dependencies": { - "ajv": "^8.17.1", + "ajv": "^6.12.6", "content-type": "^1.0.5", "cors": "^2.8.5", "cross-spawn": "^7.0.5", @@ -1099,6 +1100,22 @@ "node": ">= 0.6" } }, + "node_modules/@modelcontextprotocol/sdk/node_modules/ajv": { + "version": "6.12.6", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", + "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", + "license": "MIT", + "dependencies": { + "fast-deep-equal": "^3.1.1", + "fast-json-stable-stringify": "^2.0.0", + "json-schema-traverse": "^0.4.1", + "uri-js": "^4.2.2" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/epoberezkin" + } + }, "node_modules/@modelcontextprotocol/sdk/node_modules/body-parser": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/body-parser/-/body-parser-2.2.0.tgz", @@ -1237,6 +1254,12 @@ "node": ">=0.10.0" } }, + "node_modules/@modelcontextprotocol/sdk/node_modules/json-schema-traverse": { + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", + "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", + "license": "MIT" + }, "node_modules/@modelcontextprotocol/sdk/node_modules/media-typer": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/media-typer/-/media-typer-1.1.0.tgz", @@ -2460,9 +2483,9 @@ } }, "node_modules/apify": { - "version": "3.4.0", - "resolved": "https://registry.npmjs.org/apify/-/apify-3.4.0.tgz", - "integrity": "sha512-jVxyujEpZq9XnKJQnqH9PHIC4MlQqXbXlhdgvM/LOcnvtVdVJCbfXgyYZFOARgeX+Asc47uv9s66v23fDoc0AA==", + "version": "3.4.2", + "resolved": "https://registry.npmjs.org/apify/-/apify-3.4.2.tgz", + "integrity": "sha512-ev1OdyyHJdvPt19aZ8uzzRPcAzsCivv3IYGZdN634U7wWk3EdVv4kLulfs4AOdanj2TnzbDPbYme91io71/RtA==", "license": "Apache-2.0", "dependencies": { "@apify/consts": "^2.23.0", @@ -2485,9 +2508,9 @@ } }, "node_modules/apify-client": { - "version": "2.12.3", - "resolved": "https://registry.npmjs.org/apify-client/-/apify-client-2.12.3.tgz", - "integrity": "sha512-z12/QmvgJoVustbuujJ4fASfRq8K3b3fdxcXRm0mxFP6ufjmJt9k13755rXytvNnHaKRmKh0ViUIRUQ4al9lOg==", + "version": "2.12.6", + "resolved": "https://registry.npmjs.org/apify-client/-/apify-client-2.12.6.tgz", + "integrity": "sha512-6KTvH4UPjW0/yYu2Dx/51O5DVx+835A8oxlJLvpr1OvjWYAxBiR9MLo0V2xwAMdT8W9a7Bxk3hhjnG6FOAd/gA==", "license": "Apache-2.0", "dependencies": { "@apify/consts": "^2.25.0", @@ -2498,7 +2521,6 @@ "axios": "^1.6.7", "content-type": "^1.0.5", "ow": "^0.28.2", - "prettier": "^3.5.3", "tslib": "^2.5.0", "type-fest": "^4.0.0" } @@ -4228,7 +4250,6 @@ "version": "2.1.0", "resolved": "https://registry.npmjs.org/fast-json-stable-stringify/-/fast-json-stable-stringify-2.1.0.tgz", "integrity": "sha512-lhd/wF+Lk98HZoTCtlVraHtfh5XYijIjalXck7saUtuanSDyLMxnHhSXEDJqHxD7msR8D0uCmqlkwjCV8xvwHw==", - "dev": true, "license": "MIT" }, "node_modules/fast-levenshtein": { @@ -6387,21 +6408,6 @@ "node": ">= 0.8.0" } }, - "node_modules/prettier": { - "version": "3.5.3", - "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.5.3.tgz", - "integrity": "sha512-QQtaxnoDJeAkDvDKWCLiwIXkTgRhwYDEQCghU9Z6q03iyek/rxRh/2lC3HB7P8sWT2xC/y5JDctPLBIGzHKbhw==", - "license": "MIT", - "bin": { - "prettier": "bin/prettier.cjs" - }, - "engines": { - "node": ">=14" - }, - "funding": { - "url": "https://github.com/prettier/prettier?sponsor=1" - } - }, "node_modules/proper-lockfile": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/proper-lockfile/-/proper-lockfile-4.1.2.tgz", @@ -6445,7 +6451,6 @@ "version": "2.3.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz", "integrity": "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==", - "dev": true, "license": "MIT", "engines": { "node": ">=6" @@ -7775,7 +7780,6 @@ "version": "4.4.1", "resolved": "https://registry.npmjs.org/uri-js/-/uri-js-4.4.1.tgz", "integrity": "sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==", - "dev": true, "license": "BSD-2-Clause", "dependencies": { "punycode": "^2.1.0" diff --git a/package.json b/package.json index 9dbb1ff9..55f9a653 100644 --- a/package.json +++ b/package.json @@ -32,10 +32,10 @@ "dependencies": { "@apify/datastructures": "^2.0.3", "@apify/log": "^2.5.16", - "@modelcontextprotocol/sdk": "^1.11.5", + "@modelcontextprotocol/sdk": "^1.13.2", "ajv": "^8.17.1", - "apify": "^3.4.0", - "apify-client": "^2.12.3", + "apify": "^3.4.2", + "apify-client": "^2.12.6", "express": "^4.21.2", "yargs": "^17.7.2", "zod": "^3.24.1", diff --git a/src/actor/server.ts b/src/actor/server.ts index 95b51c68..de68e4f9 100644 --- a/src/actor/server.ts +++ b/src/actor/server.ts @@ -11,8 +11,9 @@ import express from 'express'; import log from '@apify/log'; -import { type ActorsMcpServer } from '../mcp/server.js'; -import { parseInputParamsFromUrl, processParamsGetTools } from '../mcp/utils.js'; +import { ActorsMcpServer } from '../mcp/server.js'; +import { parseInputParamsFromUrl } from '../mcp/utils.js'; +import { getActorsAsTools } from '../tools/actor.js'; import { getHelpMessage, HEADER_READINESS_PROBE, Routes } from './const.js'; import { getActorRunData } from './utils.js'; @@ -34,10 +35,15 @@ async function loadToolsAndActors(mcpServer: ActorsMcpServer, url: string, apify export function createExpressApp( host: string, - mcpServer: ActorsMcpServer, + mcpServerOptions: { + enableAddingActors?: boolean; + enableDefaultActors?: boolean; + actors?: string[]; + }, ): express.Express { const app = express(); - let transportSSE: SSEServerTransport; + const mcpServers: { [sessionId: string]: ActorsMcpServer } = {}; + const transportsSSE: { [sessionId: string]: SSEServerTransport } = {}; const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; function respondWithError(res: Response, error: unknown, logMessage: string, statusCode = 500) { @@ -62,11 +68,6 @@ export function createExpressApp( } try { log.info(`Received GET message at: ${Routes.ROOT}`); - // TODO: I think we should remove this logic, root should return only help message - const tools = await processParamsGetTools(req.url, process.env.APIFY_TOKEN as string); - if (tools) { - mcpServer.upsertTools(tools); - } res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); @@ -83,9 +84,25 @@ export function createExpressApp( app.get(Routes.SSE, async (req: Request, res: Response) => { try { log.info(`Received GET message at: ${Routes.SSE}`); + const mcpServer = new ActorsMcpServer(mcpServerOptions, false); + // Load tools from Actor input for backwards compatibility + if (mcpServerOptions.actors && mcpServerOptions.actors.length > 0) { + const tools = await getActorsAsTools(mcpServerOptions.actors, process.env.APIFY_TOKEN as string); + mcpServer.upsertTools(tools); + } await loadToolsAndActors(mcpServer, req.url, process.env.APIFY_TOKEN as string); - transportSSE = new SSEServerTransport(Routes.MESSAGE, res); - await mcpServer.connect(transportSSE); + const transport = new SSEServerTransport(Routes.MESSAGE, res); + transportsSSE[transport.sessionId] = transport; + mcpServers[transport.sessionId] = mcpServer; + await mcpServer.connect(transport); + + res.on('close', () => { + log.info('Connection closed, cleaning up', { + sessionId: transport.sessionId, + }); + delete transportsSSE[transport.sessionId]; + delete mcpServers[transport.sessionId]; + }); } catch (error) { respondWithError(res, error, `Error in GET ${Routes.SSE}`); } @@ -94,8 +111,22 @@ export function createExpressApp( app.post(Routes.MESSAGE, async (req: Request, res: Response) => { try { log.info(`Received POST message at: ${Routes.MESSAGE}`); - if (transportSSE) { - await transportSSE.handlePostMessage(req, res); + const sessionId = new URL(req.url, `http://${req.headers.host}`).searchParams.get('sessionId'); + if (!sessionId) { + log.error('No session ID provided in POST request'); + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Bad Request: No session ID provided', + }, + id: null, + }); + return; + } + const transport = transportsSSE[sessionId]; + if (transport) { + await transport.handlePostMessage(req, res); } else { log.error('Server is not connected to the client.'); res.status(400).json({ @@ -132,6 +163,12 @@ export function createExpressApp( sessionIdGenerator: () => randomUUID(), enableJsonResponse: false, // Use SSE response mode }); + const mcpServer = new ActorsMcpServer(mcpServerOptions, false); + // Load tools from Actor input for backwards compatibility + if (mcpServerOptions.actors && mcpServerOptions.actors.length > 0) { + const tools = await getActorsAsTools(mcpServerOptions.actors, process.env.APIFY_TOKEN as string); + mcpServer.upsertTools(tools); + } // Load MCP server tools await loadToolsAndActors(mcpServer, req.url, process.env.APIFY_TOKEN as string); // Connect the transport to the MCP server BEFORE handling the request @@ -143,6 +180,7 @@ export function createExpressApp( // Store the transport by session ID for future requests if (transport.sessionId) { transports[transport.sessionId] = transport; + mcpServers[transport.sessionId] = mcpServer; } return; // Already handled } else { @@ -172,6 +210,20 @@ export function createExpressApp( res.status(405).set('Allow', 'POST').send('Method Not Allowed'); }); + app.delete(Routes.MCP, async (req: Request, res: Response) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + const transport = transports[sessionId || ''] as StreamableHTTPServerTransport | undefined; + if (transport) { + log.info(`Deleting MCP session with ID: ${sessionId}`); + await transport.handleRequest(req, res, req.body); + return; + } + + log.error('Session not found', { sessionId }); + res.status(400).send('Bad Request: Session not found').end(); + }); + // Catch-all for undefined routes app.use((req: Request, res: Response) => { res.status(404).json({ message: `There is nothing at route ${req.method} ${req.originalUrl}. ${getHelpMessage(host)}` }).end(); diff --git a/src/const.ts b/src/const.ts index b27f5d3c..80cfc7a0 100644 --- a/src/const.ts +++ b/src/const.ts @@ -54,8 +54,8 @@ export const ACTOR_ADDITIONAL_INSTRUCTIONS = `Never call/execute tool/Actor unle You can always use ${HelperTools.DATASET_GET_ITEMS} tool to get more items from the dataset. Actor run input is always stored in the key-value store, recordKey: INPUT.`; -export const TOOL_CACHE_MAX_SIZE = 500; -export const TOOL_CACHE_TTL_SECS = 30 * 60; +export const ACTOR_CACHE_MAX_SIZE = 500; +export const ACTOR_CACHE_TTL_SECS = 30 * 60; // 30 minutes export const ACTOR_PRICING_MODEL = { /** Rental actors */ @@ -72,3 +72,5 @@ export const ACTOR_PRICING_MODEL = { * so we can safely filter out rental Actors from the search and ensure we return some results. */ export const ACTOR_SEARCH_ABOVE_LIMIT = 50; + +export const MCP_STREAMABLE_ENDPOINT = '/mcp'; diff --git a/src/main.ts b/src/main.ts index 7ac99668..d103a77e 100644 --- a/src/main.ts +++ b/src/main.ts @@ -10,8 +10,7 @@ import log from '@apify/log'; import { createExpressApp } from './actor/server.js'; import { processInput } from './input.js'; -import { ActorsMcpServer } from './mcp/server.js'; -import { callActorGetDataset, getActorsAsTools } from './tools/index.js'; +import { callActorGetDataset } from './tools/index.js'; import type { Input } from './types.js'; const STANDBY_MODE = Actor.getEnv().metaOrigin === 'STANDBY'; @@ -30,22 +29,21 @@ const input = processInput((await Actor.getInput>()) ?? ({} as In log.info(`Loaded input: ${JSON.stringify(input)} `); if (STANDBY_MODE) { - const mcpServer = new ActorsMcpServer({ - enableAddingActors: Boolean(input.enableAddingActors), - enableDefaultActors: false, - }); - - const app = createExpressApp(HOST, mcpServer); - log.info('Actor is running in the STANDBY mode.'); - + let actorsToLoad: string[] = []; // Load only Actors specified in the input // If you wish to start without any Actor, create a task and leave the input empty if (input.actors && input.actors.length > 0) { const { actors } = input; - const actorsToLoad = Array.isArray(actors) ? actors : actors.split(','); - const tools = await getActorsAsTools(actorsToLoad, process.env.APIFY_TOKEN as string); - mcpServer.upsertTools(tools); + actorsToLoad = Array.isArray(actors) ? actors : actors.split(','); } + // Include Actors to load in the MCP server options for backwards compatibility + const app = createExpressApp(HOST, { + enableAddingActors: Boolean(input.enableAddingActors), + enableDefaultActors: false, + actors: actorsToLoad, + }); + log.info('Actor is running in the STANDBY mode.'); + app.listen(PORT, () => { log.info(`The Actor web server is listening for user requests at ${HOST}`); }); diff --git a/src/mcp/actors.ts b/src/mcp/actors.ts index 39c86422..01859789 100644 --- a/src/mcp/actors.ts +++ b/src/mcp/actors.ts @@ -1,59 +1,67 @@ import type { ActorDefinition } from 'apify-client'; import { ApifyClient } from '../apify-client.js'; +import { MCP_STREAMABLE_ENDPOINT } from '../const.js'; +import type { ActorDefinitionPruned } from '../types.js'; -export async function isActorMCPServer(actorID: string, apifyToken: string): Promise { - const mcpPath = await getActorsMCPServerPath(actorID, apifyToken); - return (mcpPath?.length || 0) > 0; -} +/** + * Returns the MCP server path for the given Actor ID. + * Prioritizes the streamable transport path if available. + * The `webServerMcpPath` is a string containing MCP endpoint or endpoints separated by commas. + */ +export function getActorMCPServerPath(actorDefinition: ActorDefinition | ActorDefinitionPruned): string | null { + if ('webServerMcpPath' in actorDefinition && typeof actorDefinition.webServerMcpPath === 'string') { + const webServerMcpPath = actorDefinition.webServerMcpPath.trim(); -export async function getActorsMCPServerPath(actorID: string, apifyToken: string): Promise { - const actorDefinition = await getActorDefinition(actorID, apifyToken); + const paths = webServerMcpPath.split(',').map((path) => path.trim()); + // If there is only one path, return it directly + if (paths.length === 1) { + return paths[0]; + } - if ('webServerMcpPath' in actorDefinition && typeof actorDefinition.webServerMcpPath === 'string') { - return actorDefinition.webServerMcpPath; + // If there are multiple paths, prioritize the streamable transport path + // otherwise return the first one. + const streamablePath = paths.find((path) => path === MCP_STREAMABLE_ENDPOINT); + if (streamablePath) { + return streamablePath; + } + // Otherwise, return the first path + return paths[0]; } - return undefined; + return null; } -export async function getActorsMCPServerURL(actorID: string, apifyToken: string): Promise { +/** + * Returns the MCP server URL for the given Actor ID. + */ +export async function getActorMCPServerURL(realActorId: string, mcpServerPath: string): Promise { // TODO: get from API instead const standbyBaseUrl = process.env.HOSTNAME === 'mcp-securitybyobscurity.apify.com' ? 'securitybyobscurity.apify.actor' : 'apify.actor'; - const standbyUrl = await getActorStandbyURL(actorID, apifyToken, standbyBaseUrl); - const mcpPath = await getActorsMCPServerPath(actorID, apifyToken); - return `${standbyUrl}${mcpPath}`; + const standbyUrl = await getActorStandbyURL(realActorId, standbyBaseUrl); + return `${standbyUrl}${mcpServerPath}`; } /** * Gets Actor ID from the Actor object. -* -* @param actorID -* @param apifyToken */ -export async function getRealActorID(actorID: string, apifyToken: string): Promise { +export async function getRealActorID(actorIdOrName: string, apifyToken: string): Promise { const apifyClient = new ApifyClient({ token: apifyToken }); - const actor = apifyClient.actor(actorID); + const actor = apifyClient.actor(actorIdOrName); const info = await actor.get(); if (!info) { - throw new Error(`Actor ${actorID} not found`); + throw new Error(`Actor ${actorIdOrName} not found`); } return info.id; } /** * Returns standby URL for given Actor ID. -* -* @param actorID -* @param standbyBaseUrl -* @param apifyToken -* @returns */ -export async function getActorStandbyURL(actorID: string, apifyToken: string, standbyBaseUrl = 'apify.actor'): Promise { - const actorRealID = await getRealActorID(actorID, apifyToken); - return `https://${actorRealID}.${standbyBaseUrl}`; +export async function getActorStandbyURL(realActorId: string, standbyBaseUrl = 'apify.actor'): Promise { + return `https://${realActorId}.${standbyBaseUrl}`; } export async function getActorDefinition(actorID: string, apifyToken: string): Promise { diff --git a/src/state.ts b/src/state.ts new file mode 100644 index 00000000..1eff0864 --- /dev/null +++ b/src/state.ts @@ -0,0 +1,5 @@ +import { ACTOR_CACHE_MAX_SIZE, ACTOR_CACHE_TTL_SECS } from './const.js'; +import type { ActorDefinitionPruned } from './types.js'; +import { TTLLRUCache } from './utils/ttl-lru.js'; + +export const actorDefinitionPrunedCache = new TTLLRUCache(ACTOR_CACHE_MAX_SIZE, ACTOR_CACHE_TTL_SECS); diff --git a/src/tools/actor.ts b/src/tools/actor.ts index 6619b0c2..ae6153a4 100644 --- a/src/tools/actor.ts +++ b/src/tools/actor.ts @@ -4,7 +4,6 @@ import type { ActorCallOptions, ActorRun, Dataset, PaginatedList } from 'apify-c import { z } from 'zod'; import zodToJsonSchema from 'zod-to-json-schema'; -import { LruCache } from '@apify/datastructures'; import log from '@apify/log'; import { ApifyClient } from '../apify-client.js'; @@ -13,13 +12,12 @@ import { ACTOR_MAX_MEMORY_MBYTES, ACTOR_RUN_DATASET_OUTPUT_MAX_ITEMS, HelperTools, - TOOL_CACHE_MAX_SIZE, - TOOL_CACHE_TTL_SECS, } from '../const.js'; -import { getActorsMCPServerURL, isActorMCPServer } from '../mcp/actors.js'; +import { getActorMCPServerPath, getActorMCPServerURL } from '../mcp/actors.js'; import { createMCPClient } from '../mcp/client.js'; import { getMCPServerTools } from '../mcp/proxy.js'; -import type { InternalTool, ToolCacheEntry, ToolEntry } from '../types.js'; +import { actorDefinitionPrunedCache } from '../state.js'; +import type { ActorInfo, InternalTool, ToolEntry } from '../types.js'; import { getActorDefinition } from './build.js'; import { actorNameToToolName, @@ -41,11 +39,6 @@ export type CallActorGetDatasetResult = { items: PaginatedList>; }; -// Cache for normal Actor tools -const normalActorToolsCache = new LruCache({ - maxLength: TOOL_CACHE_MAX_SIZE, -}); - /** * Calls an Apify actor and retrieves the dataset items. * @@ -111,69 +104,42 @@ export async function callActorGetDataset( * @returns {Promise} - A promise that resolves to an array of MCP tools. */ export async function getNormalActorsAsTools( - actors: string[], - apifyToken: string, + actorsInfo: ActorInfo[], ): Promise { const tools: ToolEntry[] = []; - const actorsToLoad: string[] = []; - for (const actorID of actors) { - const cacheEntry = normalActorToolsCache.get(actorID); - if (cacheEntry && cacheEntry.expiresAt > Date.now()) { - tools.push(cacheEntry.tool); - } else { - actorsToLoad.push(actorID); - } - } - if (actorsToLoad.length === 0) { - return tools; - } - - const getActorDefinitionWithToken = async (actorId: string) => { - return await getActorDefinition(actorId, apifyToken); - }; - const results = await Promise.all(actorsToLoad.map(getActorDefinitionWithToken)); // Zip the results with their corresponding actorIDs - for (let i = 0; i < results.length; i++) { - const result = results[i]; - // We need to get the orignal input from the user - // sonce the user can input real Actor ID like '3ox4R101TgZz67sLr' instead of - // 'username/actorName' even though we encourage that. - // And the getActorDefinition does not return the original input it received, just the actorFullName or actorID - const actorIDOrName = actorsToLoad[i]; + for (const actorInfo of actorsInfo) { + const { actorDefinitionPruned } = actorInfo; - if (result) { - const schemaID = getToolSchemaID(result.actorFullName); - if (result.input && 'properties' in result.input && result.input) { - result.input.properties = markInputPropertiesAsRequired(result.input); - result.input.properties = buildNestedProperties(result.input.properties); - result.input.properties = filterSchemaProperties(result.input.properties); - result.input.properties = shortenProperties(result.input.properties); - result.input.properties = addEnumsToDescriptionsWithExamples(result.input.properties); + if (actorDefinitionPruned) { + const schemaID = getToolSchemaID(actorDefinitionPruned.actorFullName); + if (actorDefinitionPruned.input && 'properties' in actorDefinitionPruned.input && actorDefinitionPruned.input) { + actorDefinitionPruned.input.properties = markInputPropertiesAsRequired(actorDefinitionPruned.input); + actorDefinitionPruned.input.properties = buildNestedProperties(actorDefinitionPruned.input.properties); + actorDefinitionPruned.input.properties = filterSchemaProperties(actorDefinitionPruned.input.properties); + actorDefinitionPruned.input.properties = shortenProperties(actorDefinitionPruned.input.properties); + actorDefinitionPruned.input.properties = addEnumsToDescriptionsWithExamples(actorDefinitionPruned.input.properties); // Add schema $id, each valid JSON schema should have a unique $id // see https://json-schema.org/understanding-json-schema/basics#declaring-a-unique-identifier - result.input.$id = schemaID; + actorDefinitionPruned.input.$id = schemaID; } try { - const memoryMbytes = result.defaultRunOptions?.memoryMbytes || ACTOR_MAX_MEMORY_MBYTES; + const memoryMbytes = actorDefinitionPruned.defaultRunOptions?.memoryMbytes || ACTOR_MAX_MEMORY_MBYTES; const tool: ToolEntry = { type: 'actor', tool: { - name: actorNameToToolName(result.actorFullName), - actorFullName: result.actorFullName, - description: `${result.description} Instructions: ${ACTOR_ADDITIONAL_INSTRUCTIONS}`, - inputSchema: result.input || {}, - ajvValidate: fixedAjvCompile(ajv, result.input || {}), + name: actorNameToToolName(actorDefinitionPruned.actorFullName), + actorFullName: actorDefinitionPruned.actorFullName, + description: `${actorDefinitionPruned.description} Instructions: ${ACTOR_ADDITIONAL_INSTRUCTIONS}`, + inputSchema: actorDefinitionPruned.input || {}, + ajvValidate: fixedAjvCompile(ajv, actorDefinitionPruned.input || {}), memoryMbytes: memoryMbytes > ACTOR_MAX_MEMORY_MBYTES ? ACTOR_MAX_MEMORY_MBYTES : memoryMbytes, }, }; tools.push(tool); - normalActorToolsCache.add(actorIDOrName, { - tool, - expiresAt: Date.now() + TOOL_CACHE_TTL_SECS * 1000, - }); } catch (validationError) { - log.error(`Failed to compile AJV schema for Actor: ${result.actorFullName}. Error: ${validationError}`); + log.error(`Failed to compile AJV schema for Actor: ${actorDefinitionPruned.actorFullName}. Error: ${validationError}`); } } } @@ -181,18 +147,33 @@ export async function getNormalActorsAsTools( } async function getMCPServersAsTools( - actors: string[], + actorsInfo: ActorInfo[], apifyToken: string, ): Promise { const actorsMCPServerTools: ToolEntry[] = []; - for (const actorID of actors) { - const serverUrl = await getActorsMCPServerURL(actorID, apifyToken); - log.info(`ActorID: ${actorID} MCP server URL: ${serverUrl}`); + for (const actorInfo of actorsInfo) { + const actorId = actorInfo.actorDefinitionPruned.id; + if (!actorInfo.webServerMcpPath) { + log.warning('Actor does not have a web server MCP path, skipping', { + actorFullName: actorInfo.actorDefinitionPruned.actorFullName, + actorId, + }); + continue; + } + const mcpServerUrl = await getActorMCPServerURL( + actorInfo.actorDefinitionPruned.id, // Real ID of the Actor + actorInfo.webServerMcpPath, + ); + log.info('Retrieved MCP server URL for Actor', { + actorFullName: actorInfo.actorDefinitionPruned.actorFullName, + actorId, + mcpServerUrl, + }); let client: Client | undefined; try { - client = await createMCPClient(serverUrl, apifyToken); - const serverTools = await getMCPServerTools(actorID, client, serverUrl); + client = await createMCPClient(mcpServerUrl, apifyToken); + const serverTools = await getMCPServerTools(actorId, client, mcpServerUrl); actorsMCPServerTools.push(...serverTools); } finally { if (client) await client.close(); @@ -203,29 +184,45 @@ async function getMCPServersAsTools( } export async function getActorsAsTools( - actors: string[], + actorIdsOrNames: string[], apifyToken: string, ): Promise { log.debug(`Fetching actors as tools...`); - log.debug(`Actors: ${actors}`); - // Actorized MCP servers - const actorsMCPServers: string[] = []; - for (const actorID of actors) { - // TODO: rework, we are fetching actor definition from API twice - in the getMCPServerTools - if (await isActorMCPServer(actorID, apifyToken)) { - actorsMCPServers.push(actorID); - } - } - // Normal Actors as a tool - const toolActors = actors.filter((actorID) => !actorsMCPServers.includes(actorID)); - log.debug(`actorsMCPserver: ${actorsMCPServers}`); - log.debug(`toolActors: ${toolActors}`); + log.debug(`Actors: ${actorIdsOrNames}`); + + const actorsInfo: (ActorInfo | null)[] = await Promise.all( + actorIdsOrNames.map(async (actorIdOrName) => { + const actorDefinitionPrunedCached = actorDefinitionPrunedCache.get(actorIdOrName); + if (actorDefinitionPrunedCached) { + return { + actorDefinitionPruned: actorDefinitionPrunedCached, + webServerMcpPath: getActorMCPServerPath(actorDefinitionPrunedCached), + + } as ActorInfo; + } + + const actorDefinitionPruned = await getActorDefinition(actorIdOrName, apifyToken); + if (!actorDefinitionPruned) { + log.error('Actor not found or definition is not available', { actorIdOrName }); + return null; + } + // Cache the pruned Actor definition + actorDefinitionPrunedCache.set(actorIdOrName, actorDefinitionPruned); + return { + actorDefinitionPruned, + webServerMcpPath: getActorMCPServerPath(actorDefinitionPruned), + } as ActorInfo; + }), + ); - // Normal Actors as a tool - const normalTools = await getNormalActorsAsTools(toolActors, apifyToken); + // Filter out nulls and separate Actors with MCP servers and normal Actors + const actorMCPServersInfo = actorsInfo.filter((actorInfo) => actorInfo && actorInfo.webServerMcpPath) as ActorInfo[]; + const normalActorsInfo = actorsInfo.filter((actorInfo) => actorInfo && !actorInfo.webServerMcpPath) as ActorInfo[]; - // Tools from Actorized MCP servers - const mcpServerTools = await getMCPServersAsTools(actorsMCPServers, apifyToken); + const [normalTools, mcpServerTools] = await Promise.all([ + getNormalActorsAsTools(normalActorsInfo), + getMCPServersAsTools(actorMCPServersInfo, apifyToken), + ]); return [...normalTools, ...mcpServerTools]; } diff --git a/src/tools/build.ts b/src/tools/build.ts index 91daec09..9b91dbab 100644 --- a/src/tools/build.ts +++ b/src/tools/build.ts @@ -46,6 +46,7 @@ export async function getActorDefinition( if (buildDetails?.actorDefinition) { const actorDefinitions = buildDetails?.actorDefinition as ActorDefinitionWithDesc; + // We set actorDefinition ID to Actor ID actorDefinitions.id = actor.id; actorDefinitions.readme = truncateActorReadme(actorDefinitions.readme || '', limit); actorDefinitions.description = actor.description || ''; @@ -75,6 +76,7 @@ function pruneActorDefinition(response: ActorDefinitionWithDesc): ActorDefinitio : undefined, description: response.description, defaultRunOptions: response.defaultRunOptions, + webServerMcpPath: 'webServerMcpPath' in response ? response.webServerMcpPath as string : undefined, }; } /** Prune Actor README if it is too long diff --git a/src/tools/run_collection.ts b/src/tools/run_collection.ts index ce3cbd93..03560a0f 100644 --- a/src/tools/run_collection.ts +++ b/src/tools/run_collection.ts @@ -19,7 +19,7 @@ const getUserRunsListArgs = z.object({ desc: z.boolean() .describe('If true or 1 then the runs are sorted by the startedAt field in descending order. Default: sorted in ascending order.') .default(false), - status: z.enum(['READY', 'RUNNING', 'SUCCEEDED', 'FAILED', 'TIMING_OUT', 'TIMED_OUT', 'ABORTING', 'ABORTED']) + status: z.enum(['READY', 'RUNNING', 'SUCCEEDED', 'FAILED', 'TIMING-OUT', 'TIMED-OUT', 'ABORTING', 'ABORTED']) .optional() .describe('Return only runs with the provided status.'), }); diff --git a/src/types.ts b/src/types.ts index cb4fb39e..0f1977bd 100644 --- a/src/types.ts +++ b/src/types.ts @@ -47,8 +47,14 @@ export type ActorDefinitionWithDesc = Omit & { input?: IActorInputSchema; } +/** + * Pruned Actor definition type. + * The `id` property is set to Actor ID. + */ export type ActorDefinitionPruned = Pick + 'id' | 'actorFullName' | 'buildTag' | 'readme' | 'input' | 'description' | 'defaultRunOptions'> & { + webServerMcpPath?: string; // Optional, used for Actorized MCP server tools + } /** * Base interface for all tools in the MCP server. @@ -198,10 +204,13 @@ export type Input = { debugActorInput?: unknown; }; -export interface ToolCacheEntry { - expiresAt: number; - tool: ToolEntry; -} - // Utility type to get a union of values from an object type export type ActorPricingModel = (typeof ACTOR_PRICING_MODEL)[keyof typeof ACTOR_PRICING_MODEL]; + +/** + * Type representing the Actor information needed in order to turn it into an MCP server tool. + */ +export interface ActorInfo { + webServerMcpPath: string | null; // To determined if the Actor is an MCP server + actorDefinitionPruned: ActorDefinitionPruned; +} diff --git a/src/utils/ttl-lru.ts b/src/utils/ttl-lru.ts new file mode 100644 index 00000000..e2eada05 --- /dev/null +++ b/src/utils/ttl-lru.ts @@ -0,0 +1,69 @@ +import { LruCache } from '@apify/datastructures'; + +/** + * LRU cache with TTL (time-to-live) for storing entries. + * + * This class wraps an LRU cache and adds a time-to-live (TTL) expiration to each entry. + * When an entry is accessed, it is checked for expiration and removed if expired. + * + * Usage: + * ```typescript + * const cache = new TTLLRUCache(100, 60); // 100 items, 60 seconds TTL + * cache.set('key', 'value'); + * const value = cache.get('key'); + * ``` + */ +export class TTLLRUCache { + // Internal LRU cache storing value and expiration timestamp for each entry + private readonly cache: LruCache<{ + value: T; + expiresAt: number; + }>; + + // Time-to-live in milliseconds for each entry + private readonly ttlMillis: number; + + /** + * @param maxLength Maximum number of items in the cache (LRU eviction) + * @param ttlSecs Time-to-live for each entry, in seconds + */ + constructor(maxLength: number, ttlSecs: number) { + this.ttlMillis = ttlSecs * 1000; + this.cache = new LruCache<{ + value: T; + expiresAt: number; + }>({ + maxLength, + }); + } + + /** + * Set a value in the cache with the given key. If the key exists, it is updated and TTL is reset. + * @param key Cache key + * @param value Value to store + */ + set(key: string, value: T) { + // If the key already exists, remove it to update the value and reset TTL + if (this.cache.get(key)) { + this.cache.remove(key); + } + this.cache.add(key, { + value, + expiresAt: Date.now() + this.ttlMillis, + }); + } + + /** + * Get a value from the cache by key. Returns null if not found or expired. + * @param key Cache key + * @returns The value if present and not expired, otherwise null + */ + get(key: string): T | null { + const entry = this.cache.get(key); + if (entry && entry.expiresAt > Date.now()) { + return entry.value; + } + this.cache.remove(key); // Remove expired entry + return null; + } +} diff --git a/tests/const.ts b/tests/const.ts new file mode 100644 index 00000000..971760e2 --- /dev/null +++ b/tests/const.ts @@ -0,0 +1,7 @@ +import { defaults } from '../src/const.js'; +import { defaultTools } from '../src/tools/index.js'; +import { actorNameToToolName } from '../src/tools/utils.js'; + +export const ACTOR_PYTHON_EXAMPLE = 'apify/python-example'; +export const DEFAULT_TOOL_NAMES = defaultTools.map((tool) => tool.tool.name); +export const DEFAULT_ACTOR_NAMES = defaults.actors.map((tool) => actorNameToToolName(tool)); diff --git a/tests/integration/actor.server-sse.test.ts b/tests/integration/actor.server-sse.test.ts index c122051d..3547e863 100644 --- a/tests/integration/actor.server-sse.test.ts +++ b/tests/integration/actor.server-sse.test.ts @@ -5,12 +5,10 @@ import type { Express } from 'express'; import log from '@apify/log'; import { createExpressApp } from '../../src/actor/server.js'; -import { ActorsMcpServer } from '../../src/mcp/server.js'; import { createMcpSseClient } from '../helpers.js'; import { createIntegrationTestsSuite } from './suite.js'; let app: Express; -let mcpServer: ActorsMcpServer; let httpServer: HttpServer; const httpServerPort = 50000; const httpServerHost = `http://localhost:${httpServerPort}`; @@ -18,26 +16,24 @@ const mcpUrl = `${httpServerHost}/sse`; createIntegrationTestsSuite({ suiteName: 'Actors MCP Server SSE', - getActorsMcpServer: () => mcpServer, + transport: 'sse', createClientFn: async (options) => await createMcpSseClient(mcpUrl, options), beforeAllFn: async () => { - mcpServer = new ActorsMcpServer({ enableAddingActors: false, enableDefaultActors: false }); log.setLevel(log.LEVELS.OFF); // Create an express app using the proper server setup - app = createExpressApp(httpServerHost, mcpServer); + const mcpServerOptions = { + enableAddingActors: false, + enableDefaultActors: false, + }; + app = createExpressApp(httpServerHost, mcpServerOptions); // Start a test server await new Promise((resolve) => { httpServer = app.listen(httpServerPort, () => resolve()); }); }, - beforeEachFn: async () => { - mcpServer.disableDynamicActorTools(); - await mcpServer.reset(); - }, afterAllFn: async () => { - await mcpServer.close(); await new Promise((resolve) => { httpServer.close(() => resolve()); }); diff --git a/tests/integration/actor.server-streamable.test.ts b/tests/integration/actor.server-streamable.test.ts index a42e2301..b877c2ad 100644 --- a/tests/integration/actor.server-streamable.test.ts +++ b/tests/integration/actor.server-streamable.test.ts @@ -5,12 +5,10 @@ import type { Express } from 'express'; import log from '@apify/log'; import { createExpressApp } from '../../src/actor/server.js'; -import { ActorsMcpServer } from '../../src/mcp/server.js'; import { createMcpStreamableClient } from '../helpers.js'; import { createIntegrationTestsSuite } from './suite.js'; let app: Express; -let mcpServer: ActorsMcpServer; let httpServer: HttpServer; const httpServerPort = 50001; const httpServerHost = `http://localhost:${httpServerPort}`; @@ -18,23 +16,23 @@ const mcpUrl = `${httpServerHost}/mcp`; createIntegrationTestsSuite({ suiteName: 'Actors MCP Server Streamable HTTP', - getActorsMcpServer: () => mcpServer, + transport: 'streamable-http', createClientFn: async (options) => await createMcpStreamableClient(mcpUrl, options), beforeAllFn: async () => { log.setLevel(log.LEVELS.OFF); + // Create an express app using the proper server setup - mcpServer = new ActorsMcpServer({ enableAddingActors: false, enableDefaultActors: false }); - app = createExpressApp(httpServerHost, mcpServer); + const mcpServerOptions = { + enableAddingActors: false, + enableDefaultActors: false, + }; + app = createExpressApp(httpServerHost, mcpServerOptions); // Start a test server await new Promise((resolve) => { httpServer = app.listen(httpServerPort, () => resolve()); }); }, - beforeEachFn: async () => { - mcpServer.disableDynamicActorTools(); - await mcpServer.reset(); - }, afterAllFn: async () => { await new Promise((resolve) => { httpServer.close(() => resolve()); diff --git a/tests/integration/internals.test.ts b/tests/integration/internals.test.ts new file mode 100644 index 00000000..75ff56fa --- /dev/null +++ b/tests/integration/internals.test.ts @@ -0,0 +1,145 @@ +import { beforeAll, describe, expect, it } from 'vitest'; + +import log from '@apify/log'; + +import { actorNameToToolName } from '../../dist/tools/utils.js'; +import { defaults } from '../../src/const.js'; +import { ActorsMcpServer } from '../../src/index.js'; +import { addRemoveTools, defaultTools, getActorsAsTools } from '../../src/tools/index.js'; +import { ACTOR_PYTHON_EXAMPLE, DEFAULT_TOOL_NAMES } from '../const.js'; +import { expectArrayWeakEquals } from '../helpers.js'; + +beforeAll(() => { + log.setLevel(log.LEVELS.OFF); +}); + +describe('MCP server internals integration tests', () => { + it('should load and restore tools from a tool list', async () => { + const actorsMcpServer = new ActorsMcpServer({ enableDefaultActors: true, enableAddingActors: true }, false); + await actorsMcpServer.initialize(); + + // Load new tool + const newTool = await getActorsAsTools([ACTOR_PYTHON_EXAMPLE], process.env.APIFY_TOKEN as string); + actorsMcpServer.upsertTools(newTool); + + // Store the tool name list + const names = actorsMcpServer.listAllToolNames(); + const expectedToolNames = [ + ...DEFAULT_TOOL_NAMES, + ...defaults.actors, + ...addRemoveTools.map((tool) => tool.tool.name), + ...[ACTOR_PYTHON_EXAMPLE], + ]; + expectArrayWeakEquals(expectedToolNames, names); + + // Remove all tools + actorsMcpServer.tools.clear(); + expect(actorsMcpServer.listAllToolNames()).toEqual([]); + + // Load the tool state from the tool name list + await actorsMcpServer.loadToolsByName(names, process.env.APIFY_TOKEN as string); + + // Check if the tool name list is restored + expectArrayWeakEquals(actorsMcpServer.listAllToolNames(), expectedToolNames); + }); + + it('should reset and restore tool state with default tools', async () => { + const actorsMCPServer = new ActorsMcpServer({ enableDefaultActors: true, enableAddingActors: true }, false); + await actorsMCPServer.initialize(); + + const numberOfTools = defaultTools.length + addRemoveTools.length + defaults.actors.length; + const toolList = actorsMCPServer.listAllToolNames(); + expect(toolList.length).toEqual(numberOfTools); + // Add a new Actor + const newTool = await getActorsAsTools([ACTOR_PYTHON_EXAMPLE], process.env.APIFY_TOKEN as string); + actorsMCPServer.upsertTools(newTool); + + // Store the tool name list + const toolListWithActor = actorsMCPServer.listAllToolNames(); + expect(toolListWithActor.length).toEqual(numberOfTools + 1); // + 1 for the added Actor + + // Remove all tools + await actorsMCPServer.reset(); + // We connect second client so that the default tools are loaded + // if no specific list of Actors is provided + const toolListAfterReset = actorsMCPServer.listAllToolNames(); + expect(toolListAfterReset.length).toEqual(numberOfTools); + }); + + it('should notify tools changed handler on tool modifications', async () => { + let latestTools: string[] = []; + const numberOfTools = defaultTools.length + addRemoveTools.length + defaults.actors.length; + + let toolNotificationCount = 0; + const onToolsChanged = (tools: string[]) => { + latestTools = tools; + toolNotificationCount++; + }; + + const actorsMCPServer = new ActorsMcpServer({ enableDefaultActors: true, enableAddingActors: true }, false); + await actorsMCPServer.initialize(); + actorsMCPServer.registerToolsChangedHandler(onToolsChanged); + + // Add a new Actor + const actor = ACTOR_PYTHON_EXAMPLE; + const newTool = await getActorsAsTools([actor], process.env.APIFY_TOKEN as string); + actorsMCPServer.upsertTools(newTool, true); + + // Check if the notification was received with the correct tools + expect(toolNotificationCount).toBe(1); + expect(latestTools.length).toBe(numberOfTools + 1); + expect(latestTools).toContain(actor); + for (const tool of [...defaultTools, ...addRemoveTools]) { + expect(latestTools).toContain(tool.tool.name); + } + for (const tool of defaults.actors) { + expect(latestTools).toContain(tool); + } + + // Remove the Actor + actorsMCPServer.removeToolsByName([actorNameToToolName(actor)], true); + + // Check if the notification was received with the correct tools + expect(toolNotificationCount).toBe(2); + expect(latestTools.length).toBe(numberOfTools); + expect(latestTools).not.toContain(actor); + for (const tool of [...defaultTools, ...addRemoveTools]) { + expect(latestTools).toContain(tool.tool.name); + } + for (const tool of defaults.actors) { + expect(latestTools).toContain(tool); + } + }); + + it('should stop notifying after unregistering tools changed handler', async () => { + let latestTools: string[] = []; + let notificationCount = 0; + const numberOfTools = defaultTools.length + addRemoveTools.length + defaults.actors.length; + const onToolsChanged = (tools: string[]) => { + latestTools = tools; + notificationCount++; + }; + + const actorsMCPServer = new ActorsMcpServer({ enableDefaultActors: true, enableAddingActors: true }, false); + await actorsMCPServer.initialize(); + actorsMCPServer.registerToolsChangedHandler(onToolsChanged); + + // Add a new Actor + const actor = ACTOR_PYTHON_EXAMPLE; + const newTool = await getActorsAsTools([actor], process.env.APIFY_TOKEN as string); + actorsMCPServer.upsertTools(newTool, true); + + // Check if the notification was received + expect(notificationCount).toBe(1); + expect(latestTools.length).toBe(numberOfTools + 1); + expect(latestTools).toContain(actor); + + actorsMCPServer.unregisterToolsChangedHandler(); + + // Remove the Actor + actorsMCPServer.removeToolsByName([actorNameToToolName(actor)], true); + + // Check if the notification was NOT received + expect(notificationCount).toBe(1); + }); +}); diff --git a/tests/integration/stdio.test.ts b/tests/integration/stdio.test.ts index e11a3110..984c7f70 100644 --- a/tests/integration/stdio.test.ts +++ b/tests/integration/stdio.test.ts @@ -3,5 +3,6 @@ import { createIntegrationTestsSuite } from './suite.js'; createIntegrationTestsSuite({ suiteName: 'MCP stdio', + transport: 'stdio', createClientFn: createMcpStdioClient, }); diff --git a/tests/integration/suite.ts b/tests/integration/suite.ts index dc4521bd..4199d6b1 100644 --- a/tests/integration/suite.ts +++ b/tests/integration/suite.ts @@ -1,16 +1,17 @@ import type { Client } from '@modelcontextprotocol/sdk/client/index.js'; +import type { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; import { ToolListChangedNotificationSchema } from '@modelcontextprotocol/sdk/types.js'; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest'; import { defaults, HelperTools } from '../../src/const.js'; -import type { ActorsMcpServer } from '../../src/index.js'; import { addRemoveTools, defaultTools } from '../../src/tools/index.js'; import { actorNameToToolName } from '../../src/tools/utils.js'; -import { addActor, expectArrayWeakEquals, type McpClientOptions } from '../helpers.js'; +import { ACTOR_PYTHON_EXAMPLE, DEFAULT_ACTOR_NAMES, DEFAULT_TOOL_NAMES } from '../const.js'; +import { type McpClientOptions } from '../helpers.js'; interface IntegrationTestsSuiteOptions { suiteName: string; - getActorsMcpServer?: () => ActorsMcpServer; + transport: 'sse' | 'streamable-http' | 'stdio'; createClientFn: (options?: McpClientOptions) => Promise; beforeAllFn?: () => Promise; afterAllFn?: () => Promise; @@ -18,10 +19,6 @@ interface IntegrationTestsSuiteOptions { afterEachFn?: () => Promise; } -const ACTOR_PYTHON_EXAMPLE = 'apify/python-example'; -const DEFAULT_TOOL_NAMES = defaultTools.map((tool) => tool.tool.name); -const DEFAULT_ACTOR_NAMES = defaults.actors.map((tool) => actorNameToToolName(tool)); - function getToolNames(tools: { tools: { name: string }[] }) { return tools.tools.map((tool) => tool.name); } @@ -57,7 +54,6 @@ export function createIntegrationTestsSuite( ) { const { suiteName, - getActorsMcpServer, createClientFn, beforeAllFn, afterAllFn, @@ -212,161 +208,6 @@ export function createIntegrationTestsSuite( await client.close(); }); - // Execute only when we can get the MCP server instance - currently skips only stdio - // is skipped because we are running a compiled version through node and there is no way (easy) - // to get the MCP server instance - it.runIf(getActorsMcpServer)('should load and restore tools from a tool list', async () => { - const client = await createClientFn({ enableAddingActors: true }); - const actorsMcpServer = getActorsMcpServer!(); - - // Add a new Actor - await addActor(client, ACTOR_PYTHON_EXAMPLE); - - // Store the tool name list - const names = actorsMcpServer.listAllToolNames(); - const expectedToolNames = [ - ...DEFAULT_TOOL_NAMES, - ...defaults.actors, - ...addRemoveTools.map((tool) => tool.tool.name), - ...[ACTOR_PYTHON_EXAMPLE], - ]; - expectArrayWeakEquals(expectedToolNames, names); - - // Remove all tools - actorsMcpServer.tools.clear(); - expect(actorsMcpServer.listAllToolNames()).toEqual([]); - - // Load the tool state from the tool name list - await actorsMcpServer.loadToolsByName(names, process.env.APIFY_TOKEN as string); - - // Check if the tool name list is restored - expectArrayWeakEquals(actorsMcpServer.listAllToolNames(), expectedToolNames); - - await client.close(); - }); - - it.runIf(getActorsMcpServer)('should reset and restore tool state with default tools', async () => { - const firstClient = await createClientFn({ enableAddingActors: true }); - const actorsMCPServer = getActorsMcpServer!(); - const numberOfTools = defaultTools.length + addRemoveTools.length + defaults.actors.length; - const toolList = actorsMCPServer.listAllToolNames(); - expect(toolList.length).toEqual(numberOfTools); - // Add a new Actor - await addActor(firstClient, ACTOR_PYTHON_EXAMPLE); - - // Store the tool name list - const toolListWithActor = actorsMCPServer.listAllToolNames(); - expect(toolListWithActor.length).toEqual(numberOfTools + 1); // + 1 for the added Actor - await firstClient.close(); - - // Remove all tools - await actorsMCPServer.reset(); - // We connect second client so that the default tools are loaded - // if no specific list of Actors is provided - const secondClient = await createClientFn({ enableAddingActors: true }); - const toolListAfterReset = actorsMCPServer.listAllToolNames(); - expect(toolListAfterReset.length).toEqual(numberOfTools); - await secondClient.close(); - }); - - it.runIf(getActorsMcpServer)('should notify tools changed handler on tool modifications', async () => { - const client = await createClientFn({ enableAddingActors: true }); - let latestTools: string[] = []; - const numberOfTools = defaultTools.length + addRemoveTools.length + defaults.actors.length; - - let toolNotificationCount = 0; - const onToolsChanged = (tools: string[]) => { - latestTools = tools; - toolNotificationCount++; - }; - - const actorsMCPServer = getActorsMcpServer!(); - actorsMCPServer.registerToolsChangedHandler(onToolsChanged); - - // Add a new Actor - const actor = ACTOR_PYTHON_EXAMPLE; - await client.callTool({ - name: HelperTools.ACTOR_ADD, - arguments: { - actorName: actor, - }, - }); - - // Check if the notification was received with the correct tools - expect(toolNotificationCount).toBe(1); - expect(latestTools.length).toBe(numberOfTools + 1); - expect(latestTools).toContain(actor); - for (const tool of [...defaultTools, ...addRemoveTools]) { - expect(latestTools).toContain(tool.tool.name); - } - for (const tool of defaults.actors) { - expect(latestTools).toContain(tool); - } - - // Remove the Actor - await client.callTool({ - name: HelperTools.ACTOR_REMOVE, - arguments: { - toolName: actorNameToToolName(actor), - }, - }); - - // Check if the notification was received with the correct tools - expect(toolNotificationCount).toBe(2); - expect(latestTools.length).toBe(numberOfTools); - expect(latestTools).not.toContain(actor); - for (const tool of [...defaultTools, ...addRemoveTools]) { - expect(latestTools).toContain(tool.tool.name); - } - for (const tool of defaults.actors) { - expect(latestTools).toContain(tool); - } - - await client.close(); - }); - - it.runIf(getActorsMcpServer)('should stop notifying after unregistering tools changed handler', async () => { - const client = await createClientFn({ enableAddingActors: true }); - let latestTools: string[] = []; - let notificationCount = 0; - const numberOfTools = defaultTools.length + addRemoveTools.length + defaults.actors.length; - const onToolsChanged = (tools: string[]) => { - latestTools = tools; - notificationCount++; - }; - - const actorsMCPServer = getActorsMcpServer!(); - actorsMCPServer.registerToolsChangedHandler(onToolsChanged); - - // Add a new Actor - const actor = ACTOR_PYTHON_EXAMPLE; - await client.callTool({ - name: HelperTools.ACTOR_ADD, - arguments: { - actorName: actor, - }, - }); - - // Check if the notification was received - expect(notificationCount).toBe(1); - expect(latestTools.length).toBe(numberOfTools + 1); - expect(latestTools).toContain(actor); - - actorsMCPServer.unregisterToolsChangedHandler(); - - // Remove the Actor - await client.callTool({ - name: HelperTools.ACTOR_REMOVE, - arguments: { - toolName: actorNameToToolName(actor), - }, - }); - - // Check if the notification was NOT received - expect(notificationCount).toBe(1); - await client.close(); - }); - it('should notify client about tool list changed', async () => { const client = await createClientFn({ enableAddingActors: true }); @@ -385,5 +226,12 @@ export function createIntegrationTestsSuite( await client.close(); }); + + it.runIf(options.transport === 'streamable-http')('should successfully terminate streamable session', async () => { + const client = await createClientFn(); + await client.listTools(); + await (client.transport as StreamableHTTPClientTransport).terminateSession(); + await client.close(); + }); }); } diff --git a/tests/unit/mcp.actors.test.ts b/tests/unit/mcp.actors.test.ts new file mode 100644 index 00000000..500fe2f1 --- /dev/null +++ b/tests/unit/mcp.actors.test.ts @@ -0,0 +1,59 @@ +import type { ActorDefinition } from 'apify-client'; +import { describe, expect, it } from 'vitest'; + +import { MCP_STREAMABLE_ENDPOINT } from '../../src/const.js'; +import { getActorMCPServerPath } from '../../src/mcp/actors.js'; + +// Helper to create a valid ActorDefinition and allow webServerMcpPath for testing +function makeActorDefinitionWithPath(webServerMcpPath?: unknown): ActorDefinition { + return { + actorSpecification: 0, + name: 'dummy', + version: '0.0', + ...(webServerMcpPath !== undefined ? { webServerMcpPath } : {}), + }; +} + +describe('getActorMCPServerPath', () => { + it('should return null if webServerMcpPath is missing', async () => { + const actorDefinition = makeActorDefinitionWithPath(); + const result = await getActorMCPServerPath(actorDefinition); + expect(result).toBeNull(); + }); + + it('should return null if webServerMcpPath is not a string', async () => { + const actorDefinition = makeActorDefinitionWithPath(123); + const result = await getActorMCPServerPath(actorDefinition); + expect(result).toBeNull(); + }); + + it('should return the single path if only one is present', async () => { + const actorDefinition = makeActorDefinitionWithPath('/mcp'); + const result = await getActorMCPServerPath(actorDefinition); + expect(result).toBe('/mcp'); + }); + + it('should return the streamable path if present among multiple', async () => { + const actorDefinition = makeActorDefinitionWithPath(`/foo, ${MCP_STREAMABLE_ENDPOINT}, /bar`); + const result = await getActorMCPServerPath(actorDefinition); + expect(result).toBe(MCP_STREAMABLE_ENDPOINT); + }); + + it('should return the first path if streamable is not present', async () => { + const actorDefinition = makeActorDefinitionWithPath('/foo, /bar, /baz'); + const result = await getActorMCPServerPath(actorDefinition); + expect(result).toBe('/foo'); + }); + + it('should trim whitespace from paths', async () => { + const actorDefinition = makeActorDefinitionWithPath(' /foo , /bar '); + const result = await getActorMCPServerPath(actorDefinition); + expect(result).toBe('/foo'); + }); + + it('should handle streamable path with whitespace', async () => { + const actorDefinition = makeActorDefinitionWithPath(` /foo , ${MCP_STREAMABLE_ENDPOINT} , /bar `); + const result = await getActorMCPServerPath(actorDefinition); + expect(result).toBe(MCP_STREAMABLE_ENDPOINT); + }); +}); diff --git a/tests/unit/utils.ttl-lru.test.ts b/tests/unit/utils.ttl-lru.test.ts new file mode 100644 index 00000000..d5889fdb --- /dev/null +++ b/tests/unit/utils.ttl-lru.test.ts @@ -0,0 +1,46 @@ +import { describe, expect, it } from 'vitest'; + +import { TTLLRUCache } from '../../src/utils/ttl-lru.js'; + +describe('TTLLRUCache', () => { + it('should set and get values before TTL expires', () => { + const cache = new TTLLRUCache(2, 2); // 2 seconds TTL + cache.set('a', 'valueA'); + expect(cache.get('a')).toBe('valueA'); + }); + + it('should return null after TTL expires', async () => { + const cache = new TTLLRUCache(2, 1); // 1 second TTL + cache.set('a', 'valueA'); + await new Promise((r) => { setTimeout(r, 1100); }); + expect(cache.get('a')).toBeNull(); + }); + + it('should evict least recently used items when maxLength is exceeded', () => { + const cache = new TTLLRUCache(2, 10); // Large TTL + cache.set('a', 'valueA'); + cache.set('b', 'valueB'); + cache.set('c', 'valueC'); // Should evict 'a' + expect(cache.get('a')).toBeNull(); + expect(cache.get('b')).toBe('valueB'); + expect(cache.get('c')).toBe('valueC'); + }); + + it('should update value and TTL on set for existing key', async () => { + const cache = new TTLLRUCache(2, 1); // 1 second TTL + cache.set('a', 'valueA'); + await new Promise((r) => { setTimeout(r, 700); }); + cache.set('a', 'valueA2'); // Reset TTL + await new Promise((r) => { setTimeout(r, 700); }); + expect(cache.get('a')).toBe('valueA2'); + }); + + it('should remove expired entry on get', async () => { + const cache = new TTLLRUCache(2, 1); // 1 second TTL + cache.set('a', 'valueA'); + await new Promise((r) => { setTimeout(r, 1100); }); + expect(cache.get('a')).toBeNull(); + // Should not throw if called again + expect(cache.get('a')).toBeNull(); + }); +}); From bec6dfa7ebdcab54397a86d6423ae116d6297444 Mon Sep 17 00:00:00 2001 From: MQ Date: Tue, 1 Jul 2025 16:08:54 +0200 Subject: [PATCH 2/5] add actorized mcp servers integration test, internal mcp server mcp client add support for streamable - use it first then fallback to legacy sse --- src/mcp/client.ts | 49 +++++++++++++++++++++++++++++++++++++- src/mcp/server.ts | 4 ++-- src/tools/actor.ts | 4 ++-- tests/const.ts | 1 + tests/integration/suite.ts | 36 ++++++++++++++++++++++++++-- 5 files changed, 87 insertions(+), 7 deletions(-) diff --git a/src/mcp/client.ts b/src/mcp/client.ts index 2f5537c1..cfb73b82 100644 --- a/src/mcp/client.ts +++ b/src/mcp/client.ts @@ -1,12 +1,33 @@ import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js'; +import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; + +import log from '@apify/log'; import { getMCPServerID } from './utils.js'; +/** + * Creates and connects a ModelContextProtocol client. + * First tries streamable HTTP transport, then falls back to SSE transport. + */ +export async function connectMCPClient( + url: string, token: string, +): Promise { + try { + return await createMCPStreamableClient(url, token); + } catch { + // If streamable HTTP transport fails, fall back to SSE transport + log.info('Streamable HTTP transport failed, falling back to SSE transport', { + url, + }); + return await createMCPSSEClient(url, token); + } +} + /** * Creates and connects a ModelContextProtocol client. */ -export async function createMCPClient( +async function createMCPSSEClient( url: string, token: string, ): Promise { const transport = new SSEClientTransport( @@ -39,3 +60,29 @@ export async function createMCPClient( return client; } + +/** + * Creates and connects a ModelContextProtocol client using the streamable HTTP transport. + */ +async function createMCPStreamableClient( + url: string, token: string, +): Promise { + const transport = new StreamableHTTPClientTransport( + new URL(url), + { + requestInit: { + headers: { + authorization: `Bearer ${token}`, + }, + }, + }); + + const client = new Client({ + name: getMCPServerID(url), + version: '1.0.0', + }); + + await client.connect(transport); + + return client; +} diff --git a/src/mcp/server.ts b/src/mcp/server.ts index 75c217eb..379be736 100644 --- a/src/mcp/server.ts +++ b/src/mcp/server.ts @@ -27,7 +27,7 @@ import { import { addRemoveTools, callActorGetDataset, defaultTools, getActorsAsTools } from '../tools/index.js'; import { actorNameToToolName } from '../tools/utils.js'; import type { ActorMcpTool, ActorTool, HelperTool, ToolEntry } from '../types.js'; -import { createMCPClient } from './client.js'; +import { connectMCPClient } from './client.js'; import { EXTERNAL_TOOL_CALL_TIMEOUT_MSEC } from './const.js'; import { processParamsGetTools } from './utils.js'; @@ -432,7 +432,7 @@ export class ActorsMcpServer { const serverTool = tool.tool as ActorMcpTool; let client: Client | undefined; try { - client = await createMCPClient(serverTool.serverUrl, apifyToken); + client = await connectMCPClient(serverTool.serverUrl, apifyToken); const res = await client.callTool({ name: serverTool.originToolName, arguments: args, diff --git a/src/tools/actor.ts b/src/tools/actor.ts index ae6153a4..998f546c 100644 --- a/src/tools/actor.ts +++ b/src/tools/actor.ts @@ -14,7 +14,7 @@ import { HelperTools, } from '../const.js'; import { getActorMCPServerPath, getActorMCPServerURL } from '../mcp/actors.js'; -import { createMCPClient } from '../mcp/client.js'; +import { connectMCPClient } from '../mcp/client.js'; import { getMCPServerTools } from '../mcp/proxy.js'; import { actorDefinitionPrunedCache } from '../state.js'; import type { ActorInfo, InternalTool, ToolEntry } from '../types.js'; @@ -172,7 +172,7 @@ async function getMCPServersAsTools( let client: Client | undefined; try { - client = await createMCPClient(mcpServerUrl, apifyToken); + client = await connectMCPClient(mcpServerUrl, apifyToken); const serverTools = await getMCPServerTools(actorId, client, mcpServerUrl); actorsMCPServerTools.push(...serverTools); } finally { diff --git a/tests/const.ts b/tests/const.ts index 971760e2..785c35e2 100644 --- a/tests/const.ts +++ b/tests/const.ts @@ -3,5 +3,6 @@ import { defaultTools } from '../src/tools/index.js'; import { actorNameToToolName } from '../src/tools/utils.js'; export const ACTOR_PYTHON_EXAMPLE = 'apify/python-example'; +export const ACTOR_MCP_SERVER_ACTOR_NAME = 'apify/actors-mcp-server'; export const DEFAULT_TOOL_NAMES = defaultTools.map((tool) => tool.tool.name); export const DEFAULT_ACTOR_NAMES = defaults.actors.map((tool) => actorNameToToolName(tool)); diff --git a/tests/integration/suite.ts b/tests/integration/suite.ts index 4199d6b1..8de5c97d 100644 --- a/tests/integration/suite.ts +++ b/tests/integration/suite.ts @@ -6,8 +6,8 @@ import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from import { defaults, HelperTools } from '../../src/const.js'; import { addRemoveTools, defaultTools } from '../../src/tools/index.js'; import { actorNameToToolName } from '../../src/tools/utils.js'; -import { ACTOR_PYTHON_EXAMPLE, DEFAULT_ACTOR_NAMES, DEFAULT_TOOL_NAMES } from '../const.js'; -import { type McpClientOptions } from '../helpers.js'; +import { ACTOR_MCP_SERVER_ACTOR_NAME, ACTOR_PYTHON_EXAMPLE, DEFAULT_ACTOR_NAMES, DEFAULT_TOOL_NAMES } from '../const.js'; +import { addActor, type McpClientOptions } from '../helpers.js'; interface IntegrationTestsSuiteOptions { suiteName: string; @@ -227,6 +227,38 @@ export function createIntegrationTestsSuite( await client.close(); }); + it('should be able to add and call Actorized MCP server', async () => { + const client = await createClientFn({ enableAddingActors: true }); + + const toolNamesBefore = getToolNames(await client.listTools()); + const searchToolCountBefore = toolNamesBefore.filter((name) => name.includes(HelperTools.STORE_SEARCH)).length; + expect(searchToolCountBefore).toBe(1); + + // Add self as an Actorized MCP server + await addActor(client, ACTOR_MCP_SERVER_ACTOR_NAME); + + const toolNamesAfter = getToolNames(await client.listTools()); + const searchToolCountAfter = toolNamesAfter.filter((name) => name.includes(HelperTools.STORE_SEARCH)).length; + expect(searchToolCountAfter).toBe(2); + + // Find the search tool from the Actorized MCP server + const actorizedMCPSearchTool = toolNamesAfter.find( + (name) => name.includes(HelperTools.STORE_SEARCH) && name !== HelperTools.STORE_SEARCH); + expect(actorizedMCPSearchTool).toBeDefined(); + + const result = await client.callTool({ + name: actorizedMCPSearchTool as string, + arguments: { + search: ACTOR_MCP_SERVER_ACTOR_NAME, + limit: 1, + }, + }); + expect(result.content).toBeDefined(); + + await client.close(); + }); + + // Session termination is only possible for streamable HTTP transport. it.runIf(options.transport === 'streamable-http')('should successfully terminate streamable session', async () => { const client = await createClientFn(); await client.listTools(); From 22c5e7e29fcf5593e9310356796fec78590e28f0 Mon Sep 17 00:00:00 2001 From: MQ Date: Tue, 1 Jul 2025 16:33:38 +0200 Subject: [PATCH 3/5] update readme about default Actors loading using query param, add TODO to remove Actor loading from Actor input directly since it probably does not work --- .actor/ACTOR.md | 8 ++++++-- src/main.ts | 2 ++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/.actor/ACTOR.md b/.actor/ACTOR.md index 72cf2273..b702f17c 100644 --- a/.actor/ACTOR.md +++ b/.actor/ACTOR.md @@ -67,13 +67,17 @@ Interested in building and monetizing your own AI agent on Apify? Check out our ## Tools ### Actors - Any [Apify Actor](https://apify.com/store) can be used as a tool. -By default, the server is pre-configured with the Actors specified below, but this can be overridden by providing Actor input. +By default, the server is pre-configured with the Actors specified below, but this can be overridden by providing the `?actors` URL query parameter. ```text 'apify/rag-web-browser' ``` +For example, to additionally load the `apify/instagram-scraper` Actor, you can start the server with the following URL: +```text +https://actors-mcp-server.apify.actor?token=&actors=apify/rag-web-browser,apify/instagram-scraper +``` + The MCP server loads the Actor input schema and creates MCP tools corresponding to the Actors. See this example of input schema for the [RAG Web Browser](https://apify.com/apify/rag-web-browser/input-schema). diff --git a/src/main.ts b/src/main.ts index d103a77e..4e857532 100644 --- a/src/main.ts +++ b/src/main.ts @@ -30,6 +30,8 @@ log.info(`Loaded input: ${JSON.stringify(input)} `); if (STANDBY_MODE) { let actorsToLoad: string[] = []; + // TODO: in standby mode the input loading does not actually work, + // we should remove this since we are using the URL query parameters to load Actors // Load only Actors specified in the input // If you wish to start without any Actor, create a task and leave the input empty if (input.actors && input.actors.length > 0) { From 571582b307ec23b1b4153e219148e239e76752cd Mon Sep 17 00:00:00 2001 From: MQ Date: Tue, 1 Jul 2025 16:35:25 +0200 Subject: [PATCH 4/5] lint --- tests/unit/mcp.actors.test.ts | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/unit/mcp.actors.test.ts b/tests/unit/mcp.actors.test.ts index 500fe2f1..416efa6d 100644 --- a/tests/unit/mcp.actors.test.ts +++ b/tests/unit/mcp.actors.test.ts @@ -15,45 +15,45 @@ function makeActorDefinitionWithPath(webServerMcpPath?: unknown): ActorDefinitio } describe('getActorMCPServerPath', () => { - it('should return null if webServerMcpPath is missing', async () => { + it('should return null if webServerMcpPath is missing', () => { const actorDefinition = makeActorDefinitionWithPath(); - const result = await getActorMCPServerPath(actorDefinition); + const result = getActorMCPServerPath(actorDefinition); expect(result).toBeNull(); }); - it('should return null if webServerMcpPath is not a string', async () => { + it('should return null if webServerMcpPath is not a string', () => { const actorDefinition = makeActorDefinitionWithPath(123); - const result = await getActorMCPServerPath(actorDefinition); + const result = getActorMCPServerPath(actorDefinition); expect(result).toBeNull(); }); - it('should return the single path if only one is present', async () => { + it('should return the single path if only one is present', () => { const actorDefinition = makeActorDefinitionWithPath('/mcp'); - const result = await getActorMCPServerPath(actorDefinition); + const result = getActorMCPServerPath(actorDefinition); expect(result).toBe('/mcp'); }); - it('should return the streamable path if present among multiple', async () => { + it('should return the streamable path if present among multiple', () => { const actorDefinition = makeActorDefinitionWithPath(`/foo, ${MCP_STREAMABLE_ENDPOINT}, /bar`); - const result = await getActorMCPServerPath(actorDefinition); + const result = getActorMCPServerPath(actorDefinition); expect(result).toBe(MCP_STREAMABLE_ENDPOINT); }); - it('should return the first path if streamable is not present', async () => { + it('should return the first path if streamable is not present', () => { const actorDefinition = makeActorDefinitionWithPath('/foo, /bar, /baz'); - const result = await getActorMCPServerPath(actorDefinition); + const result = getActorMCPServerPath(actorDefinition); expect(result).toBe('/foo'); }); - it('should trim whitespace from paths', async () => { + it('should trim whitespace from paths', () => { const actorDefinition = makeActorDefinitionWithPath(' /foo , /bar '); - const result = await getActorMCPServerPath(actorDefinition); + const result = getActorMCPServerPath(actorDefinition); expect(result).toBe('/foo'); }); - it('should handle streamable path with whitespace', async () => { + it('should handle streamable path with whitespace', () => { const actorDefinition = makeActorDefinitionWithPath(` /foo , ${MCP_STREAMABLE_ENDPOINT} , /bar `); - const result = await getActorMCPServerPath(actorDefinition); + const result = getActorMCPServerPath(actorDefinition); expect(result).toBe(MCP_STREAMABLE_ENDPOINT); }); }); From 255e789ba25db53c5c2668b4bfc5f37716dfa1cc Mon Sep 17 00:00:00 2001 From: MQ Date: Tue, 1 Jul 2025 18:21:31 +0200 Subject: [PATCH 5/5] try to route actorized server notifications --- src/main.ts | 6 ++++++ src/mcp/server.ts | 17 +++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/src/main.ts b/src/main.ts index 4e857532..cf0b26cc 100644 --- a/src/main.ts +++ b/src/main.ts @@ -62,3 +62,9 @@ if (STANDBY_MODE) { log.info(`Pushed ${datasetInfo?.itemCount} items to the dataset`); await Actor.exit(); } + +// So Ctrl+C works locally +process.on('SIGINT', async () => { + log.info('Received SIGINT, shutting down gracefully...'); + await Actor.exit(); +}); diff --git a/src/mcp/server.ts b/src/mcp/server.ts index 379be736..cf6b5c0d 100644 --- a/src/mcp/server.ts +++ b/src/mcp/server.ts @@ -11,6 +11,7 @@ import { ErrorCode, ListToolsRequestSchema, McpError, + ServerNotificationSchema, } from '@modelcontextprotocol/sdk/types.js'; import type { ValidateFunction } from 'ajv'; import { type ActorCallOptions, ApifyApiError } from 'apify-client'; @@ -433,6 +434,22 @@ export class ActorsMcpServer { let client: Client | undefined; try { client = await connectMCPClient(serverTool.serverUrl, apifyToken); + + // TODO: for some reason the client does not receive notifications + // we need to investigate this + // Set up notification handlers for the client + for (const schema of ServerNotificationSchema.options) { + const method = schema.shape.method.value; + // Forward notifications from the proxy client to the server + client.setNotificationHandler(schema, async (notification) => { + log.info('Sending MCP notification', { + method, + notification, + }); + await extra.sendNotification(notification); + }); + } + const res = await client.callTool({ name: serverTool.originToolName, arguments: args,