Skip to content

Commit

Permalink
[HUDI-3580] [UBER] Add support for compacted log blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
suryaprasanna committed Apr 19, 2022
1 parent 9e8664f commit e7b633c
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 100 deletions.
Expand Up @@ -210,7 +210,7 @@ protected Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String c
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
return header;
}
}
Expand Up @@ -65,7 +65,7 @@ static Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String inst
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, rollbackInstantTime);
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, instantToRollback);
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
return header;
}

Expand Down
Expand Up @@ -56,6 +56,7 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
Expand All @@ -64,7 +65,9 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.COMMAND_BLOCK;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK;

Expand Down Expand Up @@ -218,7 +221,46 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema);

/**
* Traversal of log blocks from log files can be done in two directions.
* 1. Forward traversal
* 2. Reverse traversal
* For example: BaseFile, LogFile1(LogBlock11,LogBlock12,LogBlock13), LofFile2(LogBlock21,LogBlock22,LogBlock23)
* Forward traversal look like,
* LogBlock11, LogBlock12, LogBlock13, LogBlock21, LogBlock22, LogBlock23
* If we are considering reverse traversal including log blocks,
* LogBlock23, LogBlock22, LogBlock21, LogBlock13, LogBlock12, LogBlock11
* Here, reverse traversal also traverses blocks in reverse order of creation.
*
* 1. Forward traversal
* Forward traversal is easy to do in single writer mode. Where the rollback block is right after the effected data blocks.
* With multiwriter mode the blocks can be out of sync. An example scenario.
* B1, B2, B3, B4, R1(B3), B5
* In this case, rollback block R1 is invalidating the B3 which is not the previous block.
* This becomes more complicated if we have compacted blocks, which are data blocks but are created using log compaction.
* TODO: Include support for log compacted blocks. https://issues.apache.org/jira/browse/HUDI-3580
*
* To solve this do traversal twice.
* In first traversal, collect all the valid data and delete blocks that are not corrupted along with the rollback block's target instant times.
* For second traversal, traverse on the collected data blocks by considering the rollback blocks and compacted blocks.
* By doing a reverse traversal, we can know what all blocks are already compacted
* and also invalid blocks whose instants is tracked under rollback target instants
* 2. Reverse traversal
* Reverse traversal is more intuitive in multiwriter mode. Reverse traversal would mean not just traversing
* log files in reverse order, but also the log blocks within them.
* This is harder to achieve when there are corrupt blocks, since the blocks size information
* might not be stored at the end of the corrupt block. So, hopping to the starting of the block is not possible.
*/

// Collect targetRollbackInstants, using which we can determine which blocks are invalid.
Set<String> targetRollbackInstants = new HashSet<>();
// This will only contain data and delete blocks, corrupt blocks will be ignored and
// target instants from rollback block are collected in targetRolbackInstants set.
List<HoodieLogBlock> dataAndDeleteBlocks = new ArrayList<>();

Set<HoodieLogFile> scannedLogFiles = new HashSet<>();

// Do a forward traversal for all files and blocks.
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
LOG.info("Scanning log file " + logFile);
Expand All @@ -245,97 +287,49 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
continue;
}
}
if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
LOG.info("Found a corrupt block in " + logFile.getPath());
totalCorruptBlocks.incrementAndGet();
continue;
}

// Rollback blocks contain information of instants that are failed, collect them in a set..
switch (logBlock.getBlockType()) {
case HFILE_DATA_BLOCK:
case AVRO_DATA_BLOCK:
case PARQUET_DATA_BLOCK:
LOG.info("Reading a data block from file " + logFile.getPath() + " at instant "
+ logBlock.getLogBlockHeader().get(INSTANT_TIME));
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is an avro data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
// store the current block
currentInstantLogBlocks.push(logBlock);
break;
case DELETE_BLOCK:
LOG.info("Reading a delete block from file " + logFile.getPath());
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is a delete data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
// store deletes so can be rolled back
currentInstantLogBlocks.push(logBlock);
dataAndDeleteBlocks.add(logBlock);
break;
case COMMAND_BLOCK:
// Consider the following scenario
// (Time 0, C1, Task T1) -> Running
// (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block or a correct
// DataBlock (B1) with commitTime C1
// (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the attempt number is 2)
// (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock B2)
// Now a logFile L1 can have 2 correct Datablocks (B1 and B2) which are the same.
// Say, commit C1 eventually failed and a rollback is triggered.
// Rollback will write only 1 rollback block (R1) since it assumes one block is
// written per ingestion batch for a file but in reality we need to rollback (B1 & B2)
// The following code ensures the same rollback block (R1) is used to rollback
// both B1 & B2
LOG.info("Reading a command block from file " + logFile.getPath());
// This is a command block - take appropriate action based on the command
HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
String targetInstantForCommandBlock =
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
switch (commandBlock.getType()) { // there can be different types of command blocks
case ROLLBACK_PREVIOUS_BLOCK:
// Rollback the last read log block
// Get commit time from last record block, compare with targetCommitTime,
// rollback only if equal, this is required in scenarios of invalid/extra
// rollback blocks written due to failures during the rollback operation itself
// and ensures the same rollback block (R1) is used to rollback both B1 & B2 with
// same instant_time
int numBlocksRolledBack = 0;
totalRollbacks.incrementAndGet();
while (!currentInstantLogBlocks.isEmpty()) {
HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
// handle corrupt blocks separately since they may not have metadata
if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath());
currentInstantLogBlocks.pop();
numBlocksRolledBack++;
} else if (targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) {
// rollback last data block or delete block
LOG.info("Rolling back the last log block read in " + logFile.getPath());
currentInstantLogBlocks.pop();
numBlocksRolledBack++;
} else if (!targetInstantForCommandBlock
.contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) {
// invalid or extra rollback block
LOG.warn("TargetInstantTime " + targetInstantForCommandBlock
+ " invalid or extra rollback command block in " + logFile.getPath());
break;
} else {
// this should not happen ideally
LOG.warn("Unable to apply rollback command block in " + logFile.getPath());
}
}
LOG.info("Number of applied rollback blocks " + numBlocksRolledBack);
break;
default:
throw new UnsupportedOperationException("Command type not yet supported.");
if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
String targetInstantForCommandBlock =
logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
targetRollbackInstants.add(targetInstantForCommandBlock);
} else {
throw new UnsupportedOperationException("Command type not yet supported.");
}
break;
case CORRUPT_BLOCK:
LOG.info("Found a corrupt block in " + logFile.getPath());
totalCorruptBlocks.incrementAndGet();
// If there is a corrupt block - we will assume that this was the next data block
currentInstantLogBlocks.push(logBlock);
break;
default:
throw new UnsupportedOperationException("Block type not supported yet");
throw new UnsupportedOperationException("Block type not yet supported.");
}
}

// This is a reverse traversal on the collected data blocks.
for (HoodieLogBlock logBlock : dataAndDeleteBlocks) {
String blockInstantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);

// Exclude the blocks for rollback blocks exist.
// Here, rollback can include instants affiliated to deltacommits or log compaction commits.
if (targetRollbackInstants.contains(blockInstantTime)) {
continue;
}
currentInstantLogBlocks.push(logBlock);
}

// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
LOG.info("Merging the final data blocks");
Expand Down
Expand Up @@ -36,7 +36,7 @@ public class HoodieCommandBlock extends HoodieLogBlock {
* Hoodie command block type enum.
*/
public enum HoodieCommandBlockTypeEnum {
ROLLBACK_PREVIOUS_BLOCK
ROLLBACK_BLOCK
}

public HoodieCommandBlock(Map<HeaderMetadataType, String> header) {
Expand Down

0 comments on commit e7b633c

Please sign in to comment.