New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using BufferedFsInputStream to wrap FSInputStream for FSDataInputStream #373
Using BufferedFsInputStream to wrap FSInputStream for FSDataInputStream #373
Conversation
05fa0ae
to
610402f
Compare
@@ -69,7 +70,7 @@ public HoodieTestDataGenerator(String[] partitionPaths) { | |||
} | |||
|
|||
public HoodieTestDataGenerator() { | |||
this(new String[] {"2016/03/15", "2015/03/16", "2015/03/17"}); | |||
this(new String[]{"2016/03/15", "2015/03/16", "2015/03/17"}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why this is happening, my reformat does this.
@@ -237,6 +241,10 @@ private boolean isBlockCorrupt(int blocksize) throws IOException { | |||
inputStream.seek(currentPos + blocksize); | |||
} catch (EOFException e) { | |||
// this is corrupt | |||
// This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was one of the most tedious things to track down, phew
private Configuration hadoopConf; | ||
//NOTE : Be careful in using DFS (FileSystem.class) vs LocalFs(RawLocalFileSystem.class) | ||
//The implementation and gurantees of many API's differ, for example check rename(src,dst) | ||
// We need to use DFS here instead of LocalFs since the FsDataInputStream.getWrappedStream() returns a FsDataInputStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another extremely tedious thing to track down, wish LocalFS and DFS had same contracts :(
610402f
to
faab93d
Compare
faab93d
to
fee5a92
Compare
@vinothchandar @bvaradar I'm keeping the BufferedInputStream changes local to the LogReader for now. We can open another issue whether we want to make this change in HoodieWrapperFileSystem sine that will require wider testing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. let me know how testing goes.. And yes, we can keep this localized to LogReader for now
fee5a92
to
f3861a4
Compare
Results from testing in production 8 MB block size 18/04/05 21:17:34 INFO log.HoodieLogFileReader: Log HoodieLogFile {.....} has a corrupted block at 14 64 MB block size 18/04/05 21:41:29 INFO log.HoodieLogFileReader: Log HoodieLogFile {....} has a corrupted block at 14 256 MB block size 18/04/05 21:49:20 INFO log.HoodieLogFileReader: Log HoodieLogFile {....} has a corrupted block at 14 256 MB block size old PR 18/04/05 22:36:41 INFO log.HoodieLogFileReader: Log HoodieLogFile {....} has a corrupted block at 14 Summary for corrupt block of around 8.5MB
|
f3861a4
to
4b3461f
Compare
So something like 64MB should work? its still kind of weird though 64MB seeking takes 20 sec. Is it due to some sys call overhead |
Yeah, 20 secs is way too large, I'm going to delve into what's going on soon. BTW, 64 MB works in this example since the corrupted bytes are > 8 MB and < 64 MB. At any point when the corrupted bytes are > BLOCK_SIZE, we will see a much higher amount of time taken to find the corrupt block. So, if we want to be really safe, we want to keep the BLOCK_SIZE as large as the LOG_BLOCK_SIZE. WDYT ? |
Tried a few things like use System.copy(..) instead of inputStream.readFully(..) twice but doesn't reduce the time by a lot. Essentially, each byte results in a read of (6 bytes + 4 bytes) for each MAGIC header. So, a 8.5 MB corrupt block results in 10*8000000 bytes and corresponding all operations for each byte (compare MAGIC headers) etc. To avoid large times for corrupt blocks the best buffer size is to set to BLOCK_SIZE which is 256 MB. This will add a 256MB memory overhead. The build is failing with OOM, looking into why that's happening on jenkins not on my local. |
0ca2590
to
daf9078
Compare
aa4ae94
to
c81ee77
Compare
@vinothchandar Please take a pass at this now. Updated time taken :
16 MB block 18/04/15 22:54:33 INFO log.HoodieLogFileReader: Log HoodieLogFile {....log.1} has a corrupted block at 14 The extra time spent was in the fact that for each byte scan the code was throwing an Exception. Throwing and catching exceptions is an expensive operation. Some more info here : http://java-performance.info/throwing-an-exception-in-java-is-very-slow/. I've refactored the code to avoid throwing exceptions and instead return boolean. |
e4389ff
to
dd7ad31
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just few renames
@@ -172,7 +175,8 @@ public String showLogFileRecords(@CliOption(key = { | |||
.getTimestamp(), | |||
Long.valueOf(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES), | |||
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), | |||
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED)); | |||
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED), | |||
Integer.valueOf(HoodieMemoryConfig.DEFAULT_MAX_HDFS_STREAM_BUFFER_SIZE)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to : ..MAX_DFS_STREAM_BUFFER_SIZE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -92,6 +92,9 @@ public String showLogFileCommits( | |||
if (n instanceof HoodieCorruptBlock) { | |||
try { | |||
instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME); | |||
if (instantTime == null) { | |||
throw new Exception("Invalid value " + instantTime + " for instant time "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to : "Invalid instant time" + null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -45,6 +45,9 @@ | |||
public static final String MAX_MEMORY_FOR_MERGE_PROP = "hoodie.memory.merge.max.size"; | |||
// Property to set the max memory for compaction | |||
public static final String MAX_MEMORY_FOR_COMPACTION_PROP = "hoodie.memory.compaction.max.size"; | |||
// Property to set the max memory for hdfs inputstream buffer size | |||
public static final String MAX_HDFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.hdfs.buffer.max.size"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename here as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// Property to set the max memory for hdfs inputstream buffer size | ||
public static final String MAX_HDFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.hdfs.buffer.max.size"; | ||
// Setting this to lower value of 1 MB since no control over how many RecordReaders will be started in a mapper | ||
public static final int DEFAULT_MAX_HDFS_STREAM_BUFFER_SIZE = 1 * 1024 * 1024; // 1 MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinothchandar Kept this lower since don't know memory settings for mapper. WDYT ?
@vinothchandar made renames and left one comment |
cf9b23b
to
5c74e61
Compare
@@ -45,6 +45,9 @@ | |||
public static final String MAX_MEMORY_FOR_MERGE_PROP = "hoodie.memory.merge.max.size"; | |||
// Property to set the max memory for compaction | |||
public static final String MAX_MEMORY_FOR_COMPACTION_PROP = "hoodie.memory.compaction.max.size"; | |||
// Property to set the max memory for hdfs inputstream buffer size | |||
public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.hdfs.buffer.max.size"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hoodie.memory.dfs.buffer.max.size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh, apologies for the oversight, will rename correctly
@@ -86,6 +89,12 @@ public Builder withMaxMemoryFractionPerCompaction(long maxMemoryFractionPerCompa | |||
return this; | |||
} | |||
|
|||
public Builder withMaxStreamBufferSize(int maxStreamBufferSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename: withMaxDFSStreamBufferSize(..)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -369,6 +369,12 @@ public Long getMaxMemoryPerCompaction() { | |||
props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION_PROP)); | |||
} | |||
|
|||
public int getMaxBufferSize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make consistent with withMaxDFSStreamBufferSize
above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
9c1a038
to
1c955fa
Compare
1c955fa
to
30102a4
Compare
This PR is not ready for any review.