From ee7427350f048bde21219a9783857bc96d9a3af7 Mon Sep 17 00:00:00 2001 From: Shengkai <1059623455@qq.com> Date: Thu, 25 Sep 2025 11:17:28 +0800 Subject: [PATCH] [FLINK-38423][table-api] Add VECTOR_SEARCH connector API --- .../source/VectorSearchTableSource.java | 117 ++++++++++++++++++ .../AsyncVectorSearchFunctionProvider.java | 38 ++++++ .../search/VectorSearchFunctionProvider.java | 37 ++++++ .../functions/AsyncVectorSearchFunction.java | 66 ++++++++++ .../table/functions/VectorSearchFunction.java | 62 ++++++++++ 5 files changed, 320 insertions(+) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/VectorSearchTableSource.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/AsyncVectorSearchFunctionProvider.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/VectorSearchFunctionProvider.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncVectorSearchFunction.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/VectorSearchFunction.java diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/VectorSearchTableSource.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/VectorSearchTableSource.java new file mode 100644 index 0000000000000..b10a9896dae9b --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/VectorSearchTableSource.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.connector.source.search.AsyncVectorSearchFunctionProvider; +import org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider; +import org.apache.flink.types.RowKind; + +import java.io.Serializable; + +/** + * A {@link DynamicTableSource} that search rows of an external storage system by one or more + * vectors during runtime. + * + *

Compared to {@link ScanTableSource}, the source does not have to read the entire table and can + * lazily fetch individual values from a (possibly continuously changing) external table when + * necessary. + * + *

Note: Compared to {@link ScanTableSource}, a {@link VectorSearchTableSource} does only support + * emitting insert-only changes currently (see also {@link RowKind}). Further abilities are not + * supported. + * + *

In the last step, the planner will call {@link #getSearchRuntimeProvider(VectorSearchContext)} + * for obtaining a provider of runtime implementation. The search fields that are required to + * perform a search are derived from a query by the planner and will be provided in the given {@link + * VectorSearchTableSource.VectorSearchContext#getSearchColumns()}. The values for those key fields + * are passed during runtime. + */ +@PublicEvolving +public interface VectorSearchTableSource extends DynamicTableSource { + + /** + * Returns a provider of runtime implementation for reading the data. + * + *

There exist different interfaces for runtime implementation which is why {@link + * VectorSearchRuntimeProvider} serves as the base interface. + * + *

Independent of the provider interface, a source implementation can work on either + * arbitrary objects or internal data structures (see {@link org.apache.flink.table.data} for + * more information). + * + *

The given {@link VectorSearchContext} offers utilities by the planner for creating runtime + * implementation with minimal dependencies to internal data structures. + * + * @see VectorSearchFunctionProvider + * @see AsyncVectorSearchFunctionProvider + */ + VectorSearchRuntimeProvider getSearchRuntimeProvider(VectorSearchContext context); + + // -------------------------------------------------------------------------------------------- + // Helper interfaces + // -------------------------------------------------------------------------------------------- + + /** + * Context for creating runtime implementation via a {@link VectorSearchRuntimeProvider}. + * + *

It offers utilities by the planner for creating runtime implementation with minimal + * dependencies to internal data structures. + * + *

Methods should be called in {@link #getSearchRuntimeProvider(VectorSearchContext)}. + * Returned instances that are {@link Serializable} can be directly passed into the runtime + * implementation class. + */ + @PublicEvolving + interface VectorSearchContext extends DynamicTableSource.Context { + + /** + * Returns an array of key index paths that should be used during the search. The indices + * are 0-based and support composite keys within (possibly nested) structures. + * + *

For example, given a table with data type {@code ROW < i INT, s STRING, r ROW < i2 + * INT, s2 STRING > >}, this method would return {@code [[0], [2, 1]]} when {@code i} and + * {@code s2} are used for performing a lookup. + * + * @return array of key index paths + */ + int[][] getSearchColumns(); + + /** + * Runtime config provided to provider. The config can be used by planner or vector search + * provider at runtime. For example, async options can be used by planner to choose async + * inference. Other config such as http timeout or retry can be used to configure search + * functions. + */ + ReadableConfig runtimeConfig(); + } + + /** + * Provides actual runtime implementation for reading the data. + * + *

There exist different interfaces for runtime implementation which is why {@link + * VectorSearchRuntimeProvider} serves as the base interface. + * + * @see VectorSearchFunctionProvider + * @see AsyncVectorSearchFunctionProvider + */ + @PublicEvolving + interface VectorSearchRuntimeProvider {} +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/AsyncVectorSearchFunctionProvider.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/AsyncVectorSearchFunctionProvider.java new file mode 100644 index 0000000000000..9dd7a5083dca8 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/AsyncVectorSearchFunctionProvider.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.search; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.source.VectorSearchTableSource; +import org.apache.flink.table.functions.AsyncVectorSearchFunction; + +/** A provider for creating {@link AsyncVectorSearchFunction}. */ +@PublicEvolving +public interface AsyncVectorSearchFunctionProvider + extends VectorSearchTableSource.VectorSearchRuntimeProvider { + + /** Helper function for creating a static provider. */ + static AsyncVectorSearchFunctionProvider of( + AsyncVectorSearchFunction asyncVectorSearchFunction) { + return () -> asyncVectorSearchFunction; + } + + /** Creates an {@link AsyncVectorSearchFunction} instance. */ + AsyncVectorSearchFunction createAsyncVectorSearchFunction(); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/VectorSearchFunctionProvider.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/VectorSearchFunctionProvider.java new file mode 100644 index 0000000000000..fe50ad585df3c --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/VectorSearchFunctionProvider.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.search; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.source.VectorSearchTableSource; +import org.apache.flink.table.functions.VectorSearchFunction; + +/** A provider for creating {@link VectorSearchFunction}. */ +@PublicEvolving +public interface VectorSearchFunctionProvider + extends VectorSearchTableSource.VectorSearchRuntimeProvider { + + /** Helper function for creating a static provider. */ + static VectorSearchFunctionProvider of(VectorSearchFunction searchFunction) { + return () -> searchFunction; + } + + /** Creates an {@link VectorSearchFunction} instance. */ + VectorSearchFunction createVectorSearchFunction(); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncVectorSearchFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncVectorSearchFunction.java new file mode 100644 index 0000000000000..5641f559e9e57 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncVectorSearchFunction.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * A wrapper class of {@link AsyncTableFunction} for asynchronous vector search. + * + *

The output type of this table function is fixed as {@link RowData}. + */ +@PublicEvolving +public abstract class AsyncVectorSearchFunction extends AsyncTableFunction { + + /** + * Asynchronously search result based on input row to find topK matched rows. + * + * @param topK - The number of topK matched rows to return. + * @param queryData - A {@link RowData} that wraps input for search function. + * @return A collection of all searched results. + */ + public abstract CompletableFuture> asyncVectorSearch( + int topK, RowData queryData); + + /** Invokes {@link #asyncVectorSearch} and chains futures. */ + public void eval(CompletableFuture> future, Object... args) { + int topK = (int) args[0]; + GenericRowData argsData = GenericRowData.of(args[1]); + asyncVectorSearch(topK, argsData) + .whenComplete( + (result, exception) -> { + if (exception != null) { + future.completeExceptionally( + new TableException( + String.format( + "Failed to execute asynchronously search with input row %s.", + argsData), + exception)); + return; + } + future.complete(result); + }); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/VectorSearchFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/VectorSearchFunction.java new file mode 100644 index 0000000000000..3364e56de9b1f --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/VectorSearchFunction.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.FlinkRuntimeException; + +import java.io.IOException; +import java.util.Collection; + +/** + * A wrapper class of {@link TableFunction} for synchronous vector search. + * + *

The output type of this table function is fixed as {@link RowData}. + */ +@PublicEvolving +public abstract class VectorSearchFunction extends TableFunction { + + /** + * Synchronously search result based on input row to find topK matched rows. + * + * @param topK - The number of topK results to return. + * @param queryData - A {@link RowData} that wraps input for vector search function. + * @return A collection of predicted results. + */ + public abstract Collection vectorSearch(int topK, RowData queryData) + throws IOException; + + /** Invoke {@link #vectorSearch} and handle exceptions. */ + public final void eval(Object... args) { + int topK = (int) args[0]; + GenericRowData argsData = GenericRowData.of(args[1]); + try { + Collection results = vectorSearch(topK, argsData); + if (results == null) { + return; + } + results.forEach(this::collect); + } catch (Exception e) { + throw new FlinkRuntimeException( + String.format("Failed to execute search with input row %s.", argsData), e); + } + } +}