Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,21 @@
import org.apache.paimon.sst.BlockHandle;
import org.apache.paimon.sst.BlockIterator;
import org.apache.paimon.sst.SstFileReader;
import org.apache.paimon.sst.SstFileReader.SstFileIterator;
import org.apache.paimon.utils.FileBasedBloomFilter;
import org.apache.paimon.utils.LazyField;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RoaringNavigableMap64;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.zip.CRC32;

Expand Down Expand Up @@ -355,6 +359,15 @@ public Optional<GlobalIndexResult> visitBetween(FieldRef fieldRef, Object from,
}));
}

/**
* Create an iterator to traverse all key-rowId pairs in this index file.
*
* @return an iterator over all entries
*/
public Iterator<Pair<Object, long[]>> createIterator() {
return new BTreeEntryIterator();
}

private RoaringNavigableMap64 allNonNullRows() throws IOException {
// Traverse all data to avoid returning null values, which is very advantageous in
// situations where there are many null values
Expand All @@ -376,7 +389,7 @@ private RoaringNavigableMap64 allNonNullRows() throws IOException {
*/
private RoaringNavigableMap64 rangeQuery(
Object from, Object to, boolean fromInclusive, boolean toInclusive) throws IOException {
SstFileReader.SstFileIterator fileIter = reader.createIterator();
SstFileIterator fileIter = reader.createIterator();
fileIter.seekTo(keySerializer.serialize(from));

RoaringNavigableMap64 result = new RoaringNavigableMap64();
Expand Down Expand Up @@ -414,4 +427,75 @@ private long[] deserializeRowIds(MemorySlice slice) {
}
return ids;
}

/** An iterator to traverse all key-rowId pairs in the BTree index file. */
private class BTreeEntryIterator implements Iterator<Pair<Object, long[]>> {

private final SstFileIterator fileIter;
private BlockIterator dataIter;
private Pair<Object, long[]> nextEntry;
private boolean nullOutputted = false;

private BTreeEntryIterator() {
this.fileIter = reader.createIterator();
if (minKey != null) {
fileIter.seekTo(keySerializer.serialize(minKey));
}
advance();
}

private void advance() {
// First output null values from nullBitmap
if (!nullOutputted) {
nullOutputted = true;
RoaringNavigableMap64 nulls = nullBitmap.get();
if (nulls != null && !nulls.isEmpty()) {
long[] nullRowIds = new long[nulls.getIntCardinality()];
int index = 0;
for (Long rowId : nulls) {
nullRowIds[index++] = rowId;
}
nextEntry = Pair.of(null, nullRowIds);
return;
}
}

nextEntry = null;
while (true) {
// try to get next entry from current data block
if (dataIter != null && dataIter.hasNext()) {
Map.Entry<MemorySlice, MemorySlice> entry = dataIter.next();
Object key = keySerializer.deserialize(entry.getKey());
long[] rowIds = deserializeRowIds(entry.getValue());
nextEntry = Pair.of(key, rowIds);
return;
}

// try to read next data block
try {
dataIter = fileIter.readBatch();
if (dataIter == null) {
return; // no more data
}
} catch (IOException e) {
throw new RuntimeException("Failed to read next batch from SST file", e);
}
}
}

@Override
public boolean hasNext() {
return nextEntry != null;
}

@Override
public Pair<Object, long[]> next() {
if (nextEntry == null) {
throw new NoSuchElementException();
}
Pair<Object, long[]> result = nextEntry;
advance();
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexReader;
import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
import org.apache.paimon.utils.Pair;

import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link LazyFilteredBTreeReader} to read multiple files. */
@ExtendWith(ParameterizedTestExtension.class)
Expand Down Expand Up @@ -56,4 +66,62 @@ private List<GlobalIndexIOMeta> writeData() throws Exception {

return written;
}

@TestTemplate
public void testUnorderedIterator() throws Exception {
// Set some null values
for (int i = dataNum - 1; i >= dataNum * 0.85; i--) {
data.get(i).setLeft(null);
}

List<GlobalIndexIOMeta> written = writeData();

Comparator<Object> nullComparator = Comparator.nullsFirst(comparator);

// Build expected map from original data
Map<Object, Set<Long>> expectedMap = new TreeMap<>(nullComparator);
for (Pair<Object, Long> pair : data) {
Object key = pair.getKey();
Long rowId = pair.getValue();

if (!expectedMap.containsKey(key)) {
expectedMap.put(key, new TreeSet<>());
}
expectedMap.get(key).add(rowId);
}

Map<Object, Set<Long>> actualMap = new TreeMap<>(nullComparator);

for (GlobalIndexIOMeta index : written) {
try (BTreeIndexReader reader =
new BTreeIndexReader(keySerializer, fileReader, index, CACHE_MANAGER)) {

Iterator<Pair<Object, long[]>> iter = reader.createIterator();

// Collect all entries from iterator
while (iter.hasNext()) {
Pair<Object, long[]> entry = iter.next();
Object key = entry.getLeft();
long[] rowIds = entry.getRight();

if (!actualMap.containsKey(key)) {
actualMap.put(key, new TreeSet<>());
}
for (long rowId : rowIds) {
actualMap.get(key).add(rowId);
}
}
}
}

// Verify all keys are present
assertThat(actualMap.keySet()).containsExactlyElementsOf(expectedMap.keySet());

// Verify all rowIds for each key
for (Object key : expectedMap.keySet()) {
assertThat(actualMap.get(key))
.as("RowIds for key: " + key)
.containsExactlyElementsOf(expectedMap.get(key));
}
}
}
Loading