From 5000cd6ddf8b4a7cdec168c66991ebd69942b242 Mon Sep 17 00:00:00 2001
From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com>
Date: Mon, 20 Apr 2026 09:58:19 +0800
Subject: [PATCH] [lumina] Reuse LuminaSearcher across queries via searcher
pool
---
.../lumina/index/LuminaSearcherPool.java | 116 ++++++++++++++
.../index/LuminaVectorGlobalIndexReader.java | 144 ++++++++----------
.../index/LuminaVectorGlobalIndexer.java | 7 +-
.../LuminaVectorGlobalIndexerFactory.java | 12 +-
.../index/LuminaVectorIndexOptions.java | 10 ++
.../lumina/index/LuminaVectorBenchmark.java | 2 +-
.../index/LuminaVectorGlobalIndexTest.java | 61 ++++++--
7 files changed, 257 insertions(+), 95 deletions(-)
create mode 100644 paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaSearcherPool.java
diff --git a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaSearcherPool.java b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaSearcherPool.java
new file mode 100644
index 000000000000..2c9552a6b1cf
--- /dev/null
+++ b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaSearcherPool.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.IOUtils;
+
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Pool of {@link LuminaIndex} instances keyed by index file identity ({@code filePath@size}).
+ *
+ *
Each entry holds the Lumina index open in native memory (including the loaded graph files),
+ * so repeated queries on the same shard avoid reloading the index from object storage.
+ *
+ *
At most one idle entry is kept per key. Under concurrent queries on the same shard, the last
+ * entry returned wins; displaced entries are closed automatically via the removal listener. The
+ * total number of idle entries across all keys is bounded by {@code maxSize}.
+ *
+ *
Thread-safe. Borrow/return semantics guarantee at most one thread uses a given entry at a
+ * time.
+ */
+public class LuminaSearcherPool {
+
+ static final long EXPIRE_AFTER_ACCESS_MINUTES = 30;
+
+ /** A borrowed searcher together with its backing stream and JNI bridge. */
+ static final class PooledEntry implements Closeable {
+ final LuminaIndex index;
+ final SeekableInputStream stream;
+ final LuminaVectorGlobalIndexReader.InputStreamFileInput fileInput;
+ final LuminaIndexMeta indexMeta;
+
+ PooledEntry(
+ LuminaIndex index,
+ SeekableInputStream stream,
+ LuminaVectorGlobalIndexReader.InputStreamFileInput fileInput,
+ LuminaIndexMeta indexMeta) {
+ this.index = index;
+ this.stream = stream;
+ this.fileInput = fileInput;
+ this.indexMeta = indexMeta;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeQuietly(index);
+ IOUtils.closeQuietly(stream);
+ }
+ }
+
+ @Nullable private final Cache idleCache;
+
+ public LuminaSearcherPool(int maxSize) {
+ if (maxSize <= 0) {
+ this.idleCache = null;
+ } else {
+ this.idleCache =
+ Caffeine.newBuilder()
+ .maximumSize(maxSize)
+ .expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES, TimeUnit.MINUTES)
+ .executor(Runnable::run)
+ .removalListener((k, v, c) -> IOUtils.closeQuietly((PooledEntry) v))
+ .build();
+ }
+ }
+
+ /**
+ * Borrow an idle entry for the given key, or {@code null} if the pool has none.
+ *
+ *
The caller must either {@link #returnEntry} or {@link PooledEntry#close} the entry when
+ * done.
+ */
+ @Nullable
+ public PooledEntry borrow(String key) {
+ if (idleCache == null) {
+ return null;
+ }
+ return idleCache.asMap().remove(key);
+ }
+
+ /**
+ * Return a previously borrowed entry to the pool. Any entry displaced by size eviction, TTL
+ * expiry, or key replacement is closed automatically via the removal listener.
+ */
+ public void returnEntry(String key, PooledEntry entry) {
+ if (idleCache == null) {
+ IOUtils.closeQuietly(entry);
+ return;
+ }
+ idleCache.put(key, entry);
+ }
+}
diff --git a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java
index df20e14f851c..27f3f7d42450 100644
--- a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java
+++ b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java
@@ -51,6 +51,9 @@
*
*
Each shard has exactly one Lumina index file. This reader loads the single index and performs
* vector similarity search.
+ *
+ *
On {@link #close()}, the index is returned to the {@link LuminaSearcherPool} rather than
+ * destroyed, so the native graph stays warm across queries and avoids repeated object storage GETs.
*/
public class LuminaVectorGlobalIndexReader implements GlobalIndexReader {
@@ -65,22 +68,24 @@ public class LuminaVectorGlobalIndexReader implements GlobalIndexReader {
private final GlobalIndexFileReader fileReader;
private final DataType fieldType;
private final LuminaVectorIndexOptions options;
+ private final LuminaSearcherPool searcherPool;
+ private final String poolKey;
- private volatile LuminaIndexMeta indexMeta;
- private volatile LuminaIndex index;
- private SeekableInputStream openStream;
- private InputStreamFileInput inputStreamFileInput;
+ private volatile LuminaSearcherPool.PooledEntry borrowed;
public LuminaVectorGlobalIndexReader(
GlobalIndexFileReader fileReader,
List ioMetas,
DataType fieldType,
- LuminaVectorIndexOptions options) {
+ LuminaVectorIndexOptions options,
+ LuminaSearcherPool searcherPool) {
checkArgument(ioMetas.size() == 1, "Expected exactly one index file per shard");
this.fileReader = fileReader;
this.ioMeta = ioMetas.get(0);
this.fieldType = fieldType;
this.options = options;
+ this.searcherPool = searcherPool;
+ this.poolKey = this.ioMeta.filePath().toString() + "@" + this.ioMeta.fileSize();
}
@Override
@@ -101,9 +106,9 @@ private ScoredGlobalIndexResult search(VectorSearch vectorSearch) throws IOExcep
validateSearchVector(vectorSearch.vector());
float[] queryVector = vectorSearch.vector().clone();
int limit = vectorSearch.limit();
- LuminaVectorMetric indexMetric = indexMeta.metric();
+ LuminaVectorMetric indexMetric = borrowed.indexMeta.metric();
- int effectiveK = (int) Math.min(limit, index.size());
+ int effectiveK = (int) Math.min(limit, borrowed.index.size());
if (effectiveK <= 0) {
return null;
}
@@ -132,18 +137,18 @@ private ScoredGlobalIndexResult search(VectorSearch vectorSearch) throws IOExcep
distances = new float[effectiveK];
labels = new long[effectiveK];
Map searchOptions = options.toLuminaOptions();
- searchOptions.putAll(indexMeta.options());
+ searchOptions.putAll(borrowed.indexMeta.options());
searchOptions.put("search.thread_safe_filter", "true");
ensureSearchListSize(searchOptions, effectiveK);
- index.searchWithFilter(
+ borrowed.index.searchWithFilter(
queryVector, 1, effectiveK, distances, labels, scopedIds, searchOptions);
} else {
distances = new float[effectiveK];
labels = new long[effectiveK];
Map searchOptions = options.toLuminaOptions();
- searchOptions.putAll(indexMeta.options());
+ searchOptions.putAll(borrowed.indexMeta.options());
ensureSearchListSize(searchOptions, effectiveK);
- index.search(queryVector, 1, effectiveK, distances, labels, searchOptions);
+ borrowed.index.search(queryVector, 1, effectiveK, distances, labels, searchOptions);
}
// Min-heap: smallest score at head, so we can evict the weakest candidate efficiently.
@@ -217,128 +222,105 @@ private void validateSearchVector(Object vector) {
+ fieldType);
}
int queryDim = ((float[]) vector).length;
- if (queryDim != indexMeta.dim()) {
+ if (queryDim != borrowed.indexMeta.dim()) {
throw new IllegalArgumentException(
String.format(
"Query vector dimension mismatch: index expects %d, but got %d",
- indexMeta.dim(), queryDim));
+ borrowed.indexMeta.dim(), queryDim));
}
}
private void ensureLoaded() throws IOException {
- if (index == null) {
+ if (borrowed == null) {
synchronized (this) {
- if (index == null) {
- indexMeta = LuminaIndexMeta.deserialize(ioMeta.metadata());
- SeekableInputStream in = fileReader.getInputStream(ioMeta);
- try {
- InputStreamFileInput fileInput = new InputStreamFileInput(in);
- Map searcherOptions = options.toLuminaOptions();
- searcherOptions.putAll(indexMeta.options());
- index =
- LuminaIndex.fromStream(
- indexMeta.indexType(),
- fileInput,
- ioMeta.fileSize(),
- indexMeta.dim(),
- indexMeta.metric(),
- searcherOptions);
- fileInput.markOpenPhaseDone();
- openStream = in;
- inputStreamFileInput = fileInput;
- } catch (Exception e) {
- IOUtils.closeQuietly(in);
- throw e;
+ if (borrowed == null) {
+ LuminaSearcherPool.PooledEntry entry = searcherPool.borrow(poolKey);
+ if (entry == null) {
+ entry = createEntry();
}
+ borrowed = entry;
}
}
}
}
+ private LuminaSearcherPool.PooledEntry createEntry() throws IOException {
+ LuminaIndexMeta indexMeta = LuminaIndexMeta.deserialize(ioMeta.metadata());
+ SeekableInputStream in = fileReader.getInputStream(ioMeta);
+ try {
+ InputStreamFileInput fileInput = new InputStreamFileInput(in);
+ Map searcherOptions = options.toLuminaOptions();
+ searcherOptions.putAll(indexMeta.options());
+ LuminaIndex index =
+ LuminaIndex.fromStream(
+ indexMeta.indexType(),
+ fileInput,
+ ioMeta.fileSize(),
+ indexMeta.dim(),
+ indexMeta.metric(),
+ searcherOptions);
+ fileInput.markOpenPhaseDone();
+ return new LuminaSearcherPool.PooledEntry(index, in, fileInput, indexMeta);
+ } catch (Exception e) {
+ IOUtils.closeQuietly(in);
+ throw e;
+ }
+ }
+
/** Returns the total bytes read by the underlying {@link InputStreamFileInput}, or 0. */
public long getTotalBytesRead() {
- return inputStreamFileInput != null ? inputStreamFileInput.getTotalBytesRead() : 0;
+ return borrowed != null ? borrowed.fileInput.getTotalBytesRead() : 0;
}
// =================== open-phase I/O stats =====================
public long getOpenBytesRead() {
- return inputStreamFileInput != null ? inputStreamFileInput.getOpenBytesRead() : 0;
+ return borrowed != null ? borrowed.fileInput.getOpenBytesRead() : 0;
}
public long getOpenSeekCount() {
- return inputStreamFileInput != null ? inputStreamFileInput.getOpenSeekCount() : 0;
+ return borrowed != null ? borrowed.fileInput.getOpenSeekCount() : 0;
}
public long getOpenReadCount() {
- return inputStreamFileInput != null ? inputStreamFileInput.getOpenReadCount() : 0;
+ return borrowed != null ? borrowed.fileInput.getOpenReadCount() : 0;
}
public long getOpenReadTimeNanos() {
- return inputStreamFileInput != null ? inputStreamFileInput.getOpenReadTimeNanos() : 0;
+ return borrowed != null ? borrowed.fileInput.getOpenReadTimeNanos() : 0;
}
public long getOpenSeekTimeNanos() {
- return inputStreamFileInput != null ? inputStreamFileInput.getOpenSeekTimeNanos() : 0;
+ return borrowed != null ? borrowed.fileInput.getOpenSeekTimeNanos() : 0;
}
// =================== search-phase I/O stats =====================
public long getSearchBytesRead() {
- return inputStreamFileInput != null ? inputStreamFileInput.getSearchBytesRead() : 0;
+ return borrowed != null ? borrowed.fileInput.getSearchBytesRead() : 0;
}
public long getSearchSeekCount() {
- return inputStreamFileInput != null ? inputStreamFileInput.getSearchSeekCount() : 0;
+ return borrowed != null ? borrowed.fileInput.getSearchSeekCount() : 0;
}
public long getSearchReadCount() {
- return inputStreamFileInput != null ? inputStreamFileInput.getSearchReadCount() : 0;
+ return borrowed != null ? borrowed.fileInput.getSearchReadCount() : 0;
}
public long getSearchReadTimeNanos() {
- return inputStreamFileInput != null ? inputStreamFileInput.getSearchReadTimeNanos() : 0;
+ return borrowed != null ? borrowed.fileInput.getSearchReadTimeNanos() : 0;
}
public long getSearchSeekTimeNanos() {
- return inputStreamFileInput != null ? inputStreamFileInput.getSearchSeekTimeNanos() : 0;
+ return borrowed != null ? borrowed.fileInput.getSearchSeekTimeNanos() : 0;
}
@Override
public void close() throws IOException {
- Throwable firstException = null;
-
- if (index != null) {
- try {
- index.close();
- } catch (Throwable t) {
- firstException = t;
- }
- index = null;
- }
-
- if (openStream != null) {
- try {
- openStream.close();
- } catch (Throwable t) {
- if (firstException == null) {
- firstException = t;
- } else {
- firstException.addSuppressed(t);
- }
- }
- openStream = null;
- }
-
- if (firstException != null) {
- if (firstException instanceof IOException) {
- throw (IOException) firstException;
- } else if (firstException instanceof RuntimeException) {
- throw (RuntimeException) firstException;
- } else {
- throw new RuntimeException(
- "Failed to close Lumina vector global index reader", firstException);
- }
+ if (borrowed != null) {
+ searcherPool.returnEntry(poolKey, borrowed);
+ borrowed = null;
}
}
@@ -419,7 +401,7 @@ public Optional visitNotIn(FieldRef fieldRef, List