diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java index a0c7af7394..cab438a4b0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java @@ -19,25 +19,36 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.LittleEndianDataInputStream; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.ParquetDecodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Plain encoding for float, double, int, long + * Plain encoding for float, double, int, long. + * + *

Reads directly from a {@link ByteBuffer} with {@link ByteOrder#LITTLE_ENDIAN} byte order, + * bypassing the {@link LittleEndianDataInputStream} wrapper to avoid per-value virtual dispatch + * overhead. The underlying page data is obtained as a single contiguous {@link ByteBuffer} via + * {@link ByteBufferInputStream#slice(int)}. */ public abstract class PlainValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(PlainValuesReader.class); - protected LittleEndianDataInputStream in; + ByteBuffer buffer; @Override public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); - this.in = new LittleEndianDataInputStream(stream.remainingStream()); + int available = stream.available(); + if (available > 0) { + this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN); + } else { + this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN); + } } @Override @@ -45,31 +56,16 @@ public void skip() { skip(1); } - void skipBytesFully(int n) throws IOException { - int skipped = 0; - while (skipped < n) { - skipped += in.skipBytes(n - skipped); - } - } - public static class DoublePlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - skipBytesFully(n * 8); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " double values", e); - } + buffer.position(buffer.position() + n * 8); } @Override public double readDouble() { - try { - return in.readDouble(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read double", e); - } + return buffer.getDouble(); } } @@ -77,20 +73,12 @@ public static class FloatPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - skipBytesFully(n * 4); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " floats", e); - } + buffer.position(buffer.position() + n * 4); } @Override public float readFloat() { - try { - return in.readFloat(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read float", e); - } + return buffer.getFloat(); } } @@ -98,20 +86,12 @@ public static class IntegerPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - in.skipBytes(n * 4); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " ints", e); - } + buffer.position(buffer.position() + n * 4); } @Override public int readInteger() { - try { - return in.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read int", e); - } + return buffer.getInt(); } } @@ -119,20 +99,12 @@ public static class LongPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - in.skipBytes(n * 8); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " longs", e); - } + buffer.position(buffer.position() + n * 8); } @Override public long readLong() { - try { - return in.readLong(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read long", e); - } + return buffer.getLong(); } } } diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java index 723971f70a..fe002324e5 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java @@ -21,10 +21,24 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; /** - * Based on DataInputStream but little endian and without the String/char methods + * Based on DataInputStream but little endian and without the String/char methods. + * + * @deprecated This class has no remaining production usages and is + * significantly slower than reading directly from a {@link ByteBuffer} configured with + * {@link ByteOrder#LITTLE_ENDIAN}. Each {@link #readInt()} performs four virtual + * {@code in.read()} calls and reassembles the value with bit shifts, while + * {@link ByteBuffer#getInt()} on a little-endian buffer is a HotSpot intrinsic that + * compiles to a single unaligned load on x86/ARM. For new code, prefer + * {@link ByteBufferInputStream#slice(int)} followed by + * {@code buffer.order(ByteOrder.LITTLE_ENDIAN)} and {@link ByteBuffer#getInt()} / + * {@link ByteBuffer#getLong()} / {@link ByteBuffer#getFloat()} / + * {@link ByteBuffer#getDouble()}. This class will be removed in a future release. */ +@Deprecated public final class LittleEndianDataInputStream extends InputStream { private final InputStream in; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index a17cf678f5..8d735b55ab 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -38,9 +38,8 @@ import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.inOrder; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteOrder; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -49,7 +48,6 @@ import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.bytes.HeapByteBufferAllocator; -import org.apache.parquet.bytes.LittleEndianDataInputStream; import org.apache.parquet.bytes.TrackingByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; @@ -249,12 +247,7 @@ public void test(Configuration config, ByteBufferAllocator allocator) throws Exc } private int intValue(BytesInput in) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - in.writeAllTo(baos); - LittleEndianDataInputStream os = new LittleEndianDataInputStream(new ByteArrayInputStream(baos.toByteArray())); - int i = os.readInt(); - os.close(); - return i; + return in.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN).getInt(); } @Test diff --git a/pom.xml b/pom.xml index 3b87ba5f12..e0b04801e2 100644 --- a/pom.xml +++ b/pom.xml @@ -569,6 +569,8 @@ org.apache.parquet.internal.column.columnindex.IndexIterator org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader#gatherElementDataFromStreams(byte[]) + + org.apache.parquet.column.values.plain.PlainValuesReader#in org.apache.parquet.arrow.schema.SchemaMapping$TypeMappingVisitor#visit(org.apache.parquet.arrow.schema.SchemaMapping$MapTypeMapping)