Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,120 +19,92 @@
package org.apache.parquet.column.values.plain;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.LittleEndianDataInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Plain encoding for float, double, int, long
* Plain encoding for float, double, int, long.
*
* <p>Reads directly from a {@link ByteBuffer} with {@link ByteOrder#LITTLE_ENDIAN} byte order,
* bypassing the {@link LittleEndianDataInputStream} wrapper to avoid per-value virtual dispatch
* overhead. The underlying page data is obtained as a single contiguous {@link ByteBuffer} via
* {@link ByteBufferInputStream#slice(int)}.
*/
public abstract class PlainValuesReader extends ValuesReader {
private static final Logger LOG = LoggerFactory.getLogger(PlainValuesReader.class);

protected LittleEndianDataInputStream in;
ByteBuffer buffer;

@Override
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
this.in = new LittleEndianDataInputStream(stream.remainingStream());
int available = stream.available();
if (available > 0) {
this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN);
} else {
this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN);
}
}

@Override
public void skip() {
skip(1);
}

void skipBytesFully(int n) throws IOException {
int skipped = 0;
while (skipped < n) {
skipped += in.skipBytes(n - skipped);
}
}

public static class DoublePlainValuesReader extends PlainValuesReader {

@Override
public void skip(int n) {
try {
skipBytesFully(n * 8);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip " + n + " double values", e);
}
buffer.position(buffer.position() + n * 8);
}

@Override
public double readDouble() {
try {
return in.readDouble();
} catch (IOException e) {
throw new ParquetDecodingException("could not read double", e);
}
return buffer.getDouble();
}
}

public static class FloatPlainValuesReader extends PlainValuesReader {

@Override
public void skip(int n) {
try {
skipBytesFully(n * 4);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip " + n + " floats", e);
}
buffer.position(buffer.position() + n * 4);
}

@Override
public float readFloat() {
try {
return in.readFloat();
} catch (IOException e) {
throw new ParquetDecodingException("could not read float", e);
}
return buffer.getFloat();
}
}

public static class IntegerPlainValuesReader extends PlainValuesReader {

@Override
public void skip(int n) {
try {
in.skipBytes(n * 4);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip " + n + " ints", e);
}
buffer.position(buffer.position() + n * 4);
}

@Override
public int readInteger() {
try {
return in.readInt();
} catch (IOException e) {
throw new ParquetDecodingException("could not read int", e);
}
return buffer.getInt();
}
}

public static class LongPlainValuesReader extends PlainValuesReader {

@Override
public void skip(int n) {
try {
in.skipBytes(n * 8);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip " + n + " longs", e);
}
buffer.position(buffer.position() + n * 8);
}

@Override
public long readLong() {
try {
return in.readLong();
} catch (IOException e) {
throw new ParquetDecodingException("could not read long", e);
}
return buffer.getLong();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,24 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

/**
* Based on DataInputStream but little endian and without the String/char methods
* Based on DataInputStream but little endian and without the String/char methods.
*
* @deprecated This class has no remaining production usages and is
* significantly slower than reading directly from a {@link ByteBuffer} configured with
* {@link ByteOrder#LITTLE_ENDIAN}. Each {@link #readInt()} performs four virtual
* {@code in.read()} calls and reassembles the value with bit shifts, while
* {@link ByteBuffer#getInt()} on a little-endian buffer is a HotSpot intrinsic that
* compiles to a single unaligned load on x86/ARM. For new code, prefer
* {@link ByteBufferInputStream#slice(int)} followed by
* {@code buffer.order(ByteOrder.LITTLE_ENDIAN)} and {@link ByteBuffer#getInt()} /
* {@link ByteBuffer#getLong()} / {@link ByteBuffer#getFloat()} /
* {@link ByteBuffer#getDouble()}. This class will be removed in a future release.
*/
@Deprecated
public final class LittleEndianDataInputStream extends InputStream {

private final InputStream in;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.inOrder;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteOrder;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -49,7 +48,6 @@
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.bytes.LittleEndianDataInputStream;
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
Expand Down Expand Up @@ -249,12 +247,7 @@ public void test(Configuration config, ByteBufferAllocator allocator) throws Exc
}

private int intValue(BytesInput in) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
in.writeAllTo(baos);
LittleEndianDataInputStream os = new LittleEndianDataInputStream(new ByteArrayInputStream(baos.toByteArray()));
int i = os.readInt();
os.close();
return i;
return in.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN).getInt();
}

@Test
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,8 @@
<exclude>org.apache.parquet.internal.column.columnindex.IndexIterator</exclude>
<!-- Removal of a protected method in a class that's not supposed to be subclassed by third-party code -->
<exclude>org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader#gatherElementDataFromStreams(byte[])</exclude>
<!-- Removal of a protected internal field that should not have been part of the public API -->
<exclude>org.apache.parquet.column.values.plain.PlainValuesReader#in</exclude>
<!-- Due to the removal of deprecated methods -->
<exclude>org.apache.parquet.arrow.schema.SchemaMapping$TypeMappingVisitor#visit(org.apache.parquet.arrow.schema.SchemaMapping$MapTypeMapping)</exclude>
<!-- Make static variables final -->
Expand Down