New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-3919] [UBER] Support out of order rollback blocks in AbstractHoodieLogRecordReader #5341
Conversation
e7b633c
to
8cdf174
Compare
…odieLogRecordReader
8cdf174
to
5831902
Compare
@alexeykudinkin : can you review this |
@@ -218,7 +221,45 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) { | |||
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), | |||
readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema); | |||
|
|||
/** | |||
* Traversal of log blocks from log files can be done in two directions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please simplify this comment and provide only the current logic.
@@ -245,97 +286,53 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) { | |||
continue; | |||
} | |||
} | |||
if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we handle this too in the switch that follows? Having a common way to handle the various block types is easier to understand as per code flow.
continue; | ||
} | ||
|
||
// Rollback blocks contain information of instants that are failed, collect them in a set.. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comments seems more relevant to where the rollback block is being handled later.
} | ||
} | ||
|
||
int numBlocksRolledBack = 0; | ||
// This is a reverse traversal on the collected data blocks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collected data and delete blocks.
How is this reverse traversal? Isnt the for-loop a forward traversal?
@@ -839,20 +839,24 @@ public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di | |||
writer.appendBlock(dataBlock); | |||
|
|||
// Write 2 | |||
header = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
header.clear() also works instead of allocating a new hashmap each time.
@@ -218,7 +221,45 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) { | |||
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), | |||
readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets also remove the readBlocksLazily argument as it now required to be always true.
@@ -218,7 +221,45 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) { | |||
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), | |||
readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets also remove the reverseReader as it is no longer supported.
@prashantwason Addressed the review comments. Removed readBlocksLazily and reverseReader flags from AbstractHoodieLogRecordReader and log file reader classes. |
...di-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
Show resolved
Hide resolved
* This becomes more complicated if we have compacted blocks, which are data blocks created using log compaction. | ||
* TODO: Include support for log compacted blocks. https://issues.apache.org/jira/browse/HUDI-3580 | ||
* | ||
* To solve this do traversal twice. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume we will employ two traversals only when in need. i.e. when minor compactions are enabled. If not, can we avoid it and fallback to old behavior ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two traversals is needed to support the multiwriter scenarios where we can have rollback way away from the original block it is targeting. With minor compaction it becomes more tricky since we can have compacted blocks comprising of other compacted blocks. So, tackling the multiwriter scenarios with this PR first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey Surya, thanks for the patch. Wondering, for single writer scenario, do we think we can retain old behavior. only for multi-writer and minor log compactions, we might have to take the new route.
@@ -232,7 +251,7 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) { | |||
&& !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime | |||
)) { | |||
// hit a block with instant time greater than should be processed, stop processing further | |||
break; | |||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why continue. blocks greater than latest known instant time can be skipped altogether right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this. It is a mistake I am removing this.
this.enableRecordLookups = enableRecordLookups; | ||
this.keyField = keyField; | ||
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema; | ||
if (this.reverseReader) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we are removing the reverse reader ? can you help me understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that when iterating in reverse order there is an issue when we encounter corrupt block. We cannot jump across the corrupt block since we dont have the block size stored at the end for them. So, we end up ignoring all the blocks older than the corrupt block.
That is a reason for removing the reverseReader lookup, since it cannot be handled.
It becomes more complicated when introducing log compaction. There we need to move the compacted blocks to a different slot. So, it is not straight forward traversal. So, removing this logic to reduce the complexity involved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please let me know, what do you think?
@@ -414,7 +411,7 @@ public void testHugeLogFileWrite() throws IOException, URISyntaxException, Inter | |||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); | |||
byte[] dataBlockContentBytes = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header).getContentBytes(); | |||
HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new HoodieLogBlock.HoodieLogBlockContentLocation(new Configuration(), null, 0, dataBlockContentBytes.length, 0); | |||
HoodieDataBlock reusableDataBlock = new HoodieAvroDataBlock(null, Option.ofNullable(dataBlockContentBytes), false, | |||
HoodieDataBlock reusableDataBlock = new HoodieAvroDataBlock(null, Option.ofNullable(dataBlockContentBytes), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have tests for test out the multi-writer scenario. i.e rollback log block is appended after few other valid log blocks? if not, can we add one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will add them.
Does that mean we need to pass multiwriter enabled flag to AbstractHoodieLogRecordReader and using that flag toggle the logic between one traversal and two traversals? |
Closing in favor of #5958 |
What is the purpose of the pull request
This pull request adds support for out of order rollback blocks in AbstractHoodieLogRecordReader.
Brief change log
Verify this pull request
This pull request change is already covered by existing tests, and also modified a new test case to verify the changes.
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.