[elasticsearch] Add paimon-elasticsearch module with ES vector global index support#7777
[elasticsearch] Add paimon-elasticsearch module with ES vector global index support#7777CrownChu wants to merge 2 commits into
Conversation
Add ESVectorGlobalIndexWriter/Reader/Indexer with DiskBBQ codec support, archive-based index storage, and Lucene integration for vector search. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
JingsongLi
left a comment
There was a problem hiding this comment.
Review: paimon-elasticsearch module
Overall this is a solid addition that cleanly implements the GlobalIndex SPI for ES DiskBBQ vector search. The archive-based index packaging, the two-phase retrieval pipeline (coarse IVF + rescore), and the Lucene directory adapter are well-structured. A few issues worth addressing:
1. ESVectorMetric: COSINE incorrectly mapped to DOT_PRODUCT
COSINE("cosine", VectorSimilarityFunction.DOT_PRODUCT),
INNER_PRODUCT("inner_product", VectorSimilarityFunction.DOT_PRODUCT);Both cosine and inner_product are mapped to DOT_PRODUCT, making them behave identically. Lucene provides a separate VectorSimilarityFunction.COSINE which normalizes vectors before computing dot product. The COSINE enum value should map to VectorSimilarityFunction.COSINE to produce correct similarity scores for unnormalized inputs.
2. Thread-safety issues in ESVectorGlobalIndexReader
a) Statistics counters are racy:
totalBytesRead, totalReadCount, totalReadTimeNanos are plain longs updated in readRangeFromArchive(), which can be called concurrently from searchClustersParallel. These should be AtomicLong or LongAdder.
b) getOrCreateArchiveStream() has no synchronization:
private SeekableInputStream getOrCreateArchiveStream() throws IOException {
if (cachedArchiveStream == null) {
cachedArchiveStream = fileReader.getInputStream(archiveIOMeta);
}
return cachedArchiveStream;
}Two parallel threads can both see null, each create a stream, and one leaks. Consider synchronizing or using double-checked locking like ensureLoaded().
c) Parallel search serializes on a single stream:
readRangeFromArchive does synchronized (in) on the shared cachedArchiveStream. When searchClustersParallel submits multiple cluster reads, they all serialize on this lock, negating much of the parallelism benefit. Consider opening a per-thread stream or pre-reading cluster data before dispatching.
3. SNAPSHOT dependency in pom.xml
<es.paimon.version>9.3.2-SNAPSHOT</es.paimon.version>SNAPSHOT versions make builds non-reproducible and will break CI if the snapshot artifact is unavailable. This should point to a released version, or at minimum document how to resolve it (e.g., a custom Maven repository).
4. close() is not safe against concurrent readers
close() nulls centroidReader and clusterSearcher outside any synchronization. A thread currently inside search() (which passed the ensureLoaded() check) could NPE or operate on a closed searcher. Consider:
- Using a read/write lock or a
closedflag checked invisitVectorSearch. - Or documenting that the reader is not safe to close while searches are in flight.
5. ESIndexArchiveUtils uses System.out
The unpack() method uses System.out.println. Prefer SLF4J Logger for consistency with the rest of the module. (This also bleeds into test output when unpack is called programmatically.)
6. Integer cast risk for large archive files
byte[] mivfBytes = readRangeFromArchive(mivfRange[0], (int) mivfRange[1]);The cast (int) mivfRange[1] could overflow silently for files > 2GB. While this may be unlikely for centroid data, adding a bounds check (e.g., throw if length > Integer.MAX_VALUE) would make failures explicit.
7. Minor observations
DiskBBQCodecProviderhardcodes(1000, 8). The comment says these only affect writing, but if the ES library ever changes that contract, the reader will silently break. Consider reading these fromVectorIndexMetaif possible.- The
validateSearchVectormethod receives the vector fromvectorSearch.vector()which is alwaysfloat[]by contract, making theinstanceof float[]check redundant — though harmless as a defensive guard. - Tests like
ArchiveRebuilderandDiskBBQDataGeneratorare annotated with@Testbut are really utilities/generators. Consider making them manual/integration tests or moving them to a separate source set to avoid confusion.
Good work on the SPI integration and the layered reader architecture. The core write/read/search flow is correct and well-tested. Addressing the thread-safety and metric-mapping issues above would make this production-ready.
Summary
paimon-elasticsearchmodule implementing GlobalIndex SPI backed by Elasticsearch vector searchArchiveDirectory,ArchiveBackedIndexInput,ArchiveFlatVectorReader) for reading packed index segmentsKey Components
ESVectorGlobalIndexWriterESVectorGlobalIndexReaderESVectorGlobalIndexerGlobalIndexerESIndexArchiveUtilsESVectorIndexOptionsTest Plan
ESVectorGlobalIndexTestcovers write/read/search round-tripESVectorBenchmark,Sift1MBenchmark,LuceneVsESBenchmark