feat(vector): Add Spark VECTOR Search TVF with intial KNN algorithm#18432
feat(vector): Add Spark VECTOR Search TVF with intial KNN algorithm#18432yihua merged 15 commits intoapache:masterfrom
Conversation
Implements a new Spark TVF `hudi_vector_search` that performs exact nearest-neighbor search over embeddings stored in a Hudi table using brute-force KNN, without requiring a persistent vector index. New files: - HoodieVectorSearchTableValuedFunction: unresolved logical node holding raw args; parses single-query (table, col, ARRAY(...), k [, metric]) and batch-query (corpus, col, query_table, col, k [, metric]) modes; supports cosine, l2/euclidean, dot_product distance metrics. - HoodieVectorSearchPlanBuilder: builds the execution plan — single-query mode uses withColumn + orderBy + limit(k); batch-query mode uses crossJoin(broadcast(queries)) + window rank per _query_id; handles Float/Double/Byte embedding types via cast to Double before UDF. Fixes cross-join column ambiguity when corpus and query share column names by renaming clashing query columns to _query_<colname> before the join. Modified files: - TableValuedFunctions: registers hudi_vector_search alongside existing Hudi TVFs so all Spark adapters (3.3/3.4/3.5/4.0) inject it via SparkSessionExtensions. - HoodieSparkBaseAnalysis (ResolveReferences): resolves HoodieVectorSearchTableValuedFunction — evaluates the query vector constant at analysis time, resolves table by name or path, delegates to HoodieVectorSearchPlanBuilder. - TestHoodieVectorSearchFunction: end-to-end tests covering single/batch query modes, all three distance metrics, DataFrame API, path-based and view-based resolution, Float/Double embedding types, MOR tables, composability (WHERE, subqueries), error handling, exact distance verification, and same-column-name batch query regression test.
...cala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala
Outdated
Show resolved
Hide resolved
...cala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala
Outdated
Show resolved
Hide resolved
...cala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala
Outdated
Show resolved
Hide resolved
...cala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala
Outdated
Show resolved
Hide resolved
| while (i < numElements) { result(i) = arrayData.getFloat(i).toDouble; i += 1 } | ||
| case IntegerType => | ||
| while (i < numElements) { result(i) = arrayData.getInt(i).toDouble; i += 1 } | ||
| case LongType => |
There was a problem hiding this comment.
why is long type here, i thought VECTOR in hudi only supports float, double, int?
There was a problem hiding this comment.
Spark SQL can infer integer literals as LongType in some contexts (e.g., ARRAY(1, 2, 3) might produce LongType depending on how the expression is folded). Supporting it defensively here avoids a confusing runtime error for users, even if Hudi's VECTOR type itself only stores float/double/int.
There was a problem hiding this comment.
ok let me look into adding this.
...common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala
Outdated
Show resolved
Hide resolved
...common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala
Outdated
Show resolved
Hide resolved
| val meta = field.metadata | ||
| if (meta.contains(HoodieSchema.TYPE_METADATA_FIELD)) { | ||
| val typeDesc = meta.getString(HoodieSchema.TYPE_METADATA_FIELD) | ||
| val dimPattern = """VECTOR\((\d+)""".r |
There was a problem hiding this comment.
Is their a cleaner way of getting the dimension this also feels brittle
There was a problem hiding this comment.
Unfortunately Spark's ArrayType doesn't encode dimension in the schema, so you'd always need to inspect actual data. One option: require the user to pass dimension explicitly as a TVF argument, which avoids the first-row inspection entirely and makes validation upfront. Otherwise, evaluating the first row or the query vector length is about as clean as it gets.
There was a problem hiding this comment.
I actually think this is a cleaner solution we can leverage the HoodieSchema.parseTypeDescriptor and then if VECTOR type we can get the dimension.
private def extractVectorDimension(df: DataFrame, colName: String): Option[Int] = {
df.schema.fields.find(_.name == colName).flatMap { field =>
val meta = field.metadata
if (meta.contains(HoodieSchema.TYPE_METADATA_FIELD)) {
val typeDesc = meta.getString(HoodieSchema.TYPE_METADATA_FIELD)
Try(HoodieSchema.parseTypeDescriptor(typeDesc)) match {
case Success(v: HoodieSchema.Vector) => Some(v.getDimension)
case Success(_) => throw new HoodieAnalysisException(
s"Column '$colName' has type '$typeDesc' which is not a VECTOR type. " +
"Only VECTOR columns are supported for vector search.")
case Failure(e) => throw new HoodieAnalysisException(
s"Column '$colName' has malformed type metadata '$typeDesc': ${e.getMessage}")
}
} else None
}
}
...common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala
Outdated
Show resolved
Hide resolved
| // Rename any query column that clashes with a corpus column or internal columns. | ||
| // Uses a double prefix if the standard rename would itself clash (e.g. "id" -> "_query_id" | ||
| // would collide with the internal _query_id column). | ||
| val renamedQuery = queryWithId.columns.foldLeft(queryWithId) { (df, qCol) => |
There was a problem hiding this comment.
is there a easier way we can avoid clashes so we dont need to maintain this extra code?
There was a problem hiding this comment.
One common approach is to alias the DataFrames before the join (e.g., corpusDf.as("corpus") / queryDf.as("query")) and reference columns by qualified name throughout. That way you avoid the rename/restore dance entirely, though it does require all downstream column references to be qualified.
There was a problem hiding this comment.
@yihua i think you are right that seems to be the cleanest solution let me try this out.
...common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala
Outdated
Show resolved
Hide resolved
...ce/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieVectorSearchFunction.scala
Outdated
Show resolved
Hide resolved
...ce/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieVectorSearchFunction.scala
Outdated
Show resolved
Hide resolved
...udi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/VectorDistanceUtils.scala
Outdated
Show resolved
Hide resolved
...udi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/VectorDistanceUtils.scala
Outdated
Show resolved
Hide resolved
...spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala
Outdated
Show resolved
Hide resolved
|
|
||
| // doc_4 [0.707,0.707,0]: cosine distance to [1,0,0] = 1 - 0.707 ~= 0.293 | ||
| assertEquals("doc_4", result(1).getAs[String]("id")) | ||
| assertEquals(1.0 - 0.70710678, result(1).getAs[Double]("_hudi_distance"), 1e-4) |
There was a problem hiding this comment.
is there a cleaner way we can do these assertions?
...udi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/VectorDistanceUtils.scala
Show resolved
Hide resolved
...cala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala
Outdated
Show resolved
Hide resolved
voonhous
left a comment
There was a problem hiding this comment.
Let's fix the PR validation failures. LGTM
@rahil-c could you check the lines that miss coverage and see if more tests should be added? |
yihua
left a comment
There was a problem hiding this comment.
Nice work on adding the vector search TVF. Left a few comments.
| assertEquals(2, result.length) | ||
| assertEquals("b1", result(0).getAs[String]("id")) | ||
| assertEquals(0.0, result(0).getAs[Double]("_hudi_distance"), 1e-5) | ||
| // b2: sqrt(10^2 + 10^2) = sqrt(200) ~= 14.14 |
There was a problem hiding this comment.
Is requiring element-type match (float vs double) between corpus and query table intentional? Many vector search systems silently upcast. If a user stores embeddings as float but their query pipeline produces double, this would be a confusing error. Worth a comment in the TVF docs if this is by design.
There was a problem hiding this comment.
Yes I think this is intentional for now to minimize the complexity around doing this implicit casting. If you happen to know which system do this silent upcast let me know, or i can do some more research on which system allows this and follow up in another pr (I have some future optimization prs planned for this work so can batch it there)
For now can leave a comment in the implementation saying we have this defensive type matching requirement.
There was a problem hiding this comment.
to create Github issue for followup
There was a problem hiding this comment.
I was thinking of the Spark SQL support with VALUES clause with raw values in the query, and Spark can infer the value type. Not a blocker on this PR; we can follow up.
| requireSameLength(corpus.length, queryVector.length) | ||
| distFn(new DenseVector(corpus.iterator.map(_.toDouble).toArray), queryDv, queryNorm) | ||
| }) | ||
| case DoubleType => udf((corpus: Seq[Double]) => { |
There was a problem hiding this comment.
In the FloatType branch, corpus.iterator.map(_.toDouble).toArray allocates a full Array[Double] copy on every row. For the batch UDF factories (createFloatDistanceUdf at line 140), the same pattern appears for both arguments. Have you considered using a reusable buffer or at least corpus.view.map(...) to reduce GC pressure? For single-query mode this is on the hot path for every corpus row.
There was a problem hiding this comment.
let me look into this further.
There was a problem hiding this comment.
@yihua I tried to think of some alternatives but I think leveraging DenseVector from spark mllib makes this tricky, so thinking we might just have to document this for now.
Some ideas considered which are not ideal:
- Reusable buffer inside UDF is not safe. Spark doesn't guarantee single-threaded UDF execution per partition.
- corpus.view.map(...) does not help either. DenseVector constructor forces materialization anyway, so the array allocation still happens.
- Dropping the use of spark mllib's DenseVector, and inlining the math ourselves. We'd lose MLlib's optimized BLAS-backed dot() and sqdist(). For high-dimensional vectors, the native BLAS path is significantly faster than a manual Scala loop.
Based on this I think the Array[Double] allocation is fundamentally required by DenseVector. Every approach either has the same allocation or trades off(losing BLAS, unsafe concurrency).
There was a problem hiding this comment.
github follow for micro benchmark idea
There was a problem hiding this comment.
Got it. Overall, my concern is that per-vector processing through DenseVector could introduce latency overhead. Given this is the initial implementation, we can check in the code and should follow up with micro-benchmarks.
| private def resolveTableToDf(tableName: String): DataFrame = { | ||
| if (tableName.contains(StoragePath.SEPARATOR)) { | ||
| spark.read.format("hudi").load(tableName) | ||
| } else { |
There was a problem hiding this comment.
When tableName doesn't contain a path separator, spark.table(tableName) is called — but this won't resolve a multi-part identifier like catalog.db.table. Is that intentional? Also, if the table doesn't exist, the Spark exception won't mention hudi_vector_search, which could be confusing. It might be worth wrapping this in a try-catch that rethrows with a more contextual message.
There was a problem hiding this comment.
@yihua Actually it seems that spark.table(tableName) can resolve multipart indentifier. However will add the exception to make it more clear.
private def resolveTableToDf(tableName: String): DataFrame = {
try {
if (tableName.contains(StoragePath.SEPARATOR)) {
spark.read.format("hudi").load(tableName)
} else {
// spark.table() supports multi-part identifiers (e.g. catalog.db.table)
spark.table(tableName)
}
} catch {
case e: Exception => throw new HoodieAnalysisException(
s"hudi_vector_search: unable to resolve table '$tableName': ${e.getMessage}")
}
}
There was a problem hiding this comment.
rahil to test this once
...udi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/VectorDistanceUtils.scala
Show resolved
Hide resolved
...cala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala
Show resolved
Hide resolved
| // Verify output columns | ||
| val columns = resultDf.columns | ||
| assertTrue(columns.contains("_hudi_distance")) | ||
| assertTrue(columns.contains("_hudi_qid")) |
There was a problem hiding this comment.
Should _hudi_qid column value be validated?
...ce/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieVectorSearchFunction.scala
Show resolved
Hide resolved
...ce/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieVectorSearchFunction.scala
Outdated
Show resolved
Hide resolved
| |""".stripMargin | ||
| ).collect() | ||
|
|
||
| assertEquals(3, result.length) |
...ce/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieVectorSearchFunction.scala
Outdated
Show resolved
Hide resolved
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: Adds vector-search table-valued functions (hudi_vector_search, hudi_vector_search_batch) to Hudi Spark SQL, plus analysis, plan-building, distance UDFs, Maven dependency on Spark MLlib local, and an extensive end-to-end test suite.
Greptile Summary: This PR adds hudi_vector_search and hudi_vector_search_batch SQL table-valued functions for brute-force KNN vector similarity search over Hudi tables with support for cosine, L2, and dot-product distance metrics across float, double, and byte embeddings.
Key changes:
- New unresolved TVF plan nodes (
HoodieVectorSearchTableValuedFunction/HoodieVectorSearchBatchTableValuedFunction) with robust argument parsing for 4–7 arguments BruteForceSearchAlgorithmimplementing single-query (orderBy + limit) and batch-query (broadcast cross-join +row_number()window) plans behind a pluggableVectorSearchAlgorithmtrait that cleanly supports future algorithm additions (HNSW, RowMatrix, etc.)VectorDistanceUtilswith per-type UDF factories that pre-compute the queryDenseVectoronce for single-query mode, avoiding per-rowDenseVectorallocation overheadevaluateQueryVectorcorrectly handlesDecimalTypeinference for SQLARRAY(1.0, 0.5)literals, but is missing a type guard beforeasInstanceOf[ArrayData]that would surface as aClassCastExceptioninstead of a user-friendlyHoodieAnalysisExceptionwhen a non-array expression is supplied as the query vector
Greptile Confidence Score: 4/5
Safe to merge after the ClassCastException guard in evaluateQueryVector is fixed; remaining issues are non-blocking style improvements
The feature is well-structured with a clean pluggable algorithm design, pre-compute optimizations for single-query UDFs, and comprehensive test coverage. One concrete P1 bug (ClassCastException on invalid query vector type) is straightforward to fix with a two-line type guard. The prior NPE issue in parseK has already been addressed. Score of 4 reflects a single targeted fix remaining before the happy path to merge.
HoodieSparkBaseAnalysis.scala (evaluateQueryVector method, lines 359-361)
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant User as User / SQL
participant Analyzer as Spark Analyzer
participant ResolveRefs as ResolveReferences
participant PlanBuilder as HoodieVectorSearchPlanBuilder
participant DistanceUtils as VectorDistanceUtils
participant Executor as Spark Executor
User->>Analyzer: SELECT hudi_vector_search(...)
Analyzer->>ResolveRefs: Resolve TVF
ResolveRefs->>ResolveRefs: parse args, resolve table, eval query vector
ResolveRefs->>PlanBuilder: buildSingleQueryPlan(corpusDf, ..., queryVector, k, metric)
PlanBuilder->>DistanceUtils: create distance UDF
PlanBuilder->>PlanBuilder: build logical plan (filter, map distance, order, limit)
PlanBuilder->>Analyzer: return analyzed LogicalPlan
Analyzer->>Executor: execute plan
Executor->>Executor: compute distances, sort, return top-k
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant User as User / SQL
participant Analyzer as Spark Analyzer
participant ResolveRefs as ResolveReferences
participant PlanBuilder as HoodieVectorSearchPlanBuilder
participant DistanceUtils as VectorDistanceUtils
participant Executor as Spark Executor
User->>Analyzer: SELECT hudi_vector_search_batch(corpus, query, ...)
Analyzer->>ResolveRefs: Resolve TVF
ResolveRefs->>ResolveRefs: parse args, resolve corpus & query tables
ResolveRefs->>PlanBuilder: buildBatchQueryPlan(corpusDf, queryDf, ...)
PlanBuilder->>DistanceUtils: create distance UDF
PlanBuilder->>PlanBuilder: build plan (broadcast, compute distances, window rank, top-k)
PlanBuilder->>Analyzer: return analyzed LogicalPlan
Analyzer->>Executor: execute plan
Executor->>Executor: cross-join, compute distances, rank per query, return results
Sequence Diagram (Greptile):
sequenceDiagram
participant User as SQL User
participant Spark as Spark Analyzer
participant TVF as HoodieVectorSearch TVF
participant Analysis as ResolveReferences Rule
participant Builder as BruteForceSearchAlgorithm
participant UDF as VectorDistanceUtils
User->>Spark: SELECT * FROM hudi_vector_search(...)
Spark->>TVF: Create HoodieVectorSearchTableValuedFunction(args)
Spark->>Analysis: Apply ResolveReferences rule
Analysis->>TVF: parseArgs(args)
TVF-->>Analysis: ParsedArgs(table, embeddingCol, queryVectorExpr, k, metric)
Analysis->>Analysis: resolveTableToDf(table) → corpusDf
Analysis->>Analysis: evaluateQueryVector(expr) → Array[Double]
Analysis->>Builder: buildSingleQueryPlan(spark, corpusDf, col, queryVector, k, metric)
Builder->>Builder: validateEmbeddingColumn(corpusDf, col)
Builder->>UDF: createSingleQueryDistanceUdf(metric, elemType, queryVector)
UDF-->>Builder: distanceUdf (closes over queryDv + queryNorm)
Builder->>Builder: filteredDf.withColumn(_hudi_distance).drop(col).orderBy.limit(k)
Builder-->>Analysis: analyzed LogicalPlan
Analysis-->>Spark: Resolved plan replaces TVF node
Spark-->>User: Result rows with _hudi_distance column
CodeRabbit: yihua#10 (review)
Greptile: yihua#10 (review)
...cala/org/apache/spark/sql/catalyst/plans/logical/HoodieVectorSearchTableValuedFunction.scala
Show resolved
Hide resolved
| s"query vector element at index $i is null") | ||
| getElement(i) | ||
| }.toArray | ||
| } |
There was a problem hiding this comment.
Add validation that expr.dataType is ArrayType before casting.
If a user mistakenly passes a non-array expression as the query vector, lines 362 and 364 will throw ClassCastException instead of a descriptive HoodieAnalysisException. Consider adding an early check:
🛡️ Proposed fix to add type validation
private def evaluateQueryVector(expr: Expression): Array[Double] = {
+ expr.dataType match {
+ case _: ArrayType => // valid
+ case other => throw new HoodieAnalysisException(
+ s"Function '${HoodieVectorSearchTableValuedFunction.FUNC_NAME}': " +
+ s"query vector must be an array type, got: $other")
+ }
if (!expr.foldable) {
throw new HoodieAnalysisException(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala`
around lines 350 - 386, In evaluateQueryVector, add an early check that
expr.dataType is an ArrayType before casting: if it's not an ArrayType throw a
HoodieAnalysisException with a clear message that the query vector must be an
array of numeric types; then safely cast expr.dataType.asInstanceOf[ArrayType]
(used for elementType) and proceed with existing element handling (preserving
the existing numeric element type matching and null checks). This ensures
non-array inputs to evaluateQueryVector raise a descriptive
HoodieAnalysisException instead of a ClassCastException.
— CodeRabbit (original) (source:comment#3036004780)
There was a problem hiding this comment.
dont think is valid, not sure why user would pass a non array expression for this case, but will address
| ).collect() | ||
| }) | ||
| assertTrue(ex.getMessage.contains("nonexistent_col") || | ||
| ex.getCause.getMessage.contains("nonexistent_col")) |
There was a problem hiding this comment.
Stop dereferencing getCause blindly in these negative tests.
If Spark throws a top-level exception with no cause, these assertions NPE and hide the real regression. You already use the safer pattern on Lines 1404-1405; hoisting that into a shared helper would make these checks deterministic.
Suggested cleanup
+ private def rootMessage(t: Throwable): String =
+ Option(t.getCause).map(rootMessage).getOrElse(Option(t.getMessage).getOrElse(""))- assertTrue(ex.getMessage.contains("nonexistent_col") ||
- ex.getCause.getMessage.contains("nonexistent_col"))
+ assertTrue(rootMessage(ex).contains("nonexistent_col"))Also applies to: 565-566, 582-583
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieVectorSearchFunction.scala`
around lines 545 - 546, The test currently dereferences ex.getCause blindly (in
TestHoodieVectorSearchFunction) causing NPEs; change the negative assertions to
safely check both the top-level message and the cause only if present (e.g.,
check ex.getMessage contains "nonexistent_col" OR (ex.getCause != null &&
ex.getCause.getMessage contains "nonexistent_col")), and extract that logic into
a small shared helper (e.g., assertExceptionContains(Throwable ex, String
substr)) used by the failing tests instead of repeating the pattern so lines
545-546 (and the similar checks at the other locations) become deterministic and
null-safe.
✅ Addressed in commits 41bcb03 to f43a6ea
— CodeRabbit (original) (source:comment#3036004782)
There was a problem hiding this comment.
This is a minor problem. We can pass on this. I would suggest validating full exception message, as a general way of writing stronger tests.
...spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala
Show resolved
Hide resolved
|
|
||
| val arrayData = value.asInstanceOf[ArrayData] | ||
| val numElements = arrayData.numElements() | ||
| val elementType = expr.dataType.asInstanceOf[ArrayType].elementType |
There was a problem hiding this comment.
Unchecked cast to
ArrayData/ArrayType yields ClassCastException on non-array input
If a user passes any non-array foldable expression as the query_vector argument — e.g. hudi_vector_search('t', 'emb', 1.0, 5) or hudi_vector_search('t', 'emb', 'text', 5) — both value.asInstanceOf[ArrayData] and expr.dataType.asInstanceOf[ArrayType] throw an unhandled ClassCastException rather than a HoodieAnalysisException. The user sees a raw JVM stack trace with no hint about how to fix the call.
A type guard on expr.dataType should precede both casts:
| val elementType = expr.dataType.asInstanceOf[ArrayType].elementType | |
| if (!expr.dataType.isInstanceOf[ArrayType]) { | |
| throw new HoodieAnalysisException( | |
| s"Function '${HoodieVectorSearchTableValuedFunction.FUNC_NAME}': " + | |
| s"query vector must be an array type, got ${expr.dataType.simpleString}") | |
| } | |
| val arrayData = value.asInstanceOf[ArrayData] | |
| val numElements = arrayData.numElements() | |
| val elementType = expr.dataType.asInstanceOf[ArrayType].elementType |
— Greptile (original) (source:comment#3036042593)
There was a problem hiding this comment.
I still do not understand why the user would pass a single scaler value like
hudi_vector_search('t', 'emb', 1.0, 5) or hudi_vector_search('t', 'emb', 'text', 5).
I can address this but seems highly unlikely.
There was a problem hiding this comment.
I think this is a suggestion of defensive coding to avoid invalid input causing undefined behavior.
...common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieVectorSearchPlanBuilder.scala
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18432 +/- ##
============================================
+ Coverage 68.52% 68.75% +0.22%
- Complexity 27968 28064 +96
============================================
Files 2440 2449 +9
Lines 134456 134765 +309
Branches 16226 16319 +93
============================================
+ Hits 92138 92655 +517
+ Misses 35054 34793 -261
- Partials 7264 7317 +53
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
This PR adds distributed vector search capability to Hudi's Spark SQL integration by introducing two new table-valued functions for single-query and batch-query KNN search. Currently there is no way to perform nearest-neighbor search over embedding columns stored in Hudi tables without an external vector database.
Summary and Changelog
Users can now find k-nearest neighbors over Hudi tables using standard Spark SQL, with support for cosine, L2, and dot product distance metrics across float, double, and byte embedding types.
Changes:
hudi_vector_searchTVF for single-query KNN searchhudi_vector_search_batchTVF for batch-query KNN with broadcast cross-join and window-based top-k rankingImpact
Two new Spark SQL table-valued functions with output columns
_hudi_distanceand_hudi_qid. Addsspark-mllib-localdependency (local-only, no cluster overhead). No breaking changes to existing APIs, table format, or write/read paths.Risk Level
Low - Purely additive feature isolated in new files, registered alongside existing Hudi TVFs using the same SparkSessionExtensions mechanism.
Documentation Update
Requires documentation for TVF syntax/parameters, supported distance metrics and embedding types, output schema contract, and batch-query semantics.
Contributor's checklist