Skip to content

Commit

Permalink
[HUDI-2780] Fix the issue of Mor log skipping complete blocks when re…
Browse files Browse the repository at this point in the history
…ading data (apache#4015)


Co-authored-by: huangjing02 <huangjing02@bilibili.com>
Co-authored-by: sivabalan <n.siva.b@gmail.com>
  • Loading branch information
3 people authored and fengjian committed Apr 5, 2023
1 parent 442ae75 commit 87d910d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 37 deletions.
Expand Up @@ -150,21 +150,22 @@ private void addShutDownHook() {
// for max of Integer size
private HoodieLogBlock readBlock() throws IOException {
int blockSize;
long blockStartPos = inputStream.getPos();
try {
// 1 Read the total size of the block
blockSize = (int) inputStream.readLong();
} catch (EOFException | CorruptedLogFileException e) {
// An exception reading any of the above indicates a corrupt block
// Create a corrupt block by finding the next MAGIC marker or EOF
return createCorruptBlock();
return createCorruptBlock(blockStartPos);
}

// We may have had a crash which could have written this block partially
// Skip blockSize in the stream and we should either find a sync marker (start of the next
// block) or EOF. If we did not find either of it, then this block is a corrupted block.
boolean isCorrupted = isBlockCorrupted(blockSize);
if (isCorrupted) {
return createCorruptBlock();
return createCorruptBlock(blockStartPos);
}

// 2. Read the version for this log format
Expand Down Expand Up @@ -253,14 +254,14 @@ private HoodieLogBlockType tryReadBlockType(HoodieLogFormat.LogFormatVersion blo
return HoodieLogBlockType.values()[type];
}

private HoodieLogBlock createCorruptBlock() throws IOException {
LOG.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
long currentPos = inputStream.getPos();
private HoodieLogBlock createCorruptBlock(long blockStartPos) throws IOException {
LOG.info("Log " + logFile + " has a corrupted block at " + blockStartPos);
inputStream.seek(blockStartPos);
long nextBlockOffset = scanForNextAvailableBlockOffset();
// Rewind to the initial start and read corrupted bytes till the nextBlockOffset
inputStream.seek(currentPos);
inputStream.seek(blockStartPos);
LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos);
long contentPosition = inputStream.getPos();
Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily);
HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
Expand Down
Expand Up @@ -700,20 +700,11 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType

@Test
public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
writer.appendBlock(dataBlock);
writer.close();
HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);

// Append some arbit byte[] to the end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
FSDataOutputStream outputStream = fs.append(logFile.getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
// Write out a length that does not confirm with the content
Expand All @@ -728,17 +719,10 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep
outputStream.close();

// Append a proper block that is of the missing length of the corrupted block
writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
records = SchemaTestUtil.generateTestRecords(0, 10);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
writer.appendBlock(dataBlock);
writer.close();
logFile = addValidBlock("test-fileId1", "100", 10);

// First round of reads - we should be able to read the first block and then EOF
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should have corrupted block next");
Expand All @@ -751,7 +735,7 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep
reader.close();

// Simulate another failure back to back
outputStream = fs.append(writer.getLogFile().getPath());
outputStream = fs.append(logFile.getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
// Write out a length that does not confirm with the content
Expand All @@ -766,17 +750,10 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep
outputStream.close();

// Should be able to append a new block
writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
records = SchemaTestUtil.generateTestRecords(0, 100);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
writer.appendBlock(dataBlock);
writer.close();
logFile = addValidBlock("test-fileId1", "100", 100);

// Second round of reads - we should be able to read the first and last block
reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should get the 1st corrupted block next");
Expand All @@ -792,6 +769,48 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep
reader.close();
}

@Test
public void testMissingBlockExceptMagicBytes() throws IOException, URISyntaxException, InterruptedException {
HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);

// Append just magic bytes and move onto next block
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
FSDataOutputStream outputStream = fs.append(logFile.getPath());
outputStream.write(HoodieLogFormat.MAGIC);
outputStream.flush();
outputStream.close();

// Append a proper block
logFile = addValidBlock("test-fileId1", "100", 10);

// First round of reads - we should be able to read the first block and then EOF
Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should have corrupted block next");
HoodieLogBlock block = reader.next();
assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block");
assertTrue(reader.hasNext(), "Third block should be available");
reader.next();
assertFalse(reader.hasNext(), "There should be no more block left");

reader.close();
}

private HoodieLogFile addValidBlock(String fileId, String commitTime, int numRecords) throws IOException, URISyntaxException, InterruptedException {
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId(fileId).overBaseCommit(commitTime).withFs(fs).build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, numRecords);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
writer.appendBlock(dataBlock);
writer.close();
return writer.getLogFile();
}

@Test
public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxException, InterruptedException {
Writer writer =
Expand Down

0 comments on commit 87d910d

Please sign in to comment.