[FLINK-39568] YAML built-in AI Function and model providers overhaul#4394
[FLINK-39568] YAML built-in AI Function and model providers overhaul#4394yuxiqian wants to merge 9 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR overhauls the YAML-based AI function and model integration in Flink CDC. It removes the previous design where each model was a UDF subclass implementing both AI logic and model communication, and replaces it with: (1) a small SPI for "AI Model Clients" (AiModelClient + ability interfaces + AiModelClientFactory) loaded via ServiceLoader; (2) a fixed catalog of built-in AI SQL functions (AI_COMPLETE, AI_SUMMARIZE, AI_EMBED) wired into the SQL operator table and into Janino code generation; and (3) two new model provider modules (dummy, openai-compatible) that ship as separate jars. The pipeline YAML schema for pipeline.model is reshaped to use name + type + free-form options instead of model-name + class-name.
Changes:
- Introduce common AI client SPI (
AiModelClient,AiModelClientFactory,ModelContext,SupportsTextGeneration/SupportsEmbedding) and discover providers viaServiceLoaderfrom the composer. - Define built-in AI function metadata (
AiTextFunctionDef,AiEmbeddingFunctionDef), expose them throughAiFunctionSqlOperatorTable, and rewrite their SQL→Janino translation inJaninoCompilerplus a newAiFunctionsstatic utility. - Plumb
Map<String, AiModelClient>throughPostTransformOperator, projection/filter processors, and the expression compiler; renameModelDeffields and tighten YAML model parsing; remove the legacyflink-cdc-pipeline-modelmodule contents and adddummy+openai-compatiblesubmodules.
Reviewed changes
Copilot reviewed 53 out of 53 changed files in this pull request and generated 19 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-cdc-common/src/main/java/.../model/AiModelClient.java | New marker interface for serializable AI clients with default open/close. |
| flink-cdc-common/src/main/java/.../model/AiModelClientFactory.java | SPI factory contract with default option validation. |
| flink-cdc-common/src/main/java/.../model/ModelContext.java | Context passed to factories at assembly time. |
| flink-cdc-common/src/main/java/.../model/abilities/{SupportsTextGeneration,SupportsEmbedding}.java | Capability marker interfaces. |
| flink-cdc-common/src/test/java/.../model/AiModelClientFactoryTest.java | Tests for the default validate() logic. |
| flink-cdc-runtime/src/main/java/.../ai/AiTextFunctionDef.java | Enum holding prompt templates for text-gen functions. |
| flink-cdc-runtime/src/main/java/.../ai/AiEmbeddingFunctionDef.java | Enum holding type info for embedding functions. |
| flink-cdc-runtime/src/main/java/.../functions/impl/AiFunctions.java | Static helpers invoked from generated Janino code. |
| flink-cdc-runtime/src/main/java/.../parser/metadata/AiFunctionSqlOperatorTable.java | Builds Calcite operator table for AI functions. |
| flink-cdc-runtime/src/main/java/.../parser/metadata/TransformSqlOperatorTable.java | Removes obsolete AI_CHAT_PREDICT/GET_EMBEDDING/AI_LANGCHAIN_PREDICT. |
| flink-cdc-runtime/src/main/java/.../parser/{TransformParser,JaninoCompiler}.java | Wire new operator table; rewrite AI function model arg into a parameter reference. |
| flink-cdc-runtime/src/main/java/.../operators/transform/{PostTransformOperator,PostTransformOperatorBuilder,TransformProjectionProcessor,TransformFilterProcessor,ProjectionColumnProcessor,TransformExpressionCompiler}.java | Thread Map<String, AiModelClient> through all processors and lifecycle. |
| flink-cdc-runtime/src/test/java/.../parser/AiFunctionParserTest.java | Parser/projection translation tests for AI funcs. |
| flink-cdc-runtime/src/test/java/.../functions/impl/AiFunctionsTest.java | Unit tests for AiFunctions. |
| flink-cdc-runtime/src/test/java/.../UserDefinedFunctionDescriptorTest.java | Drops removed embedding-model assertion. |
| flink-cdc-runtime/pom.xml | Drops test dep on legacy flink-cdc-pipeline-model. |
| flink-cdc-pipeline-model/pom.xml + new submodule poms | Convert to multi-module aggregator with dummy + openai-compatible. |
| flink-cdc-pipeline-model/src/main/java/.../model/* | Delete legacy OpenAIChatModel, OpenAIEmbeddingModel, ModelOptions. |
| flink-cdc-pipeline-model/.../dummy/* | New deterministic dummy provider. |
| flink-cdc-pipeline-model/.../openai-compatible/* | New OpenAI-SDK-based provider with shading and SPI registration. |
| flink-cdc-composer/src/main/java/.../FlinkPipelineComposer.java + TransformTranslator.java | Drop pre-transform model plumbing; load model clients via SPI in post-transform. |
| flink-cdc-composer/src/main/java/.../definition/ModelDef.java | Rename fields to name/type/options. |
| flink-cdc-composer/src/test/...AiFunctionITCase.java + FlinkPipelineUdfITCase.java | New AI function ITs; remove disabled OpenAI manual test. |
| flink-cdc-composer/src/test/resources/META-INF/services/...AiModelClientFactory | Register dummy provider for tests. |
| flink-cdc-cli/src/main/java/.../YamlPipelineDefinitionParser.java | New model parsing: rename keys, validate identifier, deduplicate, allow object form. |
| flink-cdc-cli/src/test/java/.../YamlPipelineDefinitionParserTest.java + resources | Update expected YAML and add many new model parsing tests. |
| flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/* | New AI function e2e test, register new provider jars, update malformed YAML test. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This closes FLINK-39568.
Currently, YAML model is merely a UDF function, combining AI function logic and corresponding model implementations.
This PR introduce dynamically loaded "Model Providers" for low-level requests, and decouple them from actual AI functions.