Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.connector.source;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.source.search.AsyncVectorSearchFunctionProvider;
import org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
import org.apache.flink.types.RowKind;

import java.io.Serializable;

/**
* A {@link DynamicTableSource} that search rows of an external storage system by one or more
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:
search ->searches
during -> at

* vectors during runtime.
*
* <p>Compared to {@link ScanTableSource}, the source does not have to read the entire table and can
* lazily fetch individual values from a (possibly continuously changing) external table when
* necessary.
*
* <p>Note: Compared to {@link ScanTableSource}, a {@link VectorSearchTableSource} does only support
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:
does only support -> only supports
I would remove the work currently.
I would remove "Further abilities are not supported." I think this is implied by the only word prior

* emitting insert-only changes currently (see also {@link RowKind}). Further abilities are not
* supported.
*
* <p>In the last step, the planner will call {@link #getSearchRuntimeProvider(VectorSearchContext)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it always the last step. It might be safer to ay after having ... then the planner will

* for obtaining a provider of runtime implementation. The search fields that are required to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
for obtaining a provider of runtime implementation -> to obtain the runtime implementation provider

* perform a search are derived from a query by the planner and will be provided in the given {@link
* VectorSearchTableSource.VectorSearchContext#getSearchColumns()}. The values for those key fields
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
those key fields -> the search fields

* are passed during runtime.
*/
@PublicEvolving
public interface VectorSearchTableSource extends DynamicTableSource {

/**
* Returns a provider of runtime implementation for reading the data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
Returns a provider of runtime implementation for reading the data. -> Returns a SearchRuntimeProvider

*
* <p>There exist different interfaces for runtime implementation which is why {@link
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
VectorSearchRuntimeProvider is a base interface that should be extended (is this true) by child interfaces for specialized Vector Searches.

* VectorSearchRuntimeProvider} serves as the base interface.
*
* <p>Independent of the provider interface, a source implementation can work on either
* arbitrary objects or internal data structures (see {@link org.apache.flink.table.data} for
* more information).
*
* <p>The given {@link VectorSearchContext} offers utilities by the planner for creating runtime
* implementation with minimal dependencies to internal data structures.
*
* @see VectorSearchFunctionProvider
* @see AsyncVectorSearchFunctionProvider
*/
VectorSearchRuntimeProvider getSearchRuntimeProvider(VectorSearchContext context);

// --------------------------------------------------------------------------------------------
// Helper interfaces
// --------------------------------------------------------------------------------------------

/**
* Context for creating runtime implementation via a {@link VectorSearchRuntimeProvider}.
*
* <p>It offers utilities by the planner for creating runtime implementation with minimal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
by the planner for -> for the planner to

* dependencies to internal data structures.
*
* <p>Methods should be called in {@link #getSearchRuntimeProvider(VectorSearchContext)}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what "Methods should be called in " means. Maybe "classes needing access to the Vector search information should use {@link #getSearchRuntimeProvider(VectorSearchContext)}"

* Returned instances that are {@link Serializable} can be directly passed into the runtime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what the returned instances are here, the provider or what it provides?

* implementation class.
*/
@PublicEvolving
interface VectorSearchContext extends DynamicTableSource.Context {

/**
* Returns an array of key index paths that should be used during the search. The indices
* are 0-based and support composite keys within (possibly nested) structures.
*
* <p>For example, given a table with data type {@code ROW < i INT, s STRING, r ROW < i2
* INT, s2 STRING > >}, this method would return {@code [[0], [2, 1]]} when {@code i} and
* {@code s2} are used for performing a lookup.
*
* @return array of key index paths
*/
int[][] getSearchColumns();

/**
* Runtime config provided to provider. The config can be used by planner or vector search
Copy link
Contributor

@davidradl davidradl Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:
provider -> the provider
planner -> the planner

* provider at runtime. For example, async options can be used by planner to choose async
* inference. Other config such as http timeout or retry can be used to configure search
* functions.
*/
ReadableConfig runtimeConfig();
}

/**
* Provides actual runtime implementation for reading the data.
*
* <p>There exist different interfaces for runtime implementation which is why {@link
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
exist -> exists

* VectorSearchRuntimeProvider} serves as the base interface.
*
* @see VectorSearchFunctionProvider
* @see AsyncVectorSearchFunctionProvider
*/
@PublicEvolving
interface VectorSearchRuntimeProvider {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.connector.source.search;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.connector.source.VectorSearchTableSource;
import org.apache.flink.table.functions.AsyncVectorSearchFunction;

/** A provider for creating {@link AsyncVectorSearchFunction}. */
@PublicEvolving
public interface AsyncVectorSearchFunctionProvider
extends VectorSearchTableSource.VectorSearchRuntimeProvider {

/** Helper function for creating a static provider. */
static AsyncVectorSearchFunctionProvider of(
AsyncVectorSearchFunction asyncVectorSearchFunction) {
return () -> asyncVectorSearchFunction;
}

/** Creates an {@link AsyncVectorSearchFunction} instance. */
AsyncVectorSearchFunction createAsyncVectorSearchFunction();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.connector.source.search;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.connector.source.VectorSearchTableSource;
import org.apache.flink.table.functions.VectorSearchFunction;

/** A provider for creating {@link VectorSearchFunction}. */
@PublicEvolving
public interface VectorSearchFunctionProvider
extends VectorSearchTableSource.VectorSearchRuntimeProvider {

/** Helper function for creating a static provider. */
static VectorSearchFunctionProvider of(VectorSearchFunction searchFunction) {
return () -> searchFunction;
}

/** Creates an {@link VectorSearchFunction} instance. */
VectorSearchFunction createVectorSearchFunction();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.functions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
* A wrapper class of {@link AsyncTableFunction} for asynchronous vector search.
*
* <p>The output type of this table function is fixed as {@link RowData}.
*/
@PublicEvolving
public abstract class AsyncVectorSearchFunction extends AsyncTableFunction<RowData> {

/**
* Asynchronously search result based on input row to find topK matched rows.
*
* @param topK - The number of topK matched rows to return.
* @param queryData - A {@link RowData} that wraps input for search function.
* @return A collection of all searched results.
*/
public abstract CompletableFuture<Collection<RowData>> asyncVectorSearch(
int topK, RowData queryData);

/** Invokes {@link #asyncVectorSearch} and chains futures. */
public void eval(CompletableFuture<Collection<RowData>> future, Object... args) {
int topK = (int) args[0];
GenericRowData argsData = GenericRowData.of(args[1]);
asyncVectorSearch(topK, argsData)
.whenComplete(
(result, exception) -> {
if (exception != null) {
future.completeExceptionally(
new TableException(
String.format(
"Failed to execute asynchronously search with input row %s.",
argsData),
exception));
return;
}
future.complete(result);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.functions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkRuntimeException;

import java.io.IOException;
import java.util.Collection;

/**
* A wrapper class of {@link TableFunction} for synchronous vector search.
*
* <p>The output type of this table function is fixed as {@link RowData}.
*/
@PublicEvolving
public abstract class VectorSearchFunction extends TableFunction<RowData> {

/**
* Synchronously search result based on input row to find topK matched rows.
*
* @param topK - The number of topK results to return.
* @param queryData - A {@link RowData} that wraps input for vector search function.
* @return A collection of predicted results.
*/
public abstract Collection<RowData> vectorSearch(int topK, RowData queryData)
throws IOException;

/** Invoke {@link #vectorSearch} and handle exceptions. */
public final void eval(Object... args) {
int topK = (int) args[0];
GenericRowData argsData = GenericRowData.of(args[1]);
try {
Collection<RowData> results = vectorSearch(topK, argsData);
if (results == null) {
return;
}
results.forEach(this::collect);
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Failed to execute search with input row %s.", argsData), e);
}
}
}