-
Notifications
You must be signed in to change notification settings - Fork 593
HDDS-14053. Extract generic MinHeapMergeIterator from SstFileSetReader #9409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
747b376 to
ae7d88c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR extracts a generic MinHeapMergeIterator class from SstFileSetReader to enable its reuse in other parts of the codebase, specifically for rewriting the objectId mapping phase in snapshot diff operations. The refactoring generalizes the heap-based iterator merging logic while maintaining backward compatibility with existing SST file reading functionality.
Key changes:
- Extracted generic
MinHeapMergeIteratorabstract class that merges multiple sorted iterators using a min-heap - Refactored
SstFileSetReader.MultipleSstFileIteratorto extend the new generic iterator - Added
DBStore.getMergeIterator()method to merge multiple database tables
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
MinHeapMergeIterator.java |
New generic abstract class providing min-heap based merging of multiple sorted iterators with resource management |
SstFileSetReader.java |
Refactored to use the new MinHeapMergeIterator, removing the old HeapEntry inner class |
DBStore.java |
Added default getMergeIterator() method to merge multiple tables into a single iterator |
ClosableIterator.java |
Changed from extending AutoCloseable to Closeable for more specific exception handling |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for (int i = 0; i < tableValues.size(); i++) { | ||
| tableValues.set(i, keysToMerge.getOrDefault(i, defaultNullValue).getValue()); | ||
| } | ||
| return newKeyValue(key, tableValues); |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tableValues list is reused and mutated on every call to merge(). This means all returned Collection<Object> values from the iterator share the same underlying list, and subsequent calls to next() will modify the contents of previously returned collections. This can lead to subtle bugs if callers retain references to these collections. Consider creating a new list for each merge result: return newKeyValue(key, new ArrayList<>(tableValues));
| return newKeyValue(key, tableValues); | |
| return newKeyValue(key, new java.util.ArrayList<>(tableValues)); |
| assert minHeap.peek() != null; | ||
| // Get current key from heap | ||
| K currentKey = minHeap.peek().getCurrentKey(); | ||
| // Clear the keys list by setting all entries to null. |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 113 says "Clear the keys list by setting all entries to null" but the code actually calls keys.clear() which removes all entries, not sets them to null. The comment should be updated to: "Clear the keys map from the previous iteration."
| // Clear the keys list by setting all entries to null. | |
| // Clear the keys map from the previous iteration. |
| /** | ||
| * The MultipleSstFileIterator class is an abstract base for iterating over multiple SST files. | ||
| * It uses a PriorityQueue to merge keys from all files in sorted order. | ||
| * Each file's iterator is wrapped in a HeapEntryWithFileIdx object, | ||
| * which ensures stable ordering for identical keys by considering the file index. | ||
| * @param <T> |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment still mentions "HeapEntryWithFileIdx" and "stable ordering for identical keys by considering the file index", but the current implementation no longer maintains stable ordering by file index. The HeapEntry class was moved to MinHeapMergeIterator and doesn't use file index for comparison. This comment should be updated to reflect the current behavior.
| * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged output is in sorted order | ||
| * by repeatedly polling the smallest element from the heap of iterators. | ||
| * | ||
| * @param <K> the type of keys being merged, must be {@link Comparable} |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class javadoc states that K "must be {@link Comparable}" but this is not accurate. The type parameter K doesn't have a Comparable bound, and the class accepts any type with a Comparator. The documentation should be updated to reflect that K needs a Comparator provided via the constructor.
| * @param <K> the type of keys being merged, must be {@link Comparable} | |
| * @param <K> the type of keys being merged; a {@link Comparator} for K must be provided via the constructor |
| * to a collection of values from the tables | ||
| */ | ||
| default <KEY> ClosableIterator<KeyValue<KEY, Collection<Object>>> getMergeIterator( | ||
| Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) { |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The varargs parameter table is not validated for null or empty. If an empty array or null is passed, this will cause issues: an empty array will create an iterator that never returns elements but won't fail gracefully, and null will cause a NullPointerException. Consider adding validation at the start of the method.
| Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) { | |
| Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) { | |
| if (table == null || table.length == 0) { | |
| throw new IllegalArgumentException("At least one table must be provided to getMergeIterator."); | |
| } |
| IOException exception = null; | ||
| for (int idx = 0; idx < iterators.size(); idx++) { | ||
| try { | ||
| closeItrAtIndex(idx); | ||
| } catch (IOException e) { | ||
| exception = e; | ||
| } | ||
| } | ||
| if (exception != null) { | ||
| throw new UncheckedIOException(exception); |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The close() method only preserves the last IOException that occurred during cleanup. If multiple iterators throw exceptions during close(), only the last one is thrown, potentially hiding other important errors. Consider collecting all exceptions or using a suppressed exception pattern to preserve all error information.
| IOException exception = null; | |
| for (int idx = 0; idx < iterators.size(); idx++) { | |
| try { | |
| closeItrAtIndex(idx); | |
| } catch (IOException e) { | |
| exception = e; | |
| } | |
| } | |
| if (exception != null) { | |
| throw new UncheckedIOException(exception); | |
| IOException firstException = null; | |
| for (int idx = 0; idx < iterators.size(); idx++) { | |
| try { | |
| closeItrAtIndex(idx); | |
| } catch (IOException e) { | |
| if (firstException == null) { | |
| firstException = e; | |
| } else { | |
| firstException.addSuppressed(e); | |
| } | |
| } | |
| } | |
| if (firstException != null) { | |
| throw new UncheckedIOException(firstException); |
| public void close() { | ||
| IOException exception = null; | ||
| for (int idx = 0; idx < iterators.size(); idx++) { | ||
| try { | ||
| closeItrAtIndex(idx); | ||
| } catch (IOException e) { | ||
| exception = e; | ||
| } | ||
| } | ||
| if (exception != null) { | ||
| throw new UncheckedIOException(exception); | ||
| } | ||
| } |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The close() method doesn't clear the minHeap after closing all iterators. While the iterators themselves are closed and set to null in the list, HeapEntry objects remain in the minHeap, preventing proper garbage collection and potentially causing issues if the iterator is accidentally used after close().
| KeyValue<KEY, Object> defaultNullValue = newKeyValue(null, null); | ||
| Comparator<KeyValue<KEY, Object>> comparator = Comparator.comparing(KeyValue::getKey, keyComparator); | ||
| return new MinHeapMergeIterator<KeyValue<KEY, Object>, Table.KeyValueIterator<KEY, Object>, | ||
| KeyValue<KEY, Collection<Object>>>(table.length + 1, comparator) { |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor creates a PriorityQueue with size Math.max(numberOfIterators, 1) but then creates a MinHeapMergeIterator with table.length + 1 on line 202. This results in a heap size one larger than the number of tables, which is wasteful and inconsistent. The size should be table.length not table.length + 1.
| KeyValue<KEY, Collection<Object>>>(table.length + 1, comparator) { | |
| KeyValue<KEY, Collection<Object>>>(table.length, comparator) { |
| iterators.set(idx, null); | ||
| } | ||
| } | ||
|
|
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method overrides ClosableIterator.close; it is advisable to add an Override annotation.
| @Override |
Change-Id: I2118dca4d36001dcb80be870e2ac0ff34228892c
1c30074 to
9e3bd7f
Compare
| * An {@link Iterator} that may hold resources until it is closed. | ||
| */ | ||
| public interface ClosableIterator<E> extends Iterator<E>, AutoCloseable { | ||
| public interface ClosableIterator<E> extends Iterator<E>, Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is acceptable. Closeable is a subclass of AutoCloseable, and the only difference is it throws IOException instead of Exception. This is not a public API.
|
|
||
| protected abstract I getIterator(int idx) throws RocksDBException, IOException; | ||
|
|
||
| private boolean initHeap() throws IOException, RocksDBException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved from SstFileSetRead.MultipleSstFileIterator.initMinHeap()
| /** | ||
| * A wrapper class that holds an iterator and its current value for heap operations. | ||
| */ | ||
| private static final class HeapEntry<T> implements Comparable<HeapEntry<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is moved from SstFileSetRead.HeapEntry, with index and comparator.
index is not used.
comparator is customizable.
| * A wrapper class that holds an iterator and its current value for heap operations. | ||
| */ | ||
| private static final class HeapEntry<T> implements Comparable<HeapEntry<T>> { | ||
| private final int index; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
index is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines 117 to 121 in 9e3bd7f
| HeapEntry<K> entry = minHeap.poll(); | |
| int idx = entry.index; | |
| // Set the key for the current entry in the keys list. | |
| keys.put(idx, entry.getCurrentKey()); | |
| if (entry.advance()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is being used while setting the value in the Map returned
|
|
||
| protected abstract I getIterator(int idx) throws RocksDBException, IOException; | ||
|
|
||
| private boolean initHeap() throws IOException, RocksDBException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved from SstFileSetReader.MultipleSstFileIterator.initMeanHeap()
| protected abstract V merge(Map<Integer, K> keysToMerge); | ||
|
|
||
| @Override | ||
| public V next() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved from SstFileSetReader.MultipleSstFileIterator.next(), except that it also manages keys map.
| while (!minHeap.isEmpty()) { | ||
| minHeap.poll().close(); | ||
| } | ||
| protected T merge(Map<Integer, T> keys) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is equivalent to the original code. Time complexity is O(1) constant.
| * @param <I> the type of iterators being used, must extend {@link Iterator} and implement {@link Closeable} | ||
| * @param <V> the type of the final merged output | ||
| */ | ||
| public abstract class MinHeapMergeIterator<K, I extends Iterator<K> & Closeable, V> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Encapsulates several methods from SstFileSetReader.MultipleSstFileIterator, to be used by DBStore.
|
Except that index is not used, the rest is good. |
Yeah let me write one |
Change-Id: Id6864055e3fd0c779d3fdf467555c5e260b07532
Change-Id: I9db0a28efbeefd91634cdb5dbf8122e9a385d598
Change-Id: I43b834786424069910a902452ab0eda10c90e7d1
jojochuang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
| fileReader.open(sstFile.toAbsolutePath().toString()); | ||
| estimatedSize += fileReader.getTableProperties().getNumEntries(); | ||
| } catch (RocksDBException e) { | ||
| throw new RocksDatabaseException("Failed to open SST file: " + sstFile, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrap RocksDBException with RocksDatabaseException because RocksDatabaseException is an IOException, which simplifies error handle of the caller
Change-Id: I109909334847096052445a4c9c263b798568cf55
|
Thank you @jojochuang for reviewing the patch |
What changes were proposed in this pull request?
In order to rewrite objectId mapping phase in snapshot diff we need tableMergeIterator to avoid randomized gets for each objectId and also removing the requirement for a separate objectIds table to track all the objectIds witnessed while iterating through the keyTable/fileTable/directoryTable.
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-14053
How was this patch tested?
Existing unit tests should be good since this is already used by SstFileSetReader