Skip to content

Commit

Permalink
Remove reverseReader and readBlocksLazily configs
Browse files Browse the repository at this point in the history
  • Loading branch information
suryaprasanna committed May 11, 2022
1 parent 5831902 commit 4e7a316
Show file tree
Hide file tree
Showing 15 changed files with 79 additions and 384 deletions.
Expand Up @@ -101,11 +101,6 @@ public abstract class AbstractHoodieLogRecordReader {
private Option<Pair<String, String>> simpleKeyGenFields = Option.empty();
// Log File Paths
protected final List<String> logFilePaths;
// Read Lazily flag
private final boolean readBlocksLazily;
// Reverse reader - Not implemented yet (NA -> Why do we need ?)
// but present here for plumbing for future implementation
private final boolean reverseReader;
// Buffer Size for log file reader
private final int bufferSize;
// optional instant range for incremental block filtering
Expand Down Expand Up @@ -141,19 +136,18 @@ public abstract class AbstractHoodieLogRecordReader {
private boolean populateMetaFields = true;

protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema,
String latestInstantTime, boolean readBlocksLazily, boolean reverseReader,
Schema readerSchema, String latestInstantTime,
int bufferSize, Option<InstantRange> instantRange,
boolean withOperationField) {
this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, bufferSize,
instantRange, withOperationField, true, Option.empty(), InternalSchema.getEmptyInternalSchema());
}

protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, boolean readBlocksLazily,
boolean reverseReader, int bufferSize, Option<InstantRange> instantRange,
boolean withOperationField, boolean forceFullScan,
Option<String> partitionName, InternalSchema internalSchema) {
Schema readerSchema, String latestInstantTime, int bufferSize,
Option<InstantRange> instantRange, boolean withOperationField,
boolean forceFullScan, Option<String> partitionName,
InternalSchema internalSchema) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
Expand All @@ -163,8 +157,6 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<Str
this.preCombineField = tableConfig.getPreCombineField();
this.totalLogFiles.addAndGet(logFilePaths.size());
this.logFilePaths = logFilePaths;
this.reverseReader = reverseReader;
this.readBlocksLazily = readBlocksLazily;
this.fs = fs;
this.bufferSize = bufferSize;
this.instantRange = instantRange;
Expand Down Expand Up @@ -219,36 +211,22 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
boolean enableRecordLookups = !forceFullScan;
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema);
readerSchema, bufferSize, enableRecordLookups, keyField, internalSchema);

/**
* Traversal of log blocks from log files can be done in two directions.
* 1. Forward traversal
* 2. Reverse traversal
* For example: BaseFile, LogFile1(LogBlock11,LogBlock12,LogBlock13), LofFile2(LogBlock21,LogBlock22,LogBlock23)
* Forward traversal look like,
* LogBlock11, LogBlock12, LogBlock13, LogBlock21, LogBlock22, LogBlock23
* If we are considering reverse traversal including log blocks,
* LogBlock23, LogBlock22, LogBlock21, LogBlock13, LogBlock12, LogBlock11
* Here, reverse traversal also traverses blocks in reverse order of creation.
* Scanning log blocks require two traversals on the log blocks.
* First traversal to identify the rollback blocks and
*
* 1. Forward traversal
* Forward traversal is easy to do in single writer mode. Where the rollback block is right after the effected data blocks.
* Scanning blocks is easy to do in single writer mode, where the rollback block is right after the effected data blocks.
* With multiwriter mode the blocks can be out of sync. An example scenario.
* B1, B2, B3, B4, R1(B3), B5
* In this case, rollback block R1 is invalidating the B3 which is not the previous block.
* This becomes more complicated if we have compacted blocks, which are data blocks created using log compaction.
* TODO: Include support for log compacted blocks. https://issues.apache.org/jira/browse/HUDI-3580
*
* To solve this do traversal twice.
* To solve this need to do traversal twice.
* In first traversal, collect all the valid data and delete blocks that are not corrupted along with the rollback block's target instant times.
* For second traversal, traverse on the collected data blocks by considering the rollback instants.
* 2. Reverse traversal
* Reverse traversal is more intuitive in multiwriter mode. Reverse traversal would mean not just traversing
* log files in reverse order, but also the log blocks within them.
* This is harder to achieve when there are corrupt blocks, since the blocks size information
* might not be stored at the end of the corrupt block. So, hopping to the starting of the block is not possible.
* So, defaulting to use forward traversal and lazy read as true.
*/

// Collect targetRollbackInstants, using which we can determine which blocks are invalid.
Expand All @@ -273,7 +251,7 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
&& !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
)) {
// hit a block with instant time greater than should be processed, stop processing further
break;
continue;
}
if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) {
if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
Expand All @@ -286,13 +264,8 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
continue;
}
}
if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
LOG.info("Found a corrupt block in " + logFile.getPath());
totalCorruptBlocks.incrementAndGet();
continue;
}

// Rollback blocks contain information of instants that are failed, collect them in a set..
// First traversal to collect data and delete blocks and rollback block's target instant times.
switch (logBlock.getBlockType()) {
case HFILE_DATA_BLOCK:
case AVRO_DATA_BLOCK:
Expand All @@ -308,18 +281,23 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
totalRollbacks.incrementAndGet();
String targetInstantForCommandBlock =
logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
// Rollback blocks contain information of instants that are failed, collect them in a set..
targetRollbackInstants.add(targetInstantForCommandBlock);
} else {
throw new UnsupportedOperationException("Command type not yet supported.");
}
break;
case CORRUPT_BLOCK:
LOG.info("Found a corrupt block in " + logFile.getPath());
totalCorruptBlocks.incrementAndGet();
break;
default:
throw new UnsupportedOperationException("Block type not yet supported.");
}
}

int numBlocksRolledBack = 0;
// This is a reverse traversal on the collected data blocks.
// Second traversal to filter out the blocks whose block instant times are part of targetRollbackInstants set.
for (HoodieLogBlock logBlock : dataAndDeleteBlocks) {
String blockInstantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);

Expand Down
Expand Up @@ -78,10 +78,6 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
private final Schema readerSchema;
private InternalSchema internalSchema = InternalSchema.getEmptyInternalSchema();
private final String keyField;
private boolean readBlockLazily;
private long reverseLogFilePosition;
private long lastReverseLogFilePosition;
private boolean reverseReader;
private boolean enableRecordLookups;
private boolean closed = false;
private transient Thread shutdownThread = null;
Expand All @@ -93,35 +89,26 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc

public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
this(fs, logFile, readerSchema, bufferSize, false, HoodieRecord.RECORD_KEY_METADATA_FIELD);
}

public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups,
String keyField) throws IOException {
this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema());
boolean enableRecordLookups, String keyField) throws IOException {
this(fs, logFile, readerSchema, bufferSize, enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema());
}

public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups,
String keyField, InternalSchema internalSchema) throws IOException {
boolean enableRecordLookups, String keyField, InternalSchema internalSchema) throws IOException {
this.hadoopConf = fs.getConf();
// NOTE: We repackage {@code HoodieLogFile} here to make sure that the provided path
// is prefixed with an appropriate scheme given that we're not propagating the FS
// further
this.logFile = new HoodieLogFile(FSUtils.makeQualified(fs, logFile.getPath()), logFile.getFileSize());
this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize);
this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader;
this.enableRecordLookups = enableRecordLookups;
this.keyField = keyField;
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
if (this.reverseReader) {
this.reverseLogFilePosition = this.lastReverseLogFilePosition = this.logFile.getFileSize();
}

addShutDownHook();
}

Expand Down Expand Up @@ -185,8 +172,8 @@ private HoodieLogBlock readBlock() throws IOException {

// 6. Read the content or skip content based on IO vs Memory trade-off by client
long contentPosition = inputStream.getPos();
boolean shouldReadLazily = readBlockLazily && nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;
Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream, contentLength, shouldReadLazily);
boolean shouldReadLazily = nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;
Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream, contentLength);

// 7. Read footer if any
Map<HeaderMetadataType, String> footer =
Expand All @@ -209,29 +196,29 @@ private HoodieLogBlock readBlock() throws IOException {
if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
return HoodieAvroDataBlock.getBlock(content.get(), readerSchema, internalSchema);
} else {
return new HoodieAvroDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc,
return new HoodieAvroDataBlock(inputStream, content, logBlockContentLoc,
Option.ofNullable(readerSchema), header, footer, keyField, internalSchema);
}

case HFILE_DATA_BLOCK:
checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION,
String.format("HFile block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION));

return new HoodieHFileDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc,
return new HoodieHFileDataBlock(inputStream, content, logBlockContentLoc,
Option.ofNullable(readerSchema), header, footer, enableRecordLookups, logFile.getPath());

case PARQUET_DATA_BLOCK:
checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION,
String.format("Parquet block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION));

return new HoodieParquetDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc,
return new HoodieParquetDataBlock(inputStream, content, logBlockContentLoc,
Option.ofNullable(readerSchema), header, footer, keyField);

case DELETE_BLOCK:
return new HoodieDeleteBlock(content, inputStream, readBlockLazily, Option.of(logBlockContentLoc), header, footer);
return new HoodieDeleteBlock(content, inputStream, Option.of(logBlockContentLoc), header, footer);

case COMMAND_BLOCK:
return new HoodieCommandBlock(content, inputStream, readBlockLazily, Option.of(logBlockContentLoc), header, footer);
return new HoodieCommandBlock(content, inputStream, Option.of(logBlockContentLoc), header, footer);

default:
throw new HoodieNotSupportedException("Unsupported Block " + blockType);
Expand All @@ -258,10 +245,10 @@ private HoodieLogBlock createCorruptBlock() throws IOException {
LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
long contentPosition = inputStream.getPos();
Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily);
Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize);
HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
new HoodieLogBlock.HoodieLogBlockContentLocation(hadoopConf, logFile, contentPosition, corruptedBlockSize, nextBlockOffset);
return new HoodieCorruptBlock(corruptedBytes, inputStream, readBlockLazily, Option.of(logBlockContentLoc), new HashMap<>(), new HashMap<>());
return new HoodieCorruptBlock(corruptedBytes, inputStream, Option.of(logBlockContentLoc), new HashMap<>(), new HashMap<>());
}

private boolean isBlockCorrupted(int blocksize) throws IOException {
Expand Down Expand Up @@ -390,24 +377,13 @@ public HoodieLogBlock next() {
}
}

// TODO: remove method to iterate in reverse order.
/**
* hasPrev is not idempotent.
*/
@Override
public boolean hasPrev() {
try {
if (!this.reverseReader) {
throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
}
reverseLogFilePosition = lastReverseLogFilePosition;
reverseLogFilePosition -= Long.BYTES;
lastReverseLogFilePosition = reverseLogFilePosition;
inputStream.seek(reverseLogFilePosition);
} catch (Exception e) {
// Either reached EOF while reading backwards or an exception
return false;
}
return true;
throw new HoodieNotSupportedException("Reverse log reader is not supported.");
}

/**
Expand All @@ -417,25 +393,7 @@ public boolean hasPrev() {
*/
@Override
public HoodieLogBlock prev() throws IOException {

if (!this.reverseReader) {
throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
}
long blockSize = inputStream.readLong();
long blockEndPos = inputStream.getPos();
// blocksize should read everything about a block including the length as well
try {
inputStream.seek(reverseLogFilePosition - blockSize);
} catch (Exception e) {
// this could be a corrupt block
inputStream.seek(blockEndPos);
throw new CorruptedLogFileException("Found possible corrupted block, cannot read log file in reverse, "
+ "fallback to forward reading of logfile");
}
boolean hasNext = hasNext();
reverseLogFilePosition -= blockSize;
lastReverseLogFilePosition = reverseLogFilePosition;
return next();
throw new HoodieNotSupportedException("Reverse log reader is not supported.");
}

/**
Expand All @@ -444,17 +402,7 @@ public HoodieLogBlock prev() throws IOException {
* position returned from the method to expect correct results
*/
public long moveToPrev() throws IOException {

if (!this.reverseReader) {
throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
}
inputStream.seek(lastReverseLogFilePosition);
long blockSize = inputStream.readLong();
// blocksize should be everything about a block including the length as well
inputStream.seek(reverseLogFilePosition - blockSize);
reverseLogFilePosition -= blockSize;
lastReverseLogFilePosition = reverseLogFilePosition;
return reverseLogFilePosition;
throw new HoodieNotSupportedException("Reverse log reader is not supported.");
}

@Override
Expand Down

0 comments on commit 4e7a316

Please sign in to comment.