From 485d66eec07cc7b24b0a9466703484d11746b792 Mon Sep 17 00:00:00 2001 From: twosom Date: Sun, 19 Oct 2025 00:54:56 +0900 Subject: [PATCH 1/5] chore: handle immutable collections in ChatModelAction message context --- .../agents/plan/actions/ChatModelAction.java | 42 +++++++++++++++++-- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java index f20fdc17..afe7d964 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java @@ -31,10 +31,7 @@ import org.apache.flink.agents.api.tools.ToolResponse; import org.apache.flink.agents.plan.JavaFunction; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; /** Built-in action for processing chat request and tool call result. */ public class ChatModelAction { @@ -84,6 +81,11 @@ public static void chat( } List messageContext = (List) toolCallContext.get(initialRequestId); + + if (isImmutable(messageContext)) { + messageContext = new ArrayList<>(messageContext); + } + messageContext.add(response); stm.set(TOOL_CALL_CONTEXT, toolCallContext); @@ -118,6 +120,34 @@ public static void chat( } } + /** + * Checks if the provided list of chat messages is an immutable collection. + * + *

This method determines whether the list is an instance of Java's immutable collections by + * checking specific class name patterns used by the JDK implementation. + * + *

Note: This implementation is JDK-specific and may need updates if the JDK's internal + * implementation changes in future versions. + * + * @param maybeImmutableList The list of chat messages to check + * @return {@code true} if the list is an immutable collection, {@code false} otherwise + */ + private static boolean isImmutable(List maybeImmutableList) { + String className = maybeImmutableList.getClass().getName(); + // Check for Collections.unmodifiableList() and similar + if (className.startsWith("java.util.ImmutableCollections") + || className.startsWith("java.util.Collections$Unmodifiable")) { + return true; + } + + // Check for List.of() (Java 9+) + if (className.startsWith("java.util.ImmutableCollections$List")) { + return true; + } + + return false; + } + /** * Built-in action for processing chat request and tool call result. * @@ -160,6 +190,10 @@ public static void processChatRequestOrToolResponse(Event event, RunnerContext c (Map) stm.get(TOOL_CALL_CONTEXT).getValue(); // update tool call context List messages = (List) toolCallContext.get(initialRequestId); + if (isImmutable(messages)) { + messages = new ArrayList<>(messages); + } + for (Map.Entry entry : responses.entrySet()) { Map extraArgs = new HashMap<>(); String toolCallId = entry.getKey(); From 0514b6d4810154172bb2eebcbf0feee272f8479a Mon Sep 17 00:00:00 2001 From: twosom Date: Sun, 19 Oct 2025 00:55:20 +0900 Subject: [PATCH 2/5] bump ollama4j dependency from 1.1.0 to 1.1.1 --- integrations/chat-models/ollama/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/chat-models/ollama/pom.xml b/integrations/chat-models/ollama/pom.xml index 96850c44..4c704e26 100644 --- a/integrations/chat-models/ollama/pom.xml +++ b/integrations/chat-models/ollama/pom.xml @@ -46,7 +46,7 @@ under the License. io.github.ollama4j ollama4j - 1.1.0 + 1.1.1 From 9520fab5d980de6f2a2c52eeafa1d5a7f5f4d1e6 Mon Sep 17 00:00:00 2001 From: twosom Date: Sun, 19 Oct 2025 00:57:32 +0900 Subject: [PATCH 3/5] feat: improve tool handling --- .../ollama/OllamaChatModelConnection.java | 154 ++++++++++-------- .../ollama/OllamaChatModelSetup.java | 3 + 2 files changed, 93 insertions(+), 64 deletions(-) diff --git a/integrations/chat-models/ollama/src/main/java/org/apache/flink/agents/integrations/chatmodels/ollama/OllamaChatModelConnection.java b/integrations/chat-models/ollama/src/main/java/org/apache/flink/agents/integrations/chatmodels/ollama/OllamaChatModelConnection.java index 72b291c9..a606adc3 100644 --- a/integrations/chat-models/ollama/src/main/java/org/apache/flink/agents/integrations/chatmodels/ollama/OllamaChatModelConnection.java +++ b/integrations/chat-models/ollama/src/main/java/org/apache/flink/agents/integrations/chatmodels/ollama/OllamaChatModelConnection.java @@ -20,11 +20,9 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import io.github.ollama4j.OllamaAPI; import io.github.ollama4j.exceptions.RoleNotFoundException; -import io.github.ollama4j.models.chat.OllamaChatMessage; -import io.github.ollama4j.models.chat.OllamaChatMessageRole; -import io.github.ollama4j.models.chat.OllamaChatResult; +import io.github.ollama4j.models.chat.*; +import io.github.ollama4j.models.request.OllamaChatEndpointCaller; import io.github.ollama4j.tools.Tools; import org.apache.flink.agents.api.chat.messages.ChatMessage; import org.apache.flink.agents.api.chat.messages.MessageRole; @@ -33,14 +31,9 @@ import org.apache.flink.agents.api.resource.ResourceDescriptor; import org.apache.flink.agents.api.resource.ResourceType; import org.apache.flink.agents.api.tools.Tool; -import org.apache.flink.agents.api.tools.ToolParameters; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.BiFunction; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; /** @@ -66,8 +59,8 @@ * } */ public class OllamaChatModelConnection extends BaseChatModelConnection { - private final OllamaAPI client; - private final Pattern pattern; + + private final OllamaChatEndpointCaller caller; /** * Creates a new ollama chat model connection. @@ -83,13 +76,10 @@ public OllamaChatModelConnection( if (endpoint == null || endpoint.isEmpty()) { throw new IllegalArgumentException("endpoint should not be null or empty."); } - this.client = new OllamaAPI(endpoint); - Integer maxChatToolCallRetries = descriptor.getArgument("maxChatToolCallRetries"); - this.client.setMaxChatToolCallRetries( - maxChatToolCallRetries != null ? maxChatToolCallRetries : 10); Integer requestTimeout = descriptor.getArgument("requestTimeout"); - this.client.setRequestTimeoutSeconds(requestTimeout != null ? requestTimeout : 10); - this.pattern = Pattern.compile("(.*?)", Pattern.DOTALL); + this.caller = + new OllamaChatEndpointCaller( + endpoint, null, requestTimeout != null ? requestTimeout : 10); } /** @@ -108,18 +98,20 @@ public OllamaChatModelConnection( } /** - * Registers tools with the Ollama client based on tool resource names. + * Converts Flink Agent tools to Ollama compatible tool specifications. * *

Each tool's input schema is expected to be a JSON schema containing "properties" and * "required" keys. The schema is converted into the function/tool specification that Ollama - * understands, and a callable is wired to invoke the underlying BaseTool with ToolParameters. + * understands, and each tool is properly formatted for Ollama API integration. * - * @param tools tools to be registered to the client - * @throws RuntimeException if schema parsing or registration fails + * @param tools List of Flink Agent tools to be converted to Ollama tools + * @return List of Ollama compatible tool specifications + * @throws RuntimeException if schema parsing or conversion fails */ @SuppressWarnings("unchecked") - private void registerTools(List tools) { + private List convertToOllamaTools(List tools) { final ObjectMapper mapper = new ObjectMapper(); + final List ollamaTools = new ArrayList<>(); try { for (Tool tool : tools) { final Map schema = @@ -130,7 +122,7 @@ private void registerTools(List tools) { (Map>) schema.get("properties"); final List required = (List) schema.get("required"); - Map propertiesMap = new HashMap<>(); + Map propertiesMap = new HashMap<>(); for (Map.Entry> entry : properties.entrySet()) { final String paramName = entry.getKey(); @@ -140,40 +132,26 @@ private void registerTools(List tools) { propertiesMap.put( paramName, - Tools.PromptFuncDefinition.Property.builder() + Tools.Property.builder() .type(type) .description(description) .required(required.contains(paramName)) .build()); } - final Tools.ToolSpecification toolSpec = - Tools.ToolSpecification.builder() - .functionName(tool.getName()) - .functionDescription(tool.getDescription()) - .toolPrompt( - Tools.PromptFuncDefinition.builder() - .type("prompt") - .function( - Tools.PromptFuncDefinition.PromptFuncSpec - .builder() - .name(tool.getName()) - .description(tool.getDescription()) - .parameters( - Tools.PromptFuncDefinition - .Parameters - .builder() - .type("object") - .properties( - propertiesMap) - .build()) - .build()) + final Tools.Tool toolSpec = + Tools.Tool.builder() + .toolSpec( + Tools.ToolSpec.builder() + .name(tool.getName()) + .description(tool.getDescription()) + .parameters(Tools.Parameters.of(propertiesMap)) .build()) - .toolFunction(arguments -> tool.call(new ToolParameters(arguments))) .build(); - - this.client.registerTool(toolSpec); + ollamaTools.add(toolSpec); } + + return ollamaTools; } catch (Exception e) { throw new RuntimeException(e); } @@ -201,32 +179,80 @@ private OllamaChatMessage convertToOllamaChatMessages(ChatMessage message) { public ChatMessage chat( List messages, List tools, Map arguments) { try { - registerTools(tools); + final boolean extractReasoning = + (boolean) arguments.getOrDefault("extract_reasoning", false); + + final List ollamaTools = this.convertToOllamaTools(tools); final List ollamaChatMessages = messages.stream() .map(this::convertToOllamaChatMessages) .collect(Collectors.toList()); - final OllamaChatResult ollamaChatResult = - this.client.chat((String) arguments.get("model"), ollamaChatMessages); + final OllamaChatRequestBuilder chatRequestBuilder = + OllamaChatRequestBuilder.builder() + .withMessages(ollamaChatMessages) + .withModel((String) arguments.get("model")) + .withThinking(extractReasoning); + + chatRequestBuilder.setUseTools(false); + + final OllamaChatRequest chatRequest = chatRequestBuilder.build(); + + chatRequest.setTools(ollamaTools); + final OllamaChatResult ollamaChatResult = this.caller.callSync(chatRequest); + final OllamaChatResponseModel ollamaChatResponse = ollamaChatResult.getResponseModel(); + final OllamaChatMessage ollamaChatMessage = ollamaChatResponse.getMessage(); + + Map extraArgs = new HashMap<>(); + if (extractReasoning) { + extraArgs.put("reasoning", ollamaChatMessage.getThinking()); + } + + final List ollamaToolCalls = ollamaChatMessage.getToolCalls(); + final ChatMessage chatMessage = ChatMessage.assistant(ollamaChatMessage.getResponse()); + chatMessage.setExtraArgs(extraArgs); + + if (ollamaToolCalls != null) { + final List> toolCalls = convertToAgentsTools(ollamaToolCalls); + chatMessage.setToolCalls(toolCalls); + } - return extraReasoning(ollamaChatResult.getResponse()); + return chatMessage; } catch (Exception e) { throw new RuntimeException(e); } } - private ChatMessage extraReasoning(String response) { - Matcher matcher = pattern.matcher(response); - StringBuilder reasoning = new StringBuilder(); - while (matcher.find()) { - reasoning.append(matcher.group(1)); + /** + * Converts Ollama tool calls to the format expected by the Flink Agents framework. + * + *

This method transforms Ollama-specific tool call representations into a generic format + * that can be used by the Flink Agents framework. Each tool call is assigned a unique ID and + * structured with the appropriate function name and arguments. + * + * @param ollamaToolCalls the list of tool calls returned from Ollama API + * @return a list of tool calls formatted for Flink Agents, where each tool call is represented + * as a map containing id, type, and function details + */ + private List> convertToAgentsTools( + List ollamaToolCalls) { + final List> toolCalls = new ArrayList<>(ollamaToolCalls.size()); + for (OllamaChatToolCalls ollamaToolCall : ollamaToolCalls) { + final UUID id = UUID.randomUUID(); + final Map toolCall = + Map.of( + "id", + id, + "type", + "function", + "function", + Map.of( + "name", + ollamaToolCall.getFunction().getName(), + "arguments", + ollamaToolCall.getFunction().getArguments())); + toolCalls.add(toolCall); } - response = matcher.replaceAll("").strip(); - ChatMessage responseMessage = ChatMessage.assistant(response); - Map extraArgs = new HashMap<>(); - extraArgs.put("reasoning", reasoning.toString().strip()); - responseMessage.setExtraArgs(extraArgs); - return responseMessage; + return toolCalls; } } diff --git a/integrations/chat-models/ollama/src/main/java/org/apache/flink/agents/integrations/chatmodels/ollama/OllamaChatModelSetup.java b/integrations/chat-models/ollama/src/main/java/org/apache/flink/agents/integrations/chatmodels/ollama/OllamaChatModelSetup.java index 80bdeb33..8b78f8f9 100644 --- a/integrations/chat-models/ollama/src/main/java/org/apache/flink/agents/integrations/chatmodels/ollama/OllamaChatModelSetup.java +++ b/integrations/chat-models/ollama/src/main/java/org/apache/flink/agents/integrations/chatmodels/ollama/OllamaChatModelSetup.java @@ -56,11 +56,13 @@ public class OllamaChatModelSetup extends BaseChatModelSetup { private final String model; + private final boolean extractReasoning; public OllamaChatModelSetup( ResourceDescriptor descriptor, BiFunction getResource) { super(descriptor, getResource); this.model = descriptor.getArgument("model"); + this.extractReasoning = Boolean.parseBoolean(descriptor.getArgument("extract_reasoning")); } /** @@ -88,6 +90,7 @@ public OllamaChatModelSetup( public Map getParameters() { Map params = new HashMap<>(); params.put("model", model); + params.put("extract_reasoning", extractReasoning); return params; } } From f0753e60d7963a48990d1b5f28a1ddb59e697ee7 Mon Sep 17 00:00:00 2001 From: twosom Date: Tue, 21 Oct 2025 12:18:51 +0900 Subject: [PATCH 4/5] feat: upgrade ollama4j to 1.1.2 and refactor chat request builder Signed-off-by: hope --- integrations/chat-models/ollama/pom.xml | 2 +- .../chatmodels/ollama/OllamaChatModelConnection.java | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/integrations/chat-models/ollama/pom.xml b/integrations/chat-models/ollama/pom.xml index 4c704e26..04779623 100644 --- a/integrations/chat-models/ollama/pom.xml +++ b/integrations/chat-models/ollama/pom.xml @@ -46,7 +46,7 @@ under the License. io.github.ollama4j ollama4j - 1.1.1 + 1.1.2 diff --git a/integrations/chat-models/ollama/src/main/java/org/apache/flink/agents/integrations/chatmodels/ollama/OllamaChatModelConnection.java b/integrations/chat-models/ollama/src/main/java/org/apache/flink/agents/integrations/chatmodels/ollama/OllamaChatModelConnection.java index a606adc3..633514c7 100644 --- a/integrations/chat-models/ollama/src/main/java/org/apache/flink/agents/integrations/chatmodels/ollama/OllamaChatModelConnection.java +++ b/integrations/chat-models/ollama/src/main/java/org/apache/flink/agents/integrations/chatmodels/ollama/OllamaChatModelConnection.java @@ -188,15 +188,13 @@ public ChatMessage chat( .map(this::convertToOllamaChatMessages) .collect(Collectors.toList()); - final OllamaChatRequestBuilder chatRequestBuilder = - OllamaChatRequestBuilder.builder() + final OllamaChatRequest chatRequest = + OllamaChatRequest.builder() .withMessages(ollamaChatMessages) .withModel((String) arguments.get("model")) - .withThinking(extractReasoning); - - chatRequestBuilder.setUseTools(false); - - final OllamaChatRequest chatRequest = chatRequestBuilder.build(); + .withThinking(extractReasoning) + .withUseTools(false) + .build(); chatRequest.setTools(ollamaTools); final OllamaChatResult ollamaChatResult = this.caller.callSync(chatRequest); From 6e6eebb32c2678a7e1fc12c5931437c3e6f147e3 Mon Sep 17 00:00:00 2001 From: twosom Date: Tue, 21 Oct 2025 23:00:58 +0900 Subject: [PATCH 5/5] chore: simplify message list handling by always creating mutable copy --- .../agents/plan/actions/ChatModelAction.java | 40 ++----------------- 1 file changed, 3 insertions(+), 37 deletions(-) diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java index afe7d964..6fdbd922 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java @@ -80,11 +80,7 @@ public static void chat( toolCallContext.put(initialRequestId, messages); } List messageContext = - (List) toolCallContext.get(initialRequestId); - - if (isImmutable(messageContext)) { - messageContext = new ArrayList<>(messageContext); - } + new ArrayList<>((List) toolCallContext.get(initialRequestId)); messageContext.add(response); stm.set(TOOL_CALL_CONTEXT, toolCallContext); @@ -120,34 +116,6 @@ public static void chat( } } - /** - * Checks if the provided list of chat messages is an immutable collection. - * - *

This method determines whether the list is an instance of Java's immutable collections by - * checking specific class name patterns used by the JDK implementation. - * - *

Note: This implementation is JDK-specific and may need updates if the JDK's internal - * implementation changes in future versions. - * - * @param maybeImmutableList The list of chat messages to check - * @return {@code true} if the list is an immutable collection, {@code false} otherwise - */ - private static boolean isImmutable(List maybeImmutableList) { - String className = maybeImmutableList.getClass().getName(); - // Check for Collections.unmodifiableList() and similar - if (className.startsWith("java.util.ImmutableCollections") - || className.startsWith("java.util.Collections$Unmodifiable")) { - return true; - } - - // Check for List.of() (Java 9+) - if (className.startsWith("java.util.ImmutableCollections$List")) { - return true; - } - - return false; - } - /** * Built-in action for processing chat request and tool call result. * @@ -189,10 +157,8 @@ public static void processChatRequestOrToolResponse(Event event, RunnerContext c Map toolCallContext = (Map) stm.get(TOOL_CALL_CONTEXT).getValue(); // update tool call context - List messages = (List) toolCallContext.get(initialRequestId); - if (isImmutable(messages)) { - messages = new ArrayList<>(messages); - } + List messages = + new ArrayList<>((List) toolCallContext.get(initialRequestId)); for (Map.Entry entry : responses.entrySet()) { Map extraArgs = new HashMap<>();