diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java index 068cf9993b52..c079730541b4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java @@ -33,8 +33,10 @@ import org.apache.paimon.sst.BlockHandle; import org.apache.paimon.sst.BlockIterator; import org.apache.paimon.sst.SstFileReader; +import org.apache.paimon.sst.SstFileReader.SstFileIterator; import org.apache.paimon.utils.FileBasedBloomFilter; import org.apache.paimon.utils.LazyField; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.RoaringNavigableMap64; @@ -42,8 +44,10 @@ import java.io.IOException; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Optional; import java.util.zip.CRC32; @@ -355,6 +359,15 @@ public Optional visitBetween(FieldRef fieldRef, Object from, })); } + /** + * Create an iterator to traverse all key-rowId pairs in this index file. + * + * @return an iterator over all entries + */ + public Iterator> createIterator() { + return new BTreeEntryIterator(); + } + private RoaringNavigableMap64 allNonNullRows() throws IOException { // Traverse all data to avoid returning null values, which is very advantageous in // situations where there are many null values @@ -376,7 +389,7 @@ private RoaringNavigableMap64 allNonNullRows() throws IOException { */ private RoaringNavigableMap64 rangeQuery( Object from, Object to, boolean fromInclusive, boolean toInclusive) throws IOException { - SstFileReader.SstFileIterator fileIter = reader.createIterator(); + SstFileIterator fileIter = reader.createIterator(); fileIter.seekTo(keySerializer.serialize(from)); RoaringNavigableMap64 result = new RoaringNavigableMap64(); @@ -414,4 +427,75 @@ private long[] deserializeRowIds(MemorySlice slice) { } return ids; } + + /** An iterator to traverse all key-rowId pairs in the BTree index file. */ + private class BTreeEntryIterator implements Iterator> { + + private final SstFileIterator fileIter; + private BlockIterator dataIter; + private Pair nextEntry; + private boolean nullOutputted = false; + + private BTreeEntryIterator() { + this.fileIter = reader.createIterator(); + if (minKey != null) { + fileIter.seekTo(keySerializer.serialize(minKey)); + } + advance(); + } + + private void advance() { + // First output null values from nullBitmap + if (!nullOutputted) { + nullOutputted = true; + RoaringNavigableMap64 nulls = nullBitmap.get(); + if (nulls != null && !nulls.isEmpty()) { + long[] nullRowIds = new long[nulls.getIntCardinality()]; + int index = 0; + for (Long rowId : nulls) { + nullRowIds[index++] = rowId; + } + nextEntry = Pair.of(null, nullRowIds); + return; + } + } + + nextEntry = null; + while (true) { + // try to get next entry from current data block + if (dataIter != null && dataIter.hasNext()) { + Map.Entry entry = dataIter.next(); + Object key = keySerializer.deserialize(entry.getKey()); + long[] rowIds = deserializeRowIds(entry.getValue()); + nextEntry = Pair.of(key, rowIds); + return; + } + + // try to read next data block + try { + dataIter = fileIter.readBatch(); + if (dataIter == null) { + return; // no more data + } + } catch (IOException e) { + throw new RuntimeException("Failed to read next batch from SST file", e); + } + } + } + + @Override + public boolean hasNext() { + return nextEntry != null; + } + + @Override + public Pair next() { + if (nextEntry == null) { + throw new NoSuchElementException(); + } + Pair result = nextEntry; + advance(); + return result; + } + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeIndexReaderTest.java b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeIndexReaderTest.java index d3141c58fb3f..0182706770b5 100644 --- a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeIndexReaderTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeIndexReaderTest.java @@ -21,11 +21,21 @@ import org.apache.paimon.globalindex.GlobalIndexIOMeta; import org.apache.paimon.globalindex.GlobalIndexReader; import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; +import org.apache.paimon.utils.Pair; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link LazyFilteredBTreeReader} to read multiple files. */ @ExtendWith(ParameterizedTestExtension.class) @@ -56,4 +66,62 @@ private List writeData() throws Exception { return written; } + + @TestTemplate + public void testUnorderedIterator() throws Exception { + // Set some null values + for (int i = dataNum - 1; i >= dataNum * 0.85; i--) { + data.get(i).setLeft(null); + } + + List written = writeData(); + + Comparator nullComparator = Comparator.nullsFirst(comparator); + + // Build expected map from original data + Map> expectedMap = new TreeMap<>(nullComparator); + for (Pair pair : data) { + Object key = pair.getKey(); + Long rowId = pair.getValue(); + + if (!expectedMap.containsKey(key)) { + expectedMap.put(key, new TreeSet<>()); + } + expectedMap.get(key).add(rowId); + } + + Map> actualMap = new TreeMap<>(nullComparator); + + for (GlobalIndexIOMeta index : written) { + try (BTreeIndexReader reader = + new BTreeIndexReader(keySerializer, fileReader, index, CACHE_MANAGER)) { + + Iterator> iter = reader.createIterator(); + + // Collect all entries from iterator + while (iter.hasNext()) { + Pair entry = iter.next(); + Object key = entry.getLeft(); + long[] rowIds = entry.getRight(); + + if (!actualMap.containsKey(key)) { + actualMap.put(key, new TreeSet<>()); + } + for (long rowId : rowIds) { + actualMap.get(key).add(rowId); + } + } + } + } + + // Verify all keys are present + assertThat(actualMap.keySet()).containsExactlyElementsOf(expectedMap.keySet()); + + // Verify all rowIds for each key + for (Object key : expectedMap.keySet()) { + assertThat(actualMap.get(key)) + .as("RowIds for key: " + key) + .containsExactlyElementsOf(expectedMap.get(key)); + } + } }