Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import io.modelcontextprotocol.client.McpSyncClient;
import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport;
import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper;
import io.modelcontextprotocol.json.schema.jackson.DefaultJsonSchemaValidator;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import modelengine.fel.tool.mcp.client.McpClient;
import modelengine.fel.tool.mcp.client.elicitation.ElicitRequest;
import modelengine.fel.tool.mcp.client.elicitation.ElicitResult;
import modelengine.fel.tool.mcp.client.support.handler.McpClientLogHandler;
import modelengine.fel.tool.mcp.client.support.handler.McpElicitationHandler;
import modelengine.fel.tool.mcp.entity.Tool;
import modelengine.fitframework.inspection.Nullable;
import modelengine.fitframework.log.Logger;
import modelengine.fitframework.util.StringUtils;
import modelengine.fitframework.util.UuidUtils;
Expand All @@ -26,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -37,41 +42,47 @@
* @author 黄可欣
* @since 2025-11-03
*/
public class DefaultMcpStreamableClient implements McpClient {
private static final Logger log = Logger.get(DefaultMcpStreamableClient.class);
public class DefaultMcpClient implements McpClient {
private static final Logger log = Logger.get(DefaultMcpClient.class);

private final String clientId;
private final McpSyncClient mcpSyncClient;
private final DefaultMcpClientLogHandler logHandler;

private volatile boolean initialized = false;
private volatile boolean closed = false;

/**
* Constructs a new instance of the DefaultMcpStreamableClient.
* Constructs a new instance of the DefaultMcpClient.
*
* @param baseUri The base URI of the MCP server.
* @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection.
* @param requestTimeoutSeconds The timeout duration of requests. Units: seconds.
*/
public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int requestTimeoutSeconds) {
public DefaultMcpClient(String baseUri, String sseEndpoint, McpClientTransport transport, int requestTimeoutSeconds,
@Nullable Function<ElicitRequest, ElicitResult> elicitationHandler) {
this.clientId = UuidUtils.randomUuidString();
notBlank(baseUri, "The MCP server base URI cannot be blank.");
notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank.");
log.info("Creating MCP client. [clientId={}, baseUri={}]", this.clientId, baseUri);
ObjectMapper mapper = new ObjectMapper();
HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri)
.jsonMapper(new JacksonMcpJsonMapper(mapper))
.endpoint(sseEndpoint)
.build();

this.logHandler = new DefaultMcpClientLogHandler(this.clientId);
this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport)
.requestTimeout(Duration.ofSeconds(requestTimeoutSeconds))
.capabilities(McpSchema.ClientCapabilities.builder().build())
.loggingConsumer(this.logHandler::handleLoggingMessage)
.jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper))
.build();
McpClientLogHandler logHandler = new McpClientLogHandler(this.clientId);
if (elicitationHandler != null) {
McpElicitationHandler mcpElicitationHandler =
new McpElicitationHandler(this.clientId, elicitationHandler);
this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport)
.capabilities(McpSchema.ClientCapabilities.builder().elicitation().build())
.loggingConsumer(logHandler::handleLoggingMessage)
.elicitation(mcpElicitationHandler::handleElicitationRequest)
.requestTimeout(Duration.ofSeconds(requestTimeoutSeconds))
.jsonSchemaValidator(new DefaultJsonSchemaValidator(new ObjectMapper()))
.build();
} else {
this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport)
.capabilities(McpSchema.ClientCapabilities.builder().build())
.loggingConsumer(logHandler::handleLoggingMessage)
.requestTimeout(Duration.ofSeconds(requestTimeoutSeconds))
.jsonSchemaValidator(new DefaultJsonSchemaValidator(new ObjectMapper()))
.build();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,23 @@

package modelengine.fel.tool.mcp.client.support;

import com.fasterxml.jackson.databind.ObjectMapper;

import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport;
import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper;
import modelengine.fel.tool.mcp.client.McpClient;
import modelengine.fel.tool.mcp.client.McpClientFactory;
import modelengine.fel.tool.mcp.client.elicitation.ElicitRequest;
import modelengine.fel.tool.mcp.client.elicitation.ElicitResult;
import modelengine.fitframework.annotation.Component;
import modelengine.fitframework.annotation.Value;
import modelengine.fitframework.inspection.Nullable;

import java.util.function.Function;

/**
* Represents a factory for creating instances of the {@link DefaultMcpStreamableClient}.
* Represents a factory for creating instances of the {@link DefaultMcpClient}.
* This class is responsible for initializing and configuring.
*
* @author 季聿阶
Expand All @@ -32,7 +42,22 @@ public DefaultMcpClientFactory(@Value("${mcp.client.request.timeout-seconds}") i
}

@Override
public McpClient create(String baseUri, String sseEndpoint) {
return new DefaultMcpStreamableClient(baseUri, sseEndpoint, requestTimeoutSeconds);
public McpClient createStreamable(String baseUri, String sseEndpoint,
@Nullable Function<ElicitRequest, ElicitResult> elicitationHandler) {
HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri)
.jsonMapper(new JacksonMcpJsonMapper(new ObjectMapper()))
.endpoint(sseEndpoint)
.build();
return new DefaultMcpClient(baseUri, sseEndpoint, transport, this.requestTimeoutSeconds, elicitationHandler);
}

@Override
public McpClient createSse(String baseUri, String sseEndpoint,
@Nullable Function<ElicitRequest, ElicitResult> elicitationHandler) {
HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder(baseUri)
.jsonMapper(new JacksonMcpJsonMapper(new ObjectMapper()))
.sseEndpoint(sseEndpoint)
.build();
return new DefaultMcpClient(baseUri, sseEndpoint, transport, this.requestTimeoutSeconds, elicitationHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package modelengine.fel.tool.mcp.client.support;
package modelengine.fel.tool.mcp.client.support.handler;

import io.modelcontextprotocol.spec.McpSchema;
import modelengine.fitframework.log.Logger;
Expand All @@ -16,16 +16,16 @@
* @author 黄可欣
* @since 2025-11-03
*/
public class DefaultMcpClientLogHandler {
private static final Logger log = Logger.get(DefaultMcpClientLogHandler.class);
public class McpClientLogHandler {
private static final Logger log = Logger.get(McpClientLogHandler.class);
private final String clientId;

/**
* Constructs a new instance of DefaultMcpClientLogHandler.
* Constructs a new instance of McpClientLogHandler.
*
* @param clientId The unique identifier of the MCP client.
*/
public DefaultMcpClientLogHandler(String clientId) {
public McpClientLogHandler(String clientId) {
this.clientId = clientId;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package modelengine.fel.tool.mcp.client.support.handler;

import io.modelcontextprotocol.spec.McpSchema;
import modelengine.fel.tool.mcp.client.elicitation.ElicitRequest;
import modelengine.fel.tool.mcp.client.elicitation.ElicitResult;
import modelengine.fitframework.log.Logger;

import java.util.function.Function;

/**
* MCP elicitation handler that delegates to an external handler function.
*
* <p>Converts {@link McpSchema.ElicitRequest} to {@link ElicitRequest},
* calls the user's handler, and converts {@link ElicitResult} back to {@link McpSchema.ElicitResult}.</p>
*
* @author 黄可欣
* @since 2025-11-25
*/
public class McpElicitationHandler {
private static final Logger log = Logger.get(McpElicitationHandler.class);
private final String clientId;
private final Function<ElicitRequest, ElicitResult> elicitationHandler;

/**
* Constructs a new handler.
*
* @param clientId The client ID.
* @param elicitationHandler The user's handler function that processes {@link ElicitRequest}
* and returns {@link ElicitResult}.
*/
public McpElicitationHandler(String clientId, Function<ElicitRequest, ElicitResult> elicitationHandler) {
this.clientId = clientId;
this.elicitationHandler = elicitationHandler;
}

/**
* Handles an elicitation request by converting {@link McpSchema.ElicitRequest} to {@link ElicitRequest},
* delegating to the user's handler, and converting {@link ElicitResult} back to {@link McpSchema.ElicitResult}.
*
* @param request The {@link McpSchema.ElicitRequest} from MCP server.
* @return The {@link McpSchema.ElicitResult} to send back to MCP server.
*/
public McpSchema.ElicitResult handleElicitationRequest(McpSchema.ElicitRequest request) {
log.info("Received elicitation request from MCP server. [clientId={}, message={}, requestSchema={}]",
this.clientId,
request.message(),
request.requestedSchema());

try {
ElicitRequest elicitRequest = new ElicitRequest(request.message(), request.requestedSchema());
ElicitResult result = this.elicitationHandler.apply(elicitRequest);
log.info("Successfully handled elicitation request. [clientId={}, action={}, content={}]",
this.clientId,
result.action(),
result.content());

McpSchema.ElicitResult.Action mcpAction = switch (result.action()) {
case ACCEPT -> McpSchema.ElicitResult.Action.ACCEPT;
case DECLINE -> McpSchema.ElicitResult.Action.DECLINE;
case CANCEL -> McpSchema.ElicitResult.Action.CANCEL;
};
return new McpSchema.ElicitResult(mcpAction, result.content());
} catch (Exception e) {
log.error("Failed to handle elicitation request. [clientId={}, error={}]",
this.clientId,
e.getMessage(),
e);
throw new IllegalStateException("Failed to handle elicitation request: " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import modelengine.fel.tool.mcp.client.McpClient;
import modelengine.fel.tool.mcp.client.McpClientFactory;
import modelengine.fel.tool.mcp.client.elicitation.ElicitResult;
import modelengine.fel.tool.mcp.entity.Tool;
import modelengine.fit.http.annotation.GetMapping;
import modelengine.fit.http.annotation.PostMapping;
Expand All @@ -17,6 +18,7 @@
import modelengine.fitframework.annotation.Component;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -51,9 +53,27 @@ public TestController(McpClientFactory mcpClientFactory) {
* @return A string indicating that the initialization was successful.
*/
@PostMapping(path = "/initialize")
public String initialize(@RequestQuery(name = "baseUri") String baseUri,
public String initializeStreamable(@RequestQuery(name = "baseUri") String baseUri,
@RequestQuery(name = "sseEndpoint") String sseEndpoint) {
this.client = this.mcpClientFactory.create(baseUri, sseEndpoint);
this.client = this.mcpClientFactory.createStreamable(baseUri, sseEndpoint, null);
this.client.initialize();
return "Initialized";
}

@PostMapping(path = "/initialize-sse")
public String initializeSse(@RequestQuery(name = "baseUri") String baseUri,
@RequestQuery(name = "sseEndpoint") String sseEndpoint) {
this.client = this.mcpClientFactory.createSse(baseUri, sseEndpoint, null);
this.client.initialize();
return "Initialized";
}

@PostMapping(path = "/initialize-elicitation")
public String initializeElicitation(@RequestQuery(name = "baseUri") String baseUri,
@RequestQuery(name = "sseEndpoint") String sseEndpoint) {
this.client = this.mcpClientFactory.createStreamable(baseUri,
sseEndpoint,
request -> new ElicitResult(ElicitResult.Action.ACCEPT, Collections.emptyMap()));
this.client.initialize();
return "Initialized";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,74 @@

package modelengine.fel.tool.mcp.client;

import modelengine.fel.tool.mcp.client.elicitation.ElicitRequest;
import modelengine.fel.tool.mcp.client.elicitation.ElicitResult;
import modelengine.fitframework.inspection.Nullable;

import java.util.function.Function;

/**
* Indicates the factory of {@link McpClient}.
* <p>
* Each {@link McpClient} instance created by this factory is designed to connect to a single specified MCP server.
* Factory for creating {@link McpClient} instances with SSE or Streamable HTTP transport.
* <p>Each client connects to a single MCP server.</p>
*
* @author 季聿阶
* @since 2025-05-21
*/
public interface McpClientFactory {
/**
* Creates a {@link McpClient} instance.
* Creates a client with streamable HTTP transport.
*
* @param baseUri The base URI of the MCP server.
* @param sseEndpoint The SSE endpoint of the MCP server.
* @param elicitationFunction The function to handle {@link ElicitRequest} and return {@link ElicitResult}.
* If null, elicitation will not be supported in MCP client.
* @return The created {@link McpClient} instance.
*/
McpClient createStreamable(String baseUri, String sseEndpoint,
@Nullable Function<ElicitRequest, ElicitResult> elicitationFunction);

/**
* Creates a client with SSE transport.
*
* @param baseUri The base URI of the MCP server.
* @param sseEndpoint The SSE endpoint of the MCP server.
* @param elicitationFunction The function to handle {@link ElicitRequest} and return {@link ElicitResult}.
* If null, elicitation will not be supported in MCP client.
* @return The created {@link McpClient} instance.
*/
McpClient createSse(String baseUri, String sseEndpoint,
@Nullable Function<ElicitRequest, ElicitResult> elicitationFunction);

/**
* Creates a client with streamable HTTP transport (default). No elicitation support.
*
* @param baseUri The base URI of the MCP server.
* @param sseEndpoint The SSE endpoint of the MCP server.
* @return The created {@link McpClient} instance.
*/
default McpClient create(String baseUri, String sseEndpoint) {
return this.createStreamable(baseUri, sseEndpoint, null);
}

/**
* Creates a client with streamable HTTP transport. No elicitation support.
*
* @param baseUri The base URI of the MCP server.
* @param sseEndpoint The SSE endpoint of the MCP server.
* @return The created {@link McpClient} instance.
*/
default McpClient createStreamable(String baseUri, String sseEndpoint) {
return this.createStreamable(baseUri, sseEndpoint, null);
}

/**
* Creates a client with SSE transport. No elicitation support.
*
* @param baseUri The base URI of the MCP server.
* @param sseEndpoint The SSE endpoint of the MCP server.
* @return The connected {@link McpClient} instance.
* @return The created {@link McpClient} instance.
*/
McpClient create(String baseUri, String sseEndpoint);
default McpClient createSse(String baseUri, String sseEndpoint) {
return this.createSse(baseUri, sseEndpoint, null);
}
}
Loading