New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2790] Use byte[] instead of ByteBuffer to read from Hadoop FS #3744
Conversation
Since the contained InputStream on some of the Hadoop FileSystem implementations may not implement ByteBufferReadable we should take a conservative approach and read directly from the byte array. |
// We avoid using the ByteBuffer based read for Hadoop because some FSInputStream | ||
// implementations are not ByteBufferReadable, | ||
// See https://issues.apache.org/jira/browse/HADOOP-14603 | ||
int read = inputStream.read(dst.array()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dst
make have a limit on how much to read and its start position may not be zero.
dst
may not have array accessible because the bytes may be off heap.
You should use position and limit to write into the bytebuffer or you will not respect what the user has asked for.
You should only use array()
if you checked hasArray()
and correctly calculate the offsets/limits with position()
, arrayOffset()
and remaining
. Otherwise you should use a fixed size buffer and copy the bytes twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend looking at PositionedReadable.readFully()
, which will handle partial reads of the dest buffer by reading in more data, etc. On a simple() read it could return any value, including 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lukecwik Not sure if I understand, if you look at TextSource, it defines a fixed size for the read buffer, so we have a limit. TextBasedReader already defines it to be 8192.
private static final int READ_BUFFER_SIZE = 8192; | |
private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); |
And the method that iterates to read (by delegating to the HadoopSeekableByteChannel) is already moving the buffer start position to zero on each iteration so we cannot get off heap, no?
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
Lines 228 to 240 in 5181e61
private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws IOException { | |
// While we aren't at EOF or haven't fulfilled the minimum buffer capacity, | |
// attempt to read more bytes. | |
while (buffer.size() <= minCapacity && !eof) { | |
eof = inChannel.read(readBuffer) == -1; | |
readBuffer.flip(); | |
buffer = buffer.concat(ByteString.copyFrom(readBuffer)); | |
readBuffer.clear(); | |
} | |
// Return true if we were able to honor the minimum buffer capacity request | |
return buffer.size() >= minCapacity; | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is only true for TextSource
as it is implemented today. We can't say that all user implemented sources will always use in memory byte buffers with a fixed size or that the user will consume and reset the ByteBuffer on each read (for example they may detect that not enough was read and ask for more).
@@ -189,7 +189,14 @@ public int read(ByteBuffer dst) throws IOException { | |||
if (closed) { | |||
throw new IOException("Channel is closed"); | |||
} | |||
return inputStream.read(dst); | |||
// We avoid using the ByteBuffer based read for Hadoop because some FSInputStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a test which exercises various bytebuffer setups.
@lukecwik @steveloughran, Ismaël is off for some weeks, I will continue this PR. For now I'm just starting to dig into the subject. |
I did the fix in the code to deal with the offsets/limits to have the same behavior than |
I am closing this one because I suppose @echauchot will do a new one. See you in october. |
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue.mvn clean verify
to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.