Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) Optimizing search for start and end of corrupt blocks when a corrupt block needs to be created #361

Closed
wants to merge 1 commit into from

Conversation

n3nash
Copy link
Contributor

@n3nash n3nash commented Mar 22, 2018

Current algorithm
inputstream.seek() one byte at a time, hence sliding the read window one byte at a time. This requires performing seek(..) and readFully(..) for every byte over the FSDataInputStream.
New Algorithm
inputstream.readFully(some_sample_block_size) and use the sliding window approach to find the magic header in memory byte array rather than over the inputstream.

Test Results
Earlier algorithm takes approximately 1-2ms per byte. This involves seek(..) and readFully(..) of bytes to compare to magic header. A corrupt block of the size of 256 MB takes ~256000000 ms to find the next data block. This equates to around ~71 hours, hence in situations where there is a corrupt block written, the job takes forever.

The new algorithm reads the entire 256MB worth of bytes in memory using readFully(..) and then slides over the byte array to find the next data block. ** This completes in < 5 secs **

This happened during a performance test and an example log is below :

18/03/16 21:34:58 INFO collection.DiskBasedMap: Spilling to file location ...
18/03/16 21:34:58 INFO log.HoodieCompactedLogRecordScanner: Scanning log file HoodieLogFile {some.log.1}
18/03/16 21:34:58 INFO log.HoodieLogFileReader: Log HoodieLogFile {somelog.1} has a corrupted block at 14
18/03/16 23:00:55 ERROR executor.CoarseGrainedExecutorBackend: Executor self-exiting due to : Driver disassociated! Shutting down.

Notice that the task ran for about 1.5 hrs before it timed out.

@n3nash n3nash changed the title Optimizing search for start and end of corrupt blocks when a corrupt block needs to be created (WIP) Optimizing search for start and end of corrupt blocks when a corrupt block needs to be created Mar 22, 2018
@n3nash n3nash force-pushed the corrupt_block_optimizations branch from ddd5719 to 408e327 Compare March 22, 2018 05:39
@n3nash
Copy link
Contributor Author

n3nash commented Mar 22, 2018

@vinothchandar Please take a pass at this PR too tomorrow when you look over the other 2.

@n3nash n3nash force-pushed the corrupt_block_optimizations branch from 408e327 to 9c49a9f Compare March 22, 2018 05:44
@n3nash n3nash changed the title (WIP) Optimizing search for start and end of corrupt blocks when a corrupt block needs to be created Optimizing search for start and end of corrupt blocks when a corrupt block needs to be created Mar 22, 2018
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to spend more time on reviewing this.

.convert(SchemaUtil
.readSchemaFromLogFile(HoodieCLI.tableMetadata.getFs(), new Path(logFilePath)));
} catch(NullPointerException e) {
// unable to read schema
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we handle this more directly using null checks as needed.. the empty catch block for NPE, could be cleaner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, just did that to fix it quickly, cleaned it now.

@@ -49,6 +48,7 @@
class HoodieLogFileReader implements HoodieLogFormat.Reader {

private static final int DEFAULT_BUFFER_SIZE = 4096;
private static final int DEFAULT_LOG_BLOCK_SIZE = 256*1024*1024;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the default be this high? this will add to the memory pressure also correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 256MB is just read once as a byte [] and then discarded, should be ok I think.

boolean done = true;
// read upto logblocksize to find next magic header
do {
corruptedBytes = new byte[logBlockSize];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of handling this manually, should we just wrap this into a BufferedInputStream?

https://docs.oracle.com/javase/7/docs/api/java/io/BufferedInputStream.html

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is a DataInputStream.. still , is there some standard class that can give you the buffering for free

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's the reason I didn't use Buffered but since you mentioned I looked at it again and realized that DataInputStream actually extends FilterInputStream! So, I was able to change this :)

} catch (EOFException e) {
// in case unable to read logBlockSize worth of bytes
inputStream.seek(currentPos);
numberOfBytesRead = inputStream.available();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use available() to estimate actual bytes to begin with, instead of course correcting in the catch block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason to do this was because of the contract of available(..) mentioned here https://docs.oracle.com/javase/7/docs/api/java/io/FilterInputStream.html#available() and how we want to read minimum of default size. But I changed this using the BufferedInputStream so probably no need to discuss further..

@n3nash n3nash force-pushed the corrupt_block_optimizations branch from 9c49a9f to 650fc3d Compare March 24, 2018 00:15
@n3nash n3nash force-pushed the corrupt_block_optimizations branch from 650fc3d to 0c4c0a4 Compare March 24, 2018 04:18
@n3nash
Copy link
Contributor Author

n3nash commented Mar 25, 2018

@vinothchandar addressed your comments, please take anther pass.

byte[] corruptedBytes;
int corruptedBlockSize = 0;
boolean done = true;
BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can wrap this whole block in a CustomBufferedReader sort of implementation and just return the corruptedBytesSize on termination for cleanliness...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I fully follow.. But it would be good to have a Buffered, seekable abstraction over the log file.. Corrupt block handling etc should be left at this level IMO

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets file a task for this?

@@ -97,7 +97,8 @@
private final static Logger log = LogManager.getLogger(WriterBuilder.class);
// Default max log file size 512 MB
public static final long DEFAULT_SIZE_THRESHOLD = 512 * 1024 * 1024L;

// Default max log block size 512 MB
public static final int DEFAULT_LOG_BLOCK_SIZE_THRESHOLD = 256 * 1024 * 1024;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual value seems like 256MB

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment still says 512MB.. and also is 256 too high? What about additional memory needs

@@ -125,6 +128,7 @@ private HoodieLogBlock readBlock() throws IOException {

// 2 Read the total size of the block
blocksize = inputStream.readInt();
this.logBlockSize = blocksize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the logBlockSize part of the LogBlock itself?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping..

do {
// read upto logblocksize to find next magic header
corruptedBytes = new byte[logBlockSize];
int numberOfBytesRead = bufferedInputStream.read(corruptedBytes, 0, logBlockSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are buffering already, why do we have to issue such a large read? If you iterator byte-by-byte like before on a buffered reader, do you still have teh same issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know there probably is n't seek() to go back and forth.. https://docs.oracle.com/javase/7/docs/api/java/io/BufferedInputStream.html#mark(int) mark should be helpful there to rewind, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping.

done = false;
}
corruptedBlockSize += estimatedCorruptBlockSize;
} while (!done);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we return 1 corrupt block if there are back-back corrupt blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ideally there shouldn't be back to back corrupt blocks but if it failed to write the magic header for the next block too..

private long scanForNextAvailableBlockOffset() throws IOException {
while (true) {
long currentPos = inputStream.getPos();
private long scanForNextAvailableBlockOffset(byte[] bytes, int numberOfByesRead)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: numberOfBytesRead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

int numberOfBytesRead = bufferedInputStream.read(corruptedBytes, 0, logBlockSize);
int estimatedCorruptBlockSize = (int) scanForNextAvailableBlockOffset(corruptedBytes,
numberOfBytesRead);
if (numberOfBytesRead == logBlockSize && estimatedCorruptBlockSize == numberOfBytesRead) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be checking : we were able to read block size worth of byte and the byte after that matched up to a next available block offset?

// No luck - advance and try again
inputStream.seek(currentPos + 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would BufferedInputStream with mark and reset have helped resolve this issue differently?

return false;
} catch (EOFException e) {
// We have reached the EOF
return true;
}
}

private int readMagic(byte[] bytes, int offset) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs on what this method is returning

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping

byte[] corruptedBytes;
int corruptedBlockSize = 0;
boolean done = true;
BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I fully follow.. But it would be good to have a Buffered, seekable abstraction over the log file.. Corrupt block handling etc should be left at this level IMO

Copy link
Contributor

@bvaradar bvaradar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked atFSDataInputStream (and underlying DFSInputStream) to understand why you are seeing very high latency per byte (amortized). The culprit is the backwards seek we are doing in readMagic and scanForNextAvailableBlockOffset. The rewind implementation in DFSInputStream results in clearing up the current block reader (HDFS) and the next read() (1 byte) causes the HDFS data-block to be fetched.
DFSInputStream does the correct buffering for sequential reading though (reading blocks by blocks).

So Lessons for us:
Dont use naked DFSInputStream or FSDataInputStream (created by DistributedFileSystem.open()). Change HoodieWrapperFileSystem.open() to make sure FSDataInputStream is wrapping a BufferedFSInputStream which in turn is wrapping DFSInputStream. This way we can (a) buffer and (b) avoid refetch when rewinding by few bytes.
(or)
Change scan logic in HoodieLogReader to not rewind the input-stream but to keep moving forward.

Also, It looks like there are read-statistics maintained by DFSInputStream. We can track them to debug latency hits to correlate.

As it is good to fix the root-cause, can we change the logic to do buffering as default for all FileSystem.open() calls instead of turning on buffering for only corrupt blocks. This way, we will not encounter similar issues if we introduce new logic that requires rewinding file pointers.

@@ -125,6 +128,7 @@ private HoodieLogBlock readBlock() throws IOException {

// 2 Read the total size of the block
blocksize = inputStream.readInt();
this.logBlockSize = blocksize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this.logBlockSize will be set to the block size of the last uncorrupted block. right ?
In that case, createCorruptedBlock() will be using this buffer size while reading the corrupted block. Is this a heuristic to guess how much data you want to read-ahead ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right and this is the DEFAULT_LOG_BLOCK_SIZE.

@@ -61,6 +62,7 @@
private long reverseLogFilePosition;
private long lastReverseLogFilePosition;
private boolean reverseReader;
private int logBlockSize;

HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important Note: The buffer size passed here is never used when constructing DFSInputStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in the next line ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I meant HDFS client implementation (DFSClient.open() called via FS.open()) which is supposed to be using the bufferSize drops it. The implementation uses HDFS block read from network as implicit buffer.

@@ -125,6 +128,7 @@ private HoodieLogBlock readBlock() throws IOException {

// 2 Read the total size of the block
blocksize = inputStream.readInt();
this.logBlockSize = blocksize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this change but important to fix: In line 137, We are doing catch-all exceptions and treating as corrupt block. The underlying inputStream could throw IOException (other than EOF exception) because of any transient failures (or like stream getting closed) and these doesn't mean it was because of corrupt block. These IOException cases needs to raised to the caller instead of returning a corrupt block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, fixed it.

@n3nash
Copy link
Contributor Author

n3nash commented Apr 3, 2018

@bvaradar thanks for digging into the root-cause and great analysis! I looked at the code too and you're right, the rewind implementation in DFSInputStream results in clearing up the current block reader. I made some local code changes and tried using the BufferedFSInputStream and it indeed works much better but still not as performant as the prefetching and buffering implemented in this PR ( I'm wondering what might be the difference but haven't dug into it).
I think the question is the the memory implications of using the BufferedFSInputStream and how large should we choose the buffer size to be. In my tests I chose it to be 256MB (that's the HDFS block size). @vinothchandar ^ WDYT ?

To help understand my changes, here is the PR for those temporary : https://github.com/uber/hudi/pull/373/files

BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
do {
// read upto logblocksize to find next magic header
corruptedBytes = new byte[logBlockSize];
Copy link
Contributor

@bvaradar bvaradar Apr 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the MAGIC string crosses the corruptedBytes boundary (meaning it appears partially at the end of the first read-block and partly at the begining of the next block). Are you taking care of that ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.. @n3nash can you confirm? It also highlights that we byte stream abstraction is still easier to reason with.

@bvaradar
Copy link
Contributor

bvaradar commented Apr 4, 2018

@n3nash: Regarding the performance difference, I see that you have reduced the number of readFully() calls from 2 to 1 for each round when dealing with OLD_MAGIC vs NEW_MAGIC. Could this explain the perf difference you are seeing ?

@@ -125,6 +128,7 @@ private HoodieLogBlock readBlock() throws IOException {

// 2 Read the total size of the block
blocksize = inputStream.readInt();
this.logBlockSize = blocksize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping..

byte[] corruptedBytes;
int corruptedBlockSize = 0;
boolean done = true;
BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets file a task for this?

BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
do {
// read upto logblocksize to find next magic header
corruptedBytes = new byte[logBlockSize];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.. @n3nash can you confirm? It also highlights that we byte stream abstraction is still easier to reason with.

do {
// read upto logblocksize to find next magic header
corruptedBytes = new byte[logBlockSize];
int numberOfBytesRead = bufferedInputStream.read(corruptedBytes, 0, logBlockSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping.

return false;
} catch (EOFException e) {
// We have reached the EOF
return true;
}
}

private int readMagic(byte[] bytes, int offset) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping

@@ -97,7 +97,8 @@
private final static Logger log = LogManager.getLogger(WriterBuilder.class);
// Default max log file size 512 MB
public static final long DEFAULT_SIZE_THRESHOLD = 512 * 1024 * 1024L;

// Default max log block size 512 MB
public static final int DEFAULT_LOG_BLOCK_SIZE_THRESHOLD = 256 * 1024 * 1024;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment still says 512MB.. and also is 256 too high? What about additional memory needs

@n3nash
Copy link
Contributor Author

n3nash commented Apr 4, 2018

@vinothchandar Not sure if you missed to look at my comment earlier. Please look at my comment above of using BufferedFSInputStream or not, based on that I will make changes in this PR or make a PR similar to this #373

@vinothchandar
Copy link
Member

yeah I think I did miss that. apologies.

I am in favor of using a cleaner byte stream abstraction. We can have the buffer size be configurable? I am bit concerned about having this large a buffer for RecordReader and also theoretically, a few MB should be enough to amortize the seek costs (the main issue of this PR)

@n3nash n3nash changed the title Optimizing search for start and end of corrupt blocks when a corrupt block needs to be created (WIP) Optimizing search for start and end of corrupt blocks when a corrupt block needs to be created Apr 5, 2018
@n3nash
Copy link
Contributor Author

n3nash commented Apr 5, 2018

Moved the PR to #373. Let's discuss there.

@vinothchandar
Copy link
Member

Closing this since #373 is in more final shape

vinishjail97 pushed a commit to vinishjail97/hudi that referenced this pull request Dec 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants