From 1126cfba39132063424a1e4aa7d9b4d115edf5d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sat, 16 May 2026 16:45:48 +0200 Subject: [PATCH] GH-3530: Optimize RLE hybrid encoder/decoder scalar hot-path performance Targeted optimizations to the Run-Length Encoding / Bit-Packing hybrid codec (RunLengthBitPackingHybridDecoder and Encoder), focused on the scalar read/write hot path used by the default Parquet column reader. Decoder optimizations: 1. InputStream to ByteBuffer: Replace InputStream/DataInputStream with direct ByteBuffer (LITTLE_ENDIAN) access, enabling buffer.get(), getShort(), getInt() intrinsics. Add ByteBuffer overloads for BytesUtils.readUnsignedVarInt() and readIntLittleEndianPaddedOnBitWidth(). Remove checked IOException from readInt(), simplifying all call sites (DictionaryValuesReader, ColumnReaderBase, ValuesReader wrapper). 2. Buffer reuse: Allocate the int[] packed values buffer once per decoder and grow lazily, instead of allocating fresh arrays on every PACKED run. Unpack directly from the main ByteBuffer (zero-copy common case); use a reusable zero-padded ByteBuffer for end-of-data edge cases. 3. unpack32Values fast path: Batch 4 groups (32 values) into a single unpack32Values call instead of looping unpack8Values, symmetric to the encoder change. Falls back to unpack8Values for residual groups. Encoder optimizations: 4. pack32Values fast path: Buffer four 8-value groups (32 values) then pack via pack32Values instead of four separate pack8Values calls, reducing per-group overhead and enabling the packer's optimized 32-value code path. flushBitPackedValues()/flushBitPackedValuesIfFull() deduplication to avoid repeating the flush logic. Adapted consumers to ByteBuffer-based decoder: - DictionaryValuesReader: Uses in.slice() for ByteBuffer; removed all try/catch IOException wrappers from read methods. - ColumnReaderBase.newRLEIterator: Uses bytes.toByteBuffer() directly; RLEIntIterator.nextInt() no longer wraps IOException. - RunLengthBitPackingHybridValuesReader: Uses stream.slice() for ByteBuffer constructor; readInteger() no longer wraps IOException. JMH benchmarks: - RleEncodingBenchmark: scalar boolean encoding via ValuesWriter across 6 data patterns (ALL_TRUE, ALL_FALSE, ALTERNATING, RANDOM, MOSTLY_TRUE_99, MOSTLY_FALSE_99). - RleDecodingBenchmark: scalar boolean decoding via ValuesReader, same 6 patterns. - RleDictionaryIndexDecodingBenchmark: scalar encode/decode via encoder/decoder directly, plus ValuesReader wrapper decode for dictionary index pages. Parameterized by bit width (1, 4, 8, 10, 16) and data pattern (SEQUENTIAL, RANDOM, LOW_CARDINALITY, CONSTANT) to exercise pure RLE, pure packed, and mixed paths across packing densities. All 610 parquet-column and parquet-common tests pass. --- .../benchmarks/RleDecodingBenchmark.java | 98 ++++ .../RleDictionaryIndexDecodingBenchmark.java | 166 +++++++ .../benchmarks/RleEncodingBenchmark.java | 116 +++++ .../parquet/benchmarks/TestDataFactory.java | 434 ++++++++++++++++++ .../parquet/column/impl/ColumnReaderBase.java | 8 +- .../dictionary/DictionaryValuesReader.java | 52 +-- .../rle/RunLengthBitPackingHybridDecoder.java | 92 +++- .../rle/RunLengthBitPackingHybridEncoder.java | 46 +- ...RunLengthBitPackingHybridValuesReader.java | 11 +- ...LengthBitPackingHybridIntegrationTest.java | 4 +- .../TestRunLengthBitPackingHybridEncoder.java | 104 ++++- .../org/apache/parquet/bytes/BytesUtils.java | 44 ++ .../apache/parquet/bytes/TestBytesUtil.java | 100 ++++ pom.xml | 3 + 14 files changed, 1196 insertions(+), 82 deletions(-) create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDecodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleEncodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDecodingBenchmark.java new file mode 100644 index 0000000000..6db487677d --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDecodingBenchmark.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Decoding-level micro-benchmarks for the RLE/bit-packing hybrid encoding used + * for {@code BOOLEAN} values in Parquet data pages V2. + * Encoding benchmarks live in {@link RleEncodingBenchmark}. + * + *

The {@code dataPattern} parameter exercises RLE's best cases (ALL_TRUE, + * ALL_FALSE), worst case (ALTERNATING), and realistic distributions (RANDOM, + * MOSTLY_TRUE_99, MOSTLY_FALSE_99). + * + *

Each invocation decodes {@value #VALUE_COUNT} values; throughput is + * reported per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class RleDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + @Param({"ALL_TRUE", "ALL_FALSE", "ALTERNATING", "RANDOM", "MOSTLY_TRUE_99", "MOSTLY_FALSE_99"}) + public String dataPattern; + + /** RLE-encoded bytes with 4-byte LE length prefix (ValuesReader format). */ + private byte[] encodedWithLengthPrefix; + + @Setup(Level.Trial) + public void setup() throws IOException { + boolean[] data = RleEncodingBenchmark.generateData(dataPattern); + + // Encode using the scalar ValuesWriter path + RunLengthBitPackingHybridValuesWriter w = + new RunLengthBitPackingHybridValuesWriter(1, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (boolean v : data) { + w.writeBoolean(v); + } + encodedWithLengthPrefix = w.getBytes().toByteArray(); + w.close(); + } + + // ---- Scalar decode via ValuesReader ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeBoolean(Blackhole bh) throws IOException { + RunLengthBitPackingHybridValuesReader r = new RunLengthBitPackingHybridValuesReader(1); + r.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(encodedWithLengthPrefix))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readBoolean()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java new file mode 100644 index 0000000000..15b7fff5a3 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Encoding and decoding micro-benchmarks for synthetic dictionary-id pages using + * {@link RunLengthBitPackingHybridEncoder} and {@link RunLengthBitPackingHybridDecoder}. + * This isolates the RLE/bit-packing hybrid codec paths and is intentionally + * separate from full INT32/INT64 value encode/decode path benchmarks. + * + *

The encode benchmark measures the RLE encoder's {@code pack32Values} fast path + * and bit-packing throughput. The decode benchmark measures the corresponding + * {@code unpack32Values} fast path and RLE run expansion. + * + *

The {@code bitWidth} parameter exercises different packing densities (1-bit to + * 16-bit), and the {@code indexPattern} parameter exercises pure RLE (CONSTANT), + * mixed (LOW_CARDINALITY), and pure bit-packing (SEQUENTIAL, RANDOM) paths. + * + *

Per-invocation overhead (encoder/decoder construction and {@link ByteBufferInputStream} + * wrapping) is amortized over {@value #VALUE_COUNT} reads via + * {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class RleDictionaryIndexDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 1024 * 1024; + + @Param({"1", "4", "8", "10", "16"}) + public int bitWidth; + + @Param({"SEQUENTIAL", "RANDOM", "LOW_CARDINALITY", "CONSTANT"}) + public String indexPattern; + + /** Raw RLE-encoded bytes (no length prefix). */ + private byte[] encoded; + + private int[] ids; + + /** RLE-encoded bytes with 4-byte LE length prefix (ValuesReader format). */ + private byte[] encodedWithLengthPrefix; + + @Setup(Level.Trial) + public void setup() throws IOException { + int maxId = 1 << bitWidth; + ids = generateDictionaryIds(maxId); + try (RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder( + bitWidth, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator())) { + for (int id : ids) { + encoder.writeInt(id); + } + encoded = encoder.toBytes().toByteArray(); + } + + // Prepend 4-byte LE length for ValuesReader.initFromPage() format + encodedWithLengthPrefix = new byte[4 + encoded.length]; + ByteBuffer.wrap(encodedWithLengthPrefix).order(ByteOrder.LITTLE_ENDIAN).putInt(encoded.length); + System.arraycopy(encoded, 0, encodedWithLengthPrefix, 4, encoded.length); + } + + private int[] generateDictionaryIds(int maxId) { + switch (indexPattern) { + case "SEQUENTIAL": + int[] sequential = new int[VALUE_COUNT]; + for (int i = 0; i < VALUE_COUNT; i++) { + sequential[i] = i % maxId; + } + return sequential; + case "RANDOM": + return TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, maxId, TestDataFactory.DEFAULT_SEED); + case "LOW_CARDINALITY": + int distinct = Math.min(TestDataFactory.LOW_CARDINALITY_DISTINCT, maxId); + return TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, distinct, TestDataFactory.DEFAULT_SEED); + case "CONSTANT": + int[] constant = new int[VALUE_COUNT]; + java.util.Arrays.fill(constant, 0); + return constant; + default: + throw new IllegalArgumentException("Unknown index pattern: " + indexPattern); + } + } + + // ---- Scalar encode via encoder ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDictionaryIds() throws IOException { + try (RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder( + bitWidth, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator())) { + for (int id : ids) { + encoder.writeInt(id); + } + return encoder.toBytes().toByteArray(); + } + } + + // ---- Scalar decode via decoder ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDictionaryIds(Blackhole bh) { + RunLengthBitPackingHybridDecoder decoder = + new RunLengthBitPackingHybridDecoder(bitWidth, ByteBuffer.wrap(encoded)); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(decoder.readInt()); + } + } + + // ---- Scalar decode via ValuesReader wrapper ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeValuesReader(Blackhole bh) throws IOException { + RunLengthBitPackingHybridValuesReader reader = new RunLengthBitPackingHybridValuesReader(bitWidth); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(encodedWithLengthPrefix))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readInteger()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleEncodingBenchmark.java new file mode 100644 index 0000000000..8cb506a301 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleEncodingBenchmark.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Encoding-level micro-benchmarks for the RLE/bit-packing hybrid encoding used + * for {@code BOOLEAN} values in Parquet data pages V2. + * Decoding benchmarks live in {@link RleDecodingBenchmark}. + * + *

The {@code dataPattern} parameter exercises RLE's best cases (ALL_TRUE, + * ALL_FALSE), worst case (ALTERNATING), and realistic distributions (RANDOM, + * MOSTLY_TRUE_99, MOSTLY_FALSE_99). + * + *

Each invocation encodes {@value #VALUE_COUNT} values; throughput is + * reported per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class RleEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + @Param({"ALL_TRUE", "ALL_FALSE", "ALTERNATING", "RANDOM", "MOSTLY_TRUE_99", "MOSTLY_FALSE_99"}) + public String dataPattern; + + private boolean[] data; + + @Setup(Level.Trial) + public void setup() { + data = generateData(dataPattern); + } + + static boolean[] generateData(String pattern) { + boolean[] d = new boolean[VALUE_COUNT]; + Random rng = new Random(42); + switch (pattern) { + case "ALL_TRUE": + for (int i = 0; i < VALUE_COUNT; i++) d[i] = true; + break; + case "ALL_FALSE": + // already false + break; + case "ALTERNATING": + for (int i = 0; i < VALUE_COUNT; i++) d[i] = (i & 1) == 0; + break; + case "RANDOM": + for (int i = 0; i < VALUE_COUNT; i++) d[i] = rng.nextBoolean(); + break; + case "MOSTLY_TRUE_99": + for (int i = 0; i < VALUE_COUNT; i++) d[i] = rng.nextInt(100) != 0; + break; + case "MOSTLY_FALSE_99": + for (int i = 0; i < VALUE_COUNT; i++) d[i] = rng.nextInt(100) == 0; + break; + default: + throw new IllegalArgumentException("Unknown pattern: " + pattern); + } + return d; + } + + // ---- Scalar encode via ValuesWriter ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeBoolean() throws IOException { + RunLengthBitPackingHybridValuesWriter w = + new RunLengthBitPackingHybridValuesWriter(1, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (boolean v : data) { + w.writeBoolean(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java new file mode 100644 index 0000000000..3453aba52f --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Random; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; + +/** + * Utility class for generating test schemas and data for benchmarks. + */ +public final class TestDataFactory { + + /** Default number of rows for file-level benchmarks. */ + public static final int DEFAULT_ROW_COUNT = 100_000; + + /** Number of distinct values for low-cardinality data patterns. */ + public static final int LOW_CARDINALITY_DISTINCT = 100; + + /** Default RNG seed used across benchmarks for deterministic data. */ + public static final long DEFAULT_SEED = 42L; + + /** Byte length of the fixed-length byte array field in the benchmark schema. */ + public static final int FLBA_LENGTH = 12; + + /** A standard multi-type schema used by file-level benchmarks. */ + public static final MessageType FILE_BENCHMARK_SCHEMA = Types.buildMessage() + .required(INT32) + .named("int32_field") + .required(INT64) + .named("int64_field") + .required(FLOAT) + .named("float_field") + .required(DOUBLE) + .named("double_field") + .required(BOOLEAN) + .named("boolean_field") + .required(BINARY) + .named("binary_field") + .required(FIXED_LEN_BYTE_ARRAY) + .length(FLBA_LENGTH) + .named("flba_field") + .named("benchmark_record"); + + private TestDataFactory() {} + + /** + * Creates a {@link SimpleGroupFactory} for the standard benchmark schema. + */ + public static SimpleGroupFactory newGroupFactory() { + return new SimpleGroupFactory(FILE_BENCHMARK_SCHEMA); + } + + /** + * Generates a single row of benchmark data. + * + * @param factory the group factory + * @param index the row index (used for deterministic data) + * @param random the random source + * @return a populated Group + */ + public static Group generateRow(SimpleGroupFactory factory, int index, Random random) { + byte[] flbaBytes = new byte[FLBA_LENGTH]; + random.nextBytes(flbaBytes); + return factory.newGroup() + .append("int32_field", index) + .append("int64_field", (long) index * 100) + .append("float_field", random.nextFloat()) + .append("double_field", random.nextDouble()) + .append("boolean_field", index % 2 == 0) + .append("binary_field", "value_" + (index % 1000)) + .append("flba_field", Binary.fromConstantByteArray(flbaBytes)); + } + + /** + * Generates a deterministic set of rows for file-level benchmarks. + */ + public static Group[] generateRows(SimpleGroupFactory factory, int rowCount, long seed) { + Group[] rows = new Group[rowCount]; + Random random = new Random(seed); + for (int i = 0; i < rowCount; i++) { + rows[i] = generateRow(factory, i, random); + } + return rows; + } + + // ---- Integer data generation for encoding benchmarks ---- + + /** + * Generates sequential integers: 0, 1, 2, ... + */ + public static int[] generateSequentialInts(int count) { + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = i; + } + return data; + } + + /** + * Generates uniformly random integers using the given seed. + */ + public static int[] generateRandomInts(int count, long seed) { + return generateRandomInts(count, new Random(seed)); + } + + /** + * Generates uniformly random integers. + * + *

Note: prefer {@link #generateRandomInts(int, long)} when call ordering between + * generators in the same setup must not influence the produced data. + */ + public static int[] generateRandomInts(int count, Random random) { + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextInt(); + } + return data; + } + + /** + * Generates low-cardinality integers (values drawn from a small set) using the given seed. + */ + public static int[] generateLowCardinalityInts(int count, int distinctValues, long seed) { + return generateLowCardinalityInts(count, distinctValues, new Random(seed)); + } + + /** + * Generates low-cardinality integers (values drawn from a small set). + */ + public static int[] generateLowCardinalityInts(int count, int distinctValues, Random random) { + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextInt(distinctValues); + } + return data; + } + + /** + * Generates high-cardinality integers (all unique in randomized order) using the given seed. + */ + public static int[] generateHighCardinalityInts(int count, long seed) { + return generateHighCardinalityInts(count, new Random(seed)); + } + + /** + * Generates high-cardinality integers (all unique in randomized order). + */ + public static int[] generateHighCardinalityInts(int count, Random random) { + int[] data = generateSequentialInts(count); + for (int i = count - 1; i > 0; i--) { + int swapIndex = random.nextInt(i + 1); + int tmp = data[i]; + data[i] = data[swapIndex]; + data[swapIndex] = tmp; + } + return data; + } + + // ---- Long data generation for encoding benchmarks ---- + + /** + * Generates sequential longs: 0, 1, 2, ... + */ + public static long[] generateSequentialLongs(int count) { + long[] data = new long[count]; + for (int i = 0; i < count; i++) { + data[i] = i; + } + return data; + } + + /** + * Generates uniformly random longs using the given seed. + */ + public static long[] generateRandomLongs(int count, long seed) { + Random random = new Random(seed); + long[] data = new long[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextLong(); + } + return data; + } + + /** + * Generates low-cardinality longs (values drawn from a small set). + */ + public static long[] generateLowCardinalityLongs(int count, int distinctValues, long seed) { + Random random = new Random(seed); + long[] palette = new long[distinctValues]; + for (int i = 0; i < distinctValues; i++) { + palette[i] = random.nextLong(); + } + long[] data = new long[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinctValues)]; + } + return data; + } + + /** + * Generates high-cardinality longs (all unique, shuffled). + */ + public static long[] generateHighCardinalityLongs(int count, long seed) { + Random random = new Random(seed); + long[] data = generateSequentialLongs(count); + for (int i = count - 1; i > 0; i--) { + int swapIndex = random.nextInt(i + 1); + long tmp = data[i]; + data[i] = data[swapIndex]; + data[swapIndex] = tmp; + } + return data; + } + + // ---- Float data generation for encoding benchmarks ---- + + /** + * Generates uniformly random floats using the given seed. + */ + public static float[] generateRandomFloats(int count, long seed) { + Random random = new Random(seed); + float[] data = new float[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextFloat() * 1000.0f; + } + return data; + } + + /** + * Generates low-cardinality floats (values drawn from a small set). + */ + public static float[] generateLowCardinalityFloats(int count, int distinctValues, long seed) { + Random random = new Random(seed); + float[] palette = new float[distinctValues]; + for (int i = 0; i < distinctValues; i++) { + palette[i] = random.nextFloat() * 1000.0f; + } + float[] data = new float[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinctValues)]; + } + return data; + } + + // ---- Double data generation for encoding benchmarks ---- + + /** + * Generates uniformly random doubles using the given seed. + */ + public static double[] generateRandomDoubles(int count, long seed) { + Random random = new Random(seed); + double[] data = new double[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextDouble() * 1000.0; + } + return data; + } + + /** + * Generates low-cardinality doubles (values drawn from a small set). + */ + public static double[] generateLowCardinalityDoubles(int count, int distinctValues, long seed) { + Random random = new Random(seed); + double[] palette = new double[distinctValues]; + for (int i = 0; i < distinctValues; i++) { + palette[i] = random.nextDouble() * 1000.0; + } + double[] data = new double[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinctValues)]; + } + return data; + } + + // ---- Fixed-length byte array data generation for encoding benchmarks ---- + + /** + * Generates fixed-length byte arrays with the specified cardinality. + * + * @param count number of values + * @param length byte length of each value + * @param distinct number of distinct values (0 means all unique) + * @param seed RNG seed + */ + public static Binary[] generateFixedLenByteArrays(int count, int length, int distinct, long seed) { + Random random = new Random(seed); + if (distinct > 0) { + Binary[] palette = new Binary[distinct]; + for (int i = 0; i < distinct; i++) { + byte[] bytes = new byte[length]; + random.nextBytes(bytes); + palette[i] = Binary.fromConstantByteArray(bytes); + } + Binary[] data = new Binary[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinct)]; + } + return data; + } else { + Binary[] data = new Binary[count]; + for (int i = 0; i < count; i++) { + byte[] bytes = new byte[length]; + random.nextBytes(bytes); + data[i] = Binary.fromConstantByteArray(bytes); + } + return data; + } + } + + // ---- Binary data generation for encoding benchmarks ---- + + /** + * Generates binary strings of the given length with the specified cardinality, using + * a deterministic seed. + */ + public static Binary[] generateBinaryData(int count, int stringLength, int distinct, long seed) { + return generateBinaryData(count, stringLength, distinct, new Random(seed)); + } + + /** + * Generates binary strings of the given length with the specified cardinality. + * + * @param count number of values + * @param stringLength length of each string + * @param distinct number of distinct values (0 means all unique) + * @param random random source + * @return array of Binary values + */ + public static Binary[] generateBinaryData(int count, int stringLength, int distinct, Random random) { + Binary[] data = new Binary[count]; + if (distinct > 0) { + // Pre-generate the distinct values + Binary[] dictionary = new Binary[distinct]; + for (int i = 0; i < distinct; i++) { + dictionary[i] = Binary.fromConstantByteArray( + randomString(stringLength, random).getBytes(StandardCharsets.UTF_8)); + } + for (int i = 0; i < count; i++) { + data[i] = dictionary[random.nextInt(distinct)]; + } + } else { + // All unique + for (int i = 0; i < count; i++) { + data[i] = Binary.fromConstantByteArray( + randomString(stringLength, random).getBytes(StandardCharsets.UTF_8)); + } + } + return data; + } + + // ---- Sorted data generation for delta encoding benchmarks ---- + + /** + * Generates all-unique binary strings of the given length, sorted in natural + * ({@link Binary#compareTo}) order. Useful for benchmarking DELTA_BYTE_ARRAY + * encoding, which benefits from prefix sharing between consecutive values. + */ + public static Binary[] generateSortedBinaryData(int count, int stringLength, long seed) { + Binary[] data = generateBinaryData(count, stringLength, 0, seed); + Arrays.sort(data); + return data; + } + + /** + * Generates all-unique fixed-length byte arrays, sorted in natural + * ({@link Binary#compareTo}) order. Useful for benchmarking DELTA_BYTE_ARRAY + * encoding with FIXED_LEN_BYTE_ARRAY values. + */ + public static Binary[] generateSortedFixedLenByteArrays(int count, int fixedLength, long seed) { + Binary[] data = generateFixedLenByteArrays(count, fixedLength, 0, seed); + Arrays.sort(data); + return data; + } + + // ---- Variable-length data generation for delta encoding benchmarks ---- + + /** + * Generates all-unique binary strings with lengths uniformly distributed in + * {@code [1, maxLength]}. Useful for benchmarking DELTA_LENGTH_BYTE_ARRAY + * encoding, where non-zero length deltas exercise the DELTA_BINARY_PACKED + * sub-encoding of lengths (unlike uniform-length data where deltas are all zero). + * + * @param count number of values + * @param maxLength maximum string length (inclusive) + * @param seed RNG seed + */ + public static Binary[] generateVariableLengthBinaryData(int count, int maxLength, long seed) { + Random random = new Random(seed); + Binary[] data = new Binary[count]; + for (int i = 0; i < count; i++) { + int length = 1 + random.nextInt(maxLength); + data[i] = Binary.fromConstantByteArray(randomString(length, random).getBytes(StandardCharsets.UTF_8)); + } + return data; + } + + private static String randomString(int length, Random random) { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append((char) ('a' + random.nextInt(26))); + } + return sb.toString(); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java index 2b3e47116c..ef9a386314 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java @@ -773,7 +773,7 @@ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) { return new NullIntIterator(); } return new RLEIntIterator(new RunLengthBitPackingHybridDecoder( - BytesUtils.getWidthFromMaxInt(maxLevel), bytes.toInputStream())); + BytesUtils.getWidthFromMaxInt(maxLevel), bytes.toByteBuffer())); } catch (IOException e) { throw new ParquetDecodingException("could not read levels in page for col " + path, e); } @@ -832,11 +832,7 @@ public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) { @Override int nextInt() { - try { - return delegate.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return delegate.readInt(); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java index 53fafc55dc..77e3784392 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java @@ -19,6 +19,7 @@ package org.apache.parquet.column.values.dictionary; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.Dictionary; @@ -52,12 +53,13 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in); LOG.debug("bit width {}", bitWidth); - decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); + ByteBuffer buf = in.slice(in.available()); + decoder = new RunLengthBitPackingHybridDecoder(bitWidth, buf); } else { - decoder = new RunLengthBitPackingHybridDecoder(1, in) { + decoder = new RunLengthBitPackingHybridDecoder(1, ByteBuffer.allocate(0)) { @Override - public int readInt() throws IOException { - throw new IOException("Attempt to read from empty page"); + public int readInt() { + throw new ParquetDecodingException("Attempt to read from empty page"); } }; } @@ -65,64 +67,36 @@ public int readInt() throws IOException { @Override public int readValueDictionaryId() { - try { - return decoder.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return decoder.readInt(); } @Override public Binary readBytes() { - try { - return dictionary.decodeToBinary(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToBinary(decoder.readInt()); } @Override public float readFloat() { - try { - return dictionary.decodeToFloat(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToFloat(decoder.readInt()); } @Override public double readDouble() { - try { - return dictionary.decodeToDouble(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToDouble(decoder.readInt()); } @Override public int readInteger() { - try { - return dictionary.decodeToInt(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToInt(decoder.readInt()); } @Override public long readLong() { - try { - return dictionary.decodeToLong(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToLong(decoder.readInt()); } @Override public void skip() { - try { - decoder.readInt(); // Type does not matter as we are just skipping dictionary keys - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + decoder.readInt(); // Type does not matter as we are just skipping dictionary keys } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java index e55b276b29..61ad542de1 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java @@ -18,9 +18,8 @@ */ package org.apache.parquet.column.values.rle; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.bitpacking.BytePacker; @@ -42,23 +41,35 @@ private static enum MODE { private final int bitWidth; private final BytePacker packer; - private final InputStream in; + private final ByteBuffer buffer; private MODE mode; private int currentCount; private int currentValue; private int[] currentBuffer; + private int currentBufferLength; - public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) { + // Reusable buffer to avoid per-run allocation in PACKED mode + private int[] packedValuesBuffer = new int[0]; + // Reusable padded buffer for end-of-data edge case where fewer bytes remain than required + private ByteBuffer packedPaddedBuffer; + + public RunLengthBitPackingHybridDecoder(int bitWidth, ByteBuffer buffer) { LOG.debug("decoding bitWidth {}", bitWidth); Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); this.bitWidth = bitWidth; this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); - this.in = in; + this.buffer = buffer.order(ByteOrder.LITTLE_ENDIAN); } - public int readInt() throws IOException { + /** + * Reads the next int value from the RLE/Bit-Packing hybrid stream. + * + * @return the next decoded integer value + * @throws ParquetDecodingException if a decoding error occurs + */ + public int readInt() { if (currentCount == 0) { readNext(); } @@ -69,7 +80,7 @@ public int readInt() throws IOException { result = currentValue; break; case PACKED: - result = currentBuffer[currentBuffer.length - 1 - currentCount]; + result = currentBuffer[currentBufferLength - 1 - currentCount]; break; default: throw new ParquetDecodingException("not a valid mode " + mode); @@ -77,30 +88,69 @@ public int readInt() throws IOException { return result; } - private void readNext() throws IOException { - Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream."); - final int header = BytesUtils.readUnsignedVarInt(in); + private void readNext() { + Preconditions.checkArgument(buffer.hasRemaining(), "Reading past RLE/BitPacking stream."); + final int header = BytesUtils.readUnsignedVarInt(buffer); mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; switch (mode) { case RLE: currentCount = header >>> 1; LOG.debug("reading {} values RLE", currentCount); - currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(in, bitWidth); + currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(buffer, bitWidth); break; case PACKED: int numGroups = header >>> 1; currentCount = numGroups * 8; + currentBufferLength = currentCount; LOG.debug("reading {} values BIT PACKED", currentCount); - currentBuffer = new int[currentCount]; // TODO: reuse a buffer - byte[] bytes = new byte[numGroups * bitWidth]; + if (packedValuesBuffer.length < currentCount) { + packedValuesBuffer = new int[currentCount]; + } + currentBuffer = packedValuesBuffer; + int bytesRequired = numGroups * bitWidth; // At the end of the file RLE data though, there might not be that many bytes left. - int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0); - bytesToRead = Math.min(bytesToRead, in.available()); - new DataInputStream(in).readFully(bytes, 0, bytesToRead); - for (int valueIndex = 0, byteIndex = 0; - valueIndex < currentCount; - valueIndex += 8, byteIndex += bitWidth) { - packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex); + int bytesToRead = Math.min(bytesRequired, buffer.remaining()); + + // Determine the source ByteBuffer and start offset for unpacking + ByteBuffer packedSource; + int packedBytesStart; + + if (bytesToRead >= bytesRequired) { + // Common case: unpack directly from the main ByteBuffer (zero-copy) + packedSource = buffer; + packedBytesStart = buffer.position(); + } else { + // End-of-data edge case: pad remaining bytes with zeros for safe unpacking + if (packedPaddedBuffer == null || packedPaddedBuffer.capacity() < bytesRequired) { + packedPaddedBuffer = ByteBuffer.allocate(bytesRequired).order(ByteOrder.LITTLE_ENDIAN); + } + packedPaddedBuffer.clear(); + java.util.Arrays.fill(packedPaddedBuffer.array(), 0, bytesRequired, (byte) 0); + for (int i = 0; i < bytesToRead; i++) { + packedPaddedBuffer.put(i, buffer.get(buffer.position() + i)); + } + buffer.position(buffer.position() + bytesToRead); + packedSource = packedPaddedBuffer; + packedBytesStart = 0; + } + + // Unpack 32 values (4 groups) at a time when possible — symmetric to the encoder's + // pack32Values fast path. Falls back to unpack8Values for any residual groups. + int groupIdx = 0; + int bufPos = packedBytesStart; + final int step32 = bitWidth * 4; + while (groupIdx + 4 <= numGroups) { + packer.unpack32Values(packedSource, bufPos, currentBuffer, groupIdx * 8); + groupIdx += 4; + bufPos += step32; + } + for (; groupIdx < numGroups; groupIdx++, bufPos += bitWidth) { + packer.unpack8Values(packedSource, bufPos, currentBuffer, groupIdx * 8); + } + // Advance the main buffer position past the consumed packed bytes (common case only; + // edge case already advanced above) + if (packedSource == buffer) { + buffer.position(packedBytesStart + bytesToRead); } break; default: diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java index e33824bff1..3fddce06c1 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java @@ -74,6 +74,11 @@ public class RunLengthBitPackingHybridEncoder implements AutoCloseable { */ private final byte[] packBuffer; + /** + * Buffer four 8-value groups so we can use the packer's 32-value fast path. + */ + private final int[] bitPackedValuesBuffer; + /** * Previous value written, used to detect repeated values */ @@ -98,6 +103,8 @@ public class RunLengthBitPackingHybridEncoder implements AutoCloseable { */ private int bitPackedGroupCount; + private int numBitPackedValues; + /** * A "pointer" to a single byte in baos, * which we use as our bit-packed-header. It's really @@ -125,7 +132,8 @@ public RunLengthBitPackingHybridEncoder( this.bitWidth = bitWidth; this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator); - this.packBuffer = new byte[bitWidth]; + this.packBuffer = new byte[bitWidth * 4]; + this.bitPackedValuesBuffer = new int[32]; this.bufferedValues = new int[8]; this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); reset(false); @@ -139,6 +147,7 @@ private void reset(boolean resetBaos) { this.numBufferedValues = 0; this.repeatCount = 0; this.bitPackedGroupCount = 0; + this.numBitPackedValues = 0; this.bitPackedRunHeaderPointer = -1; this.toBytesCalled = false; } @@ -196,8 +205,9 @@ private void writeOrAppendBitPackedRun() throws IOException { bitPackedRunHeaderPointer = baos.getCurrentIndex(); } - packer.pack8Values(bufferedValues, 0, packBuffer, 0); - baos.write(packBuffer); + System.arraycopy(bufferedValues, 0, bitPackedValuesBuffer, numBitPackedValues, 8); + numBitPackedValues += 8; + flushBitPackedValuesIfFull(); // empty the buffer, they've all been written numBufferedValues = 0; @@ -209,6 +219,34 @@ private void writeOrAppendBitPackedRun() throws IOException { ++bitPackedGroupCount; } + private void flushBitPackedValuesIfFull() { + if (numBitPackedValues == bitPackedValuesBuffer.length) { + packer.pack32Values(bitPackedValuesBuffer, 0, packBuffer, 0); + baos.write(packBuffer, 0, bitWidth * 4); + numBitPackedValues = 0; + } + } + + private void flushBitPackedValues() { + if (numBitPackedValues == 0) { + return; + } + + // Full 32-value block — use fast path + flushBitPackedValuesIfFull(); + + // Handle any residual values (< 32) + if (numBitPackedValues > 0) { + int outPos = 0; + for (int inPos = 0; inPos < numBitPackedValues; inPos += 8) { + packer.pack8Values(bitPackedValuesBuffer, inPos, packBuffer, outPos); + outPos += bitWidth; + } + baos.write(packBuffer, 0, outPos); + numBitPackedValues = 0; + } + } + /** * If we are currently writing a bit-packed-run, update the * bit-packed-header and consider this run to be over @@ -221,6 +259,8 @@ private void endPreviousBitPackedRun() { return; } + flushBitPackedValues(); + // create bit-packed-header, which needs to fit in 1 byte byte bitPackHeader = (byte) ((bitPackedGroupCount << 1) | 1); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java index 0bd5a18d2b..8050662f14 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java @@ -19,10 +19,10 @@ package org.apache.parquet.column.values.rle; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.ParquetDecodingException; /** * This ValuesReader does all the reading in {@link #initFromPage} @@ -39,7 +39,8 @@ public RunLengthBitPackingHybridValuesReader(int bitWidth) { @Override public void initFromPage(int valueCountL, ByteBufferInputStream stream) throws IOException { int length = BytesUtils.readIntLittleEndian(stream); - this.decoder = new RunLengthBitPackingHybridDecoder(bitWidth, stream.sliceStream(length)); + ByteBuffer buf = stream.slice(length); + this.decoder = new RunLengthBitPackingHybridDecoder(bitWidth, buf); // 4 is for the length which is stored as 4 bytes little endian updateNextOffset(length + 4); @@ -47,11 +48,7 @@ public void initFromPage(int valueCountL, ByteBufferInputStream stream) throws I @Override public int readInteger() { - try { - return decoder.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return decoder.readInt(); } @Override diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java index 01a4c96e85..b06672cee9 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import java.nio.ByteBuffer; -import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.junit.Test; @@ -57,9 +56,8 @@ private void doIntegrationTest(int bitWidth) throws Exception { encoder.writeInt((int) (17 % modValue)); } ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer(); - ByteBufferInputStream in = ByteBufferInputStream.wrap(encodedBytes); - RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); + RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, encodedBytes); for (int i = 0; i < 100; i++) { assertEquals(i % modValue, decoder.readInt()); diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java index 93a6c8deb4..86d5650857 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.parquet.bytes.BytesUtils; @@ -290,12 +291,109 @@ public void testGroupBoundary() throws Exception { // bit width 2. bytes[0] = (1 << 1) | 1; bytes[1] = (1 << 0) | (2 << 2) | (3 << 4); - ByteArrayInputStream stream = new ByteArrayInputStream(bytes); - RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, stream); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, buffer); assertEquals(decoder.readInt(), 1); assertEquals(decoder.readInt(), 2); assertEquals(decoder.readInt(), 3); - assertEquals(stream.available(), 0); + assertEquals(buffer.remaining(), 0); + } + + // ---- Decoder-specific edge case tests ---- + + /** + * Tests the packedPaddedBuffer edge case: when the last packed run in the + * stream has fewer bytes available than required (bytesToRead < bytesRequired). + * This exercises the zero-padded ByteBuffer fallback path in readNext(). + */ + @Test + public void testDecoderPaddedBufferEdgeCase() throws Exception { + // Encode 5 distinct values with bitWidth=3 using scalar writeInt. + // The encoder will produce a bit-packed run with header indicating 1 group (8 values), + // but only 5 values are meaningful. The stream has exactly 3 bytes (1 group * 3 bitWidth). + // To create a truncated stream, we manually construct the packed data with fewer bytes + // than required for the group count declared in the header. + int bitWidth = 3; + RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(bitWidth, 100, 64000); + // Write 5 values — encoder will pad to 8 (1 group), producing header for 1 group and 3 bytes + for (int i = 0; i < 5; i++) encoder.writeInt(i); + byte[] fullEncoded = encoder.toBytes().toByteArray(); + + // Decode normally and verify correctness + RunLengthBitPackingHybridDecoder decoder = + new RunLengthBitPackingHybridDecoder(bitWidth, ByteBuffer.wrap(fullEncoded)); + for (int i = 0; i < 5; i++) { + assertEquals("mismatch at index " + i, i, decoder.readInt()); + } + + // Now construct a stream where we artificially increase the group count header + // to declare more groups than bytes available, forcing the padded buffer path. + // Header for 2 groups (16 values): (2 << 1) | 1 = 5 + // But only provide bytes for 1 group (3 bytes), so bytesToRead < bytesRequired. + byte[] truncated = new byte[1 + bitWidth]; // 1 byte header + 3 bytes data + truncated[0] = 5; // header = 2 groups packed + System.arraycopy(fullEncoded, 1, truncated, 1, bitWidth); // copy the 3 data bytes + + RunLengthBitPackingHybridDecoder paddedDecoder = + new RunLengthBitPackingHybridDecoder(bitWidth, ByteBuffer.wrap(truncated)); + // First 5 values should decode correctly from the real bytes + for (int i = 0; i < 5; i++) { + assertEquals("padded mismatch at index " + i, i, paddedDecoder.readInt()); + } + // Values 5-7 come from the real packed group's padding (encoder pads with 0s) + for (int i = 5; i < 8; i++) { + assertEquals("padded zero at index " + i, 0, paddedDecoder.readInt()); + } + // Values 8-15 come from the zero-padded buffer (no real data) — all should be 0 + for (int i = 8; i < 16; i++) { + assertEquals("zero-padded value at index " + i, 0, paddedDecoder.readInt()); + } + } + + /** + * Tests the unpack32Values fast path by ensuring that a packed run with + * exactly 4 or more groups (32+ values) is decoded correctly via scalar readInt. + */ + @Test + public void testDecoderUnpack32ValuesFastPath() throws Exception { + // bitWidth=3 with 40 distinct values forces 5 packed groups (40 values). + // The decoder should use unpack32Values for the first 4 groups, then + // unpack8Values for the last group. + int bitWidth = 3; + int numValues = 40; + int[] values = new int[numValues]; + RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(bitWidth, 100, 64000); + for (int i = 0; i < numValues; i++) { + values[i] = i % 7; // stay within 3-bit range + encoder.writeInt(values[i]); + } + byte[] encoded = encoder.toBytes().toByteArray(); + + RunLengthBitPackingHybridDecoder decoder = + new RunLengthBitPackingHybridDecoder(bitWidth, ByteBuffer.wrap(encoded)); + for (int i = 0; i < numValues; i++) { + assertEquals("unpack32 fast path mismatch at " + i, values[i], decoder.readInt()); + } + } + + @Test + public void testDecoderUnpack32ValuesExact4Groups() throws Exception { + // Exactly 4 groups (32 values) — uses only the unpack32Values path, no residual + int bitWidth = 5; + int numValues = 32; + int[] values = new int[numValues]; + RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(bitWidth, 100, 64000); + for (int i = 0; i < numValues; i++) { + values[i] = i % 31; + encoder.writeInt(values[i]); + } + byte[] encoded = encoder.toBytes().toByteArray(); + + RunLengthBitPackingHybridDecoder decoder = + new RunLengthBitPackingHybridDecoder(bitWidth, ByteBuffer.wrap(encoded)); + for (int i = 0; i < numValues; i++) { + assertEquals("exact 4 groups mismatch at " + i, values[i], decoder.readInt()); + } } private static List unpack(int bitWidth, int numValues, ByteArrayInputStream is) throws Exception { diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java index b8373a898d..b791eb3f32 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java @@ -141,6 +141,33 @@ public static int readIntLittleEndianPaddedOnBitWidth(InputStream in, int bitWid } } + /** + * Reads a little-endian int padded to the byte count for the given bit width from a ByteBuffer. + * The buffer must be in {@link java.nio.ByteOrder#LITTLE_ENDIAN} order for 2-byte and 4-byte reads. + * + * @param in a ByteBuffer in LITTLE_ENDIAN order + * @param bitWidth the bit width determining how many bytes to read + * @return the value read + */ + public static int readIntLittleEndianPaddedOnBitWidth(ByteBuffer in, int bitWidth) { + int bytesWidth = paddedByteCountFromBits(bitWidth); + switch (bytesWidth) { + case 0: + return 0; + case 1: + return in.get() & 0xFF; + case 2: + return in.getShort() & 0xFFFF; + case 3: + return (in.get() & 0xFF) | ((in.get() & 0xFF) << 8) | ((in.get() & 0xFF) << 16); + case 4: + return in.getInt(); + default: + throw new IllegalArgumentException( + String.format("Encountered bitWidth (%d) that requires more than 4 bytes", bitWidth)); + } + } + public static void writeIntLittleEndianOnOneByte(OutputStream out, int v) throws IOException { out.write((v >>> 0) & 0xFF); } @@ -210,6 +237,23 @@ public static int readUnsignedVarInt(InputStream in) throws IOException { return value | (b << i); } + /** + * Reads an unsigned variable-length integer (varint) from a ByteBuffer. + * + * @param in a ByteBuffer + * @return the unsigned varint value + */ + public static int readUnsignedVarInt(ByteBuffer in) { + int value = 0; + int i = 0; + int b; + while (((b = in.get() & 0xFF) & 0x80) != 0) { + value |= (b & 0x7F) << i; + i += 7; + } + return value | (b << i); + } + /** * uses a trick mentioned in https://developers.google.com/protocol-buffers/docs/encoding to read zigZag encoded data * diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesUtil.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesUtil.java index a146570d8a..062180b529 100644 --- a/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesUtil.java +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesUtil.java @@ -21,6 +21,10 @@ import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt; import static org.junit.Assert.assertEquals; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.junit.Test; public class TestBytesUtil { @@ -46,4 +50,100 @@ public void testWidth() { assertEquals(8, getWidthFromMaxInt(128)); assertEquals(8, getWidthFromMaxInt(255)); } + + // ---- ByteBuffer overload tests ---- + + @Test + public void testReadUnsignedVarIntByteBuffer() { + // Single-byte varint: value 0 + assertVarIntRoundTrip(0); + // Single-byte varint: value 1 + assertVarIntRoundTrip(1); + // Single-byte: max single-byte value = 127 + assertVarIntRoundTrip(127); + // Two-byte varint: 128 + assertVarIntRoundTrip(128); + // Two-byte varint: 300 + assertVarIntRoundTrip(300); + // Three-byte varint: 16384 + assertVarIntRoundTrip(16384); + // Larger value + assertVarIntRoundTrip(100000); + // Even larger + assertVarIntRoundTrip(Integer.MAX_VALUE); + } + + private void assertVarIntRoundTrip(int value) { + try { + // Write using the OutputStream API + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BytesUtils.writeUnsignedVarInt(value, baos); + byte[] bytes = baos.toByteArray(); + + // Read back using the ByteBuffer overload + ByteBuffer buf = ByteBuffer.wrap(bytes); + int result = BytesUtils.readUnsignedVarInt(buf); + assertEquals("varint round-trip failed for value " + value, value, result); + assertEquals("ByteBuffer should be fully consumed for value " + value, 0, buf.remaining()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testReadIntLittleEndianPaddedOnBitWidthByteBuffer() throws IOException { + // bitWidth 0 -> 0 bytes -> returns 0 + assertPaddedReadRoundTrip(0, 0); + + // bitWidth 1-8 -> 1 byte + assertPaddedReadRoundTrip(1, 1); + assertPaddedReadRoundTrip(7, 127); + assertPaddedReadRoundTrip(8, 255); + + // bitWidth 9-16 -> 2 bytes + assertPaddedReadRoundTrip(9, 511); + assertPaddedReadRoundTrip(16, 65535); + + // bitWidth 17-24 -> 3 bytes + assertPaddedReadRoundTrip(17, 131071); + assertPaddedReadRoundTrip(24, 0xFFFFFF); + + // bitWidth 25-32 -> 4 bytes + assertPaddedReadRoundTrip(25, 33554431); + assertPaddedReadRoundTrip(32, 0x7FFFFFFF); + assertPaddedReadRoundTrip(32, -1); // all bits set + } + + private void assertPaddedReadRoundTrip(int bitWidth, int value) throws IOException { + // Write using the OutputStream API + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BytesUtils.writeIntLittleEndianPaddedOnBitWidth(baos, value, bitWidth); + byte[] bytes = baos.toByteArray(); + + // Read back using the ByteBuffer overload + ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + int result = BytesUtils.readIntLittleEndianPaddedOnBitWidth(buf, bitWidth); + assertEquals("padded read round-trip failed for bitWidth=" + bitWidth + " value=" + value, value, result); + assertEquals("ByteBuffer should be fully consumed for bitWidth=" + bitWidth, 0, buf.remaining()); + } + + @Test + public void testReadUnsignedVarIntByteBufferMultipleSequential() { + // Write multiple varints sequentially, then read them back from a single ByteBuffer + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + int[] values = {0, 1, 127, 128, 16384, 100000}; + for (int v : values) { + BytesUtils.writeUnsignedVarInt(v, baos); + } + ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); + for (int expected : values) { + int actual = BytesUtils.readUnsignedVarInt(buf); + assertEquals("sequential varint mismatch for " + expected, expected, actual); + } + assertEquals("ByteBuffer should be fully consumed", 0, buf.remaining()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/pom.xml b/pom.xml index 1bd9893d87..3461b0dcb6 100644 --- a/pom.xml +++ b/pom.xml @@ -600,6 +600,9 @@ org.apache.parquet.thrift.pig.ParquetThriftStorer org.apache.parquet.thrift.pig.TupleToThriftWriteSupport org.apache.parquet.hadoop.thrift.AbstractThriftWriteSupport#isPigLoaded() + + org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder#readInt() + org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder#RunLengthBitPackingHybridDecoder(int,java.io.InputStream) org.apache.parquet.avro.AvroReadSupport#AVRO_REQUESTED_PROJECTION org.apache.parquet.avro.AvroReadSupport#AVRO_DATA_SUPPLIER