diff --git a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java index 8efe6f46..df98e326 100644 --- a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java +++ b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java @@ -29,6 +29,8 @@ import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import javax.annotation.Nullable; + import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.HashMap; @@ -63,10 +65,11 @@ protected AgentsExecutionEnvironment() { * * @param env Optional StreamExecutionEnvironment for remote execution. If null, a local * execution environment will be created. + * @param tEnv Optional StreamTableEnvironment for table-to-stream conversion. * @return AgentsExecutionEnvironment appropriate for the execution context. */ public static AgentsExecutionEnvironment getExecutionEnvironment( - StreamExecutionEnvironment env) { + StreamExecutionEnvironment env, @Nullable StreamTableEnvironment tEnv) { if (env == null) { // Return local execution environment for testing/development try { @@ -86,14 +89,31 @@ public static AgentsExecutionEnvironment getExecutionEnvironment( "org.apache.flink.agents.runtime.env.RemoteExecutionEnvironment"); return (AgentsExecutionEnvironment) remoteEnvClass - .getDeclaredConstructor(StreamExecutionEnvironment.class) - .newInstance(env); + .getDeclaredConstructor( + StreamExecutionEnvironment.class, + StreamTableEnvironment.class) + .newInstance(env, tEnv); } catch (Exception e) { throw new RuntimeException("Failed to create RemoteExecutionEnvironment", e); } } } + /** + * Convenience method to get execution environment without Flink StreamTableEnvironment. If + * StreamTableEnvironment is needed during execution, the environment will auto crate using + * StreamExecutionEnvironment. + * + *

* @param env Optional StreamExecutionEnvironment for remote execution. If null, a local + * execution environment will be created. + * + * @return Remote execution environment for testing and development. + */ + public static AgentsExecutionEnvironment getExecutionEnvironment( + StreamExecutionEnvironment env) { + return getExecutionEnvironment(env, null); + } + /** * Convenience method to get execution environment without Flink integration. * @@ -155,23 +175,20 @@ public AgentBuilder fromDataStream(DataStream input) { * and processing it through agents. * * @param input Table to be processed by agents. - * @param tableEnv StreamTableEnvironment for table-to-stream conversion. * @param keySelector Optional KeySelector for extracting keys from table rows. * @param Type of the key extracted by the KeySelector. * @return AgentBuilder for configuring the agent pipeline. */ - public abstract AgentBuilder fromTable( - Table input, StreamTableEnvironment tableEnv, KeySelector keySelector); + public abstract AgentBuilder fromTable(Table input, KeySelector keySelector); /** * Set input for agents from a Table without keying. * * @param input Table to be processed by agents. - * @param tableEnv StreamTableEnvironment for table-to-stream conversion. * @return AgentBuilder for configuring the agent pipeline. */ - public AgentBuilder fromTable(Table input, StreamTableEnvironment tableEnv) { - return fromTable(input, tableEnv, null); + public AgentBuilder fromTable(Table input) { + return fromTable(input, null); } /** diff --git a/e2e-test/integration-test/pom.xml b/e2e-test/integration-test/pom.xml new file mode 100644 index 00000000..598da38b --- /dev/null +++ b/e2e-test/integration-test/pom.xml @@ -0,0 +1,69 @@ + + + + 4.0.0 + + org.apache.flink + flink-agents-e2e-tests + 0.1-SNAPSHOT + + + flink-agents-integration-tests + Flink Agents : E2E Tests: Integration + + + + org.apache.flink + flink-agents-api + ${project.version} + + + org.apache.flink + flink-agents-runtime + ${project.version} + + + org.apache.flink + flink-streaming-java + ${flink.version} + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + + + org.apache.flink + flink-table-planner_2.12 + ${flink.version} + + + org.apache.flink + flink-clients + ${flink.version} + + + org.apache.flink + flink-agents-integrations-chat-models-ollama + ${project.version} + + + + \ No newline at end of file diff --git a/examples/src/main/java/org/apache/flink/agents/examples/AgentWithOllama.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/AgentWithOllama.java similarity index 99% rename from examples/src/main/java/org/apache/flink/agents/examples/AgentWithOllama.java rename to e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/AgentWithOllama.java index b3714447..00c41448 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/AgentWithOllama.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/AgentWithOllama.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.agents.examples; +package org.apache.flink.agents.integration.test; import org.apache.flink.agents.api.Agent; import org.apache.flink.agents.api.InputEvent; diff --git a/examples/src/main/java/org/apache/flink/agents/examples/AgentWithOllamaExample.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/AgentWithOllamaExample.java similarity index 98% rename from examples/src/main/java/org/apache/flink/agents/examples/AgentWithOllamaExample.java rename to e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/AgentWithOllamaExample.java index 7a850c59..c58f6b49 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/AgentWithOllamaExample.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/AgentWithOllamaExample.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.agents.examples; +package org.apache.flink.agents.integration.test; import org.apache.flink.agents.api.AgentsExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; diff --git a/examples/src/main/java/org/apache/flink/agents/examples/AgentWithResource.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/AgentWithResource.java similarity index 99% rename from examples/src/main/java/org/apache/flink/agents/examples/AgentWithResource.java rename to e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/AgentWithResource.java index 27ee17bb..101df786 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/AgentWithResource.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/AgentWithResource.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.agents.examples; +package org.apache.flink.agents.integration.test; import org.apache.flink.agents.api.Agent; import org.apache.flink.agents.api.InputEvent; diff --git a/examples/src/main/java/org/apache/flink/agents/examples/AgentWithResourceExample.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/AgentWithResourceExample.java similarity index 98% rename from examples/src/main/java/org/apache/flink/agents/examples/AgentWithResourceExample.java rename to e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/AgentWithResourceExample.java index 8ca3c75f..c1828aa5 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/AgentWithResourceExample.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/AgentWithResourceExample.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.agents.examples; +package org.apache.flink.agents.integration.test; import org.apache.flink.agents.api.AgentsExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; diff --git a/examples/src/main/java/org/apache/flink/agents/examples/DataStreamAgent.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamAgent.java similarity index 96% rename from examples/src/main/java/org/apache/flink/agents/examples/DataStreamAgent.java rename to e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamAgent.java index ffc10500..860412b0 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/DataStreamAgent.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamAgent.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.agents.examples; +package org.apache.flink.agents.integration.test; import org.apache.flink.agents.api.Agent; import org.apache.flink.agents.api.Event; @@ -25,7 +25,7 @@ import org.apache.flink.agents.api.context.MemoryObject; import org.apache.flink.agents.api.context.MemoryRef; import org.apache.flink.agents.api.context.RunnerContext; -import org.apache.flink.agents.examples.DataStreamIntegrationExample.ItemData; +import org.apache.flink.agents.integration.test.DataStreamIntegrationExample.ItemData; /** * A simple example agent used for explaining integrating agents with DataStream. diff --git a/examples/src/main/java/org/apache/flink/agents/examples/DataStreamIntegrationExample.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamIntegrationExample.java similarity index 98% rename from examples/src/main/java/org/apache/flink/agents/examples/DataStreamIntegrationExample.java rename to e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamIntegrationExample.java index b3a38602..f87ec854 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/DataStreamIntegrationExample.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamIntegrationExample.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.agents.examples; +package org.apache.flink.agents.integration.test; import org.apache.flink.agents.api.AgentsExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; diff --git a/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamTableIntegrationExample.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamTableIntegrationExample.java new file mode 100644 index 00000000..625eb478 --- /dev/null +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamTableIntegrationExample.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.integration.test; + +import org.apache.flink.agents.api.AgentsExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; + +public class DataStreamTableIntegrationExample { + /** Simple data class for the example. */ + public static class ItemData { + public final int id; + public final String name; + public final double value; + public int visit_count; + + public ItemData(int id, String name, double value) { + this.id = id; + this.name = name; + this.value = value; + this.visit_count = 0; + } + + @Override + public String toString() { + return String.format( + "ItemData{id=%d, name='%s', value=%.2f,visit_count=%d}", + id, name, value, visit_count); + } + } + + /** Key selector for extracting keys from ItemData. */ + public static class ItemKeySelector + implements KeySelector { + @Override + public Integer getKey(DataStreamIntegrationExample.ItemData item) { + return item.id; + } + } + + public static void main(String[] args) throws Exception { + // Create the execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + // Create input DataStream + DataStream inputStream = + env.fromElements( + new DataStreamIntegrationExample.ItemData(1, "item1", 10.5), + new DataStreamIntegrationExample.ItemData(2, "item2", 20.0), + new DataStreamIntegrationExample.ItemData(3, "item3", 15.7), + new DataStreamIntegrationExample.ItemData(1, "item1_updated", 12.3), + new DataStreamIntegrationExample.ItemData(2, "item2_updated", 22.1), + new DataStreamIntegrationExample.ItemData(1, "item1_updated_again", 15.3)); + + // Create agents execution environment + AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + + // Define output schema + Schema outputSchema = Schema.newBuilder().column("f0", DataTypes.STRING()).build(); + + // Apply agent to the Table + Table outputTable = + agentsEnv + .fromDataStream( + inputStream, new DataStreamIntegrationExample.ItemKeySelector()) + .apply(new DataStreamAgent()) + .toTable(outputSchema); + + outputTable.execute().print(); + } +} diff --git a/examples/src/main/java/org/apache/flink/agents/examples/MemoryObjectAgent.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/MemoryObjectAgent.java similarity index 98% rename from examples/src/main/java/org/apache/flink/agents/examples/MemoryObjectAgent.java rename to e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/MemoryObjectAgent.java index bd34f8f7..a54316c3 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/MemoryObjectAgent.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/MemoryObjectAgent.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.agents.examples; +package org.apache.flink.agents.integration.test; import org.apache.flink.agents.api.Agent; import org.apache.flink.agents.api.Event; diff --git a/examples/src/main/java/org/apache/flink/agents/examples/MemoryObjectExample.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/MemoryObjectExample.java similarity index 97% rename from examples/src/main/java/org/apache/flink/agents/examples/MemoryObjectExample.java rename to e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/MemoryObjectExample.java index 8cac5670..a0e2304e 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/MemoryObjectExample.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/MemoryObjectExample.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.agents.examples; +package org.apache.flink.agents.integration.test; import org.apache.flink.agents.api.AgentsExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; diff --git a/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/ReActAgentExample.java similarity index 98% rename from examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java rename to e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/ReActAgentExample.java index 251a7c24..af997563 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/ReActAgentExample.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.agents.examples; +package org.apache.flink.agents.integration.test; import org.apache.flink.agents.api.Agent; import org.apache.flink.agents.api.AgentsExecutionEnvironment; @@ -66,7 +66,7 @@ public static void main(String[] args) throws Exception { // Create agents execution environment AgentsExecutionEnvironment agentsEnv = - AgentsExecutionEnvironment.getExecutionEnvironment(env); + AgentsExecutionEnvironment.getExecutionEnvironment(env, tableEnv); // register resource to agents execution environment. agentsEnv @@ -102,7 +102,6 @@ public static void main(String[] args) throws Exception { agentsEnv .fromTable( inputTable, - tableEnv, (KeySelector) value -> (Double) ((Row) value).getField("a")) .apply(agent) diff --git a/examples/src/main/java/org/apache/flink/agents/examples/SimpleAgent.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/SimpleAgent.java similarity index 98% rename from examples/src/main/java/org/apache/flink/agents/examples/SimpleAgent.java rename to e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/SimpleAgent.java index 121596e5..991162ee 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/SimpleAgent.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/SimpleAgent.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.agents.examples; +package org.apache.flink.agents.integration.test; import org.apache.flink.agents.api.Agent; import org.apache.flink.agents.api.Event; diff --git a/examples/src/main/java/org/apache/flink/agents/examples/TableAgent.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableAgent.java similarity index 98% rename from examples/src/main/java/org/apache/flink/agents/examples/TableAgent.java rename to e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableAgent.java index 44a79802..8582b176 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/TableAgent.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableAgent.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.agents.examples; +package org.apache.flink.agents.integration.test; import org.apache.flink.agents.api.Agent; import org.apache.flink.agents.api.Event; diff --git a/examples/src/main/java/org/apache/flink/agents/examples/TableIntegrationExample.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableIntegrationExample.java similarity index 96% rename from examples/src/main/java/org/apache/flink/agents/examples/TableIntegrationExample.java rename to e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableIntegrationExample.java index abce29c2..89835d73 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/TableIntegrationExample.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableIntegrationExample.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.agents.examples; +package org.apache.flink.agents.integration.test; import org.apache.flink.agents.api.AgentsExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; @@ -78,7 +78,7 @@ public static void main(String[] args) throws Exception { // Create agents execution environment AgentsExecutionEnvironment agentsEnv = - AgentsExecutionEnvironment.getExecutionEnvironment(env); + AgentsExecutionEnvironment.getExecutionEnvironment(env, tableEnv); // Define output schema Schema outputSchema = Schema.newBuilder().column("f0", DataTypes.STRING()).build(); @@ -86,7 +86,7 @@ public static void main(String[] args) throws Exception { // Apply agent to the Table Table outputTable = agentsEnv - .fromTable(inputTable, tableEnv, new RowKeySelector()) + .fromTable(inputTable, new RowKeySelector()) .apply(new TableAgent()) .toTable(outputSchema); diff --git a/e2e-test/pom.xml b/e2e-test/pom.xml index e2895cdc..c85bc2e2 100644 --- a/e2e-test/pom.xml +++ b/e2e-test/pom.xml @@ -30,5 +30,6 @@ under the License. Flink Agents : E2E Tests: agent-plan-compatibility-test + integration-test \ No newline at end of file diff --git a/python/flink_agents/api/execution_environment.py b/python/flink_agents/api/execution_environment.py index 9ccc4a4e..a198e6c9 100644 --- a/python/flink_agents/api/execution_environment.py +++ b/python/flink_agents/api/execution_environment.py @@ -106,7 +106,9 @@ def resources(self) -> Dict[ResourceType, Dict[str, Any]]: @staticmethod def get_execution_environment( - env: StreamExecutionEnvironment | None = None, **kwargs: Dict[str, Any] + env: StreamExecutionEnvironment | None = None, + t_env: StreamTableEnvironment | None = None, + **kwargs: Dict[str, Any], ) -> "AgentsExecutionEnvironment": """Get agents execution environment. @@ -123,13 +125,13 @@ def get_execution_environment( if env is None: return importlib.import_module( "flink_agents.runtime.local_execution_environment" - ).create_instance(env=env, **kwargs) + ).create_instance(env=env, t_env=t_env, **kwargs) else: for path in files("flink_agents.lib").iterdir(): env.add_jars(f"file://{path}") return importlib.import_module( "flink_agents.runtime.remote_execution_environment" - ).create_instance(env=env, **kwargs) + ).create_instance(env=env, t_env=t_env, **kwargs) @abstractmethod def get_config(self, path: str | None = None) -> Configuration: @@ -180,7 +182,6 @@ def from_datastream( def from_table( self, input: Table, - t_env: StreamTableEnvironment, key_selector: KeySelector | Callable | None = None, ) -> AgentBuilder: """Set input for agents. Used for remote execution. diff --git a/python/flink_agents/e2e_tests/__init__.py b/python/flink_agents/e2e_tests/__init__.py new file mode 100644 index 00000000..e154fadd --- /dev/null +++ b/python/flink_agents/e2e_tests/__init__.py @@ -0,0 +1,17 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# diff --git a/python/flink_agents/examples/agent_example.py b/python/flink_agents/e2e_tests/agent_example.py similarity index 100% rename from python/flink_agents/examples/agent_example.py rename to python/flink_agents/e2e_tests/agent_example.py diff --git a/python/flink_agents/examples/chat_model_example.py b/python/flink_agents/e2e_tests/chat_model_example.py similarity index 100% rename from python/flink_agents/examples/chat_model_example.py rename to python/flink_agents/e2e_tests/chat_model_example.py diff --git a/python/flink_agents/examples/common_tools.py b/python/flink_agents/e2e_tests/common_tools.py similarity index 100% rename from python/flink_agents/examples/common_tools.py rename to python/flink_agents/e2e_tests/common_tools.py diff --git a/python/flink_agents/e2e_tests/from_datastream_to_table.py b/python/flink_agents/e2e_tests/from_datastream_to_table.py new file mode 100644 index 00000000..02d7151f --- /dev/null +++ b/python/flink_agents/e2e_tests/from_datastream_to_table.py @@ -0,0 +1,108 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from pathlib import Path + +from pyflink.common import Duration, WatermarkStrategy +from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, RowTypeInfo +from pyflink.datastream import ( + KeySelector, + RuntimeExecutionMode, + StreamExecutionEnvironment, +) +from pyflink.datastream.connectors.file_system import FileSource, StreamFormat +from pyflink.table import DataTypes, Schema, StreamTableEnvironment, TableDescriptor + +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.e2e_tests.my_agent import ( + DataStreamToTableAgent, + ItemData, +) + + +class MyKeySelector(KeySelector): + """KeySelector for extracting key.""" + + def get_key(self, value: ItemData) -> int: + """Extract key from ItemData.""" + return value.id + + +current_dir = Path(__file__).parent + +# if this example raises exception "No module named 'flink_agents'", you could set +# PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/ +# site-packages) in this file. +if __name__ == "__main__": + env = StreamExecutionEnvironment.get_execution_environment() + + env.set_runtime_mode(RuntimeExecutionMode.STREAMING) + env.set_parallelism(1) + t_env = StreamTableEnvironment.create(stream_execution_environment=env) + + # currently, bounded source is not supported due to runtime implementation, so + # we use continuous file source here. + input_datastream = env.from_source( + source=FileSource.for_record_stream_format( + StreamFormat.text_line_format(), f"file:///{current_dir}/resources" + ) + .monitor_continuously(Duration.of_minutes(1)) + .build(), + watermark_strategy=WatermarkStrategy.no_watermarks(), + source_name="streaming_agent_example", + ) + + deserialize_datastream = input_datastream.map( + lambda x: ItemData.model_validate_json(x) + ) + + agents_env = AgentsExecutionEnvironment.get_execution_environment( + env=env, t_env=t_env + ) + + output_type = ExternalTypeInfo( + RowTypeInfo( + [ + BasicTypeInfo.LONG_TYPE_INFO(), + BasicTypeInfo.STRING_TYPE_INFO(), + BasicTypeInfo.FLOAT_TYPE_INFO(), + ], + ["id", "review", "review_score"], + ) + ) + + schema = ( + Schema.new_builder() + .column("id", DataTypes.BIGINT()) + .column("review", DataTypes.STRING()) + .column("review_score", DataTypes.FLOAT()) + ).build() + + output_table = ( + agents_env.from_datastream( + input=deserialize_datastream, key_selector=MyKeySelector() + ) + .apply(DataStreamToTableAgent()) + .to_table(schema=schema, output_type=output_type) + ) + + t_env.create_temporary_table( + "sink", + TableDescriptor.for_connector("print").schema(schema).build(), + ) + + output_table.execute_insert("sink").wait() diff --git a/python/flink_agents/examples/integrate_datastream_with_agent_example.py b/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py similarity index 97% rename from python/flink_agents/examples/integrate_datastream_with_agent_example.py rename to python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py index 8e03d692..5f2e391b 100644 --- a/python/flink_agents/examples/integrate_datastream_with_agent_example.py +++ b/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py @@ -26,7 +26,7 @@ from pyflink.datastream.connectors.file_system import FileSource, StreamFormat from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.examples.my_agent import DataStreamAgent, ItemData +from flink_agents.e2e_tests.my_agent import DataStreamAgent, ItemData class MyKeySelector(KeySelector): diff --git a/python/flink_agents/examples/integrate_table_with_agent_example.py b/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py similarity index 95% rename from python/flink_agents/examples/integrate_table_with_agent_example.py rename to python/flink_agents/e2e_tests/integrate_table_with_agent_example.py index 2069d511..5983ae7b 100644 --- a/python/flink_agents/examples/integrate_table_with_agent_example.py +++ b/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py @@ -32,7 +32,7 @@ ) from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.examples.my_agent import TableAgent +from flink_agents.e2e_tests.my_agent import TableAgent current_dir = Path(__file__).parent @@ -74,7 +74,7 @@ def get_key(self, value: Row) -> int: table = t_env.from_path("source") - agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) + agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env, t_env=t_env) output_type = ExternalTypeInfo( RowTypeInfo( @@ -95,7 +95,7 @@ def get_key(self, value: Row) -> int: ).build() output_table = ( - agents_env.from_table(input=table, t_env=t_env, key_selector=MyKeySelector()) + agents_env.from_table(input=table, key_selector=MyKeySelector()) .apply(TableAgent()) .to_table(schema=schema, output_type=output_type) ) diff --git a/python/flink_agents/examples/integrate_table_with_react_agent_example.py b/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py similarity index 96% rename from python/flink_agents/examples/integrate_table_with_react_agent_example.py rename to python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py index c767c2db..30aaeb3c 100644 --- a/python/flink_agents/examples/integrate_table_with_react_agent_example.py +++ b/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py @@ -32,7 +32,7 @@ from flink_agents.api.prompts.prompt import Prompt from flink_agents.api.resource import ResourceDescriptor from flink_agents.api.tools.tool import Tool -from flink_agents.examples.common_tools import add, multiply +from flink_agents.e2e_tests.common_tools import add, multiply from flink_agents.integrations.chat_models.ollama_chat_model import ( OllamaChatModelConnection, OllamaChatModelSetup, @@ -72,7 +72,7 @@ def get_key(self, value: Row) -> int: ), ) - env = AgentsExecutionEnvironment.get_execution_environment(env=stream_env) + env = AgentsExecutionEnvironment.get_execution_environment(env=stream_env, t_env=t_env) # register resource to execution environment ( @@ -114,7 +114,7 @@ def get_key(self, value: Row) -> int: schema = (Schema.new_builder().column("result", DataTypes.INT())).build() output_table = ( - env.from_table(input=table, t_env=t_env, key_selector=MyKeySelector()) + env.from_table(input=table, key_selector=MyKeySelector()) .apply(agent) .to_table(schema=schema, output_type=output_type) ) diff --git a/python/flink_agents/examples/my_agent.py b/python/flink_agents/e2e_tests/my_agent.py similarity index 83% rename from python/flink_agents/examples/my_agent.py rename to python/flink_agents/e2e_tests/my_agent.py index 1af739e8..8b25a088 100644 --- a/python/flink_agents/examples/my_agent.py +++ b/python/flink_agents/e2e_tests/my_agent.py @@ -21,6 +21,7 @@ from typing import Any from pydantic import BaseModel +from pyflink.common import Row from flink_agents.api.agent import Agent from flink_agents.api.decorators import action, tool @@ -51,6 +52,7 @@ class ItemData(BaseModel): class MyEvent(Event): # noqa D101 value: Any + class DataStreamAgent(Agent): """Agent used for explaining integrating agents with DataStream. @@ -138,3 +140,30 @@ def second_action(event: Event, ctx: RunnerContext): # noqa D102 content = input content["review"] += " second action" ctx.send_event(OutputEvent(output=content)) + + +class DataStreamToTableAgent(Agent): + """Agent used for explaining integrating agents from table to datastream. + + Because pemja will find action in this class when execute Agent, we can't + define this class directly in example.py for module name will be set + to __main__. + """ + + @action(InputEvent) + @staticmethod + def first_action(event: Event, ctx: RunnerContext): # noqa D102 + input = event.input + content = copy.deepcopy(input) + content.review += " first action" + ctx.send_event(MyEvent(value=content)) + + @action(MyEvent) + @staticmethod + def second_action(event: Event, ctx: RunnerContext): # noqa D102 + input = event.value + content = input + content.review += " second action" + ctx.send_event( + OutputEvent(output=Row(**content.model_dump(exclude="memory_info"))) + ) diff --git a/python/flink_agents/examples/react_agent_example.py b/python/flink_agents/e2e_tests/react_agent_example.py similarity index 97% rename from python/flink_agents/examples/react_agent_example.py rename to python/flink_agents/e2e_tests/react_agent_example.py index 830bc6a3..6cfbfa42 100644 --- a/python/flink_agents/examples/react_agent_example.py +++ b/python/flink_agents/e2e_tests/react_agent_example.py @@ -25,7 +25,7 @@ from flink_agents.api.prompts.prompt import Prompt from flink_agents.api.resource import ResourceDescriptor from flink_agents.api.tools.tool import Tool -from flink_agents.examples.common_tools import add, multiply +from flink_agents.e2e_tests.common_tools import add, multiply from flink_agents.integrations.chat_models.ollama_chat_model import ( OllamaChatModelConnection, OllamaChatModelSetup, diff --git a/python/flink_agents/examples/resources/input_data.txt b/python/flink_agents/e2e_tests/resources/input_data.txt similarity index 100% rename from python/flink_agents/examples/resources/input_data.txt rename to python/flink_agents/e2e_tests/resources/input_data.txt diff --git a/python/flink_agents/runtime/local_execution_environment.py b/python/flink_agents/runtime/local_execution_environment.py index 87d4c1be..ebaf78cb 100644 --- a/python/flink_agents/runtime/local_execution_environment.py +++ b/python/flink_agents/runtime/local_execution_environment.py @@ -41,13 +41,17 @@ class LocalAgentBuilder(AgentBuilder): __config: AgentConfiguration def __init__( - self, env: "LocalExecutionEnvironment", input: List[Dict[str, Any]], config: AgentConfiguration + self, + env: "LocalExecutionEnvironment", + input: List[Dict[str, Any]], + config: AgentConfiguration, ) -> None: """Init empty output list.""" self.__env = env self.__input = input self.__output = [] self.__config = config + def apply(self, agent: Agent) -> AgentBuilder: """Create local runner to execute given agent. @@ -130,7 +134,7 @@ def execute(self) -> None: self.__output.append(output) def from_datastream( - self, input: DataStream, key_selector : KeySelector | Callable | None = None + self, input: DataStream, key_selector: KeySelector | Callable | None = None ) -> AgentBuilder: """Set input DataStream of agent execution. @@ -142,7 +146,6 @@ def from_datastream( def from_table( self, input: Table, - t_env: StreamTableEnvironment, key_selector: KeySelector | Callable | None = None, ) -> AgentBuilder: """Set input Table of agent execution. @@ -154,7 +157,7 @@ def from_table( def create_instance( - env: StreamExecutionEnvironment, **kwargs: Dict[str, Any] + env: StreamExecutionEnvironment, t_env: StreamTableEnvironment, **kwargs: Any ) -> AgentsExecutionEnvironment: """Factory function to create a remote agents execution environment. @@ -162,6 +165,8 @@ def create_instance( ---------- env : StreamExecutionEnvironment Flink job execution environment. + t_env: StreamTableEnvironment + Flink job execution table environment. **kwargs : Dict[str, Any] The dict of parameters to configure the execution environment. diff --git a/python/flink_agents/runtime/remote_execution_environment.py b/python/flink_agents/runtime/remote_execution_environment.py index 7d649c31..878c527a 100644 --- a/python/flink_agents/runtime/remote_execution_environment.py +++ b/python/flink_agents/runtime/remote_execution_environment.py @@ -66,6 +66,15 @@ def __init__( self.__config = config self.__resources = resources + @property + def t_env(self) -> StreamTableEnvironment: + """Get or crate table environment.""" + if self.__t_env is None: + self.__t_env = StreamTableEnvironment.create( + stream_execution_environment=self.__env + ) + return self.__t_env + def apply(self, agent: Agent) -> "AgentBuilder": """Set agent of execution environment. @@ -134,7 +143,7 @@ def to_table(self, schema: Schema, output_type: TypeInformation) -> Table: Table Output Table of agent execution. """ - return self.__t_env.from_data_stream(self.to_datastream(output_type), schema) + return self.t_env.from_data_stream(self.to_datastream(output_type), schema) def to_list(self) -> List[Dict[str, Any]]: """Get output list of agent execution. @@ -149,18 +158,33 @@ class RemoteExecutionEnvironment(AgentsExecutionEnvironment): """Implementation of AgentsExecutionEnvironment for execution with DataStream.""" __env: StreamExecutionEnvironment + __t_env: StreamTableEnvironment __config: AgentConfiguration - def __init__(self, env: StreamExecutionEnvironment) -> None: + def __init__( + self, + env: StreamExecutionEnvironment, + t_env: StreamTableEnvironment | None = None, + ) -> None: """Init method of RemoteExecutionEnvironment.""" super().__init__() self.__env = env + self.__t_env = t_env self.__config = AgentConfiguration() flink_conf_dir = os.environ.get("FLINK_CONF_DIR") if flink_conf_dir is not None: config_dir = Path(flink_conf_dir) / "config.yaml" self.__config.load_from_file(str(config_dir)) + @property + def t_env(self) -> StreamTableEnvironment: + """Get or crate table environment.""" + if self.__t_env is None: + self.__t_env = StreamTableEnvironment.create( + stream_execution_environment=self.__env + ) + return self.__t_env + def get_config(self, path: str | None = None) -> AgentConfiguration: """Get the writable configuration for flink agents. @@ -200,13 +224,15 @@ def from_datastream( input = self.__process_input_datastream(input, key_selector) return RemoteAgentBuilder( - input=input, config=self.__config, resources=self.resources + input=input, + config=self.__config, + t_env=self.__t_env, + resources=self.resources, ) def from_table( self, input: Table, - t_env: StreamTableEnvironment, key_selector: KeySelector | Callable | None = None, ) -> AgentBuilder: """Set input Table of agent. @@ -215,18 +241,19 @@ def from_table( ---------- input : Table Receive a Table as input. - t_env: StreamTableEnvironment - table environment supports convert table to/from datastream. key_selector : KeySelector Extract key from each input record. """ - input = t_env.to_data_stream(table=input) + input = self.t_env.to_data_stream(table=input) input = input.map(lambda x: x, output_type=PickledBytesTypeInfo()) input = self.__process_input_datastream(input, key_selector) return RemoteAgentBuilder( - input=input, config=self.__config, t_env=t_env, resources=self.resources + input=input, + config=self.__config, + t_env=self.t_env, + resources=self.resources, ) def from_list(self, input: List[Dict[str, Any]]) -> "AgentsExecutionEnvironment": @@ -243,7 +270,7 @@ def execute(self) -> None: def create_instance( - env: StreamExecutionEnvironment, **kwargs: Dict[str, Any] + env: StreamExecutionEnvironment, t_env: StreamTableEnvironment, **kwargs: Any ) -> AgentsExecutionEnvironment: """Factory function to create a remote agents execution environment. @@ -251,6 +278,8 @@ def create_instance( ---------- env : StreamExecutionEnvironment Flink job execution environment. + t_env : StreamTableEnvironment + Flink job execution table environment. **kwargs : Dict[str, Any] The dict of parameters to configure the execution environment. @@ -259,4 +288,4 @@ def create_instance( AgentsExecutionEnvironment A configured agents execution environment instance. """ - return RemoteExecutionEnvironment(env=env) + return RemoteExecutionEnvironment(env=env, t_env=t_env) diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java b/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java index 096d7aac..8b2eb292 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java @@ -34,6 +34,8 @@ import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import javax.annotation.Nullable; + import java.io.File; import java.util.HashMap; import java.util.List; @@ -48,17 +50,27 @@ public class RemoteExecutionEnvironment extends AgentsExecutionEnvironment { private final StreamExecutionEnvironment env; + private @Nullable StreamTableEnvironment tEnv; private final AgentConfiguration config; public static final String FLINK_CONF_FILENAME = "config.yaml"; - public RemoteExecutionEnvironment(StreamExecutionEnvironment env) { + public RemoteExecutionEnvironment( + StreamExecutionEnvironment env, @Nullable StreamTableEnvironment tEnv) { this.env = env; + this.tEnv = tEnv; final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); this.config = loadAgentConfiguration(configDir); } + private StreamTableEnvironment getTableEnvironment() { + if (tEnv == null) { + tEnv = StreamTableEnvironment.create(env); + } + return tEnv; + } + @Override public AgentConfiguration getConfig() { return config; @@ -72,13 +84,13 @@ public AgentBuilder fromList(List input) { @Override public AgentBuilder fromDataStream(DataStream input, KeySelector keySelector) { - return new RemoteAgentBuilder<>(input, keySelector, env, config, resources); + return new RemoteAgentBuilder<>(input, tEnv, keySelector, env, config, resources); } @Override - public AgentBuilder fromTable( - Table input, StreamTableEnvironment tableEnv, KeySelector keySelector) { - return new RemoteAgentBuilder<>(input, tableEnv, keySelector, env, config, resources); + public AgentBuilder fromTable(Table input, KeySelector keySelector) { + return new RemoteAgentBuilder<>( + input, getTableEnvironment(), keySelector, env, config, resources); } @Override @@ -110,7 +122,7 @@ private static class RemoteAgentBuilder implements AgentBuilder { private final DataStream inputDataStream; private final KeySelector keySelector; private final StreamExecutionEnvironment env; - private final StreamTableEnvironment tableEnv; + private @Nullable StreamTableEnvironment tableEnv; private final AgentConfiguration config; private final Map> resources; @@ -120,6 +132,7 @@ private static class RemoteAgentBuilder implements AgentBuilder { // Constructor for DataStream input public RemoteAgentBuilder( DataStream inputDataStream, + @Nullable StreamTableEnvironment tableEnv, KeySelector keySelector, StreamExecutionEnvironment env, AgentConfiguration config, @@ -127,7 +140,7 @@ public RemoteAgentBuilder( this.inputDataStream = inputDataStream; this.keySelector = keySelector; this.env = env; - this.tableEnv = null; + this.tableEnv = tableEnv; this.config = config; this.resources = resources; } @@ -149,6 +162,13 @@ public RemoteAgentBuilder( this.resources = resources; } + private StreamTableEnvironment getTableEnvironment() { + if (tableEnv == null) { + tableEnv = StreamTableEnvironment.create(env); + } + return tableEnv; + } + @Override public AgentBuilder apply(Agent agent) { try { @@ -189,13 +209,8 @@ public DataStream toDataStream() { @Override public Table toTable(Schema schema) { - if (tableEnv == null) { - throw new IllegalStateException( - "Table environment not available. Use fromTable() method to enable table output."); - } - DataStream dataStream = toDataStream(); - return tableEnv.fromDataStream(dataStream, schema); + return getTableEnvironment().fromDataStream(dataStream, schema); } } }