Skip to content

Commit

Permalink
PARQUET-2134: Fix type checking in HadoopStreams.wrap
Browse files Browse the repository at this point in the history
  • Loading branch information
7c00 committed Mar 9, 2022
1 parent 4d062dc commit 5f4aa3b
Showing 1 changed file with 12 additions and 2 deletions.
Expand Up @@ -22,11 +22,12 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.io.SeekableInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Objects;
Expand All @@ -51,7 +52,7 @@ public class HadoopStreams {
public static SeekableInputStream wrap(FSDataInputStream stream) {
Objects.requireNonNull(stream, "Cannot wrap a null input stream");
if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
isWrappedStreamByteBufferReadable(stream)) {
try {
return h2SeekableConstructor.newInstance(stream);
} catch (InstantiationException | IllegalAccessException e) {
Expand All @@ -66,6 +67,15 @@ public static SeekableInputStream wrap(FSDataInputStream stream) {
}
}

private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) {
InputStream wrapped = stream.getWrappedStream();
if (wrapped instanceof FSDataInputStream) {
return isWrappedStreamByteBufferReadable(((FSDataInputStream) wrapped));
}
//noinspection ConstantConditions
return byteBufferReadableClass.isInstance(wrapped);
}

private static Class<?> getReadableClass() {
try {
return Class.forName("org.apache.hadoop.fs.ByteBufferReadable");
Expand Down

0 comments on commit 5f4aa3b

Please sign in to comment.