Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.lumina.index;

import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.utils.IOUtils;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* Pool of {@link LuminaIndex} instances keyed by index file identity ({@code filePath@size}).
*
* <p>Each entry holds the Lumina index open in native memory (including the loaded graph files),
* so repeated queries on the same shard avoid reloading the index from object storage.
*
* <p>At most one idle entry is kept per key. Under concurrent queries on the same shard, the last
* entry returned wins; displaced entries are closed automatically via the removal listener. The
* total number of idle entries across all keys is bounded by {@code maxSize}.
*
* <p>Thread-safe. Borrow/return semantics guarantee at most one thread uses a given entry at a
* time.
*/
public class LuminaSearcherPool {

static final long EXPIRE_AFTER_ACCESS_MINUTES = 30;

/** A borrowed searcher together with its backing stream and JNI bridge. */
static final class PooledEntry implements Closeable {
final LuminaIndex index;
final SeekableInputStream stream;
final LuminaVectorGlobalIndexReader.InputStreamFileInput fileInput;
final LuminaIndexMeta indexMeta;

PooledEntry(
LuminaIndex index,
SeekableInputStream stream,
LuminaVectorGlobalIndexReader.InputStreamFileInput fileInput,
LuminaIndexMeta indexMeta) {
this.index = index;
this.stream = stream;
this.fileInput = fileInput;
this.indexMeta = indexMeta;
}

@Override
public void close() throws IOException {
IOUtils.closeQuietly(index);
IOUtils.closeQuietly(stream);
}
}

@Nullable private final Cache<String, PooledEntry> idleCache;

public LuminaSearcherPool(int maxSize) {
if (maxSize <= 0) {
this.idleCache = null;
} else {
this.idleCache =
Caffeine.newBuilder()
.maximumSize(maxSize)
.expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES, TimeUnit.MINUTES)
.executor(Runnable::run)
.removalListener((k, v, c) -> IOUtils.closeQuietly((PooledEntry) v))
.build();
}
}

/**
* Borrow an idle entry for the given key, or {@code null} if the pool has none.
*
* <p>The caller must either {@link #returnEntry} or {@link PooledEntry#close} the entry when
* done.
*/
@Nullable
public PooledEntry borrow(String key) {
if (idleCache == null) {
return null;
}
return idleCache.asMap().remove(key);
}

/**
* Return a previously borrowed entry to the pool. Any entry displaced by size eviction, TTL
* expiry, or key replacement is closed automatically via the removal listener.
*/
public void returnEntry(String key, PooledEntry entry) {
if (idleCache == null) {
IOUtils.closeQuietly(entry);
return;
}
idleCache.put(key, entry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
*
* <p>Each shard has exactly one Lumina index file. This reader loads the single index and performs
* vector similarity search.
*
* <p>On {@link #close()}, the index is returned to the {@link LuminaSearcherPool} rather than
* destroyed, so the native graph stays warm across queries and avoids repeated object storage GETs.
*/
public class LuminaVectorGlobalIndexReader implements GlobalIndexReader {

Expand All @@ -65,22 +68,24 @@ public class LuminaVectorGlobalIndexReader implements GlobalIndexReader {
private final GlobalIndexFileReader fileReader;
private final DataType fieldType;
private final LuminaVectorIndexOptions options;
private final LuminaSearcherPool searcherPool;
private final String poolKey;

private volatile LuminaIndexMeta indexMeta;
private volatile LuminaIndex index;
private SeekableInputStream openStream;
private InputStreamFileInput inputStreamFileInput;
private volatile LuminaSearcherPool.PooledEntry borrowed;

public LuminaVectorGlobalIndexReader(
GlobalIndexFileReader fileReader,
List<GlobalIndexIOMeta> ioMetas,
DataType fieldType,
LuminaVectorIndexOptions options) {
LuminaVectorIndexOptions options,
LuminaSearcherPool searcherPool) {
checkArgument(ioMetas.size() == 1, "Expected exactly one index file per shard");
this.fileReader = fileReader;
this.ioMeta = ioMetas.get(0);
this.fieldType = fieldType;
this.options = options;
this.searcherPool = searcherPool;
this.poolKey = this.ioMeta.filePath().toString() + "@" + this.ioMeta.fileSize();
}

@Override
Expand All @@ -101,9 +106,9 @@ private ScoredGlobalIndexResult search(VectorSearch vectorSearch) throws IOExcep
validateSearchVector(vectorSearch.vector());
float[] queryVector = vectorSearch.vector().clone();
int limit = vectorSearch.limit();
LuminaVectorMetric indexMetric = indexMeta.metric();
LuminaVectorMetric indexMetric = borrowed.indexMeta.metric();

int effectiveK = (int) Math.min(limit, index.size());
int effectiveK = (int) Math.min(limit, borrowed.index.size());
if (effectiveK <= 0) {
return null;
}
Expand Down Expand Up @@ -132,18 +137,18 @@ private ScoredGlobalIndexResult search(VectorSearch vectorSearch) throws IOExcep
distances = new float[effectiveK];
labels = new long[effectiveK];
Map<String, String> searchOptions = options.toLuminaOptions();
searchOptions.putAll(indexMeta.options());
searchOptions.putAll(borrowed.indexMeta.options());
searchOptions.put("search.thread_safe_filter", "true");
ensureSearchListSize(searchOptions, effectiveK);
index.searchWithFilter(
borrowed.index.searchWithFilter(
queryVector, 1, effectiveK, distances, labels, scopedIds, searchOptions);
} else {
distances = new float[effectiveK];
labels = new long[effectiveK];
Map<String, String> searchOptions = options.toLuminaOptions();
searchOptions.putAll(indexMeta.options());
searchOptions.putAll(borrowed.indexMeta.options());
ensureSearchListSize(searchOptions, effectiveK);
index.search(queryVector, 1, effectiveK, distances, labels, searchOptions);
borrowed.index.search(queryVector, 1, effectiveK, distances, labels, searchOptions);
}

// Min-heap: smallest score at head, so we can evict the weakest candidate efficiently.
Expand Down Expand Up @@ -217,128 +222,105 @@ private void validateSearchVector(Object vector) {
+ fieldType);
}
int queryDim = ((float[]) vector).length;
if (queryDim != indexMeta.dim()) {
if (queryDim != borrowed.indexMeta.dim()) {
throw new IllegalArgumentException(
String.format(
"Query vector dimension mismatch: index expects %d, but got %d",
indexMeta.dim(), queryDim));
borrowed.indexMeta.dim(), queryDim));
}
}

private void ensureLoaded() throws IOException {
if (index == null) {
if (borrowed == null) {
synchronized (this) {
if (index == null) {
indexMeta = LuminaIndexMeta.deserialize(ioMeta.metadata());
SeekableInputStream in = fileReader.getInputStream(ioMeta);
try {
InputStreamFileInput fileInput = new InputStreamFileInput(in);
Map<String, String> searcherOptions = options.toLuminaOptions();
searcherOptions.putAll(indexMeta.options());
index =
LuminaIndex.fromStream(
indexMeta.indexType(),
fileInput,
ioMeta.fileSize(),
indexMeta.dim(),
indexMeta.metric(),
searcherOptions);
fileInput.markOpenPhaseDone();
openStream = in;
inputStreamFileInput = fileInput;
} catch (Exception e) {
IOUtils.closeQuietly(in);
throw e;
if (borrowed == null) {
LuminaSearcherPool.PooledEntry entry = searcherPool.borrow(poolKey);
if (entry == null) {
entry = createEntry();
}
borrowed = entry;
}
}
}
}

private LuminaSearcherPool.PooledEntry createEntry() throws IOException {
LuminaIndexMeta indexMeta = LuminaIndexMeta.deserialize(ioMeta.metadata());
SeekableInputStream in = fileReader.getInputStream(ioMeta);
try {
InputStreamFileInput fileInput = new InputStreamFileInput(in);
Map<String, String> searcherOptions = options.toLuminaOptions();
searcherOptions.putAll(indexMeta.options());
LuminaIndex index =
LuminaIndex.fromStream(
indexMeta.indexType(),
fileInput,
ioMeta.fileSize(),
indexMeta.dim(),
indexMeta.metric(),
searcherOptions);
fileInput.markOpenPhaseDone();
return new LuminaSearcherPool.PooledEntry(index, in, fileInput, indexMeta);
} catch (Exception e) {
IOUtils.closeQuietly(in);
throw e;
}
}

/** Returns the total bytes read by the underlying {@link InputStreamFileInput}, or 0. */
public long getTotalBytesRead() {
return inputStreamFileInput != null ? inputStreamFileInput.getTotalBytesRead() : 0;
return borrowed != null ? borrowed.fileInput.getTotalBytesRead() : 0;
}

// =================== open-phase I/O stats =====================

public long getOpenBytesRead() {
return inputStreamFileInput != null ? inputStreamFileInput.getOpenBytesRead() : 0;
return borrowed != null ? borrowed.fileInput.getOpenBytesRead() : 0;
}

public long getOpenSeekCount() {
return inputStreamFileInput != null ? inputStreamFileInput.getOpenSeekCount() : 0;
return borrowed != null ? borrowed.fileInput.getOpenSeekCount() : 0;
}

public long getOpenReadCount() {
return inputStreamFileInput != null ? inputStreamFileInput.getOpenReadCount() : 0;
return borrowed != null ? borrowed.fileInput.getOpenReadCount() : 0;
}

public long getOpenReadTimeNanos() {
return inputStreamFileInput != null ? inputStreamFileInput.getOpenReadTimeNanos() : 0;
return borrowed != null ? borrowed.fileInput.getOpenReadTimeNanos() : 0;
}

public long getOpenSeekTimeNanos() {
return inputStreamFileInput != null ? inputStreamFileInput.getOpenSeekTimeNanos() : 0;
return borrowed != null ? borrowed.fileInput.getOpenSeekTimeNanos() : 0;
}

// =================== search-phase I/O stats =====================

public long getSearchBytesRead() {
return inputStreamFileInput != null ? inputStreamFileInput.getSearchBytesRead() : 0;
return borrowed != null ? borrowed.fileInput.getSearchBytesRead() : 0;
}

public long getSearchSeekCount() {
return inputStreamFileInput != null ? inputStreamFileInput.getSearchSeekCount() : 0;
return borrowed != null ? borrowed.fileInput.getSearchSeekCount() : 0;
}

public long getSearchReadCount() {
return inputStreamFileInput != null ? inputStreamFileInput.getSearchReadCount() : 0;
return borrowed != null ? borrowed.fileInput.getSearchReadCount() : 0;
}

public long getSearchReadTimeNanos() {
return inputStreamFileInput != null ? inputStreamFileInput.getSearchReadTimeNanos() : 0;
return borrowed != null ? borrowed.fileInput.getSearchReadTimeNanos() : 0;
}

public long getSearchSeekTimeNanos() {
return inputStreamFileInput != null ? inputStreamFileInput.getSearchSeekTimeNanos() : 0;
return borrowed != null ? borrowed.fileInput.getSearchSeekTimeNanos() : 0;
}

@Override
public void close() throws IOException {
Throwable firstException = null;

if (index != null) {
try {
index.close();
} catch (Throwable t) {
firstException = t;
}
index = null;
}

if (openStream != null) {
try {
openStream.close();
} catch (Throwable t) {
if (firstException == null) {
firstException = t;
} else {
firstException.addSuppressed(t);
}
}
openStream = null;
}

if (firstException != null) {
if (firstException instanceof IOException) {
throw (IOException) firstException;
} else if (firstException instanceof RuntimeException) {
throw (RuntimeException) firstException;
} else {
throw new RuntimeException(
"Failed to close Lumina vector global index reader", firstException);
}
if (borrowed != null) {
searcherPool.returnEntry(poolKey, borrowed);
borrowed = null;
}
}

Expand Down Expand Up @@ -419,7 +401,7 @@ public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef, List<Object> li
*
* <p>This mirrors the C++ {@code LuminaFileReader} adapter that bridges Paimon's {@code
* InputStream} to Lumina's {@code FileReader} interface. The stream lifecycle is managed by the
* enclosing reader, not by this adapter.
* enclosing pool entry, not by this adapter.
*/
static class InputStreamFileInput implements LuminaFileInput {
private final SeekableInputStream in;
Expand Down Expand Up @@ -520,7 +502,7 @@ long getSearchSeekTimeNanos() {

@Override
public void close() {
// Stream lifecycle is managed by the enclosing Reader.
// Stream lifecycle is managed by the enclosing pool entry.
}
}

Expand Down
Loading
Loading