Skip to content

[SPARK-55683][SQL] Optimize VectorizedPlainValuesReader.readUnsignedLongs#54479

Closed
LuciferYang wants to merge 1 commit intoapache:masterfrom
LuciferYang:SPARK-55683
Closed

[SPARK-55683][SQL] Optimize VectorizedPlainValuesReader.readUnsignedLongs#54479
LuciferYang wants to merge 1 commit intoapache:masterfrom
LuciferYang:SPARK-55683

Conversation

@LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Feb 25, 2026

What changes were proposed in this pull request?

This PR optimizes VectorizedPlainValuesReader.readUnsignedLongs by replacing the per-element BigInteger heap allocation chain with direct byte manipulation.

The original implementation allocates multiple objects per element:

// Old: String + BigInteger + internal int[] + byte[] allocations per element
c.putByteArray(rowId + i,
    new BigInteger(Long.toUnsignedString(buffer.getLong())).toByteArray());

The new implementation reads raw little-endian bytes directly from the ByteBuffer backing array (when available) and converts them to BigInteger-compatible big-endian encoding in a single pass:

// New: hasArray() fast path - operates directly on backing array, one byte[] per element
if (buffer.hasArray()) {
    byte[] src = buffer.array();
    int offset = buffer.arrayOffset() + buffer.position();
    for (int i = 0; i < total; i++, rowId++, offset += 8) {
        putLittleEndianBytesAsBigInteger(c, rowId, src, offset);
    }
} else {
    byte[] data = new byte[8];  // reused across all values in this batch
    for (int i = 0; i < total; i++, rowId++) {
        buffer.get(data);
        putLittleEndianBytesAsBigInteger(c, rowId, data, 0);
    }
}

The private helper putLittleEndianBytesAsBigInteger handles the conversion with output matching BigInteger.toByteArray() semantics:

  • Zero value: writes [0x00] (1 byte) rather than an empty array, since new BigInteger(new byte[0]) throws NumberFormatException
  • Sign byte: prepends 0x00 when the most significant byte has bit 7 set, to ensure the value is interpreted as positive by BigInteger
  • Byte order: reverses little-endian Parquet physical encoding to big-endian in a single loop

Why are the changes needed?

The original implementation constructs a BigInteger via Long.toUnsignedString + new BigInteger(String), which involves per-element allocations of a String, a BigInteger, its internal int[] magnitude array, and the final byte[]. For a typical batch of 4096 values this means ~16K object allocations, creating significant GC pressure in workloads reading large UINT_64 columns.

The new implementation reduces this to one byte[] allocation per element by operating directly on the raw bytes from the ByteBuffer, avoiding all intermediate object creation. Additionally, the direct buffer fallback path reuses a single byte[8] scratch buffer across the entire batch.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • The existing test SPARK-34817: Read UINT_64 as Decimal from parquet in ParquetIOSuite was extended with boundary values covering the critical edge cases of the new byte manipulation logic
  • Rename the original code to OldVectorizedPlainValuesReader, and compare the latency of the old and new readUnsignedLongs methods using JMH:
Benchmark Code (click to expand)
package org.apache.spark.sql.execution.datasources.parquet;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import org.apache.parquet.bytes.ByteBufferInputStream;

import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.DataTypes;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
@Fork(value = 1, jvmArgs = {"-Xms4G", "-Xmx4G"})
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
public class VectorizedPlainValuesReaderJMHBenchmark {

    // ==================== Parameters ====================

    @Param({"10000000"})
    private int numValues;

    // ==================== Test Data ====================

    private byte[] longData;
    private static final int BATCH_SIZE = 4096;

    private OldVectorizedPlainValuesReader oldSingleBufferOnHeapReader;
    private OldVectorizedPlainValuesReader oldSingleBufferOffHeapReader;
    private VectorizedPlainValuesReader newSingleBufferOnHeapReader;
    private VectorizedPlainValuesReader newSingleBufferOffHeapReader;

    // ==================== State Classes ====================

    /**
     * Column vector state using DecimalType(20, 0), which is the correct type for UINT64.
     * Parquet UINT_64 logical type is mapped to DecimalType(20, 0) in Spark.
     * Using LongType would cause NullPointerException because readUnsignedLongs
     * calls arrayData() which requires childColumns, only initialized for DecimalType.
     */
    @State(Scope.Thread)
    public static class DecimalColumnVectorState {
        public WritableColumnVector decimalColumn;

        @Setup(Level.Iteration)
        public void setup() {
            // UINT64 -> DecimalType(20, 0): precision=20, scale=0
            decimalColumn = new OnHeapColumnVector(BATCH_SIZE, DataTypes.createDecimalType(20, 0));
        }

        @TearDown(Level.Iteration)
        public void tearDown() {
            decimalColumn.close();
        }

        @Setup(Level.Invocation)
        public void reset() {
            decimalColumn.reset();
        }
    }

    // ==================== Setup ====================

    @Setup(Level.Trial)
    public void setupTrial() {
        Random random = new Random(42);
        longData = generateLongData(numValues, random);
    }

    @Setup(Level.Invocation)
    public void setupInvocation() throws IOException {
        oldSingleBufferOnHeapReader = new OldVectorizedPlainValuesReader();
        oldSingleBufferOnHeapReader.initFromPage(numValues, createSingleBufferInputStream(longData));
        oldSingleBufferOffHeapReader = new OldVectorizedPlainValuesReader();
        oldSingleBufferOffHeapReader.initFromPage(numValues, createDirectSingleBufferInputStream(longData));
        newSingleBufferOnHeapReader = new VectorizedPlainValuesReader();
        newSingleBufferOnHeapReader.initFromPage(numValues, createSingleBufferInputStream(longData));
        newSingleBufferOffHeapReader = new VectorizedPlainValuesReader();
        newSingleBufferOffHeapReader.initFromPage(numValues, createDirectSingleBufferInputStream(longData));
    }

    // ==================== Data Generation ====================

    private byte[] generateLongData(int count, Random random) {
        ByteBuffer buffer = ByteBuffer.allocate(count * 8).order(ByteOrder.LITTLE_ENDIAN);
        for (int i = 0; i < count; i++) {
            buffer.putLong(random.nextLong()); // full unsigned long range
        }
        return buffer.array();
    }

    // ==================== ByteBufferInputStream Creation ====================

    private ByteBufferInputStream createSingleBufferInputStream(byte[] data) {
        ByteBuffer buffer = ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN);
        return ByteBufferInputStream.wrap(buffer);
    }

    private ByteBuffer createDirectBuffer(byte[] data) {
        ByteBuffer buffer = ByteBuffer.allocateDirect(data.length).order(ByteOrder.LITTLE_ENDIAN);
        buffer.put(data);
        buffer.flip();
        return buffer;
    }

    private ByteBufferInputStream createDirectSingleBufferInputStream(byte[] data) {
        ByteBuffer buffer = createDirectBuffer(data);
        return ByteBufferInputStream.wrap(buffer);
    }

    // ====================================================================================
    // readUnsignedLongs onHeap
    // ====================================================================================

    @Benchmark
    public void readUnsignedLongs_onHeap_Old(DecimalColumnVectorState state) throws IOException {
        for (int i = 0; i < numValues; i += BATCH_SIZE) {
            oldSingleBufferOnHeapReader.readUnsignedLongs(
                    Math.min(BATCH_SIZE, numValues - i), state.decimalColumn, 0);
        }
    }

    @Benchmark
    public void readUnsignedLongs_onHeap_New(DecimalColumnVectorState state) throws IOException {
        for (int i = 0; i < numValues; i += BATCH_SIZE) {
            newSingleBufferOnHeapReader.readUnsignedLongs(
                    Math.min(BATCH_SIZE, numValues - i), state.decimalColumn, 0);
        }
    }

    // ====================================================================================
    // readUnsignedLongs offHeap
    // ====================================================================================

    @Benchmark
    public void readUnsignedLongs_offHeap_Old(DecimalColumnVectorState state) throws IOException {
        for (int i = 0; i < numValues; i += BATCH_SIZE) {
            oldSingleBufferOffHeapReader.readUnsignedLongs(
                    Math.min(BATCH_SIZE, numValues - i), state.decimalColumn, 0);
        }
    }

    @Benchmark
    public void readUnsignedLongs_offHeap_New(DecimalColumnVectorState state) throws IOException {
        for (int i = 0; i < numValues; i += BATCH_SIZE) {
            newSingleBufferOffHeapReader.readUnsignedLongs(
                    Math.min(BATCH_SIZE, numValues - i), state.decimalColumn, 0);
        }
    }

    // ==================== Main Method ====================

    public static void main(String[] args) throws RunnerException {
        String filter = args.length > 0 ?
                args[0] : VectorizedPlainValuesReaderJMHBenchmark.class.getSimpleName();
        Options opt = new OptionsBuilder()
                .include(filter)
                .build();

        new Runner(opt).run();
    }
}

Perform build/sbt "sql/Test/runMain org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReaderJMHBenchmark" to conduct the test

Benchmark results:

  • Java 17.0.18+8-LTS
Benchmark                                                              (numValues)  Mode  Cnt        Score       Error  Units
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New     10000000  avgt   10   249413.824 ± 12242.331  us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old     10000000  avgt   10  2301279.127 ± 14970.249  us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New      10000000  avgt   10   282651.747 ±  5031.717  us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old      10000000  avgt   10  2382690.093 ± 10364.228  us/op
  • Java 21.0.10+7-LTS
Benchmark                                                              (numValues)  Mode  Cnt        Score       Error  Units
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New     10000000  avgt   10   256621.630 ± 24087.509  us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old     10000000  avgt   10  2120170.591 ±  4862.317  us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New      10000000  avgt   10   284058.229 ± 19966.179  us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old      10000000  avgt   10  2190838.305 ±  7979.740  us/op

Both onHeap and offHeap paths show approximately ~8x improvement.

Was this patch authored or co-authored using generative AI tooling?

Yes, Claude Sonnet 4.6 was used to assist in completing the code writing.

@LuciferYang LuciferYang marked this pull request as draft February 25, 2026 09:49
@LuciferYang
Copy link
Contributor Author

Test first, the PRprdescription and performance comparison will be updated later.

@LuciferYang LuciferYang marked this pull request as ready for review February 25, 2026 13:33
@pan3793 pan3793 requested a review from Copilot February 25, 2026 15:42
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Optimizes Spark SQL’s Parquet vectorized PLAIN decoding for UINT_64 by replacing the per-value BigInteger(String) construction path with direct byte-level conversion, and extends the existing Parquet IO test to cover boundary cases for unsigned 64-bit decoding.

Changes:

  • Reworked VectorizedPlainValuesReader.readUnsignedLongs to convert little-endian UINT_64 bytes into BigInteger-compatible big-endian two’s-complement byte arrays without String/BigInteger intermediates.
  • Added a helper to produce BigInteger.toByteArray()-compatible encodings (zero handling + sign-byte handling).
  • Extended ParquetIOSuite’s UINT_64 test data with boundary values (0/1/max/min/-1/-2).

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java Implements the new byte-manipulation decoding path + helper for BigInteger-compatible output.
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala Extends existing UINT_64 Parquet read test with boundary values to validate the new encoding logic.
Comments suppressed due to low confidence (2)

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java:278

  • putLittleEndianBytesAsBigInteger still allocates a new byte[] per value (new byte[totalLen], plus new byte[]{0} for zeros). Since WritableColumnVector.putByteArray(rowId, value, offset, count) copies the bytes into arrayData(), you can reuse a single scratch buffer (e.g., byte[9]) across the loop and pass the appropriate offset/length to avoid per-element heap allocations entirely.
    // Zero value: must write [0x00] rather than an empty array.
    // BigInteger.ZERO.toByteArray() returns [0x00], and new BigInteger(new byte[0])
    // throws NumberFormatException("Zero length BigInteger").
    if (msbIndex == offset && src[offset] == 0) {
      c.putByteArray(rowId, new byte[]{0});
      return;
    }

    // Prepend a 0x00 sign byte if the most significant byte has bit 7 set.
    // This matches BigInteger.toByteArray() behavior: positive values whose highest
    // magnitude byte has the MSB set are prefixed with 0x00 to distinguish them
    // from negative values in two's-complement encoding.
    boolean needSignByte = (src[msbIndex] & 0x80) != 0;
    int valueLen = msbIndex - offset + 1;
    int totalLen = needSignByte ? valueLen + 1 : valueLen;

    byte[] dest = new byte[totalLen];
    int destOffset = 0;
    if (needSignByte) {
      dest[destOffset++] = 0x00;
    }
    // Reverse byte order: little-endian src → big-endian dest
    for (int i = msbIndex; i >= offset; i--) {
      dest[destOffset++] = src[i];
    }
    c.putByteArray(rowId, dest, 0, totalLen);

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala:1275

  • The boundary values sequence is duplicated here and again when constructing boundaryExpected below. Consider defining a single val boundaryValues = Seq(...) and reusing it for both writing and expected rows to avoid accidental drift if the list changes in the future.
        // Boundary values: zero, one, signed extremes interpreted as unsigned
        Seq(0L, 1L, Long.MaxValue, Long.MinValue, -2L, -1L).foreach { v =>
          val group = factory.newGroup().append("a", v)
          writer.write(group)
        }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@dongjoon-hyun
Copy link
Member

Merged to master for Apache Spark 4.2.0. Thank you, @LuciferYang and @pan3793 .

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Feb 26, 2026

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java:278

  • putLittleEndianBytesAsBigInteger still allocates a new byte[] per value (new byte[totalLen], plus new byte[]{0} for zeros). Since WritableColumnVector.putByteArray(rowId, value, offset, count) copies the bytes into arrayData(), you can reuse a single scratch buffer (e.g., byte[9]) across the loop and pass the appropriate offset/length to avoid per-element heap allocations entirely.
    // Zero value: must write [0x00] rather than an empty array.
    // BigInteger.ZERO.toByteArray() returns [0x00], and new BigInteger(new byte[0])
    // throws NumberFormatException("Zero length BigInteger").
    if (msbIndex == offset && src[offset] == 0) {
      c.putByteArray(rowId, new byte[]{0});
      return;
    }

    // Prepend a 0x00 sign byte if the most significant byte has bit 7 set.
    // This matches BigInteger.toByteArray() behavior: positive values whose highest
    // magnitude byte has the MSB set are prefixed with 0x00 to distinguish them
    // from negative values in two's-complement encoding.
    boolean needSignByte = (src[msbIndex] & 0x80) != 0;
    int valueLen = msbIndex - offset + 1;
    int totalLen = needSignByte ? valueLen + 1 : valueLen;

    byte[] dest = new byte[totalLen];
    int destOffset = 0;
    if (needSignByte) {
      dest[destOffset++] = 0x00;
    }
    // Reverse byte order: little-endian src → big-endian dest
    for (int i = msbIndex; i >= offset; i--) {
      dest[destOffset++] = src[i];
    }
    c.putByteArray(rowId, dest, 0, totalLen);

This suggestion from Copilot sounds good. Let me try it out to see the effect. If there are benefits, I'll submit a FOLLOWUP.

@LuciferYang
Copy link
Contributor Author

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java:278

  • putLittleEndianBytesAsBigInteger still allocates a new byte[] per value (new byte[totalLen], plus new byte[]{0} for zeros). Since WritableColumnVector.putByteArray(rowId, value, offset, count) copies the bytes into arrayData(), you can reuse a single scratch buffer (e.g., byte[9]) across the loop and pass the appropriate offset/length to avoid per-element heap allocations entirely.
    // Zero value: must write [0x00] rather than an empty array.
    // BigInteger.ZERO.toByteArray() returns [0x00], and new BigInteger(new byte[0])
    // throws NumberFormatException("Zero length BigInteger").
    if (msbIndex == offset && src[offset] == 0) {
      c.putByteArray(rowId, new byte[]{0});
      return;
    }

    // Prepend a 0x00 sign byte if the most significant byte has bit 7 set.
    // This matches BigInteger.toByteArray() behavior: positive values whose highest
    // magnitude byte has the MSB set are prefixed with 0x00 to distinguish them
    // from negative values in two's-complement encoding.
    boolean needSignByte = (src[msbIndex] & 0x80) != 0;
    int valueLen = msbIndex - offset + 1;
    int totalLen = needSignByte ? valueLen + 1 : valueLen;

    byte[] dest = new byte[totalLen];
    int destOffset = 0;
    if (needSignByte) {
      dest[destOffset++] = 0x00;
    }
    // Reverse byte order: little-endian src → big-endian dest
    for (int i = msbIndex; i >= offset; i--) {
      dest[destOffset++] = src[i];
    }
    c.putByteArray(rowId, dest, 0, totalLen);

This suggestion from Copilot sounds good. Let me try it out to see the effect. If there are benefits, I'll submit a FOLLOWUP.

It has shown stable optimization effects for OnHeap , and I've opened a follow-up:

dongjoon-hyun pushed a commit that referenced this pull request Feb 26, 2026
…adUnsignedLongs` to reuse scratch buffer and avoid per-element allocations

### What changes were proposed in this pull request?
This pr refer to the suggestion from Copilot: #54479 (review), further optimizes `VectorizedPlainValuesReader.readUnsignedLongs` by introducing a reusable scratch buffer to eliminate per-element `byte[]` allocations introduced in the previous refactoring.

The previous implementation allocates a new `byte[]` per element for the encoded output:

```java
// Previous: new byte[totalLen] per element, plus new byte[]{0} for zero values
byte[] dest = new byte[totalLen];
...
c.putByteArray(rowId, dest, 0, totalLen);
```

The new implementation allocates a single `byte[9]` scratch buffer once per batch and reuses it across all elements. Since `WritableColumnVector.putByteArray` copies the bytes into its internal storage immediately, the scratch buffer can be safely overwritten on the next iteration:

```java
// New: one byte[9] allocated per batch, reused for every element
byte[] scratch = new byte[9];
for (...) {
    putLittleEndianBytesAsBigInteger(c, rowId, src, offset, scratch);
}
```

The scratch buffer is sized at 9 bytes to accommodate the worst case: 1 `0x00` sign byte + 8 value bytes. The zero value special case is also handled via scratch, avoiding the previous `new byte[]{0}` allocation.

### Why are the changes needed?
The previous implementation still allocates one `byte[]` per element for the encoded output. For a typical batch of 4096 values this means 4096 heap allocations per `readUnsignedLongs` call, creating GC pressure in workloads that read large `UINT_64` columns. With the scratch buffer approach, the entire batch produces only 2 allocations: `byte[9]` (scratch) and `byte[8]` (direct buffer fallback read buffer), regardless of batch size.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass Github Action
- Reused the JMH benchmark provided in #54479, and the test results are as follows:

Java 17

```
[info] Benchmark                                                              (numValues)  Mode  Cnt       Score      Error  Units
[info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New     10000000  avgt   10  233820.658 ± 1888.523  us/op
[info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old     10000000  avgt   10  255563.248 ± 3500.165  us/op
[info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New      10000000  avgt   10  228672.684 ± 2985.496  us/op
[info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old      10000000  avgt   10  275756.804 ± 2065.405  us/op
```

Java 21

```
[info] Benchmark                                                              (numValues)  Mode  Cnt       Score       Error  Units
[info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New     10000000  avgt   10  241977.924 ± 15125.343  us/op
[info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old     10000000  avgt   10  250343.470 ±  1342.509  us/op
[info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New      10000000  avgt   10  212929.948 ±  1387.671  us/op
[info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old      10000000  avgt   10  274561.949 ±  1226.348  us/op
```

Judging from the test results, the onHeap path demonstrates approximately a 17-22% improvement, while the offHeap path shows roughly a 3-9% improvement across Java 17 and Java 21.

### Was this patch authored or co-authored using generative AI tooling?
Yes, Claude Sonnet 4.6 was used to assist in completing the code writing.

Closes #54510 from LuciferYang/SPARK-55683-FOLLOWUP.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@LuciferYang LuciferYang deleted the SPARK-55683 branch March 2, 2026 02:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants