diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDecodingBenchmark.java new file mode 100644 index 0000000000..6db487677d --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDecodingBenchmark.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Decoding-level micro-benchmarks for the RLE/bit-packing hybrid encoding used + * for {@code BOOLEAN} values in Parquet data pages V2. + * Encoding benchmarks live in {@link RleEncodingBenchmark}. + * + *
The {@code dataPattern} parameter exercises RLE's best cases (ALL_TRUE, + * ALL_FALSE), worst case (ALTERNATING), and realistic distributions (RANDOM, + * MOSTLY_TRUE_99, MOSTLY_FALSE_99). + * + *
Each invocation decodes {@value #VALUE_COUNT} values; throughput is + * reported per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class RleDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + @Param({"ALL_TRUE", "ALL_FALSE", "ALTERNATING", "RANDOM", "MOSTLY_TRUE_99", "MOSTLY_FALSE_99"}) + public String dataPattern; + + /** RLE-encoded bytes with 4-byte LE length prefix (ValuesReader format). */ + private byte[] encodedWithLengthPrefix; + + @Setup(Level.Trial) + public void setup() throws IOException { + boolean[] data = RleEncodingBenchmark.generateData(dataPattern); + + // Encode using the scalar ValuesWriter path + RunLengthBitPackingHybridValuesWriter w = + new RunLengthBitPackingHybridValuesWriter(1, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (boolean v : data) { + w.writeBoolean(v); + } + encodedWithLengthPrefix = w.getBytes().toByteArray(); + w.close(); + } + + // ---- Scalar decode via ValuesReader ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeBoolean(Blackhole bh) throws IOException { + RunLengthBitPackingHybridValuesReader r = new RunLengthBitPackingHybridValuesReader(1); + r.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(encodedWithLengthPrefix))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readBoolean()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java new file mode 100644 index 0000000000..15b7fff5a3 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Encoding and decoding micro-benchmarks for synthetic dictionary-id pages using + * {@link RunLengthBitPackingHybridEncoder} and {@link RunLengthBitPackingHybridDecoder}. + * This isolates the RLE/bit-packing hybrid codec paths and is intentionally + * separate from full INT32/INT64 value encode/decode path benchmarks. + * + *
The encode benchmark measures the RLE encoder's {@code pack32Values} fast path + * and bit-packing throughput. The decode benchmark measures the corresponding + * {@code unpack32Values} fast path and RLE run expansion. + * + *
The {@code bitWidth} parameter exercises different packing densities (1-bit to + * 16-bit), and the {@code indexPattern} parameter exercises pure RLE (CONSTANT), + * mixed (LOW_CARDINALITY), and pure bit-packing (SEQUENTIAL, RANDOM) paths. + * + *
Per-invocation overhead (encoder/decoder construction and {@link ByteBufferInputStream} + * wrapping) is amortized over {@value #VALUE_COUNT} reads via + * {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class RleDictionaryIndexDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 1024 * 1024; + + @Param({"1", "4", "8", "10", "16"}) + public int bitWidth; + + @Param({"SEQUENTIAL", "RANDOM", "LOW_CARDINALITY", "CONSTANT"}) + public String indexPattern; + + /** Raw RLE-encoded bytes (no length prefix). */ + private byte[] encoded; + + private int[] ids; + + /** RLE-encoded bytes with 4-byte LE length prefix (ValuesReader format). */ + private byte[] encodedWithLengthPrefix; + + @Setup(Level.Trial) + public void setup() throws IOException { + int maxId = 1 << bitWidth; + ids = generateDictionaryIds(maxId); + try (RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder( + bitWidth, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator())) { + for (int id : ids) { + encoder.writeInt(id); + } + encoded = encoder.toBytes().toByteArray(); + } + + // Prepend 4-byte LE length for ValuesReader.initFromPage() format + encodedWithLengthPrefix = new byte[4 + encoded.length]; + ByteBuffer.wrap(encodedWithLengthPrefix).order(ByteOrder.LITTLE_ENDIAN).putInt(encoded.length); + System.arraycopy(encoded, 0, encodedWithLengthPrefix, 4, encoded.length); + } + + private int[] generateDictionaryIds(int maxId) { + switch (indexPattern) { + case "SEQUENTIAL": + int[] sequential = new int[VALUE_COUNT]; + for (int i = 0; i < VALUE_COUNT; i++) { + sequential[i] = i % maxId; + } + return sequential; + case "RANDOM": + return TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, maxId, TestDataFactory.DEFAULT_SEED); + case "LOW_CARDINALITY": + int distinct = Math.min(TestDataFactory.LOW_CARDINALITY_DISTINCT, maxId); + return TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, distinct, TestDataFactory.DEFAULT_SEED); + case "CONSTANT": + int[] constant = new int[VALUE_COUNT]; + java.util.Arrays.fill(constant, 0); + return constant; + default: + throw new IllegalArgumentException("Unknown index pattern: " + indexPattern); + } + } + + // ---- Scalar encode via encoder ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDictionaryIds() throws IOException { + try (RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder( + bitWidth, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator())) { + for (int id : ids) { + encoder.writeInt(id); + } + return encoder.toBytes().toByteArray(); + } + } + + // ---- Scalar decode via decoder ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDictionaryIds(Blackhole bh) { + RunLengthBitPackingHybridDecoder decoder = + new RunLengthBitPackingHybridDecoder(bitWidth, ByteBuffer.wrap(encoded)); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(decoder.readInt()); + } + } + + // ---- Scalar decode via ValuesReader wrapper ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeValuesReader(Blackhole bh) throws IOException { + RunLengthBitPackingHybridValuesReader reader = new RunLengthBitPackingHybridValuesReader(bitWidth); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(encodedWithLengthPrefix))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readInteger()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleEncodingBenchmark.java new file mode 100644 index 0000000000..8cb506a301 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleEncodingBenchmark.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Encoding-level micro-benchmarks for the RLE/bit-packing hybrid encoding used + * for {@code BOOLEAN} values in Parquet data pages V2. + * Decoding benchmarks live in {@link RleDecodingBenchmark}. + * + *
The {@code dataPattern} parameter exercises RLE's best cases (ALL_TRUE, + * ALL_FALSE), worst case (ALTERNATING), and realistic distributions (RANDOM, + * MOSTLY_TRUE_99, MOSTLY_FALSE_99). + * + *
Each invocation encodes {@value #VALUE_COUNT} values; throughput is + * reported per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class RleEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + @Param({"ALL_TRUE", "ALL_FALSE", "ALTERNATING", "RANDOM", "MOSTLY_TRUE_99", "MOSTLY_FALSE_99"}) + public String dataPattern; + + private boolean[] data; + + @Setup(Level.Trial) + public void setup() { + data = generateData(dataPattern); + } + + static boolean[] generateData(String pattern) { + boolean[] d = new boolean[VALUE_COUNT]; + Random rng = new Random(42); + switch (pattern) { + case "ALL_TRUE": + for (int i = 0; i < VALUE_COUNT; i++) d[i] = true; + break; + case "ALL_FALSE": + // already false + break; + case "ALTERNATING": + for (int i = 0; i < VALUE_COUNT; i++) d[i] = (i & 1) == 0; + break; + case "RANDOM": + for (int i = 0; i < VALUE_COUNT; i++) d[i] = rng.nextBoolean(); + break; + case "MOSTLY_TRUE_99": + for (int i = 0; i < VALUE_COUNT; i++) d[i] = rng.nextInt(100) != 0; + break; + case "MOSTLY_FALSE_99": + for (int i = 0; i < VALUE_COUNT; i++) d[i] = rng.nextInt(100) == 0; + break; + default: + throw new IllegalArgumentException("Unknown pattern: " + pattern); + } + return d; + } + + // ---- Scalar encode via ValuesWriter ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeBoolean() throws IOException { + RunLengthBitPackingHybridValuesWriter w = + new RunLengthBitPackingHybridValuesWriter(1, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (boolean v : data) { + w.writeBoolean(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java new file mode 100644 index 0000000000..3453aba52f --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Random; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; + +/** + * Utility class for generating test schemas and data for benchmarks. + */ +public final class TestDataFactory { + + /** Default number of rows for file-level benchmarks. */ + public static final int DEFAULT_ROW_COUNT = 100_000; + + /** Number of distinct values for low-cardinality data patterns. */ + public static final int LOW_CARDINALITY_DISTINCT = 100; + + /** Default RNG seed used across benchmarks for deterministic data. */ + public static final long DEFAULT_SEED = 42L; + + /** Byte length of the fixed-length byte array field in the benchmark schema. */ + public static final int FLBA_LENGTH = 12; + + /** A standard multi-type schema used by file-level benchmarks. */ + public static final MessageType FILE_BENCHMARK_SCHEMA = Types.buildMessage() + .required(INT32) + .named("int32_field") + .required(INT64) + .named("int64_field") + .required(FLOAT) + .named("float_field") + .required(DOUBLE) + .named("double_field") + .required(BOOLEAN) + .named("boolean_field") + .required(BINARY) + .named("binary_field") + .required(FIXED_LEN_BYTE_ARRAY) + .length(FLBA_LENGTH) + .named("flba_field") + .named("benchmark_record"); + + private TestDataFactory() {} + + /** + * Creates a {@link SimpleGroupFactory} for the standard benchmark schema. + */ + public static SimpleGroupFactory newGroupFactory() { + return new SimpleGroupFactory(FILE_BENCHMARK_SCHEMA); + } + + /** + * Generates a single row of benchmark data. + * + * @param factory the group factory + * @param index the row index (used for deterministic data) + * @param random the random source + * @return a populated Group + */ + public static Group generateRow(SimpleGroupFactory factory, int index, Random random) { + byte[] flbaBytes = new byte[FLBA_LENGTH]; + random.nextBytes(flbaBytes); + return factory.newGroup() + .append("int32_field", index) + .append("int64_field", (long) index * 100) + .append("float_field", random.nextFloat()) + .append("double_field", random.nextDouble()) + .append("boolean_field", index % 2 == 0) + .append("binary_field", "value_" + (index % 1000)) + .append("flba_field", Binary.fromConstantByteArray(flbaBytes)); + } + + /** + * Generates a deterministic set of rows for file-level benchmarks. + */ + public static Group[] generateRows(SimpleGroupFactory factory, int rowCount, long seed) { + Group[] rows = new Group[rowCount]; + Random random = new Random(seed); + for (int i = 0; i < rowCount; i++) { + rows[i] = generateRow(factory, i, random); + } + return rows; + } + + // ---- Integer data generation for encoding benchmarks ---- + + /** + * Generates sequential integers: 0, 1, 2, ... + */ + public static int[] generateSequentialInts(int count) { + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = i; + } + return data; + } + + /** + * Generates uniformly random integers using the given seed. + */ + public static int[] generateRandomInts(int count, long seed) { + return generateRandomInts(count, new Random(seed)); + } + + /** + * Generates uniformly random integers. + * + *
Note: prefer {@link #generateRandomInts(int, long)} when call ordering between
+ * generators in the same setup must not influence the produced data.
+ */
+ public static int[] generateRandomInts(int count, Random random) {
+ int[] data = new int[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = random.nextInt();
+ }
+ return data;
+ }
+
+ /**
+ * Generates low-cardinality integers (values drawn from a small set) using the given seed.
+ */
+ public static int[] generateLowCardinalityInts(int count, int distinctValues, long seed) {
+ return generateLowCardinalityInts(count, distinctValues, new Random(seed));
+ }
+
+ /**
+ * Generates low-cardinality integers (values drawn from a small set).
+ */
+ public static int[] generateLowCardinalityInts(int count, int distinctValues, Random random) {
+ int[] data = new int[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = random.nextInt(distinctValues);
+ }
+ return data;
+ }
+
+ /**
+ * Generates high-cardinality integers (all unique in randomized order) using the given seed.
+ */
+ public static int[] generateHighCardinalityInts(int count, long seed) {
+ return generateHighCardinalityInts(count, new Random(seed));
+ }
+
+ /**
+ * Generates high-cardinality integers (all unique in randomized order).
+ */
+ public static int[] generateHighCardinalityInts(int count, Random random) {
+ int[] data = generateSequentialInts(count);
+ for (int i = count - 1; i > 0; i--) {
+ int swapIndex = random.nextInt(i + 1);
+ int tmp = data[i];
+ data[i] = data[swapIndex];
+ data[swapIndex] = tmp;
+ }
+ return data;
+ }
+
+ // ---- Long data generation for encoding benchmarks ----
+
+ /**
+ * Generates sequential longs: 0, 1, 2, ...
+ */
+ public static long[] generateSequentialLongs(int count) {
+ long[] data = new long[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = i;
+ }
+ return data;
+ }
+
+ /**
+ * Generates uniformly random longs using the given seed.
+ */
+ public static long[] generateRandomLongs(int count, long seed) {
+ Random random = new Random(seed);
+ long[] data = new long[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = random.nextLong();
+ }
+ return data;
+ }
+
+ /**
+ * Generates low-cardinality longs (values drawn from a small set).
+ */
+ public static long[] generateLowCardinalityLongs(int count, int distinctValues, long seed) {
+ Random random = new Random(seed);
+ long[] palette = new long[distinctValues];
+ for (int i = 0; i < distinctValues; i++) {
+ palette[i] = random.nextLong();
+ }
+ long[] data = new long[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = palette[random.nextInt(distinctValues)];
+ }
+ return data;
+ }
+
+ /**
+ * Generates high-cardinality longs (all unique, shuffled).
+ */
+ public static long[] generateHighCardinalityLongs(int count, long seed) {
+ Random random = new Random(seed);
+ long[] data = generateSequentialLongs(count);
+ for (int i = count - 1; i > 0; i--) {
+ int swapIndex = random.nextInt(i + 1);
+ long tmp = data[i];
+ data[i] = data[swapIndex];
+ data[swapIndex] = tmp;
+ }
+ return data;
+ }
+
+ // ---- Float data generation for encoding benchmarks ----
+
+ /**
+ * Generates uniformly random floats using the given seed.
+ */
+ public static float[] generateRandomFloats(int count, long seed) {
+ Random random = new Random(seed);
+ float[] data = new float[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = random.nextFloat() * 1000.0f;
+ }
+ return data;
+ }
+
+ /**
+ * Generates low-cardinality floats (values drawn from a small set).
+ */
+ public static float[] generateLowCardinalityFloats(int count, int distinctValues, long seed) {
+ Random random = new Random(seed);
+ float[] palette = new float[distinctValues];
+ for (int i = 0; i < distinctValues; i++) {
+ palette[i] = random.nextFloat() * 1000.0f;
+ }
+ float[] data = new float[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = palette[random.nextInt(distinctValues)];
+ }
+ return data;
+ }
+
+ // ---- Double data generation for encoding benchmarks ----
+
+ /**
+ * Generates uniformly random doubles using the given seed.
+ */
+ public static double[] generateRandomDoubles(int count, long seed) {
+ Random random = new Random(seed);
+ double[] data = new double[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = random.nextDouble() * 1000.0;
+ }
+ return data;
+ }
+
+ /**
+ * Generates low-cardinality doubles (values drawn from a small set).
+ */
+ public static double[] generateLowCardinalityDoubles(int count, int distinctValues, long seed) {
+ Random random = new Random(seed);
+ double[] palette = new double[distinctValues];
+ for (int i = 0; i < distinctValues; i++) {
+ palette[i] = random.nextDouble() * 1000.0;
+ }
+ double[] data = new double[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = palette[random.nextInt(distinctValues)];
+ }
+ return data;
+ }
+
+ // ---- Fixed-length byte array data generation for encoding benchmarks ----
+
+ /**
+ * Generates fixed-length byte arrays with the specified cardinality.
+ *
+ * @param count number of values
+ * @param length byte length of each value
+ * @param distinct number of distinct values (0 means all unique)
+ * @param seed RNG seed
+ */
+ public static Binary[] generateFixedLenByteArrays(int count, int length, int distinct, long seed) {
+ Random random = new Random(seed);
+ if (distinct > 0) {
+ Binary[] palette = new Binary[distinct];
+ for (int i = 0; i < distinct; i++) {
+ byte[] bytes = new byte[length];
+ random.nextBytes(bytes);
+ palette[i] = Binary.fromConstantByteArray(bytes);
+ }
+ Binary[] data = new Binary[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = palette[random.nextInt(distinct)];
+ }
+ return data;
+ } else {
+ Binary[] data = new Binary[count];
+ for (int i = 0; i < count; i++) {
+ byte[] bytes = new byte[length];
+ random.nextBytes(bytes);
+ data[i] = Binary.fromConstantByteArray(bytes);
+ }
+ return data;
+ }
+ }
+
+ // ---- Binary data generation for encoding benchmarks ----
+
+ /**
+ * Generates binary strings of the given length with the specified cardinality, using
+ * a deterministic seed.
+ */
+ public static Binary[] generateBinaryData(int count, int stringLength, int distinct, long seed) {
+ return generateBinaryData(count, stringLength, distinct, new Random(seed));
+ }
+
+ /**
+ * Generates binary strings of the given length with the specified cardinality.
+ *
+ * @param count number of values
+ * @param stringLength length of each string
+ * @param distinct number of distinct values (0 means all unique)
+ * @param random random source
+ * @return array of Binary values
+ */
+ public static Binary[] generateBinaryData(int count, int stringLength, int distinct, Random random) {
+ Binary[] data = new Binary[count];
+ if (distinct > 0) {
+ // Pre-generate the distinct values
+ Binary[] dictionary = new Binary[distinct];
+ for (int i = 0; i < distinct; i++) {
+ dictionary[i] = Binary.fromConstantByteArray(
+ randomString(stringLength, random).getBytes(StandardCharsets.UTF_8));
+ }
+ for (int i = 0; i < count; i++) {
+ data[i] = dictionary[random.nextInt(distinct)];
+ }
+ } else {
+ // All unique
+ for (int i = 0; i < count; i++) {
+ data[i] = Binary.fromConstantByteArray(
+ randomString(stringLength, random).getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ return data;
+ }
+
+ // ---- Sorted data generation for delta encoding benchmarks ----
+
+ /**
+ * Generates all-unique binary strings of the given length, sorted in natural
+ * ({@link Binary#compareTo}) order. Useful for benchmarking DELTA_BYTE_ARRAY
+ * encoding, which benefits from prefix sharing between consecutive values.
+ */
+ public static Binary[] generateSortedBinaryData(int count, int stringLength, long seed) {
+ Binary[] data = generateBinaryData(count, stringLength, 0, seed);
+ Arrays.sort(data);
+ return data;
+ }
+
+ /**
+ * Generates all-unique fixed-length byte arrays, sorted in natural
+ * ({@link Binary#compareTo}) order. Useful for benchmarking DELTA_BYTE_ARRAY
+ * encoding with FIXED_LEN_BYTE_ARRAY values.
+ */
+ public static Binary[] generateSortedFixedLenByteArrays(int count, int fixedLength, long seed) {
+ Binary[] data = generateFixedLenByteArrays(count, fixedLength, 0, seed);
+ Arrays.sort(data);
+ return data;
+ }
+
+ // ---- Variable-length data generation for delta encoding benchmarks ----
+
+ /**
+ * Generates all-unique binary strings with lengths uniformly distributed in
+ * {@code [1, maxLength]}. Useful for benchmarking DELTA_LENGTH_BYTE_ARRAY
+ * encoding, where non-zero length deltas exercise the DELTA_BINARY_PACKED
+ * sub-encoding of lengths (unlike uniform-length data where deltas are all zero).
+ *
+ * @param count number of values
+ * @param maxLength maximum string length (inclusive)
+ * @param seed RNG seed
+ */
+ public static Binary[] generateVariableLengthBinaryData(int count, int maxLength, long seed) {
+ Random random = new Random(seed);
+ Binary[] data = new Binary[count];
+ for (int i = 0; i < count; i++) {
+ int length = 1 + random.nextInt(maxLength);
+ data[i] = Binary.fromConstantByteArray(randomString(length, random).getBytes(StandardCharsets.UTF_8));
+ }
+ return data;
+ }
+
+ private static String randomString(int length, Random random) {
+ StringBuilder sb = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ return sb.toString();
+ }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
index 2b3e47116c..ef9a386314 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
@@ -773,7 +773,7 @@ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
return new NullIntIterator();
}
return new RLEIntIterator(new RunLengthBitPackingHybridDecoder(
- BytesUtils.getWidthFromMaxInt(maxLevel), bytes.toInputStream()));
+ BytesUtils.getWidthFromMaxInt(maxLevel), bytes.toByteBuffer()));
} catch (IOException e) {
throw new ParquetDecodingException("could not read levels in page for col " + path, e);
}
@@ -832,11 +832,7 @@ public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
@Override
int nextInt() {
- try {
- return delegate.readInt();
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
+ return delegate.readInt();
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
index 53fafc55dc..77e3784392 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
@@ -19,6 +19,7 @@
package org.apache.parquet.column.values.dictionary;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.Dictionary;
@@ -52,12 +53,13 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO
LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in);
LOG.debug("bit width {}", bitWidth);
- decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
+ ByteBuffer buf = in.slice(in.available());
+ decoder = new RunLengthBitPackingHybridDecoder(bitWidth, buf);
} else {
- decoder = new RunLengthBitPackingHybridDecoder(1, in) {
+ decoder = new RunLengthBitPackingHybridDecoder(1, ByteBuffer.allocate(0)) {
@Override
- public int readInt() throws IOException {
- throw new IOException("Attempt to read from empty page");
+ public int readInt() {
+ throw new ParquetDecodingException("Attempt to read from empty page");
}
};
}
@@ -65,64 +67,36 @@ public int readInt() throws IOException {
@Override
public int readValueDictionaryId() {
- try {
- return decoder.readInt();
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
+ return decoder.readInt();
}
@Override
public Binary readBytes() {
- try {
- return dictionary.decodeToBinary(decoder.readInt());
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
+ return dictionary.decodeToBinary(decoder.readInt());
}
@Override
public float readFloat() {
- try {
- return dictionary.decodeToFloat(decoder.readInt());
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
+ return dictionary.decodeToFloat(decoder.readInt());
}
@Override
public double readDouble() {
- try {
- return dictionary.decodeToDouble(decoder.readInt());
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
+ return dictionary.decodeToDouble(decoder.readInt());
}
@Override
public int readInteger() {
- try {
- return dictionary.decodeToInt(decoder.readInt());
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
+ return dictionary.decodeToInt(decoder.readInt());
}
@Override
public long readLong() {
- try {
- return dictionary.decodeToLong(decoder.readInt());
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
+ return dictionary.decodeToLong(decoder.readInt());
}
@Override
public void skip() {
- try {
- decoder.readInt(); // Type does not matter as we are just skipping dictionary keys
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
+ decoder.readInt(); // Type does not matter as we are just skipping dictionary keys
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
index e55b276b29..61ad542de1 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
@@ -18,9 +18,8 @@
*/
package org.apache.parquet.column.values.rle;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.bitpacking.BytePacker;
@@ -42,23 +41,35 @@ private static enum MODE {
private final int bitWidth;
private final BytePacker packer;
- private final InputStream in;
+ private final ByteBuffer buffer;
private MODE mode;
private int currentCount;
private int currentValue;
private int[] currentBuffer;
+ private int currentBufferLength;
- public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) {
+ // Reusable buffer to avoid per-run allocation in PACKED mode
+ private int[] packedValuesBuffer = new int[0];
+ // Reusable padded buffer for end-of-data edge case where fewer bytes remain than required
+ private ByteBuffer packedPaddedBuffer;
+
+ public RunLengthBitPackingHybridDecoder(int bitWidth, ByteBuffer buffer) {
LOG.debug("decoding bitWidth {}", bitWidth);
Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
this.bitWidth = bitWidth;
this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
- this.in = in;
+ this.buffer = buffer.order(ByteOrder.LITTLE_ENDIAN);
}
- public int readInt() throws IOException {
+ /**
+ * Reads the next int value from the RLE/Bit-Packing hybrid stream.
+ *
+ * @return the next decoded integer value
+ * @throws ParquetDecodingException if a decoding error occurs
+ */
+ public int readInt() {
if (currentCount == 0) {
readNext();
}
@@ -69,7 +80,7 @@ public int readInt() throws IOException {
result = currentValue;
break;
case PACKED:
- result = currentBuffer[currentBuffer.length - 1 - currentCount];
+ result = currentBuffer[currentBufferLength - 1 - currentCount];
break;
default:
throw new ParquetDecodingException("not a valid mode " + mode);
@@ -77,30 +88,69 @@ public int readInt() throws IOException {
return result;
}
- private void readNext() throws IOException {
- Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream.");
- final int header = BytesUtils.readUnsignedVarInt(in);
+ private void readNext() {
+ Preconditions.checkArgument(buffer.hasRemaining(), "Reading past RLE/BitPacking stream.");
+ final int header = BytesUtils.readUnsignedVarInt(buffer);
mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
switch (mode) {
case RLE:
currentCount = header >>> 1;
LOG.debug("reading {} values RLE", currentCount);
- currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(in, bitWidth);
+ currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(buffer, bitWidth);
break;
case PACKED:
int numGroups = header >>> 1;
currentCount = numGroups * 8;
+ currentBufferLength = currentCount;
LOG.debug("reading {} values BIT PACKED", currentCount);
- currentBuffer = new int[currentCount]; // TODO: reuse a buffer
- byte[] bytes = new byte[numGroups * bitWidth];
+ if (packedValuesBuffer.length < currentCount) {
+ packedValuesBuffer = new int[currentCount];
+ }
+ currentBuffer = packedValuesBuffer;
+ int bytesRequired = numGroups * bitWidth;
// At the end of the file RLE data though, there might not be that many bytes left.
- int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0);
- bytesToRead = Math.min(bytesToRead, in.available());
- new DataInputStream(in).readFully(bytes, 0, bytesToRead);
- for (int valueIndex = 0, byteIndex = 0;
- valueIndex < currentCount;
- valueIndex += 8, byteIndex += bitWidth) {
- packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex);
+ int bytesToRead = Math.min(bytesRequired, buffer.remaining());
+
+ // Determine the source ByteBuffer and start offset for unpacking
+ ByteBuffer packedSource;
+ int packedBytesStart;
+
+ if (bytesToRead >= bytesRequired) {
+ // Common case: unpack directly from the main ByteBuffer (zero-copy)
+ packedSource = buffer;
+ packedBytesStart = buffer.position();
+ } else {
+ // End-of-data edge case: pad remaining bytes with zeros for safe unpacking
+ if (packedPaddedBuffer == null || packedPaddedBuffer.capacity() < bytesRequired) {
+ packedPaddedBuffer = ByteBuffer.allocate(bytesRequired).order(ByteOrder.LITTLE_ENDIAN);
+ }
+ packedPaddedBuffer.clear();
+ java.util.Arrays.fill(packedPaddedBuffer.array(), 0, bytesRequired, (byte) 0);
+ for (int i = 0; i < bytesToRead; i++) {
+ packedPaddedBuffer.put(i, buffer.get(buffer.position() + i));
+ }
+ buffer.position(buffer.position() + bytesToRead);
+ packedSource = packedPaddedBuffer;
+ packedBytesStart = 0;
+ }
+
+ // Unpack 32 values (4 groups) at a time when possible — symmetric to the encoder's
+ // pack32Values fast path. Falls back to unpack8Values for any residual groups.
+ int groupIdx = 0;
+ int bufPos = packedBytesStart;
+ final int step32 = bitWidth * 4;
+ while (groupIdx + 4 <= numGroups) {
+ packer.unpack32Values(packedSource, bufPos, currentBuffer, groupIdx * 8);
+ groupIdx += 4;
+ bufPos += step32;
+ }
+ for (; groupIdx < numGroups; groupIdx++, bufPos += bitWidth) {
+ packer.unpack8Values(packedSource, bufPos, currentBuffer, groupIdx * 8);
+ }
+ // Advance the main buffer position past the consumed packed bytes (common case only;
+ // edge case already advanced above)
+ if (packedSource == buffer) {
+ buffer.position(packedBytesStart + bytesToRead);
}
break;
default:
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
index e33824bff1..3fddce06c1 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
@@ -74,6 +74,11 @@ public class RunLengthBitPackingHybridEncoder implements AutoCloseable {
*/
private final byte[] packBuffer;
+ /**
+ * Buffer four 8-value groups so we can use the packer's 32-value fast path.
+ */
+ private final int[] bitPackedValuesBuffer;
+
/**
* Previous value written, used to detect repeated values
*/
@@ -98,6 +103,8 @@ public class RunLengthBitPackingHybridEncoder implements AutoCloseable {
*/
private int bitPackedGroupCount;
+ private int numBitPackedValues;
+
/**
* A "pointer" to a single byte in baos,
* which we use as our bit-packed-header. It's really
@@ -125,7 +132,8 @@ public RunLengthBitPackingHybridEncoder(
this.bitWidth = bitWidth;
this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator);
- this.packBuffer = new byte[bitWidth];
+ this.packBuffer = new byte[bitWidth * 4];
+ this.bitPackedValuesBuffer = new int[32];
this.bufferedValues = new int[8];
this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
reset(false);
@@ -139,6 +147,7 @@ private void reset(boolean resetBaos) {
this.numBufferedValues = 0;
this.repeatCount = 0;
this.bitPackedGroupCount = 0;
+ this.numBitPackedValues = 0;
this.bitPackedRunHeaderPointer = -1;
this.toBytesCalled = false;
}
@@ -196,8 +205,9 @@ private void writeOrAppendBitPackedRun() throws IOException {
bitPackedRunHeaderPointer = baos.getCurrentIndex();
}
- packer.pack8Values(bufferedValues, 0, packBuffer, 0);
- baos.write(packBuffer);
+ System.arraycopy(bufferedValues, 0, bitPackedValuesBuffer, numBitPackedValues, 8);
+ numBitPackedValues += 8;
+ flushBitPackedValuesIfFull();
// empty the buffer, they've all been written
numBufferedValues = 0;
@@ -209,6 +219,34 @@ private void writeOrAppendBitPackedRun() throws IOException {
++bitPackedGroupCount;
}
+ private void flushBitPackedValuesIfFull() {
+ if (numBitPackedValues == bitPackedValuesBuffer.length) {
+ packer.pack32Values(bitPackedValuesBuffer, 0, packBuffer, 0);
+ baos.write(packBuffer, 0, bitWidth * 4);
+ numBitPackedValues = 0;
+ }
+ }
+
+ private void flushBitPackedValues() {
+ if (numBitPackedValues == 0) {
+ return;
+ }
+
+ // Full 32-value block — use fast path
+ flushBitPackedValuesIfFull();
+
+ // Handle any residual values (< 32)
+ if (numBitPackedValues > 0) {
+ int outPos = 0;
+ for (int inPos = 0; inPos < numBitPackedValues; inPos += 8) {
+ packer.pack8Values(bitPackedValuesBuffer, inPos, packBuffer, outPos);
+ outPos += bitWidth;
+ }
+ baos.write(packBuffer, 0, outPos);
+ numBitPackedValues = 0;
+ }
+ }
+
/**
* If we are currently writing a bit-packed-run, update the
* bit-packed-header and consider this run to be over
@@ -221,6 +259,8 @@ private void endPreviousBitPackedRun() {
return;
}
+ flushBitPackedValues();
+
// create bit-packed-header, which needs to fit in 1 byte
byte bitPackHeader = (byte) ((bitPackedGroupCount << 1) | 1);
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
index 0bd5a18d2b..8050662f14 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
@@ -19,10 +19,10 @@
package org.apache.parquet.column.values.rle;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.io.ParquetDecodingException;
/**
* This ValuesReader does all the reading in {@link #initFromPage}
@@ -39,7 +39,8 @@ public RunLengthBitPackingHybridValuesReader(int bitWidth) {
@Override
public void initFromPage(int valueCountL, ByteBufferInputStream stream) throws IOException {
int length = BytesUtils.readIntLittleEndian(stream);
- this.decoder = new RunLengthBitPackingHybridDecoder(bitWidth, stream.sliceStream(length));
+ ByteBuffer buf = stream.slice(length);
+ this.decoder = new RunLengthBitPackingHybridDecoder(bitWidth, buf);
// 4 is for the length which is stored as 4 bytes little endian
updateNextOffset(length + 4);
@@ -47,11 +48,7 @@ public void initFromPage(int valueCountL, ByteBufferInputStream stream) throws I
@Override
public int readInteger() {
- try {
- return decoder.readInt();
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
+ return decoder.readInt();
}
@Override
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
index 01a4c96e85..b06672cee9 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
@@ -21,7 +21,6 @@
import static org.junit.Assert.assertEquals;
import java.nio.ByteBuffer;
-import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.junit.Test;
@@ -57,9 +56,8 @@ private void doIntegrationTest(int bitWidth) throws Exception {
encoder.writeInt((int) (17 % modValue));
}
ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer();
- ByteBufferInputStream in = ByteBufferInputStream.wrap(encodedBytes);
- RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
+ RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, encodedBytes);
for (int i = 0; i < 100; i++) {
assertEquals(i % modValue, decoder.readInt());
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
index 93a6c8deb4..86d5650857 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
@@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.parquet.bytes.BytesUtils;
@@ -290,12 +291,109 @@ public void testGroupBoundary() throws Exception {
// bit width 2.
bytes[0] = (1 << 1) | 1;
bytes[1] = (1 << 0) | (2 << 2) | (3 << 4);
- ByteArrayInputStream stream = new ByteArrayInputStream(bytes);
- RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, stream);
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, buffer);
assertEquals(decoder.readInt(), 1);
assertEquals(decoder.readInt(), 2);
assertEquals(decoder.readInt(), 3);
- assertEquals(stream.available(), 0);
+ assertEquals(buffer.remaining(), 0);
+ }
+
+ // ---- Decoder-specific edge case tests ----
+
+ /**
+ * Tests the packedPaddedBuffer edge case: when the last packed run in the
+ * stream has fewer bytes available than required (bytesToRead < bytesRequired).
+ * This exercises the zero-padded ByteBuffer fallback path in readNext().
+ */
+ @Test
+ public void testDecoderPaddedBufferEdgeCase() throws Exception {
+ // Encode 5 distinct values with bitWidth=3 using scalar writeInt.
+ // The encoder will produce a bit-packed run with header indicating 1 group (8 values),
+ // but only 5 values are meaningful. The stream has exactly 3 bytes (1 group * 3 bitWidth).
+ // To create a truncated stream, we manually construct the packed data with fewer bytes
+ // than required for the group count declared in the header.
+ int bitWidth = 3;
+ RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(bitWidth, 100, 64000);
+ // Write 5 values — encoder will pad to 8 (1 group), producing header for 1 group and 3 bytes
+ for (int i = 0; i < 5; i++) encoder.writeInt(i);
+ byte[] fullEncoded = encoder.toBytes().toByteArray();
+
+ // Decode normally and verify correctness
+ RunLengthBitPackingHybridDecoder decoder =
+ new RunLengthBitPackingHybridDecoder(bitWidth, ByteBuffer.wrap(fullEncoded));
+ for (int i = 0; i < 5; i++) {
+ assertEquals("mismatch at index " + i, i, decoder.readInt());
+ }
+
+ // Now construct a stream where we artificially increase the group count header
+ // to declare more groups than bytes available, forcing the padded buffer path.
+ // Header for 2 groups (16 values): (2 << 1) | 1 = 5
+ // But only provide bytes for 1 group (3 bytes), so bytesToRead < bytesRequired.
+ byte[] truncated = new byte[1 + bitWidth]; // 1 byte header + 3 bytes data
+ truncated[0] = 5; // header = 2 groups packed
+ System.arraycopy(fullEncoded, 1, truncated, 1, bitWidth); // copy the 3 data bytes
+
+ RunLengthBitPackingHybridDecoder paddedDecoder =
+ new RunLengthBitPackingHybridDecoder(bitWidth, ByteBuffer.wrap(truncated));
+ // First 5 values should decode correctly from the real bytes
+ for (int i = 0; i < 5; i++) {
+ assertEquals("padded mismatch at index " + i, i, paddedDecoder.readInt());
+ }
+ // Values 5-7 come from the real packed group's padding (encoder pads with 0s)
+ for (int i = 5; i < 8; i++) {
+ assertEquals("padded zero at index " + i, 0, paddedDecoder.readInt());
+ }
+ // Values 8-15 come from the zero-padded buffer (no real data) — all should be 0
+ for (int i = 8; i < 16; i++) {
+ assertEquals("zero-padded value at index " + i, 0, paddedDecoder.readInt());
+ }
+ }
+
+ /**
+ * Tests the unpack32Values fast path by ensuring that a packed run with
+ * exactly 4 or more groups (32+ values) is decoded correctly via scalar readInt.
+ */
+ @Test
+ public void testDecoderUnpack32ValuesFastPath() throws Exception {
+ // bitWidth=3 with 40 distinct values forces 5 packed groups (40 values).
+ // The decoder should use unpack32Values for the first 4 groups, then
+ // unpack8Values for the last group.
+ int bitWidth = 3;
+ int numValues = 40;
+ int[] values = new int[numValues];
+ RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(bitWidth, 100, 64000);
+ for (int i = 0; i < numValues; i++) {
+ values[i] = i % 7; // stay within 3-bit range
+ encoder.writeInt(values[i]);
+ }
+ byte[] encoded = encoder.toBytes().toByteArray();
+
+ RunLengthBitPackingHybridDecoder decoder =
+ new RunLengthBitPackingHybridDecoder(bitWidth, ByteBuffer.wrap(encoded));
+ for (int i = 0; i < numValues; i++) {
+ assertEquals("unpack32 fast path mismatch at " + i, values[i], decoder.readInt());
+ }
+ }
+
+ @Test
+ public void testDecoderUnpack32ValuesExact4Groups() throws Exception {
+ // Exactly 4 groups (32 values) — uses only the unpack32Values path, no residual
+ int bitWidth = 5;
+ int numValues = 32;
+ int[] values = new int[numValues];
+ RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(bitWidth, 100, 64000);
+ for (int i = 0; i < numValues; i++) {
+ values[i] = i % 31;
+ encoder.writeInt(values[i]);
+ }
+ byte[] encoded = encoder.toBytes().toByteArray();
+
+ RunLengthBitPackingHybridDecoder decoder =
+ new RunLengthBitPackingHybridDecoder(bitWidth, ByteBuffer.wrap(encoded));
+ for (int i = 0; i < numValues; i++) {
+ assertEquals("exact 4 groups mismatch at " + i, values[i], decoder.readInt());
+ }
}
private static List