Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,87 @@ protected int nextElementByteOffset() {
return offset;
}

// Decode an entire data page
/**
* Advances the stream position by {@code count} elements and returns the byte offset
* of the first element. Used by batch read methods in subclasses.
*/
protected int advanceByteOffset(int count) {
if (indexInStream + count > valuesCount) {
throw new ParquetDecodingException("Byte-stream data was already exhausted.");
}
int offset = indexInStream * elementSizeInBytes;
indexInStream += count;
return offset;
}

// Decode an entire data page by transposing from stream-split layout to interleaved layout.
// The encoded data has elementSizeInBytes separate streams of valuesCount bytes each.
// The decoded data has valuesCount elements of elementSizeInBytes bytes each.
private byte[] decodeData(ByteBuffer encoded, int valuesCount) {
assert encoded.limit() == valuesCount * elementSizeInBytes;
byte[] decoded = new byte[encoded.limit()];
int destByteIndex = 0;
for (int srcValueIndex = 0; srcValueIndex < valuesCount; ++srcValueIndex) {
for (int stream = 0; stream < elementSizeInBytes; ++stream, ++destByteIndex) {
decoded[destByteIndex] = encoded.get(srcValueIndex + stream * valuesCount);
int totalBytes = valuesCount * elementSizeInBytes;
assert encoded.remaining() >= totalBytes;

// Bulk access: use the backing array directly if available, otherwise copy once.
// This eliminates per-byte ByteBuffer.get(index) bounds checking in the hot loop.
byte[] src;
int srcBase;
if (encoded.hasArray()) {
src = encoded.array();
srcBase = encoded.arrayOffset() + encoded.position();
} else {
src = new byte[totalBytes];
encoded.get(src);
srcBase = 0;
}

byte[] decoded = new byte[totalBytes];

// Specialized single-pass loops for common element sizes.
// Single-pass writes each output location exactly once (sequentially), which is
// cache-friendlier than multi-pass for large pages where the output array exceeds L1.
// The reads come from elementSizeInBytes concurrent sequential streams, which modern
// hardware prefetchers handle well (typically 8-16 tracked streams per core).
if (elementSizeInBytes == 4) {
int s0 = srcBase,
s1 = srcBase + valuesCount,
s2 = srcBase + 2 * valuesCount,
s3 = srcBase + 3 * valuesCount;
for (int i = 0; i < valuesCount; ++i) {
int di = i * 4;
decoded[di] = src[s0 + i];
decoded[di + 1] = src[s1 + i];
decoded[di + 2] = src[s2 + i];
decoded[di + 3] = src[s3 + i];
}
} else if (elementSizeInBytes == 8) {
int s0 = srcBase,
s1 = srcBase + valuesCount,
s2 = srcBase + 2 * valuesCount,
s3 = srcBase + 3 * valuesCount,
s4 = srcBase + 4 * valuesCount,
s5 = srcBase + 5 * valuesCount,
s6 = srcBase + 6 * valuesCount,
s7 = srcBase + 7 * valuesCount;
for (int i = 0; i < valuesCount; ++i) {
int di = i * 8;
decoded[di] = src[s0 + i];
decoded[di + 1] = src[s1 + i];
decoded[di + 2] = src[s2 + i];
decoded[di + 3] = src[s3 + i];
decoded[di + 4] = src[s4 + i];
decoded[di + 5] = src[s5 + i];
decoded[di + 6] = src[s6 + i];
decoded[di + 7] = src[s7 + i];
}
} else {
// Generic fallback for arbitrary element sizes (e.g. FIXED_LEN_BYTE_ARRAY)
for (int stream = 0; stream < elementSizeInBytes; ++stream) {
int srcOffset = srcBase + stream * valuesCount;
for (int i = 0; i < valuesCount; ++i) {
decoded[i * elementSizeInBytes + stream] = src[srcOffset + i];
}
}
}
assert destByteIndex == decoded.length;
return decoded;
}

Expand Down
Loading