New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PARQUET-2134: Fix type checking in HadoopStreams.wrap #951
Conversation
Related issue: prestodb/presto#17435 |
private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) { | ||
InputStream wrapped = stream.getWrappedStream(); | ||
if (wrapped instanceof FSDataInputStream) { | ||
return isWrappedStreamByteBufferReadable(((FSDataInputStream) wrapped)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a corner case that can cause an infinite loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it could be. But it may be hard to create such a case. As its code shows, FSDataInputStream is a wrapper class of an inputstream. When we check the wrapped inputstream recursively, it would finally reach an inputstream whose type is not FSDataInputStream. A developer could override getWrappedStream
as return this
to cause an infinite loop, while this makes no sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand it would be very rare case but once that happen it would be hard to debug this 'hang' issue. Let's do two things: 1) Add check if it is 'this'; Throw exception if that happens; 2) Add debug log; When it hangs, developer can enable debug log and see what parquet-mr is doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestions! I have updated as the comment.
1e26cb1
to
3a8ef91
Compare
Thanks for adding the check and debug log. LGTM! One more thing(sorry for not asking at first-round review), do you think it makes sense to add tests? |
@7c00 Do you have time to look into the last feedback? |
@shangxinli Thanks for your comments. Sorry for late to reply. I think it's ok to add some unit tests, and I am going to do that in this weekend. I will ping you when finished. Thank you in advance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a lot easier if HadoopStreams didn't use reflection to get at ByteBufferReadable. that's an api which came with hadoop 2.0.2, so *everything has it.
@@ -51,7 +52,7 @@ public class HadoopStreams { | |||
public static SeekableInputStream wrap(FSDataInputStream stream) { | |||
Objects.requireNonNull(stream, "Cannot wrap a null input stream"); | |||
if (byteBufferReadableClass != null && h2SeekableConstructor != null && | |||
byteBufferReadableClass.isInstance(stream.getWrappedStream())) { | |||
isWrappedStreamByteBufferReadable(stream)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really going into the internals of the hadoop classes and potentially tricky if there is any dynamic decision making in the inner class. The good news there is I don't see anything doing that.
there is a way to ask (hadoop 3.2+) if a stream does support the API before calling, using the StreamCapabilities interface.
https://issues.apache.org/jira/browse/HDFS-14111
if (stream.hasCapability( "in:readbytebuffer") {
// stream is confident it has the api
) else {
// do the checking of the inner class
}
private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) { | ||
InputStream wrapped = stream.getWrappedStream(); | ||
if (wrapped == stream) { | ||
throw new ParquetDecodingException("Illegal FSDataInputStream as wrapped itself"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can't happen. the inner stream is set in the constructor, so cannot take the not-yet-constructed class as an argument...no need to worry about recursion.
This extends apache#951 Since [HDFS-14111](https://issues.apache.org/jira/browse/HDFS-14111) all input streams in the hadoop codebase which implement `ByteBufferReadable` return true on the StreamCapabilities probe `stream.hasCapability("in:readbytebuffer")`; those which don't are forbidden to do so. This means that on Hadoop 3.3.0+ the preferred way to probe for the API is to ask the stream. The StreamCapabilities probe was added in Hadoop 2.9. Along with making all use of `ByteBufferReadable` non-reflective, this makes the checks fairly straightforward. Tests verify that if a stream implements `ByteBufferReadable' then it will be bonded to H2SeekableInputStream, even if multiply wrapped by FSDataInputStreams, and that if it doesn't, it won't.
I've taken this PR and added the changes I was suggesting, plus tests. see #971. If you take that extra commit and merge it in here, it should complete this PR |
This extends apache#951 Since [HDFS-14111](https://issues.apache.org/jira/browse/HDFS-14111) all input streams in the hadoop codebase which implement `ByteBufferReadable` return true on the StreamCapabilities probe `stream.hasCapability("in:readbytebuffer")`; those which don't are forbidden to do so. This means that on Hadoop 3.3.0+ the preferred way to probe for the API is to ask the stream. The StreamCapabilities probe was added in Hadoop 2.9. Along with making all use of `ByteBufferReadable` non-reflective, this makes the checks fairly straightforward. Tests verify that if a stream implements `ByteBufferReadable' then it will be bonded to H2SeekableInputStream, even if multiply wrapped by FSDataInputStreams, and that if it doesn't, it won't.
Thanks @steveloughran @shangxinli . I have cherry-picked the commit from #971 Do I need to squash the two commits into one? |
whoever actually commits this can use the github squash option to combine all commits into one before merging. FYI, I've just started writing a shim library so that apps compiling against hadoop 3.2.0 wil be able to invoke the 3.3+ API calls when present: HADOOP-18287. First parquet will need to be able to compile/link against hadoop 3.x: #976 |
* @return true if it is safe to a H2SeekableInputStream to access the data | ||
*/ | ||
private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) { | ||
if (stream.hasCapability("in:readbytebuffer")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have the Hadoop 3..3.0 yet in Parquet. Does it mean we need to hold of this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, the StreamCapabilities probe has been around since hadoop 2. it is just in 3.3.0 all streams which implement the api return true for this probe...a probe which gets passed down the wrapped streams. It avoids looking at the wrapped streams as you should be able to trust the response (put differently: if something lied it is in trouble)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@steveloughran @shangxinli it looks like the API is not available in Hadoop 2.8.x, so it will create issues for projects that want to use the latest version of Parquet but still want to keep Hadoop 2.8.x.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also see related JIRA: https://issues.apache.org/jira/browse/PARQUET-2276
@7c00 and @steveloughran Thank both of you for the great contribution! This PR comes from two authors. Can @7c00 add @steveloughran as the co-author to this PR? This is an example. |
HadoopStreams.wrap produces a wrong H2SeekableInputStream if the passed-in FSDataInputStream wraps another FSDataInputStream. Since [HDFS-14111](https://issues.apache.org/jira/browse/HDFS-14111) all input streams in the hadoop codebase which implement `ByteBufferReadable` return true on the StreamCapabilities probe `stream.hasCapability("in:readbytebuffer")`; those which don't are forbidden to do so. This means that on Hadoop 3.3.0+ the preferred way to probe for the API is to ask the stream. The StreamCapabilities probe was added in Hadoop 2.9. Along with making all use of `ByteBufferReadable` non-reflective, this makes the checks fairly straightforward. Tests verify that if a stream implements `ByteBufferReadable' then it will be bonded to H2SeekableInputStream, even if multiply wrapped by FSDataInputStreams, and that if it doesn't, it won't. Co-authored-by: Steve Loughran <stevel@cloudera.com>
@shangxinli Thank you for reminding me. I have squashed the PR and added @steveloughran as the co-author. |
thanks. created HADOOP-18336 |
LGTM |
Make sure you have checked all steps below.
Jira
Tests
Commits
Documentation