diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java index c8ab3043bd..d40bbc90de 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java @@ -49,17 +49,87 @@ protected int nextElementByteOffset() { return offset; } - // Decode an entire data page + /** + * Advances the stream position by {@code count} elements and returns the byte offset + * of the first element. Used by batch read methods in subclasses. + */ + protected int advanceByteOffset(int count) { + if (indexInStream + count > valuesCount) { + throw new ParquetDecodingException("Byte-stream data was already exhausted."); + } + int offset = indexInStream * elementSizeInBytes; + indexInStream += count; + return offset; + } + + // Decode an entire data page by transposing from stream-split layout to interleaved layout. + // The encoded data has elementSizeInBytes separate streams of valuesCount bytes each. + // The decoded data has valuesCount elements of elementSizeInBytes bytes each. private byte[] decodeData(ByteBuffer encoded, int valuesCount) { - assert encoded.limit() == valuesCount * elementSizeInBytes; - byte[] decoded = new byte[encoded.limit()]; - int destByteIndex = 0; - for (int srcValueIndex = 0; srcValueIndex < valuesCount; ++srcValueIndex) { - for (int stream = 0; stream < elementSizeInBytes; ++stream, ++destByteIndex) { - decoded[destByteIndex] = encoded.get(srcValueIndex + stream * valuesCount); + int totalBytes = valuesCount * elementSizeInBytes; + assert encoded.remaining() >= totalBytes; + + // Bulk access: use the backing array directly if available, otherwise copy once. + // This eliminates per-byte ByteBuffer.get(index) bounds checking in the hot loop. + byte[] src; + int srcBase; + if (encoded.hasArray()) { + src = encoded.array(); + srcBase = encoded.arrayOffset() + encoded.position(); + } else { + src = new byte[totalBytes]; + encoded.get(src); + srcBase = 0; + } + + byte[] decoded = new byte[totalBytes]; + + // Specialized single-pass loops for common element sizes. + // Single-pass writes each output location exactly once (sequentially), which is + // cache-friendlier than multi-pass for large pages where the output array exceeds L1. + // The reads come from elementSizeInBytes concurrent sequential streams, which modern + // hardware prefetchers handle well (typically 8-16 tracked streams per core). + if (elementSizeInBytes == 4) { + int s0 = srcBase, + s1 = srcBase + valuesCount, + s2 = srcBase + 2 * valuesCount, + s3 = srcBase + 3 * valuesCount; + for (int i = 0; i < valuesCount; ++i) { + int di = i * 4; + decoded[di] = src[s0 + i]; + decoded[di + 1] = src[s1 + i]; + decoded[di + 2] = src[s2 + i]; + decoded[di + 3] = src[s3 + i]; + } + } else if (elementSizeInBytes == 8) { + int s0 = srcBase, + s1 = srcBase + valuesCount, + s2 = srcBase + 2 * valuesCount, + s3 = srcBase + 3 * valuesCount, + s4 = srcBase + 4 * valuesCount, + s5 = srcBase + 5 * valuesCount, + s6 = srcBase + 6 * valuesCount, + s7 = srcBase + 7 * valuesCount; + for (int i = 0; i < valuesCount; ++i) { + int di = i * 8; + decoded[di] = src[s0 + i]; + decoded[di + 1] = src[s1 + i]; + decoded[di + 2] = src[s2 + i]; + decoded[di + 3] = src[s3 + i]; + decoded[di + 4] = src[s4 + i]; + decoded[di + 5] = src[s5 + i]; + decoded[di + 6] = src[s6 + i]; + decoded[di + 7] = src[s7 + i]; + } + } else { + // Generic fallback for arbitrary element sizes (e.g. FIXED_LEN_BYTE_ARRAY) + for (int stream = 0; stream < elementSizeInBytes; ++stream) { + int srcOffset = srcBase + stream * valuesCount; + for (int i = 0; i < valuesCount; ++i) { + decoded[i * elementSizeInBytes + stream] = src[srcOffset + i]; + } } } - assert destByteIndex == decoded.length; return decoded; }