From 68b271b7bb1de1b797bb2b74ff48ceeeb47d7170 Mon Sep 17 00:00:00 2001 From: MQ Date: Wed, 17 Sep 2025 09:56:09 +0200 Subject: [PATCH 1/3] refactor: move call-actor to a separate file and Actor related logic to utils --- src/tools/actor.ts | 456 ------------------------------------ src/tools/call-actor.ts | 182 ++++++++++++++ src/tools/index.ts | 3 +- src/utils/actor-response.ts | 2 +- src/utils/actor.ts | 285 +++++++++++++++++++++- src/utils/tools-loader.ts | 2 +- 6 files changed, 470 insertions(+), 460 deletions(-) delete mode 100644 src/tools/actor.ts create mode 100644 src/tools/call-actor.ts diff --git a/src/tools/actor.ts b/src/tools/actor.ts deleted file mode 100644 index 139ac558..00000000 --- a/src/tools/actor.ts +++ /dev/null @@ -1,456 +0,0 @@ -import type { Client } from '@modelcontextprotocol/sdk/client/index.js'; -import type { ActorCallOptions, ActorRun } from 'apify-client'; -import { z } from 'zod'; -import zodToJsonSchema from 'zod-to-json-schema'; - -import log from '@apify/log'; - -import { ApifyClient } from '../apify-client.js'; -import { - ACTOR_ADDITIONAL_INSTRUCTIONS, - ACTOR_MAX_MEMORY_MBYTES, - HelperTools, - SKYFIRE_TOOL_INSTRUCTIONS, - TOOL_MAX_OUTPUT_CHARS, -} from '../const.js'; -import { getActorMCPServerPath, getActorMCPServerURL } from '../mcp/actors.js'; -import { connectMCPClient } from '../mcp/client.js'; -import { getMCPServerTools } from '../mcp/proxy.js'; -import { actorDefinitionPrunedCache } from '../state.js'; -import type { ActorDefinitionStorage, ActorInfo, ApifyToken, DatasetItem, ToolEntry } from '../types.js'; -import { ensureOutputWithinCharLimit, getActorDefinitionStorageFieldNames } from '../utils/actor.js'; -import { fetchActorDetails } from '../utils/actor-details.js'; -import { buildActorResponseContent } from '../utils/actor-response.js'; -import { ajv } from '../utils/ajv.js'; -import type { ProgressTracker } from '../utils/progress.js'; -import type { JsonSchemaProperty } from '../utils/schema-generation.js'; -import { generateSchemaFromItems } from '../utils/schema-generation.js'; -import { getActorDefinition } from './build.js'; -import { actorNameToToolName, fixedAjvCompile, getToolSchemaID, transformActorInputSchemaProperties } from './utils.js'; - -// Define a named return type for callActorGetDataset -export type CallActorGetDatasetResult = { - runId: string; - datasetId: string; - itemCount: number; - schema: JsonSchemaProperty; - previewItems: DatasetItem[]; -}; - -/** - * Calls an Apify Actor and retrieves metadata about the dataset results. - * - * This function executes an Actor and returns summary information instead with a result items preview of the full dataset - * to prevent overwhelming responses. The actual data can be retrieved using the get-actor-output tool. - * - * It requires the `APIFY_TOKEN` environment variable to be set. - * If the `APIFY_IS_AT_HOME` the dataset items are pushed to the Apify dataset. - * - * @param {string} actorName - The name of the Actor to call. - * @param {unknown} input - The input to pass to the actor. - * @param {ApifyClient} apifyClient - The Apify client to use for authentication. - * @param {ActorCallOptions} callOptions - The options to pass to the Actor. - * @param {ProgressTracker} progressTracker - Optional progress tracker for real-time updates. - * @param {AbortSignal} abortSignal - Optional abort signal to cancel the actor run. - * @returns {Promise} - A promise that resolves to an object containing the actor run and dataset items. - * @throws {Error} - Throws an error if the `APIFY_TOKEN` is not set - */ -export async function callActorGetDataset( - actorName: string, - input: unknown, - apifyClient: ApifyClient, - callOptions: ActorCallOptions | undefined = undefined, - progressTracker?: ProgressTracker | null, - abortSignal?: AbortSignal, -): Promise { - const CLIENT_ABORT = Symbol('CLIENT_ABORT'); // Just internal symbol to identify client abort - const actorClient = apifyClient.actor(actorName); - - // Start the actor run - const actorRun: ActorRun = await actorClient.start(input, callOptions); - - // Start progress tracking if tracker is provided - if (progressTracker) { - progressTracker.startActorRunUpdates(actorRun.id, apifyClient, actorName); - } - - // Create abort promise that handles both API abort and race rejection - const abortPromise = async () => new Promise((resolve) => { - abortSignal?.addEventListener('abort', async () => { - // Abort the actor run via API - try { - await apifyClient.run(actorRun.id).abort({ gracefully: false }); - } catch (e) { - log.error('Error aborting Actor run', { error: e, runId: actorRun.id }); - } - // Reject to stop waiting - resolve(CLIENT_ABORT); - }, { once: true }); - }); - - // Wait for completion or cancellation - const potentialAbortedRun = await Promise.race([ - apifyClient.run(actorRun.id).waitForFinish(), - ...(abortSignal ? [abortPromise()] : []), - ]); - - if (potentialAbortedRun === CLIENT_ABORT) { - log.info('Actor run aborted by client', { actorName, input }); - return null; - } - const completedRun = potentialAbortedRun as ActorRun; - - // Process the completed run - const dataset = apifyClient.dataset(completedRun.defaultDatasetId); - const [datasetItems, defaultBuild] = await Promise.all([ - dataset.listItems(), - (await actorClient.defaultBuild()).get(), - ]); - - // Generate schema using the shared utility - const generatedSchema = generateSchemaFromItems(datasetItems.items, { - clean: true, - arrayMode: 'all', - }); - const schema = generatedSchema || { type: 'object', properties: {} }; - - /** - * Get important fields that are using in any dataset view as they MAY be used in filtering to ensure the output fits - * the tool output limits. Client has to use the get-actor-output tool to retrieve the full dataset or filtered out fields. - */ - const storageDefinition = defaultBuild?.actorDefinition?.storages?.dataset as ActorDefinitionStorage | undefined; - const importantProperties = getActorDefinitionStorageFieldNames(storageDefinition || {}); - const previewItems = ensureOutputWithinCharLimit(datasetItems.items, importantProperties, TOOL_MAX_OUTPUT_CHARS); - - return { - runId: actorRun.id, - datasetId: completedRun.defaultDatasetId, - itemCount: datasetItems.count, - schema, - previewItems, - }; -} - -/** - * This function is used to fetch normal non-MCP server Actors as a tool. - * - * Fetches Actor input schemas by Actor IDs or Actor full names and creates MCP tools. - * - * This function retrieves the input schemas for the specified Actors and compiles them into MCP tools. - * It uses the AJV library to validate the input schemas. - * - * Tool name can't contain /, so it is replaced with _ - * - * The input schema processing workflow: - * 1. Properties are marked as required using markInputPropertiesAsRequired() to add "REQUIRED" prefix to descriptions - * 2. Nested properties are built by analyzing editor type (proxy, requestListSources) using buildNestedProperties() - * 3. Properties are filtered using filterSchemaProperties() - * 4. Properties are shortened using shortenProperties() - * 5. Enums are added to descriptions with examples using addEnumsToDescriptionsWithExamples() - * - * @param {ActorInfo[]} actorsInfo - An array of ActorInfo objects with webServerMcpPath and actorDefinitionPruned. - * @returns {Promise} - A promise that resolves to an array of MCP tools. - */ -export async function getNormalActorsAsTools( - actorsInfo: ActorInfo[], -): Promise { - const tools: ToolEntry[] = []; - - // Zip the results with their corresponding actorIDs - for (const actorInfo of actorsInfo) { - const { actorDefinitionPruned } = actorInfo; - - if (actorDefinitionPruned) { - const schemaID = getToolSchemaID(actorDefinitionPruned.actorFullName); - if (actorDefinitionPruned.input && 'properties' in actorDefinitionPruned.input && actorDefinitionPruned.input) { - actorDefinitionPruned.input.properties = transformActorInputSchemaProperties(actorDefinitionPruned.input); - // Add schema $id, each valid JSON schema should have a unique $id - // see https://json-schema.org/understanding-json-schema/basics#declaring-a-unique-identifier - actorDefinitionPruned.input.$id = schemaID; - } - try { - const memoryMbytes = actorDefinitionPruned.defaultRunOptions?.memoryMbytes || ACTOR_MAX_MEMORY_MBYTES; - const tool: ToolEntry = { - type: 'actor', - tool: { - name: actorNameToToolName(actorDefinitionPruned.actorFullName), - actorFullName: actorDefinitionPruned.actorFullName, - description: `This tool calls the Actor "${actorDefinitionPruned.actorFullName}" and retrieves its output results. Use this tool instead of the "${HelperTools.ACTOR_CALL}" if user requests to use this specific Actor. -Actor description: ${actorDefinitionPruned.description} -Instructions: ${ACTOR_ADDITIONAL_INSTRUCTIONS}`, - inputSchema: actorDefinitionPruned.input - // So Actor without input schema works - MCP client expects JSON schema valid output - || { - type: 'object', - properties: {}, - required: [], - }, - // Additional props true to allow skyfire-pay-id - ajvValidate: fixedAjvCompile(ajv, { ...actorDefinitionPruned.input, additionalProperties: true }), - memoryMbytes: memoryMbytes > ACTOR_MAX_MEMORY_MBYTES ? ACTOR_MAX_MEMORY_MBYTES : memoryMbytes, - }, - }; - tools.push(tool); - } catch (validationError) { - log.error('Failed to compile AJV schema for Actor', { actorName: actorDefinitionPruned.actorFullName, error: validationError }); - } - } - } - return tools; -} - -async function getMCPServersAsTools( - actorsInfo: ActorInfo[], - apifyToken: ApifyToken, -): Promise { - /** - * This is case for the Skyfire request without any Apify token, we do not support - * standby Actors in this case so we can skip MCP servers since they would fail anyway (they are standby Actors). - */ - if (apifyToken === null || apifyToken === undefined) { - return []; - } - - const actorsMCPServerTools: ToolEntry[] = []; - for (const actorInfo of actorsInfo) { - const actorId = actorInfo.actorDefinitionPruned.id; - if (!actorInfo.webServerMcpPath) { - log.warning('Actor does not have a web server MCP path, skipping', { - actorFullName: actorInfo.actorDefinitionPruned.actorFullName, - actorId, - }); - continue; - } - const mcpServerUrl = await getActorMCPServerURL( - actorInfo.actorDefinitionPruned.id, // Real ID of the Actor - actorInfo.webServerMcpPath, - ); - log.debug('Retrieved MCP server URL for Actor', { - actorFullName: actorInfo.actorDefinitionPruned.actorFullName, - actorId, - mcpServerUrl, - }); - - let client: Client | undefined; - try { - client = await connectMCPClient(mcpServerUrl, apifyToken); - const serverTools = await getMCPServerTools(actorId, client, mcpServerUrl); - actorsMCPServerTools.push(...serverTools); - } finally { - if (client) await client.close(); - } - } - - return actorsMCPServerTools; -} - -export async function getActorsAsTools( - actorIdsOrNames: string[], - apifyClient: ApifyClient, -): Promise { - log.debug('Fetching Actors as tools', { actorNames: actorIdsOrNames }); - - const actorsInfo: (ActorInfo | null)[] = await Promise.all( - actorIdsOrNames.map(async (actorIdOrName) => { - const actorDefinitionPrunedCached = actorDefinitionPrunedCache.get(actorIdOrName); - if (actorDefinitionPrunedCached) { - return { - actorDefinitionPruned: actorDefinitionPrunedCached, - webServerMcpPath: getActorMCPServerPath(actorDefinitionPrunedCached), - - } as ActorInfo; - } - - const actorDefinitionPruned = await getActorDefinition(actorIdOrName, apifyClient); - if (!actorDefinitionPruned) { - log.error('Actor not found or definition is not available', { actorName: actorIdOrName }); - return null; - } - // Cache the pruned Actor definition - actorDefinitionPrunedCache.set(actorIdOrName, actorDefinitionPruned); - return { - actorDefinitionPruned, - webServerMcpPath: getActorMCPServerPath(actorDefinitionPruned), - } as ActorInfo; - }), - ); - - const clonedActors = structuredClone(actorsInfo); - - // Filter out nulls and separate Actors with MCP servers and normal Actors - const actorMCPServersInfo = clonedActors.filter((actorInfo) => actorInfo && actorInfo.webServerMcpPath) as ActorInfo[]; - const normalActorsInfo = clonedActors.filter((actorInfo) => actorInfo && !actorInfo.webServerMcpPath) as ActorInfo[]; - - const [normalTools, mcpServerTools] = await Promise.all([ - getNormalActorsAsTools(normalActorsInfo), - getMCPServersAsTools(actorMCPServersInfo, apifyClient.token), - ]); - - return [...normalTools, ...mcpServerTools]; -} - -const callActorArgs = z.object({ - actor: z.string() - .describe('The name of the Actor to call. For example, "apify/rag-web-browser".'), - step: z.enum(['info', 'call']) - .default('info') - .describe(`Step to perform: "info" to get Actor details and input schema (required first step), "call" to execute the Actor (only after getting info).`), - input: z.object({}).passthrough() - .optional() - .describe(`The input JSON to pass to the Actor. For example, {"query": "apify", "maxResults": 5, "outputFormats": ["markdown"]}. Required only when step is "call".`), - callOptions: z.object({ - memory: z.number() - .min(128, 'Memory must be at least 128 MB') - .max(32768, 'Memory cannot exceed 32 GB (32768 MB)') - .optional() - .describe(`Memory allocation for the Actor in MB. Must be a power of 2 (e.g., 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768). Minimum: 128 MB, Maximum: 32768 MB (32 GB).`), - timeout: z.number() - .min(0, 'Timeout must be 0 or greater') - .optional() - .describe(`Maximum runtime for the Actor in seconds. After this time elapses, the Actor will be automatically terminated. Use 0 for infinite timeout (no time limit). Minimum: 0 seconds (infinite).`), - }).optional() - .describe('Optional call options for the Actor run configuration.'), -}); - -export const callActor: ToolEntry = { - type: 'internal', - tool: { - name: HelperTools.ACTOR_CALL, - actorFullName: HelperTools.ACTOR_CALL, - description: `Call Any Actor from Apify Store - Two-Step Process - -This tool uses a mandatory two-step process to safely call any Actor from the Apify store. - -USAGE: -• ONLY for Actors that are NOT available as dedicated tools -• If a dedicated tool exists (e.g., ${actorNameToToolName('apify/rag-web-browser')}), use that instead - -MANDATORY TWO-STEP WORKFLOW: - -Step 1: Get Actor Info (step="info", default) -• First call this tool with step="info" to get Actor details and input schema -• This returns the Actor description, documentation, and required input schema -• You MUST do this step first - it's required to understand how to call the Actor - -Step 2: Call Actor (step="call") -• Only after step 1, call again with step="call" and proper input based on the schema -• This executes the Actor and returns the results - -The step parameter enforces this workflow - you cannot call an Actor without first getting its info.`, - inputSchema: zodToJsonSchema(callActorArgs), - ajvValidate: ajv.compile({ - ...zodToJsonSchema(callActorArgs), - // Additional props true to allow skyfire-pay-id - additionalProperties: true, - }), - call: async (toolArgs) => { - const { args, apifyToken, progressTracker, extra, apifyMcpServer } = toolArgs; - const { actor: actorName, step, input, callOptions } = callActorArgs.parse(args); - - try { - if (step === 'info') { - const apifyClient = new ApifyClient({ token: apifyToken }); - // Step 1: Return Actor card and schema directly - const details = await fetchActorDetails(apifyClient, actorName); - if (!details) { - return { - content: [{ type: 'text', text: `Actor information for '${actorName}' was not found. Please check the Actor ID or name and ensure the Actor exists.` }], - }; - } - const content = [ - { type: 'text', text: `**Input Schema:**\n${JSON.stringify(details.inputSchema, null, 0)}` }, - ]; - /** - * Add Skyfire instructions also in the info step since clients are most likely truncating the long tool description of the call-actor. - */ - if (apifyMcpServer.options.skyfireMode) { - content.push({ - type: 'text', - text: SKYFIRE_TOOL_INSTRUCTIONS, - }); - } - return { content }; - } - - /** - * In Skyfire mode, we check for the presence of `skyfire-pay-id`. - * If it is missing, we return instructions to the LLM on how to create it and pass it to the tool. - */ - if (apifyMcpServer.options.skyfireMode - && args['skyfire-pay-id'] === undefined - ) { - return { - content: [{ - type: 'text', - text: SKYFIRE_TOOL_INSTRUCTIONS, - }], - }; - } - - /** - * Create Apify token, for Skyfire mode use `skyfire-pay-id` and for normal mode use `apifyToken`. - */ - const apifyClient = apifyMcpServer.options.skyfireMode && typeof args['skyfire-pay-id'] === 'string' - ? new ApifyClient({ skyfirePayId: args['skyfire-pay-id'] }) - : new ApifyClient({ token: apifyToken }); - - // Step 2: Call the Actor - if (!input) { - return { - content: [ - { type: 'text', text: `Input is required when step="call". Please provide the input parameter based on the Actor's input schema.` }, - ], - }; - } - - const [actor] = await getActorsAsTools([actorName], apifyClient); - - if (!actor) { - return { - content: [ - { type: 'text', text: `Actor '${actorName}' not found.` }, - ], - }; - } - - if (!actor.tool.ajvValidate(input)) { - const { errors } = actor.tool.ajvValidate; - if (errors && errors.length > 0) { - return { - content: [ - { type: 'text', text: `Input validation failed for Actor '${actorName}': ${errors.map((e) => e.message).join(', ')}` }, - { type: 'text', text: `Input Schema:\n${JSON.stringify(actor.tool.inputSchema)}` }, - ], - }; - } - } - - const callResult = await callActorGetDataset( - actorName, - input, - apifyClient, - callOptions, - progressTracker, - extra.signal, - ); - - if (!callResult) { - // Receivers of cancellation notifications SHOULD NOT send a response for the cancelled request - // https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/cancellation#behavior-requirements - return { }; - } - - const content = buildActorResponseContent(actorName, callResult); - - return { content }; - } catch (error) { - log.error('Error with Actor operation', { error, actorName, step }); - return { - content: [ - { type: 'text', text: `Error with Actor operation: ${error instanceof Error ? error.message : String(error)}` }, - ], - }; - } - }, - }, -}; diff --git a/src/tools/call-actor.ts b/src/tools/call-actor.ts new file mode 100644 index 00000000..5dc95810 --- /dev/null +++ b/src/tools/call-actor.ts @@ -0,0 +1,182 @@ +import { z } from 'zod'; +import zodToJsonSchema from 'zod-to-json-schema'; + +import log from '@apify/log'; + +import { ApifyClient } from '../apify-client.js'; +import { + HelperTools, + SKYFIRE_TOOL_INSTRUCTIONS, +} from '../const.js'; +import type { ToolEntry } from '../types.js'; +import { callActorGetDataset, getActorsAsTools } from '../utils/actor.js'; +import { fetchActorDetails } from '../utils/actor-details.js'; +import { buildActorResponseContent } from '../utils/actor-response.js'; +import { ajv } from '../utils/ajv.js'; +import { actorNameToToolName } from './utils.js'; + +const callActorArgs = z.object({ + actor: z.string() + .describe('The name of the Actor to call. For example, "apify/rag-web-browser".'), + step: z.enum(['info', 'call']) + .default('info') + .describe(`Step to perform: "info" to get Actor details and input schema (required first step), "call" to execute the Actor (only after getting info).`), + input: z.object({}).passthrough() + .optional() + .describe(`The input JSON to pass to the Actor. For example, {"query": "apify", "maxResults": 5, "outputFormats": ["markdown"]}. Required only when step is "call".`), + callOptions: z.object({ + memory: z.number() + .min(128, 'Memory must be at least 128 MB') + .max(32768, 'Memory cannot exceed 32 GB (32768 MB)') + .optional() + .describe(`Memory allocation for the Actor in MB. Must be a power of 2 (e.g., 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768). Minimum: 128 MB, Maximum: 32768 MB (32 GB).`), + timeout: z.number() + .min(0, 'Timeout must be 0 or greater') + .optional() + .describe(`Maximum runtime for the Actor in seconds. After this time elapses, the Actor will be automatically terminated. Use 0 for infinite timeout (no time limit). Minimum: 0 seconds (infinite).`), + }).optional() + .describe('Optional call options for the Actor run configuration.'), +}); + +export const callActor: ToolEntry = { + type: 'internal', + tool: { + name: HelperTools.ACTOR_CALL, + actorFullName: HelperTools.ACTOR_CALL, + description: `Call Any Actor from Apify Store - Two-Step Process + +This tool uses a mandatory two-step process to safely call any Actor from the Apify store. + +USAGE: +• ONLY for Actors that are NOT available as dedicated tools +• If a dedicated tool exists (e.g., ${actorNameToToolName('apify/rag-web-browser')}), use that instead + +MANDATORY TWO-STEP WORKFLOW: + +Step 1: Get Actor Info (step="info", default) +• First call this tool with step="info" to get Actor details and input schema +• This returns the Actor description, documentation, and required input schema +• You MUST do this step first - it's required to understand how to call the Actor + +Step 2: Call Actor (step="call") +• Only after step 1, call again with step="call" and proper input based on the schema +• This executes the Actor and returns the results + +The step parameter enforces this workflow - you cannot call an Actor without first getting its info.`, + inputSchema: zodToJsonSchema(callActorArgs), + ajvValidate: ajv.compile({ + ...zodToJsonSchema(callActorArgs), + // Additional props true to allow skyfire-pay-id + additionalProperties: true, + }), + call: async (toolArgs) => { + const { args, apifyToken, progressTracker, extra, apifyMcpServer } = toolArgs; + const { actor: actorName, step, input, callOptions } = callActorArgs.parse(args); + + try { + if (step === 'info') { + const apifyClient = new ApifyClient({ token: apifyToken }); + // Step 1: Return Actor card and schema directly + const details = await fetchActorDetails(apifyClient, actorName); + if (!details) { + return { + content: [{ type: 'text', text: `Actor information for '${actorName}' was not found. Please check the Actor ID or name and ensure the Actor exists.` }], + }; + } + const content = [ + { type: 'text', text: `**Input Schema:**\n${JSON.stringify(details.inputSchema, null, 0)}` }, + ]; + /** + * Add Skyfire instructions also in the info step since clients are most likely truncating the long tool description of the call-actor. + */ + if (apifyMcpServer.options.skyfireMode) { + content.push({ + type: 'text', + text: SKYFIRE_TOOL_INSTRUCTIONS, + }); + } + return { content }; + } + + /** + * In Skyfire mode, we check for the presence of `skyfire-pay-id`. + * If it is missing, we return instructions to the LLM on how to create it and pass it to the tool. + */ + if (apifyMcpServer.options.skyfireMode + && args['skyfire-pay-id'] === undefined + ) { + return { + content: [{ + type: 'text', + text: SKYFIRE_TOOL_INSTRUCTIONS, + }], + }; + } + + /** + * Create Apify token, for Skyfire mode use `skyfire-pay-id` and for normal mode use `apifyToken`. + */ + const apifyClient = apifyMcpServer.options.skyfireMode && typeof args['skyfire-pay-id'] === 'string' + ? new ApifyClient({ skyfirePayId: args['skyfire-pay-id'] }) + : new ApifyClient({ token: apifyToken }); + + // Step 2: Call the Actor + if (!input) { + return { + content: [ + { type: 'text', text: `Input is required when step="call". Please provide the input parameter based on the Actor's input schema.` }, + ], + }; + } + + const [actor] = await getActorsAsTools([actorName], apifyClient); + + if (!actor) { + return { + content: [ + { type: 'text', text: `Actor '${actorName}' not found.` }, + ], + }; + } + + if (!actor.tool.ajvValidate(input)) { + const { errors } = actor.tool.ajvValidate; + if (errors && errors.length > 0) { + return { + content: [ + { type: 'text', text: `Input validation failed for Actor '${actorName}': ${errors.map((e) => e.message).join(', ')}` }, + { type: 'text', text: `Input Schema:\n${JSON.stringify(actor.tool.inputSchema)}` }, + ], + }; + } + } + + const callResult = await callActorGetDataset( + actorName, + input, + apifyClient, + callOptions, + progressTracker, + extra.signal, + ); + + if (!callResult) { + // Receivers of cancellation notifications SHOULD NOT send a response for the cancelled request + // https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/cancellation#behavior-requirements + return { }; + } + + const content = buildActorResponseContent(actorName, callResult); + + return { content }; + } catch (error) { + log.error('Error with Actor operation', { error, actorName, step }); + return { + content: [ + { type: 'text', text: `Error with Actor operation: ${error instanceof Error ? error.message : String(error)}` }, + ], + }; + } + }, + }, +}; diff --git a/src/tools/index.ts b/src/tools/index.ts index d03ec75f..546e9ef8 100644 --- a/src/tools/index.ts +++ b/src/tools/index.ts @@ -1,7 +1,8 @@ // Import specific tools that are being used import type { ToolCategory } from '../types.js'; +import { callActorGetDataset, getActorsAsTools } from '../utils/actor.js'; import { getExpectedToolsByCategories } from '../utils/tools.js'; -import { callActor, callActorGetDataset, getActorsAsTools } from './actor.js'; +import { callActor } from './call-actor.js'; import { getDataset, getDatasetItems, getDatasetSchema } from './dataset.js'; import { getUserDatasetsList } from './dataset_collection.js'; import { fetchActorDetailsTool } from './fetch-actor-details.js'; diff --git a/src/utils/actor-response.ts b/src/utils/actor-response.ts index 5dc77b03..df913a21 100644 --- a/src/utils/actor-response.ts +++ b/src/utils/actor-response.ts @@ -1,4 +1,4 @@ -import type { CallActorGetDatasetResult } from '../tools/actor'; +import type { CallActorGetDatasetResult } from './actor.js'; /** * Builds the response content for Actor tool calls. diff --git a/src/utils/actor.ts b/src/utils/actor.ts index 9871d674..ac8ff63d 100644 --- a/src/utils/actor.ts +++ b/src/utils/actor.ts @@ -1,6 +1,289 @@ -import type { ActorDefinitionStorage, DatasetItem } from '../types.js'; +import type { Client } from '@modelcontextprotocol/sdk/client/index.js'; +import type { ActorCallOptions, ActorRun } from 'apify-client'; + +import log from '@apify/log'; + +import type { ApifyClient } from '../apify-client.js'; +import { + ACTOR_ADDITIONAL_INSTRUCTIONS, + ACTOR_MAX_MEMORY_MBYTES, + HelperTools, + TOOL_MAX_OUTPUT_CHARS, +} from '../const.js'; +import { getActorMCPServerPath, getActorMCPServerURL } from '../mcp/actors.js'; +import { connectMCPClient } from '../mcp/client.js'; +import { getMCPServerTools } from '../mcp/proxy.js'; +import { actorDefinitionPrunedCache } from '../state.js'; +import { getActorDefinition } from '../tools/build.js'; +import { actorNameToToolName, fixedAjvCompile, getToolSchemaID, transformActorInputSchemaProperties } from '../tools/utils.js'; +import type { ActorDefinitionStorage, ActorInfo, ApifyToken, DatasetItem, ToolEntry } from '../types.js'; +import { ajv } from '../utils/ajv.js'; +import type { ProgressTracker } from '../utils/progress.js'; +import type { JsonSchemaProperty } from '../utils/schema-generation.js'; +import { generateSchemaFromItems } from '../utils/schema-generation.js'; import { getValuesByDotKeys } from './generic.js'; +// Define a named return type for callActorGetDataset +export type CallActorGetDatasetResult = { + runId: string; + datasetId: string; + itemCount: number; + schema: JsonSchemaProperty; + previewItems: DatasetItem[]; +}; + +/** + * Calls an Apify Actor and retrieves metadata about the dataset results. + * + * This function executes an Actor and returns summary information instead with a result items preview of the full dataset + * to prevent overwhelming responses. The actual data can be retrieved using the get-actor-output tool. + * + * It requires the `APIFY_TOKEN` environment variable to be set. + * If the `APIFY_IS_AT_HOME` the dataset items are pushed to the Apify dataset. + * + * @param {string} actorName - The name of the Actor to call. + * @param {unknown} input - The input to pass to the actor. + * @param {ApifyClient} apifyClient - The Apify client to use for authentication. + * @param {ActorCallOptions} callOptions - The options to pass to the Actor. + * @param {ProgressTracker} progressTracker - Optional progress tracker for real-time updates. + * @param {AbortSignal} abortSignal - Optional abort signal to cancel the actor run. + * @returns {Promise} - A promise that resolves to an object containing the actor run and dataset items. + * @throws {Error} - Throws an error if the `APIFY_TOKEN` is not set + */ +export async function callActorGetDataset( + actorName: string, + input: unknown, + apifyClient: ApifyClient, + callOptions: ActorCallOptions | undefined = undefined, + progressTracker?: ProgressTracker | null, + abortSignal?: AbortSignal, +): Promise { + const CLIENT_ABORT = Symbol('CLIENT_ABORT'); // Just internal symbol to identify client abort + const actorClient = apifyClient.actor(actorName); + + // Start the actor run + const actorRun: ActorRun = await actorClient.start(input, callOptions); + + // Start progress tracking if tracker is provided + if (progressTracker) { + progressTracker.startActorRunUpdates(actorRun.id, apifyClient, actorName); + } + + // Create abort promise that handles both API abort and race rejection + const abortPromise = async () => new Promise((resolve) => { + abortSignal?.addEventListener('abort', async () => { + // Abort the actor run via API + try { + await apifyClient.run(actorRun.id).abort({ gracefully: false }); + } catch (e) { + log.error('Error aborting Actor run', { error: e, runId: actorRun.id }); + } + // Reject to stop waiting + resolve(CLIENT_ABORT); + }, { once: true }); + }); + + // Wait for completion or cancellation + const potentialAbortedRun = await Promise.race([ + apifyClient.run(actorRun.id).waitForFinish(), + ...(abortSignal ? [abortPromise()] : []), + ]); + + if (potentialAbortedRun === CLIENT_ABORT) { + log.info('Actor run aborted by client', { actorName, input }); + return null; + } + const completedRun = potentialAbortedRun as ActorRun; + + // Process the completed run + const dataset = apifyClient.dataset(completedRun.defaultDatasetId); + const [datasetItems, defaultBuild] = await Promise.all([ + dataset.listItems(), + (await actorClient.defaultBuild()).get(), + ]); + + // Generate schema using the shared utility + const generatedSchema = generateSchemaFromItems(datasetItems.items, { + clean: true, + arrayMode: 'all', + }); + const schema = generatedSchema || { type: 'object', properties: {} }; + + /** + * Get important fields that are using in any dataset view as they MAY be used in filtering to ensure the output fits + * the tool output limits. Client has to use the get-actor-output tool to retrieve the full dataset or filtered out fields. + */ + const storageDefinition = defaultBuild?.actorDefinition?.storages?.dataset as ActorDefinitionStorage | undefined; + const importantProperties = getActorDefinitionStorageFieldNames(storageDefinition || {}); + const previewItems = ensureOutputWithinCharLimit(datasetItems.items, importantProperties, TOOL_MAX_OUTPUT_CHARS); + + return { + runId: actorRun.id, + datasetId: completedRun.defaultDatasetId, + itemCount: datasetItems.count, + schema, + previewItems, + }; +} + +/** + * This function is used to fetch normal non-MCP server Actors as a tool. + * + * Fetches Actor input schemas by Actor IDs or Actor full names and creates MCP tools. + * + * This function retrieves the input schemas for the specified Actors and compiles them into MCP tools. + * It uses the AJV library to validate the input schemas. + * + * Tool name can't contain /, so it is replaced with _ + * + * The input schema processing workflow: + * 1. Properties are marked as required using markInputPropertiesAsRequired() to add "REQUIRED" prefix to descriptions + * 2. Nested properties are built by analyzing editor type (proxy, requestListSources) using buildNestedProperties() + * 3. Properties are filtered using filterSchemaProperties() + * 4. Properties are shortened using shortenProperties() + * 5. Enums are added to descriptions with examples using addEnumsToDescriptionsWithExamples() + * + * @param {ActorInfo[]} actorsInfo - An array of ActorInfo objects with webServerMcpPath and actorDefinitionPruned. + * @returns {Promise} - A promise that resolves to an array of MCP tools. + */ +export async function getNormalActorsAsTools( + actorsInfo: ActorInfo[], +): Promise { + const tools: ToolEntry[] = []; + + // Zip the results with their corresponding actorIDs + for (const actorInfo of actorsInfo) { + const { actorDefinitionPruned } = actorInfo; + + if (actorDefinitionPruned) { + const schemaID = getToolSchemaID(actorDefinitionPruned.actorFullName); + if (actorDefinitionPruned.input && 'properties' in actorDefinitionPruned.input && actorDefinitionPruned.input) { + actorDefinitionPruned.input.properties = transformActorInputSchemaProperties(actorDefinitionPruned.input); + // Add schema $id, each valid JSON schema should have a unique $id + // see https://json-schema.org/understanding-json-schema/basics#declaring-a-unique-identifier + actorDefinitionPruned.input.$id = schemaID; + } + try { + const memoryMbytes = actorDefinitionPruned.defaultRunOptions?.memoryMbytes || ACTOR_MAX_MEMORY_MBYTES; + const tool: ToolEntry = { + type: 'actor', + tool: { + name: actorNameToToolName(actorDefinitionPruned.actorFullName), + actorFullName: actorDefinitionPruned.actorFullName, + description: `This tool calls the Actor "${actorDefinitionPruned.actorFullName}" and retrieves its output results. Use this tool instead of the "${HelperTools.ACTOR_CALL}" if user requests to use this specific Actor. +Actor description: ${actorDefinitionPruned.description} +Instructions: ${ACTOR_ADDITIONAL_INSTRUCTIONS}`, + inputSchema: actorDefinitionPruned.input + // So Actor without input schema works - MCP client expects JSON schema valid output + || { + type: 'object', + properties: {}, + required: [], + }, + // Additional props true to allow skyfire-pay-id + ajvValidate: fixedAjvCompile(ajv, { ...actorDefinitionPruned.input, additionalProperties: true }), + memoryMbytes: memoryMbytes > ACTOR_MAX_MEMORY_MBYTES ? ACTOR_MAX_MEMORY_MBYTES : memoryMbytes, + }, + }; + tools.push(tool); + } catch (validationError) { + log.error('Failed to compile AJV schema for Actor', { actorName: actorDefinitionPruned.actorFullName, error: validationError }); + } + } + } + return tools; +} + +async function getMCPServersAsTools( + actorsInfo: ActorInfo[], + apifyToken: ApifyToken, +): Promise { + /** + * This is case for the Skyfire request without any Apify token, we do not support + * standby Actors in this case so we can skip MCP servers since they would fail anyway (they are standby Actors). + */ + if (apifyToken === null || apifyToken === undefined) { + return []; + } + + const actorsMCPServerTools: ToolEntry[] = []; + for (const actorInfo of actorsInfo) { + const actorId = actorInfo.actorDefinitionPruned.id; + if (!actorInfo.webServerMcpPath) { + log.warning('Actor does not have a web server MCP path, skipping', { + actorFullName: actorInfo.actorDefinitionPruned.actorFullName, + actorId, + }); + continue; + } + const mcpServerUrl = await getActorMCPServerURL( + actorInfo.actorDefinitionPruned.id, // Real ID of the Actor + actorInfo.webServerMcpPath, + ); + log.debug('Retrieved MCP server URL for Actor', { + actorFullName: actorInfo.actorDefinitionPruned.actorFullName, + actorId, + mcpServerUrl, + }); + + let client: Client | undefined; + try { + client = await connectMCPClient(mcpServerUrl, apifyToken); + const serverTools = await getMCPServerTools(actorId, client, mcpServerUrl); + actorsMCPServerTools.push(...serverTools); + } finally { + if (client) await client.close(); + } + } + + return actorsMCPServerTools; +} + +export async function getActorsAsTools( + actorIdsOrNames: string[], + apifyClient: ApifyClient, +): Promise { + log.debug('Fetching Actors as tools', { actorNames: actorIdsOrNames }); + + const actorsInfo: (ActorInfo | null)[] = await Promise.all( + actorIdsOrNames.map(async (actorIdOrName) => { + const actorDefinitionPrunedCached = actorDefinitionPrunedCache.get(actorIdOrName); + if (actorDefinitionPrunedCached) { + return { + actorDefinitionPruned: actorDefinitionPrunedCached, + webServerMcpPath: getActorMCPServerPath(actorDefinitionPrunedCached), + + } as ActorInfo; + } + + const actorDefinitionPruned = await getActorDefinition(actorIdOrName, apifyClient); + if (!actorDefinitionPruned) { + log.error('Actor not found or definition is not available', { actorName: actorIdOrName }); + return null; + } + // Cache the pruned Actor definition + actorDefinitionPrunedCache.set(actorIdOrName, actorDefinitionPruned); + return { + actorDefinitionPruned, + webServerMcpPath: getActorMCPServerPath(actorDefinitionPruned), + } as ActorInfo; + }), + ); + + const clonedActors = structuredClone(actorsInfo); + + // Filter out nulls and separate Actors with MCP servers and normal Actors + const actorMCPServersInfo = clonedActors.filter((actorInfo) => actorInfo && actorInfo.webServerMcpPath) as ActorInfo[]; + const normalActorsInfo = clonedActors.filter((actorInfo) => actorInfo && !actorInfo.webServerMcpPath) as ActorInfo[]; + + const [normalTools, mcpServerTools] = await Promise.all([ + getNormalActorsAsTools(normalActorsInfo), + getMCPServersAsTools(actorMCPServersInfo, apifyClient.token), + ]); + + return [...normalTools, ...mcpServerTools]; +} + /** * Returns an array of all field names mentioned in the display.properties * of all views in the given ActorDefinitionStorage object. diff --git a/src/utils/tools-loader.ts b/src/utils/tools-loader.ts index f1ee806d..c2685357 100644 --- a/src/utils/tools-loader.ts +++ b/src/utils/tools-loader.ts @@ -8,7 +8,7 @@ import type { ApifyClient } from 'apify'; import log from '@apify/log'; import { defaults } from '../const.js'; -import { callActor } from '../tools/actor.js'; +import { callActor } from '../tools/call-actor.js'; import { getActorOutput } from '../tools/get-actor-output.js'; import { addTool } from '../tools/helpers.js'; import { getActorsAsTools, toolCategories, toolCategoriesEnabledByDefault } from '../tools/index.js'; From 2a4763bb70e58a820a9d57b87f2666157d9fb66e Mon Sep 17 00:00:00 2001 From: MQ Date: Wed, 17 Sep 2025 12:42:27 +0200 Subject: [PATCH 2/3] feat: call-actor add support for MCP server Actors --- .gitignore | 5 ++ src/const.ts | 2 + src/state.ts | 8 ++ src/tools/call-actor.ts | 148 +++++++++++++++++++++++++------------ src/utils/actor.ts | 29 +++++++- tests/integration/suite.ts | 32 ++++++++ 6 files changed, 177 insertions(+), 47 deletions(-) diff --git a/.gitignore b/.gitignore index 75950be2..c32757f6 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,9 @@ storage/key_value_stores/default/* # Added by Apify CLI .venv .env + +# Aider coding agent files .aider* + +# Ignore MCP config for Opencode client +opencode.json diff --git a/src/const.ts b/src/const.ts index 5d4575b5..849ef663 100644 --- a/src/const.ts +++ b/src/const.ts @@ -72,6 +72,8 @@ export const APIFY_DOCS_CACHE_MAX_SIZE = 500; export const APIFY_DOCS_CACHE_TTL_SECS = 60 * 60; // 1 hour export const GET_HTML_SKELETON_CACHE_TTL_SECS = 5 * 60; // 5 minutes export const GET_HTML_SKELETON_CACHE_MAX_SIZE = 200; +export const MCP_SERVER_CACHE_MAX_SIZE = 500; +export const MCP_SERVER_CACHE_TTL_SECS = 30 * 60; // 30 minutes export const ACTOR_PRICING_MODEL = { /** Rental Actors */ diff --git a/src/state.ts b/src/state.ts index 4d305538..b8380d75 100644 --- a/src/state.ts +++ b/src/state.ts @@ -5,6 +5,8 @@ import { APIFY_DOCS_CACHE_TTL_SECS, GET_HTML_SKELETON_CACHE_MAX_SIZE, GET_HTML_SKELETON_CACHE_TTL_SECS, + MCP_SERVER_CACHE_MAX_SIZE, + MCP_SERVER_CACHE_TTL_SECS, } from './const.js'; import type { ActorDefinitionPruned, ApifyDocsSearchResult } from './types.js'; import { TTLLRUCache } from './utils/ttl-lru.js'; @@ -15,3 +17,9 @@ export const searchApifyDocsCache = new TTLLRUCache(API export const fetchApifyDocsCache = new TTLLRUCache(APIFY_DOCS_CACHE_MAX_SIZE, APIFY_DOCS_CACHE_TTL_SECS); /** Stores HTML content per URL so we can paginate the tool output */ export const getHtmlSkeletonCache = new TTLLRUCache(GET_HTML_SKELETON_CACHE_MAX_SIZE, GET_HTML_SKELETON_CACHE_TTL_SECS); +/** + * Stores MCP server resolution per actor: + * - false: not an MCP server + * - string: MCP server URL + */ +export const mcpServerCache = new TTLLRUCache(MCP_SERVER_CACHE_MAX_SIZE, MCP_SERVER_CACHE_TTL_SECS); diff --git a/src/tools/call-actor.ts b/src/tools/call-actor.ts index 5dc95810..0481bd2a 100644 --- a/src/tools/call-actor.ts +++ b/src/tools/call-actor.ts @@ -1,3 +1,4 @@ +import type { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { z } from 'zod'; import zodToJsonSchema from 'zod-to-json-schema'; @@ -8,11 +9,13 @@ import { HelperTools, SKYFIRE_TOOL_INSTRUCTIONS, } from '../const.js'; +import { connectMCPClient } from '../mcp/client.js'; import type { ToolEntry } from '../types.js'; -import { callActorGetDataset, getActorsAsTools } from '../utils/actor.js'; +import { callActorGetDataset, getActorMcpUrlCached, getActorsAsTools } from '../utils/actor.js'; import { fetchActorDetails } from '../utils/actor-details.js'; import { buildActorResponseContent } from '../utils/actor-response.js'; import { ajv } from '../utils/ajv.js'; +import { buildMCPResponse } from '../utils/mcp.js'; import { actorNameToToolName } from './utils.js'; const callActorArgs = z.object({ @@ -55,12 +58,19 @@ MANDATORY TWO-STEP WORKFLOW: Step 1: Get Actor Info (step="info", default) • First call this tool with step="info" to get Actor details and input schema -• This returns the Actor description, documentation, and required input schema +• For regular Actors: returns the Actor input schema +• For MCP server Actors: returns list of available tools with their schemas • You MUST do this step first - it's required to understand how to call the Actor -Step 2: Call Actor (step="call") +Step 2: Call Actor (step="call") • Only after step 1, call again with step="call" and proper input based on the schema -• This executes the Actor and returns the results +• For regular Actors: executes the Actor and returns results +• For MCP server Actors: use format "actor-name:tool-name" to call specific tools + +MCP SERVER ACTORS: +• For MCP server actors, step="info" lists available tools instead of input schema +• To call an MCP tool, use actor name format: "actor-name:tool-name" with step="call" +• Example: actor="apify/my-mcp-actor:search-tool", step="call", input={...} The step parameter enforces this workflow - you cannot call an Actor without first getting its info.`, inputSchema: zodToJsonSchema(callActorArgs), @@ -73,29 +83,66 @@ The step parameter enforces this workflow - you cannot call an Actor without fir const { args, apifyToken, progressTracker, extra, apifyMcpServer } = toolArgs; const { actor: actorName, step, input, callOptions } = callActorArgs.parse(args); + // Parse special format: actor:tool + const mcpToolMatch = actorName.match(/^(.+):(.+)$/); + let baseActorName = actorName; + let mcpToolName: string | undefined; + + if (mcpToolMatch) { + baseActorName = mcpToolMatch[1]; + mcpToolName = mcpToolMatch[2]; + } + + // For definition resolution we always use token-based client; Skyfire is only for actual Actor runs + const apifyClientForDefinition = new ApifyClient({ token: apifyToken }); + // Resolve MCP server URL + const needsMcpUrl = mcpToolName !== undefined || step === 'info'; + const mcpServerUrlOrFalse = needsMcpUrl ? await getActorMcpUrlCached(baseActorName, apifyClientForDefinition) : false; + const isActorMcpServer = mcpServerUrlOrFalse && typeof mcpServerUrlOrFalse === 'string'; + + // Standby Actors, thus MCPs, are not supported in Skyfire mode + if (isActorMcpServer && apifyMcpServer.options.skyfireMode) { + return buildMCPResponse([`MCP server Actors are not supported in Skyfire mode. Please use a regular Apify token without Skyfire.`]); + } + try { if (step === 'info') { - const apifyClient = new ApifyClient({ token: apifyToken }); - // Step 1: Return Actor card and schema directly - const details = await fetchActorDetails(apifyClient, actorName); - if (!details) { - return { - content: [{ type: 'text', text: `Actor information for '${actorName}' was not found. Please check the Actor ID or name and ensure the Actor exists.` }], - }; - } - const content = [ - { type: 'text', text: `**Input Schema:**\n${JSON.stringify(details.inputSchema, null, 0)}` }, - ]; - /** - * Add Skyfire instructions also in the info step since clients are most likely truncating the long tool description of the call-actor. - */ - if (apifyMcpServer.options.skyfireMode) { - content.push({ - type: 'text', - text: SKYFIRE_TOOL_INSTRUCTIONS, - }); + if (isActorMcpServer) { + // MCP server: list tools + const mcpServerUrl = mcpServerUrlOrFalse; + let client: Client | undefined; + // Nested try to ensure client is closed + try { + client = await connectMCPClient(mcpServerUrl, apifyToken); + const toolsResponse = await client.listTools(); + + const toolsInfo = toolsResponse.tools.map((tool) => `**${tool.name}**\n${tool.description || 'No description'}\nInput Schema: ${JSON.stringify(tool.inputSchema, null, 2)}`, + ).join('\n\n'); + + return buildMCPResponse([`This is an MCP Server Actor with the following tools:\n\n${toolsInfo}\n\nTo call a tool, use step="call" with actor name format: "${baseActorName}:{toolName}"`]); + } finally { + if (client) await client.close(); + } + } else { + // Regular actor: return schema + const details = await fetchActorDetails(apifyClientForDefinition, baseActorName); + if (!details) { + return buildMCPResponse([`Actor information for '${baseActorName}' was not found. Please check the Actor ID or name and ensure the Actor exists.`]); + } + const content = [ + { type: 'text', text: `**Input Schema:**\n${JSON.stringify(details.inputSchema, null, 0)}` }, + ]; + /** + * Add Skyfire instructions also in the info step since clients are most likely truncating the long tool description of the call-actor. + */ + if (apifyMcpServer.options.skyfireMode) { + content.push({ + type: 'text', + text: SKYFIRE_TOOL_INSTRUCTIONS, + }); + } + return { content }; } - return { content }; } /** @@ -122,32 +169,45 @@ The step parameter enforces this workflow - you cannot call an Actor without fir // Step 2: Call the Actor if (!input) { - return { - content: [ - { type: 'text', text: `Input is required when step="call". Please provide the input parameter based on the Actor's input schema.` }, - ], - }; + return buildMCPResponse([`Input is required when step="call". Please provide the input parameter based on the Actor's input schema.`]); } + // Handle MCP tool calls + if (mcpToolName) { + if (!isActorMcpServer) { + return buildMCPResponse([`Actor '${baseActorName}' is not an MCP server.`]); + } + + const mcpServerUrl = mcpServerUrlOrFalse; + let client: Client | undefined; + try { + client = await connectMCPClient(mcpServerUrl, apifyToken); + + const result = await client.callTool({ + name: mcpToolName, + arguments: input, + }); + + return { content: result.content }; + } finally { + if (client) await client.close(); + } + } + + // Handle regular Actor calls const [actor] = await getActorsAsTools([actorName], apifyClient); if (!actor) { - return { - content: [ - { type: 'text', text: `Actor '${actorName}' not found.` }, - ], - }; + return buildMCPResponse([`Actor '${actorName}' was not found.`]); } if (!actor.tool.ajvValidate(input)) { const { errors } = actor.tool.ajvValidate; if (errors && errors.length > 0) { - return { - content: [ - { type: 'text', text: `Input validation failed for Actor '${actorName}': ${errors.map((e) => e.message).join(', ')}` }, - { type: 'text', text: `Input Schema:\n${JSON.stringify(actor.tool.inputSchema)}` }, - ], - }; + return buildMCPResponse([ + `Input validation failed for Actor '${actorName}': ${errors.map((e) => e.message).join(', ')}`, + `Input Schema:\n${JSON.stringify(actor.tool.inputSchema)}`, + ]); } } @@ -170,12 +230,8 @@ The step parameter enforces this workflow - you cannot call an Actor without fir return { content }; } catch (error) { - log.error('Error with Actor operation', { error, actorName, step }); - return { - content: [ - { type: 'text', text: `Error with Actor operation: ${error instanceof Error ? error.message : String(error)}` }, - ], - }; + log.error('Failed to call Actor', { error, actorName, step }); + return buildMCPResponse([`Failed to call Actor '${actorName}': ${error instanceof Error ? error.message : String(error)}`]); } }, }, diff --git a/src/utils/actor.ts b/src/utils/actor.ts index ac8ff63d..9296a6f8 100644 --- a/src/utils/actor.ts +++ b/src/utils/actor.ts @@ -13,7 +13,7 @@ import { import { getActorMCPServerPath, getActorMCPServerURL } from '../mcp/actors.js'; import { connectMCPClient } from '../mcp/client.js'; import { getMCPServerTools } from '../mcp/proxy.js'; -import { actorDefinitionPrunedCache } from '../state.js'; +import { actorDefinitionPrunedCache, mcpServerCache } from '../state.js'; import { getActorDefinition } from '../tools/build.js'; import { actorNameToToolName, fixedAjvCompile, getToolSchemaID, transformActorInputSchemaProperties } from '../tools/utils.js'; import type { ActorDefinitionStorage, ActorInfo, ApifyToken, DatasetItem, ToolEntry } from '../types.js'; @@ -32,6 +32,33 @@ export type CallActorGetDatasetResult = { previewItems: DatasetItem[]; }; +/** + * Resolve and cache the MCP server URL for the given Actor. + * - Returns a string URL when the Actor exposes an MCP server + * - Returns false when the Actor is not an MCP server + * Uses a TTL LRU cache to avoid repeated API calls. + */ +export async function getActorMcpUrlCached( + actorIdOrName: string, + apifyClient: ApifyClient, +): Promise { + const cached = mcpServerCache.get(actorIdOrName); + if (cached !== null && cached !== undefined) { + return cached as string | false; + } + + const actorDefinitionPruned = await getActorDefinition(actorIdOrName, apifyClient); + const mcpPath = actorDefinitionPruned && getActorMCPServerPath(actorDefinitionPruned); + if (actorDefinitionPruned && mcpPath) { + const url = await getActorMCPServerURL(actorDefinitionPruned.id, mcpPath); + mcpServerCache.set(actorIdOrName, url); + return url; + } + + mcpServerCache.set(actorIdOrName, false); + return false; +} + /** * Calls an Apify Actor and retrieves metadata about the dataset results. * diff --git a/tests/integration/suite.ts b/tests/integration/suite.ts index 954bcecb..5fad3845 100644 --- a/tests/integration/suite.ts +++ b/tests/integration/suite.ts @@ -486,6 +486,38 @@ export function createIntegrationTestsSuite( expect(result.content).toBeDefined(); }); + it('should call MCP server Actor via call-actor and invoke fetch-apify-docs tool', async () => { + client = await createClientFn({ tools: ['actors'] }); + + // Step 1: info - ensure the MCP server Actor lists tools including fetch-apify-docs + const infoResult = await client.callTool({ + name: HelperTools.ACTOR_CALL, + arguments: { + actor: ACTOR_MCP_SERVER_ACTOR_NAME, + step: 'info', + }, + }); + + expect(infoResult.content).toBeDefined(); + const infoContent = infoResult.content as { text: string }[]; + expect(infoContent.some((item) => item.text.includes('fetch-apify-docs'))).toBe(true); + + // Step 2: call - invoke the MCP tool fetch-apify-docs via actor:tool syntax + const DOCS_URL = 'https://docs.apify.com'; + const callResult = await client.callTool({ + name: HelperTools.ACTOR_CALL, + arguments: { + actor: `${ACTOR_MCP_SERVER_ACTOR_NAME}:fetch-apify-docs`, + step: 'call', + input: { url: DOCS_URL }, + }, + }); + + expect(callResult.content).toBeDefined(); + const callContent = callResult.content as { text: string }[]; + expect(callContent.some((item) => item.text.includes(`Fetched content from ${DOCS_URL}`))).toBe(true); + }); + it('should search Apify documentation', async () => { client = await createClientFn({ tools: ['docs'], From 55682fd3e73f3991f5390ca771131594fdb19287 Mon Sep 17 00:00:00 2001 From: MQ Date: Thu, 18 Sep 2025 16:13:17 +0200 Subject: [PATCH 3/3] revert all the refactors --- src/tools/actor.ts | 510 ++++++++++++++++++++++++++++++++++++ src/tools/call-actor.ts | 238 ----------------- src/tools/index.ts | 3 +- src/utils/actor-response.ts | 2 +- src/utils/actor.ts | 283 +------------------- src/utils/tools-loader.ts | 2 +- 6 files changed, 515 insertions(+), 523 deletions(-) create mode 100644 src/tools/actor.ts delete mode 100644 src/tools/call-actor.ts diff --git a/src/tools/actor.ts b/src/tools/actor.ts new file mode 100644 index 00000000..200c3545 --- /dev/null +++ b/src/tools/actor.ts @@ -0,0 +1,510 @@ +import type { Client } from '@modelcontextprotocol/sdk/client/index.js'; +import type { ActorCallOptions, ActorRun } from 'apify-client'; +import { z } from 'zod'; +import zodToJsonSchema from 'zod-to-json-schema'; + +import log from '@apify/log'; + +import { ApifyClient } from '../apify-client.js'; +import { + ACTOR_ADDITIONAL_INSTRUCTIONS, + ACTOR_MAX_MEMORY_MBYTES, + HelperTools, + SKYFIRE_TOOL_INSTRUCTIONS, + TOOL_MAX_OUTPUT_CHARS, +} from '../const.js'; +import { getActorMCPServerPath, getActorMCPServerURL } from '../mcp/actors.js'; +import { connectMCPClient } from '../mcp/client.js'; +import { getMCPServerTools } from '../mcp/proxy.js'; +import { actorDefinitionPrunedCache } from '../state.js'; +import type { ActorDefinitionStorage, ActorInfo, ApifyToken, DatasetItem, ToolEntry } from '../types.js'; +import { ensureOutputWithinCharLimit, getActorDefinitionStorageFieldNames, getActorMcpUrlCached } from '../utils/actor.js'; +import { fetchActorDetails } from '../utils/actor-details.js'; +import { buildActorResponseContent } from '../utils/actor-response.js'; +import { ajv } from '../utils/ajv.js'; +import { buildMCPResponse } from '../utils/mcp.js'; +import type { ProgressTracker } from '../utils/progress.js'; +import type { JsonSchemaProperty } from '../utils/schema-generation.js'; +import { generateSchemaFromItems } from '../utils/schema-generation.js'; +import { getActorDefinition } from './build.js'; +import { actorNameToToolName, fixedAjvCompile, getToolSchemaID, transformActorInputSchemaProperties } from './utils.js'; + +// Define a named return type for callActorGetDataset +export type CallActorGetDatasetResult = { + runId: string; + datasetId: string; + itemCount: number; + schema: JsonSchemaProperty; + previewItems: DatasetItem[]; +}; + +/** + * Calls an Apify Actor and retrieves metadata about the dataset results. + * + * This function executes an Actor and returns summary information instead with a result items preview of the full dataset + * to prevent overwhelming responses. The actual data can be retrieved using the get-actor-output tool. + * + * It requires the `APIFY_TOKEN` environment variable to be set. + * If the `APIFY_IS_AT_HOME` the dataset items are pushed to the Apify dataset. + * + * @param {string} actorName - The name of the Actor to call. + * @param {unknown} input - The input to pass to the actor. + * @param {ApifyClient} apifyClient - The Apify client to use for authentication. + * @param {ActorCallOptions} callOptions - The options to pass to the Actor. + * @param {ProgressTracker} progressTracker - Optional progress tracker for real-time updates. + * @param {AbortSignal} abortSignal - Optional abort signal to cancel the actor run. + * @returns {Promise} - A promise that resolves to an object containing the actor run and dataset items. + * @throws {Error} - Throws an error if the `APIFY_TOKEN` is not set + */ +export async function callActorGetDataset( + actorName: string, + input: unknown, + apifyClient: ApifyClient, + callOptions: ActorCallOptions | undefined = undefined, + progressTracker?: ProgressTracker | null, + abortSignal?: AbortSignal, +): Promise { + const CLIENT_ABORT = Symbol('CLIENT_ABORT'); // Just internal symbol to identify client abort + const actorClient = apifyClient.actor(actorName); + + // Start the actor run + const actorRun: ActorRun = await actorClient.start(input, callOptions); + + // Start progress tracking if tracker is provided + if (progressTracker) { + progressTracker.startActorRunUpdates(actorRun.id, apifyClient, actorName); + } + + // Create abort promise that handles both API abort and race rejection + const abortPromise = async () => new Promise((resolve) => { + abortSignal?.addEventListener('abort', async () => { + // Abort the actor run via API + try { + await apifyClient.run(actorRun.id).abort({ gracefully: false }); + } catch (e) { + log.error('Error aborting Actor run', { error: e, runId: actorRun.id }); + } + // Reject to stop waiting + resolve(CLIENT_ABORT); + }, { once: true }); + }); + + // Wait for completion or cancellation + const potentialAbortedRun = await Promise.race([ + apifyClient.run(actorRun.id).waitForFinish(), + ...(abortSignal ? [abortPromise()] : []), + ]); + + if (potentialAbortedRun === CLIENT_ABORT) { + log.info('Actor run aborted by client', { actorName, input }); + return null; + } + const completedRun = potentialAbortedRun as ActorRun; + + // Process the completed run + const dataset = apifyClient.dataset(completedRun.defaultDatasetId); + const [datasetItems, defaultBuild] = await Promise.all([ + dataset.listItems(), + (await actorClient.defaultBuild()).get(), + ]); + + // Generate schema using the shared utility + const generatedSchema = generateSchemaFromItems(datasetItems.items, { + clean: true, + arrayMode: 'all', + }); + const schema = generatedSchema || { type: 'object', properties: {} }; + + /** + * Get important fields that are using in any dataset view as they MAY be used in filtering to ensure the output fits + * the tool output limits. Client has to use the get-actor-output tool to retrieve the full dataset or filtered out fields. + */ + const storageDefinition = defaultBuild?.actorDefinition?.storages?.dataset as ActorDefinitionStorage | undefined; + const importantProperties = getActorDefinitionStorageFieldNames(storageDefinition || {}); + const previewItems = ensureOutputWithinCharLimit(datasetItems.items, importantProperties, TOOL_MAX_OUTPUT_CHARS); + + return { + runId: actorRun.id, + datasetId: completedRun.defaultDatasetId, + itemCount: datasetItems.count, + schema, + previewItems, + }; +} + +/** + * This function is used to fetch normal non-MCP server Actors as a tool. + * + * Fetches Actor input schemas by Actor IDs or Actor full names and creates MCP tools. + * + * This function retrieves the input schemas for the specified Actors and compiles them into MCP tools. + * It uses the AJV library to validate the input schemas. + * + * Tool name can't contain /, so it is replaced with _ + * + * The input schema processing workflow: + * 1. Properties are marked as required using markInputPropertiesAsRequired() to add "REQUIRED" prefix to descriptions + * 2. Nested properties are built by analyzing editor type (proxy, requestListSources) using buildNestedProperties() + * 3. Properties are filtered using filterSchemaProperties() + * 4. Properties are shortened using shortenProperties() + * 5. Enums are added to descriptions with examples using addEnumsToDescriptionsWithExamples() + * + * @param {ActorInfo[]} actorsInfo - An array of ActorInfo objects with webServerMcpPath and actorDefinitionPruned. + * @returns {Promise} - A promise that resolves to an array of MCP tools. + */ +export async function getNormalActorsAsTools( + actorsInfo: ActorInfo[], +): Promise { + const tools: ToolEntry[] = []; + + // Zip the results with their corresponding actorIDs + for (const actorInfo of actorsInfo) { + const { actorDefinitionPruned } = actorInfo; + + if (actorDefinitionPruned) { + const schemaID = getToolSchemaID(actorDefinitionPruned.actorFullName); + if (actorDefinitionPruned.input && 'properties' in actorDefinitionPruned.input && actorDefinitionPruned.input) { + actorDefinitionPruned.input.properties = transformActorInputSchemaProperties(actorDefinitionPruned.input); + // Add schema $id, each valid JSON schema should have a unique $id + // see https://json-schema.org/understanding-json-schema/basics#declaring-a-unique-identifier + actorDefinitionPruned.input.$id = schemaID; + } + try { + const memoryMbytes = actorDefinitionPruned.defaultRunOptions?.memoryMbytes || ACTOR_MAX_MEMORY_MBYTES; + const tool: ToolEntry = { + type: 'actor', + tool: { + name: actorNameToToolName(actorDefinitionPruned.actorFullName), + actorFullName: actorDefinitionPruned.actorFullName, + description: `This tool calls the Actor "${actorDefinitionPruned.actorFullName}" and retrieves its output results. Use this tool instead of the "${HelperTools.ACTOR_CALL}" if user requests to use this specific Actor. +Actor description: ${actorDefinitionPruned.description} +Instructions: ${ACTOR_ADDITIONAL_INSTRUCTIONS}`, + inputSchema: actorDefinitionPruned.input + // So Actor without input schema works - MCP client expects JSON schema valid output + || { + type: 'object', + properties: {}, + required: [], + }, + // Additional props true to allow skyfire-pay-id + ajvValidate: fixedAjvCompile(ajv, { ...actorDefinitionPruned.input, additionalProperties: true }), + memoryMbytes: memoryMbytes > ACTOR_MAX_MEMORY_MBYTES ? ACTOR_MAX_MEMORY_MBYTES : memoryMbytes, + }, + }; + tools.push(tool); + } catch (validationError) { + log.error('Failed to compile AJV schema for Actor', { actorName: actorDefinitionPruned.actorFullName, error: validationError }); + } + } + } + return tools; +} + +async function getMCPServersAsTools( + actorsInfo: ActorInfo[], + apifyToken: ApifyToken, +): Promise { + /** + * This is case for the Skyfire request without any Apify token, we do not support + * standby Actors in this case so we can skip MCP servers since they would fail anyway (they are standby Actors). + */ + if (apifyToken === null || apifyToken === undefined) { + return []; + } + + const actorsMCPServerTools: ToolEntry[] = []; + for (const actorInfo of actorsInfo) { + const actorId = actorInfo.actorDefinitionPruned.id; + if (!actorInfo.webServerMcpPath) { + log.warning('Actor does not have a web server MCP path, skipping', { + actorFullName: actorInfo.actorDefinitionPruned.actorFullName, + actorId, + }); + continue; + } + const mcpServerUrl = await getActorMCPServerURL( + actorInfo.actorDefinitionPruned.id, // Real ID of the Actor + actorInfo.webServerMcpPath, + ); + log.debug('Retrieved MCP server URL for Actor', { + actorFullName: actorInfo.actorDefinitionPruned.actorFullName, + actorId, + mcpServerUrl, + }); + + let client: Client | undefined; + try { + client = await connectMCPClient(mcpServerUrl, apifyToken); + const serverTools = await getMCPServerTools(actorId, client, mcpServerUrl); + actorsMCPServerTools.push(...serverTools); + } finally { + if (client) await client.close(); + } + } + + return actorsMCPServerTools; +} + +export async function getActorsAsTools( + actorIdsOrNames: string[], + apifyClient: ApifyClient, +): Promise { + log.debug('Fetching Actors as tools', { actorNames: actorIdsOrNames }); + + const actorsInfo: (ActorInfo | null)[] = await Promise.all( + actorIdsOrNames.map(async (actorIdOrName) => { + const actorDefinitionPrunedCached = actorDefinitionPrunedCache.get(actorIdOrName); + if (actorDefinitionPrunedCached) { + return { + actorDefinitionPruned: actorDefinitionPrunedCached, + webServerMcpPath: getActorMCPServerPath(actorDefinitionPrunedCached), + + } as ActorInfo; + } + + const actorDefinitionPruned = await getActorDefinition(actorIdOrName, apifyClient); + if (!actorDefinitionPruned) { + log.error('Actor not found or definition is not available', { actorName: actorIdOrName }); + return null; + } + // Cache the pruned Actor definition + actorDefinitionPrunedCache.set(actorIdOrName, actorDefinitionPruned); + return { + actorDefinitionPruned, + webServerMcpPath: getActorMCPServerPath(actorDefinitionPruned), + } as ActorInfo; + }), + ); + + const clonedActors = structuredClone(actorsInfo); + + // Filter out nulls and separate Actors with MCP servers and normal Actors + const actorMCPServersInfo = clonedActors.filter((actorInfo) => actorInfo && actorInfo.webServerMcpPath) as ActorInfo[]; + const normalActorsInfo = clonedActors.filter((actorInfo) => actorInfo && !actorInfo.webServerMcpPath) as ActorInfo[]; + + const [normalTools, mcpServerTools] = await Promise.all([ + getNormalActorsAsTools(normalActorsInfo), + getMCPServersAsTools(actorMCPServersInfo, apifyClient.token), + ]); + + return [...normalTools, ...mcpServerTools]; +} + +const callActorArgs = z.object({ + actor: z.string() + .describe('The name of the Actor to call. For example, "apify/rag-web-browser".'), + step: z.enum(['info', 'call']) + .default('info') + .describe(`Step to perform: "info" to get Actor details and input schema (required first step), "call" to execute the Actor (only after getting info).`), + input: z.object({}).passthrough() + .optional() + .describe(`The input JSON to pass to the Actor. For example, {"query": "apify", "maxResults": 5, "outputFormats": ["markdown"]}. Required only when step is "call".`), + callOptions: z.object({ + memory: z.number() + .min(128, 'Memory must be at least 128 MB') + .max(32768, 'Memory cannot exceed 32 GB (32768 MB)') + .optional() + .describe(`Memory allocation for the Actor in MB. Must be a power of 2 (e.g., 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768). Minimum: 128 MB, Maximum: 32768 MB (32 GB).`), + timeout: z.number() + .min(0, 'Timeout must be 0 or greater') + .optional() + .describe(`Maximum runtime for the Actor in seconds. After this time elapses, the Actor will be automatically terminated. Use 0 for infinite timeout (no time limit). Minimum: 0 seconds (infinite).`), + }).optional() + .describe('Optional call options for the Actor run configuration.'), +}); + +export const callActor: ToolEntry = { + type: 'internal', + tool: { + name: HelperTools.ACTOR_CALL, + actorFullName: HelperTools.ACTOR_CALL, + description: `Call Any Actor from Apify Store - Two-Step Process + +This tool uses a mandatory two-step process to safely call any Actor from the Apify store. + +USAGE: +• ONLY for Actors that are NOT available as dedicated tools +• If a dedicated tool exists (e.g., ${actorNameToToolName('apify/rag-web-browser')}), use that instead + +MANDATORY TWO-STEP WORKFLOW: + +Step 1: Get Actor Info (step="info", default) +• First call this tool with step="info" to get Actor details and input schema +• For regular Actors: returns the Actor input schema +• For MCP server Actors: returns list of available tools with their schemas +• You MUST do this step first - it's required to understand how to call the Actor + +Step 2: Call Actor (step="call") +• Only after step 1, call again with step="call" and proper input based on the schema +• For regular Actors: executes the Actor and returns results +• For MCP server Actors: use format "actor-name:tool-name" to call specific tools + +MCP SERVER ACTORS: +• For MCP server actors, step="info" lists available tools instead of input schema +• To call an MCP tool, use actor name format: "actor-name:tool-name" with step="call" +• Example: actor="apify/my-mcp-actor:search-tool", step="call", input={...} + +The step parameter enforces this workflow - you cannot call an Actor without first getting its info.`, + inputSchema: zodToJsonSchema(callActorArgs), + ajvValidate: ajv.compile({ + ...zodToJsonSchema(callActorArgs), + // Additional props true to allow skyfire-pay-id + additionalProperties: true, + }), + call: async (toolArgs) => { + const { args, apifyToken, progressTracker, extra, apifyMcpServer } = toolArgs; + const { actor: actorName, step, input, callOptions } = callActorArgs.parse(args); + + // Parse special format: actor:tool + const mcpToolMatch = actorName.match(/^(.+):(.+)$/); + let baseActorName = actorName; + let mcpToolName: string | undefined; + + if (mcpToolMatch) { + baseActorName = mcpToolMatch[1]; + mcpToolName = mcpToolMatch[2]; + } + + // For definition resolution we always use token-based client; Skyfire is only for actual Actor runs + const apifyClientForDefinition = new ApifyClient({ token: apifyToken }); + // Resolve MCP server URL + const needsMcpUrl = mcpToolName !== undefined || step === 'info'; + const mcpServerUrlOrFalse = needsMcpUrl ? await getActorMcpUrlCached(baseActorName, apifyClientForDefinition) : false; + const isActorMcpServer = mcpServerUrlOrFalse && typeof mcpServerUrlOrFalse === 'string'; + + // Standby Actors, thus MCPs, are not supported in Skyfire mode + if (isActorMcpServer && apifyMcpServer.options.skyfireMode) { + return buildMCPResponse([`MCP server Actors are not supported in Skyfire mode. Please use a regular Apify token without Skyfire.`]); + } + + try { + if (step === 'info') { + if (isActorMcpServer) { + // MCP server: list tools + const mcpServerUrl = mcpServerUrlOrFalse; + let client: Client | undefined; + // Nested try to ensure client is closed + try { + client = await connectMCPClient(mcpServerUrl, apifyToken); + const toolsResponse = await client.listTools(); + + const toolsInfo = toolsResponse.tools.map((tool) => `**${tool.name}**\n${tool.description || 'No description'}\nInput Schema: ${JSON.stringify(tool.inputSchema, null, 2)}`, + ).join('\n\n'); + + return buildMCPResponse([`This is an MCP Server Actor with the following tools:\n\n${toolsInfo}\n\nTo call a tool, use step="call" with actor name format: "${baseActorName}:{toolName}"`]); + } finally { + if (client) await client.close(); + } + } else { + // Regular actor: return schema + const details = await fetchActorDetails(apifyClientForDefinition, baseActorName); + if (!details) { + return buildMCPResponse([`Actor information for '${baseActorName}' was not found. Please check the Actor ID or name and ensure the Actor exists.`]); + } + const content = [ + { type: 'text', text: `**Input Schema:**\n${JSON.stringify(details.inputSchema, null, 0)}` }, + ]; + /** + * Add Skyfire instructions also in the info step since clients are most likely truncating the long tool description of the call-actor. + */ + if (apifyMcpServer.options.skyfireMode) { + content.push({ + type: 'text', + text: SKYFIRE_TOOL_INSTRUCTIONS, + }); + } + return { content }; + } + } + + /** + * In Skyfire mode, we check for the presence of `skyfire-pay-id`. + * If it is missing, we return instructions to the LLM on how to create it and pass it to the tool. + */ + if (apifyMcpServer.options.skyfireMode + && args['skyfire-pay-id'] === undefined + ) { + return { + content: [{ + type: 'text', + text: SKYFIRE_TOOL_INSTRUCTIONS, + }], + }; + } + + /** + * Create Apify token, for Skyfire mode use `skyfire-pay-id` and for normal mode use `apifyToken`. + */ + const apifyClient = apifyMcpServer.options.skyfireMode && typeof args['skyfire-pay-id'] === 'string' + ? new ApifyClient({ skyfirePayId: args['skyfire-pay-id'] }) + : new ApifyClient({ token: apifyToken }); + + // Step 2: Call the Actor + if (!input) { + return buildMCPResponse([`Input is required when step="call". Please provide the input parameter based on the Actor's input schema.`]); + } + + // Handle MCP tool calls + if (mcpToolName) { + if (!isActorMcpServer) { + return buildMCPResponse([`Actor '${baseActorName}' is not an MCP server.`]); + } + + const mcpServerUrl = mcpServerUrlOrFalse; + let client: Client | undefined; + try { + client = await connectMCPClient(mcpServerUrl, apifyToken); + + const result = await client.callTool({ + name: mcpToolName, + arguments: input, + }); + + return { content: result.content }; + } finally { + if (client) await client.close(); + } + } + + // Handle regular Actor calls + const [actor] = await getActorsAsTools([actorName], apifyClient); + + if (!actor) { + return buildMCPResponse([`Actor '${actorName}' was not found.`]); + } + + if (!actor.tool.ajvValidate(input)) { + const { errors } = actor.tool.ajvValidate; + if (errors && errors.length > 0) { + return buildMCPResponse([ + `Input validation failed for Actor '${actorName}': ${errors.map((e) => e.message).join(', ')}`, + `Input Schema:\n${JSON.stringify(actor.tool.inputSchema)}`, + ]); + } + } + + const callResult = await callActorGetDataset( + actorName, + input, + apifyClient, + callOptions, + progressTracker, + extra.signal, + ); + + if (!callResult) { + // Receivers of cancellation notifications SHOULD NOT send a response for the cancelled request + // https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/cancellation#behavior-requirements + return { }; + } + + const content = buildActorResponseContent(actorName, callResult); + + return { content }; + } catch (error) { + log.error('Failed to call Actor', { error, actorName, step }); + return buildMCPResponse([`Failed to call Actor '${actorName}': ${error instanceof Error ? error.message : String(error)}`]); + } + }, + }, +}; diff --git a/src/tools/call-actor.ts b/src/tools/call-actor.ts deleted file mode 100644 index 0481bd2a..00000000 --- a/src/tools/call-actor.ts +++ /dev/null @@ -1,238 +0,0 @@ -import type { Client } from '@modelcontextprotocol/sdk/client/index.js'; -import { z } from 'zod'; -import zodToJsonSchema from 'zod-to-json-schema'; - -import log from '@apify/log'; - -import { ApifyClient } from '../apify-client.js'; -import { - HelperTools, - SKYFIRE_TOOL_INSTRUCTIONS, -} from '../const.js'; -import { connectMCPClient } from '../mcp/client.js'; -import type { ToolEntry } from '../types.js'; -import { callActorGetDataset, getActorMcpUrlCached, getActorsAsTools } from '../utils/actor.js'; -import { fetchActorDetails } from '../utils/actor-details.js'; -import { buildActorResponseContent } from '../utils/actor-response.js'; -import { ajv } from '../utils/ajv.js'; -import { buildMCPResponse } from '../utils/mcp.js'; -import { actorNameToToolName } from './utils.js'; - -const callActorArgs = z.object({ - actor: z.string() - .describe('The name of the Actor to call. For example, "apify/rag-web-browser".'), - step: z.enum(['info', 'call']) - .default('info') - .describe(`Step to perform: "info" to get Actor details and input schema (required first step), "call" to execute the Actor (only after getting info).`), - input: z.object({}).passthrough() - .optional() - .describe(`The input JSON to pass to the Actor. For example, {"query": "apify", "maxResults": 5, "outputFormats": ["markdown"]}. Required only when step is "call".`), - callOptions: z.object({ - memory: z.number() - .min(128, 'Memory must be at least 128 MB') - .max(32768, 'Memory cannot exceed 32 GB (32768 MB)') - .optional() - .describe(`Memory allocation for the Actor in MB. Must be a power of 2 (e.g., 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768). Minimum: 128 MB, Maximum: 32768 MB (32 GB).`), - timeout: z.number() - .min(0, 'Timeout must be 0 or greater') - .optional() - .describe(`Maximum runtime for the Actor in seconds. After this time elapses, the Actor will be automatically terminated. Use 0 for infinite timeout (no time limit). Minimum: 0 seconds (infinite).`), - }).optional() - .describe('Optional call options for the Actor run configuration.'), -}); - -export const callActor: ToolEntry = { - type: 'internal', - tool: { - name: HelperTools.ACTOR_CALL, - actorFullName: HelperTools.ACTOR_CALL, - description: `Call Any Actor from Apify Store - Two-Step Process - -This tool uses a mandatory two-step process to safely call any Actor from the Apify store. - -USAGE: -• ONLY for Actors that are NOT available as dedicated tools -• If a dedicated tool exists (e.g., ${actorNameToToolName('apify/rag-web-browser')}), use that instead - -MANDATORY TWO-STEP WORKFLOW: - -Step 1: Get Actor Info (step="info", default) -• First call this tool with step="info" to get Actor details and input schema -• For regular Actors: returns the Actor input schema -• For MCP server Actors: returns list of available tools with their schemas -• You MUST do this step first - it's required to understand how to call the Actor - -Step 2: Call Actor (step="call") -• Only after step 1, call again with step="call" and proper input based on the schema -• For regular Actors: executes the Actor and returns results -• For MCP server Actors: use format "actor-name:tool-name" to call specific tools - -MCP SERVER ACTORS: -• For MCP server actors, step="info" lists available tools instead of input schema -• To call an MCP tool, use actor name format: "actor-name:tool-name" with step="call" -• Example: actor="apify/my-mcp-actor:search-tool", step="call", input={...} - -The step parameter enforces this workflow - you cannot call an Actor without first getting its info.`, - inputSchema: zodToJsonSchema(callActorArgs), - ajvValidate: ajv.compile({ - ...zodToJsonSchema(callActorArgs), - // Additional props true to allow skyfire-pay-id - additionalProperties: true, - }), - call: async (toolArgs) => { - const { args, apifyToken, progressTracker, extra, apifyMcpServer } = toolArgs; - const { actor: actorName, step, input, callOptions } = callActorArgs.parse(args); - - // Parse special format: actor:tool - const mcpToolMatch = actorName.match(/^(.+):(.+)$/); - let baseActorName = actorName; - let mcpToolName: string | undefined; - - if (mcpToolMatch) { - baseActorName = mcpToolMatch[1]; - mcpToolName = mcpToolMatch[2]; - } - - // For definition resolution we always use token-based client; Skyfire is only for actual Actor runs - const apifyClientForDefinition = new ApifyClient({ token: apifyToken }); - // Resolve MCP server URL - const needsMcpUrl = mcpToolName !== undefined || step === 'info'; - const mcpServerUrlOrFalse = needsMcpUrl ? await getActorMcpUrlCached(baseActorName, apifyClientForDefinition) : false; - const isActorMcpServer = mcpServerUrlOrFalse && typeof mcpServerUrlOrFalse === 'string'; - - // Standby Actors, thus MCPs, are not supported in Skyfire mode - if (isActorMcpServer && apifyMcpServer.options.skyfireMode) { - return buildMCPResponse([`MCP server Actors are not supported in Skyfire mode. Please use a regular Apify token without Skyfire.`]); - } - - try { - if (step === 'info') { - if (isActorMcpServer) { - // MCP server: list tools - const mcpServerUrl = mcpServerUrlOrFalse; - let client: Client | undefined; - // Nested try to ensure client is closed - try { - client = await connectMCPClient(mcpServerUrl, apifyToken); - const toolsResponse = await client.listTools(); - - const toolsInfo = toolsResponse.tools.map((tool) => `**${tool.name}**\n${tool.description || 'No description'}\nInput Schema: ${JSON.stringify(tool.inputSchema, null, 2)}`, - ).join('\n\n'); - - return buildMCPResponse([`This is an MCP Server Actor with the following tools:\n\n${toolsInfo}\n\nTo call a tool, use step="call" with actor name format: "${baseActorName}:{toolName}"`]); - } finally { - if (client) await client.close(); - } - } else { - // Regular actor: return schema - const details = await fetchActorDetails(apifyClientForDefinition, baseActorName); - if (!details) { - return buildMCPResponse([`Actor information for '${baseActorName}' was not found. Please check the Actor ID or name and ensure the Actor exists.`]); - } - const content = [ - { type: 'text', text: `**Input Schema:**\n${JSON.stringify(details.inputSchema, null, 0)}` }, - ]; - /** - * Add Skyfire instructions also in the info step since clients are most likely truncating the long tool description of the call-actor. - */ - if (apifyMcpServer.options.skyfireMode) { - content.push({ - type: 'text', - text: SKYFIRE_TOOL_INSTRUCTIONS, - }); - } - return { content }; - } - } - - /** - * In Skyfire mode, we check for the presence of `skyfire-pay-id`. - * If it is missing, we return instructions to the LLM on how to create it and pass it to the tool. - */ - if (apifyMcpServer.options.skyfireMode - && args['skyfire-pay-id'] === undefined - ) { - return { - content: [{ - type: 'text', - text: SKYFIRE_TOOL_INSTRUCTIONS, - }], - }; - } - - /** - * Create Apify token, for Skyfire mode use `skyfire-pay-id` and for normal mode use `apifyToken`. - */ - const apifyClient = apifyMcpServer.options.skyfireMode && typeof args['skyfire-pay-id'] === 'string' - ? new ApifyClient({ skyfirePayId: args['skyfire-pay-id'] }) - : new ApifyClient({ token: apifyToken }); - - // Step 2: Call the Actor - if (!input) { - return buildMCPResponse([`Input is required when step="call". Please provide the input parameter based on the Actor's input schema.`]); - } - - // Handle MCP tool calls - if (mcpToolName) { - if (!isActorMcpServer) { - return buildMCPResponse([`Actor '${baseActorName}' is not an MCP server.`]); - } - - const mcpServerUrl = mcpServerUrlOrFalse; - let client: Client | undefined; - try { - client = await connectMCPClient(mcpServerUrl, apifyToken); - - const result = await client.callTool({ - name: mcpToolName, - arguments: input, - }); - - return { content: result.content }; - } finally { - if (client) await client.close(); - } - } - - // Handle regular Actor calls - const [actor] = await getActorsAsTools([actorName], apifyClient); - - if (!actor) { - return buildMCPResponse([`Actor '${actorName}' was not found.`]); - } - - if (!actor.tool.ajvValidate(input)) { - const { errors } = actor.tool.ajvValidate; - if (errors && errors.length > 0) { - return buildMCPResponse([ - `Input validation failed for Actor '${actorName}': ${errors.map((e) => e.message).join(', ')}`, - `Input Schema:\n${JSON.stringify(actor.tool.inputSchema)}`, - ]); - } - } - - const callResult = await callActorGetDataset( - actorName, - input, - apifyClient, - callOptions, - progressTracker, - extra.signal, - ); - - if (!callResult) { - // Receivers of cancellation notifications SHOULD NOT send a response for the cancelled request - // https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/cancellation#behavior-requirements - return { }; - } - - const content = buildActorResponseContent(actorName, callResult); - - return { content }; - } catch (error) { - log.error('Failed to call Actor', { error, actorName, step }); - return buildMCPResponse([`Failed to call Actor '${actorName}': ${error instanceof Error ? error.message : String(error)}`]); - } - }, - }, -}; diff --git a/src/tools/index.ts b/src/tools/index.ts index 546e9ef8..d03ec75f 100644 --- a/src/tools/index.ts +++ b/src/tools/index.ts @@ -1,8 +1,7 @@ // Import specific tools that are being used import type { ToolCategory } from '../types.js'; -import { callActorGetDataset, getActorsAsTools } from '../utils/actor.js'; import { getExpectedToolsByCategories } from '../utils/tools.js'; -import { callActor } from './call-actor.js'; +import { callActor, callActorGetDataset, getActorsAsTools } from './actor.js'; import { getDataset, getDatasetItems, getDatasetSchema } from './dataset.js'; import { getUserDatasetsList } from './dataset_collection.js'; import { fetchActorDetailsTool } from './fetch-actor-details.js'; diff --git a/src/utils/actor-response.ts b/src/utils/actor-response.ts index df913a21..fe998655 100644 --- a/src/utils/actor-response.ts +++ b/src/utils/actor-response.ts @@ -1,4 +1,4 @@ -import type { CallActorGetDatasetResult } from './actor.js'; +import type { CallActorGetDatasetResult } from '../tools/actor.js'; /** * Builds the response content for Actor tool calls. diff --git a/src/utils/actor.ts b/src/utils/actor.ts index 9296a6f8..e9645a9d 100644 --- a/src/utils/actor.ts +++ b/src/utils/actor.ts @@ -1,37 +1,10 @@ -import type { Client } from '@modelcontextprotocol/sdk/client/index.js'; -import type { ActorCallOptions, ActorRun } from 'apify-client'; - -import log from '@apify/log'; - import type { ApifyClient } from '../apify-client.js'; -import { - ACTOR_ADDITIONAL_INSTRUCTIONS, - ACTOR_MAX_MEMORY_MBYTES, - HelperTools, - TOOL_MAX_OUTPUT_CHARS, -} from '../const.js'; import { getActorMCPServerPath, getActorMCPServerURL } from '../mcp/actors.js'; -import { connectMCPClient } from '../mcp/client.js'; -import { getMCPServerTools } from '../mcp/proxy.js'; -import { actorDefinitionPrunedCache, mcpServerCache } from '../state.js'; +import { mcpServerCache } from '../state.js'; import { getActorDefinition } from '../tools/build.js'; -import { actorNameToToolName, fixedAjvCompile, getToolSchemaID, transformActorInputSchemaProperties } from '../tools/utils.js'; -import type { ActorDefinitionStorage, ActorInfo, ApifyToken, DatasetItem, ToolEntry } from '../types.js'; -import { ajv } from '../utils/ajv.js'; -import type { ProgressTracker } from '../utils/progress.js'; -import type { JsonSchemaProperty } from '../utils/schema-generation.js'; -import { generateSchemaFromItems } from '../utils/schema-generation.js'; +import type { ActorDefinitionStorage, DatasetItem } from '../types.js'; import { getValuesByDotKeys } from './generic.js'; -// Define a named return type for callActorGetDataset -export type CallActorGetDatasetResult = { - runId: string; - datasetId: string; - itemCount: number; - schema: JsonSchemaProperty; - previewItems: DatasetItem[]; -}; - /** * Resolve and cache the MCP server URL for the given Actor. * - Returns a string URL when the Actor exposes an MCP server @@ -59,258 +32,6 @@ export async function getActorMcpUrlCached( return false; } -/** - * Calls an Apify Actor and retrieves metadata about the dataset results. - * - * This function executes an Actor and returns summary information instead with a result items preview of the full dataset - * to prevent overwhelming responses. The actual data can be retrieved using the get-actor-output tool. - * - * It requires the `APIFY_TOKEN` environment variable to be set. - * If the `APIFY_IS_AT_HOME` the dataset items are pushed to the Apify dataset. - * - * @param {string} actorName - The name of the Actor to call. - * @param {unknown} input - The input to pass to the actor. - * @param {ApifyClient} apifyClient - The Apify client to use for authentication. - * @param {ActorCallOptions} callOptions - The options to pass to the Actor. - * @param {ProgressTracker} progressTracker - Optional progress tracker for real-time updates. - * @param {AbortSignal} abortSignal - Optional abort signal to cancel the actor run. - * @returns {Promise} - A promise that resolves to an object containing the actor run and dataset items. - * @throws {Error} - Throws an error if the `APIFY_TOKEN` is not set - */ -export async function callActorGetDataset( - actorName: string, - input: unknown, - apifyClient: ApifyClient, - callOptions: ActorCallOptions | undefined = undefined, - progressTracker?: ProgressTracker | null, - abortSignal?: AbortSignal, -): Promise { - const CLIENT_ABORT = Symbol('CLIENT_ABORT'); // Just internal symbol to identify client abort - const actorClient = apifyClient.actor(actorName); - - // Start the actor run - const actorRun: ActorRun = await actorClient.start(input, callOptions); - - // Start progress tracking if tracker is provided - if (progressTracker) { - progressTracker.startActorRunUpdates(actorRun.id, apifyClient, actorName); - } - - // Create abort promise that handles both API abort and race rejection - const abortPromise = async () => new Promise((resolve) => { - abortSignal?.addEventListener('abort', async () => { - // Abort the actor run via API - try { - await apifyClient.run(actorRun.id).abort({ gracefully: false }); - } catch (e) { - log.error('Error aborting Actor run', { error: e, runId: actorRun.id }); - } - // Reject to stop waiting - resolve(CLIENT_ABORT); - }, { once: true }); - }); - - // Wait for completion or cancellation - const potentialAbortedRun = await Promise.race([ - apifyClient.run(actorRun.id).waitForFinish(), - ...(abortSignal ? [abortPromise()] : []), - ]); - - if (potentialAbortedRun === CLIENT_ABORT) { - log.info('Actor run aborted by client', { actorName, input }); - return null; - } - const completedRun = potentialAbortedRun as ActorRun; - - // Process the completed run - const dataset = apifyClient.dataset(completedRun.defaultDatasetId); - const [datasetItems, defaultBuild] = await Promise.all([ - dataset.listItems(), - (await actorClient.defaultBuild()).get(), - ]); - - // Generate schema using the shared utility - const generatedSchema = generateSchemaFromItems(datasetItems.items, { - clean: true, - arrayMode: 'all', - }); - const schema = generatedSchema || { type: 'object', properties: {} }; - - /** - * Get important fields that are using in any dataset view as they MAY be used in filtering to ensure the output fits - * the tool output limits. Client has to use the get-actor-output tool to retrieve the full dataset or filtered out fields. - */ - const storageDefinition = defaultBuild?.actorDefinition?.storages?.dataset as ActorDefinitionStorage | undefined; - const importantProperties = getActorDefinitionStorageFieldNames(storageDefinition || {}); - const previewItems = ensureOutputWithinCharLimit(datasetItems.items, importantProperties, TOOL_MAX_OUTPUT_CHARS); - - return { - runId: actorRun.id, - datasetId: completedRun.defaultDatasetId, - itemCount: datasetItems.count, - schema, - previewItems, - }; -} - -/** - * This function is used to fetch normal non-MCP server Actors as a tool. - * - * Fetches Actor input schemas by Actor IDs or Actor full names and creates MCP tools. - * - * This function retrieves the input schemas for the specified Actors and compiles them into MCP tools. - * It uses the AJV library to validate the input schemas. - * - * Tool name can't contain /, so it is replaced with _ - * - * The input schema processing workflow: - * 1. Properties are marked as required using markInputPropertiesAsRequired() to add "REQUIRED" prefix to descriptions - * 2. Nested properties are built by analyzing editor type (proxy, requestListSources) using buildNestedProperties() - * 3. Properties are filtered using filterSchemaProperties() - * 4. Properties are shortened using shortenProperties() - * 5. Enums are added to descriptions with examples using addEnumsToDescriptionsWithExamples() - * - * @param {ActorInfo[]} actorsInfo - An array of ActorInfo objects with webServerMcpPath and actorDefinitionPruned. - * @returns {Promise} - A promise that resolves to an array of MCP tools. - */ -export async function getNormalActorsAsTools( - actorsInfo: ActorInfo[], -): Promise { - const tools: ToolEntry[] = []; - - // Zip the results with their corresponding actorIDs - for (const actorInfo of actorsInfo) { - const { actorDefinitionPruned } = actorInfo; - - if (actorDefinitionPruned) { - const schemaID = getToolSchemaID(actorDefinitionPruned.actorFullName); - if (actorDefinitionPruned.input && 'properties' in actorDefinitionPruned.input && actorDefinitionPruned.input) { - actorDefinitionPruned.input.properties = transformActorInputSchemaProperties(actorDefinitionPruned.input); - // Add schema $id, each valid JSON schema should have a unique $id - // see https://json-schema.org/understanding-json-schema/basics#declaring-a-unique-identifier - actorDefinitionPruned.input.$id = schemaID; - } - try { - const memoryMbytes = actorDefinitionPruned.defaultRunOptions?.memoryMbytes || ACTOR_MAX_MEMORY_MBYTES; - const tool: ToolEntry = { - type: 'actor', - tool: { - name: actorNameToToolName(actorDefinitionPruned.actorFullName), - actorFullName: actorDefinitionPruned.actorFullName, - description: `This tool calls the Actor "${actorDefinitionPruned.actorFullName}" and retrieves its output results. Use this tool instead of the "${HelperTools.ACTOR_CALL}" if user requests to use this specific Actor. -Actor description: ${actorDefinitionPruned.description} -Instructions: ${ACTOR_ADDITIONAL_INSTRUCTIONS}`, - inputSchema: actorDefinitionPruned.input - // So Actor without input schema works - MCP client expects JSON schema valid output - || { - type: 'object', - properties: {}, - required: [], - }, - // Additional props true to allow skyfire-pay-id - ajvValidate: fixedAjvCompile(ajv, { ...actorDefinitionPruned.input, additionalProperties: true }), - memoryMbytes: memoryMbytes > ACTOR_MAX_MEMORY_MBYTES ? ACTOR_MAX_MEMORY_MBYTES : memoryMbytes, - }, - }; - tools.push(tool); - } catch (validationError) { - log.error('Failed to compile AJV schema for Actor', { actorName: actorDefinitionPruned.actorFullName, error: validationError }); - } - } - } - return tools; -} - -async function getMCPServersAsTools( - actorsInfo: ActorInfo[], - apifyToken: ApifyToken, -): Promise { - /** - * This is case for the Skyfire request without any Apify token, we do not support - * standby Actors in this case so we can skip MCP servers since they would fail anyway (they are standby Actors). - */ - if (apifyToken === null || apifyToken === undefined) { - return []; - } - - const actorsMCPServerTools: ToolEntry[] = []; - for (const actorInfo of actorsInfo) { - const actorId = actorInfo.actorDefinitionPruned.id; - if (!actorInfo.webServerMcpPath) { - log.warning('Actor does not have a web server MCP path, skipping', { - actorFullName: actorInfo.actorDefinitionPruned.actorFullName, - actorId, - }); - continue; - } - const mcpServerUrl = await getActorMCPServerURL( - actorInfo.actorDefinitionPruned.id, // Real ID of the Actor - actorInfo.webServerMcpPath, - ); - log.debug('Retrieved MCP server URL for Actor', { - actorFullName: actorInfo.actorDefinitionPruned.actorFullName, - actorId, - mcpServerUrl, - }); - - let client: Client | undefined; - try { - client = await connectMCPClient(mcpServerUrl, apifyToken); - const serverTools = await getMCPServerTools(actorId, client, mcpServerUrl); - actorsMCPServerTools.push(...serverTools); - } finally { - if (client) await client.close(); - } - } - - return actorsMCPServerTools; -} - -export async function getActorsAsTools( - actorIdsOrNames: string[], - apifyClient: ApifyClient, -): Promise { - log.debug('Fetching Actors as tools', { actorNames: actorIdsOrNames }); - - const actorsInfo: (ActorInfo | null)[] = await Promise.all( - actorIdsOrNames.map(async (actorIdOrName) => { - const actorDefinitionPrunedCached = actorDefinitionPrunedCache.get(actorIdOrName); - if (actorDefinitionPrunedCached) { - return { - actorDefinitionPruned: actorDefinitionPrunedCached, - webServerMcpPath: getActorMCPServerPath(actorDefinitionPrunedCached), - - } as ActorInfo; - } - - const actorDefinitionPruned = await getActorDefinition(actorIdOrName, apifyClient); - if (!actorDefinitionPruned) { - log.error('Actor not found or definition is not available', { actorName: actorIdOrName }); - return null; - } - // Cache the pruned Actor definition - actorDefinitionPrunedCache.set(actorIdOrName, actorDefinitionPruned); - return { - actorDefinitionPruned, - webServerMcpPath: getActorMCPServerPath(actorDefinitionPruned), - } as ActorInfo; - }), - ); - - const clonedActors = structuredClone(actorsInfo); - - // Filter out nulls and separate Actors with MCP servers and normal Actors - const actorMCPServersInfo = clonedActors.filter((actorInfo) => actorInfo && actorInfo.webServerMcpPath) as ActorInfo[]; - const normalActorsInfo = clonedActors.filter((actorInfo) => actorInfo && !actorInfo.webServerMcpPath) as ActorInfo[]; - - const [normalTools, mcpServerTools] = await Promise.all([ - getNormalActorsAsTools(normalActorsInfo), - getMCPServersAsTools(actorMCPServersInfo, apifyClient.token), - ]); - - return [...normalTools, ...mcpServerTools]; -} - /** * Returns an array of all field names mentioned in the display.properties * of all views in the given ActorDefinitionStorage object. diff --git a/src/utils/tools-loader.ts b/src/utils/tools-loader.ts index c2685357..f1ee806d 100644 --- a/src/utils/tools-loader.ts +++ b/src/utils/tools-loader.ts @@ -8,7 +8,7 @@ import type { ApifyClient } from 'apify'; import log from '@apify/log'; import { defaults } from '../const.js'; -import { callActor } from '../tools/call-actor.js'; +import { callActor } from '../tools/actor.js'; import { getActorOutput } from '../tools/get-actor-output.js'; import { addTool } from '../tools/helpers.js'; import { getActorsAsTools, toolCategories, toolCategoriesEnabledByDefault } from '../tools/index.js';