Skip to content

Commit

Permalink
[HUDI-2780] Fix the issue of Mor log skipping complete blocks when re…
Browse files Browse the repository at this point in the history
…ading data
  • Loading branch information
huangjing02 committed Nov 17, 2021
1 parent aec5d11 commit 375927a
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ private HoodieLogBlock readBlock() throws IOException {

int blocksize;
int type;
long blockStartPos = inputStream.getPos();
HoodieLogBlockType blockType = null;
Map<HeaderMetadataType, String> header = null;

Expand All @@ -194,7 +195,7 @@ private HoodieLogBlock readBlock() throws IOException {
// We may have had a crash which could have written this block partially
// Skip blocksize in the stream and we should either find a sync marker (start of the next
// block) or EOF. If we did not find either of it, then this block is a corrupted block.
boolean isCorrupted = isBlockCorrupt(blocksize);
boolean isCorrupted = isBlockCorrupt(blockStartPos, blocksize);
if (isCorrupted) {
return createCorruptBlock();
}
Expand Down Expand Up @@ -281,7 +282,7 @@ private HoodieLogBlock createCorruptBlock() throws IOException {
contentPosition, corruptedBlockSize, corruptedBlockSize, new HashMap<>(), new HashMap<>());
}

private boolean isBlockCorrupt(int blocksize) throws IOException {
private boolean isBlockCorrupt(long blockStartPos, int blocksize) throws IOException {
long currentPos = inputStream.getPos();
try {
inputStream.seek(currentPos + blocksize);
Expand All @@ -291,7 +292,7 @@ private boolean isBlockCorrupt(int blocksize) throws IOException {
// This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream
// release-3.1.0-RC1/DFSInputStream.java#L1455
// release-3.1.0-RC1/BufferedFSInputStream.java#L73
inputStream.seek(currentPos);
inputStream.seek(blockStartPos);
return true;
}

Expand Down

0 comments on commit 375927a

Please sign in to comment.