Skip to content

[SPARK-56894][SQL] Add vectorized Parquet BYTE_STREAM_SPLIT reader#55921

Open
iemejia wants to merge 1 commit into
apache:masterfrom
iemejia:SPARK-56894-byte-stream-split
Open

[SPARK-56894][SQL] Add vectorized Parquet BYTE_STREAM_SPLIT reader#55921
iemejia wants to merge 1 commit into
apache:masterfrom
iemejia:SPARK-56894-byte-stream-split

Conversation

@iemejia
Copy link
Copy Markdown
Member

@iemejia iemejia commented May 16, 2026

What changes were proposed in this pull request?

This PR adds a vectorized reader for the Parquet BYTE_STREAM_SPLIT encoding (VectorizedByteStreamSplitValuesReader), wired into VectorizedColumnReader.getValuesReader().

BYTE_STREAM_SPLIT de-interleaves N fixed-width values (W bytes each) into W separate byte streams. Decoding gathers the original bytes back: value[i] = {stream[0][i], stream[1][i], ..., stream[W-1][i]}. This encoding is particularly effective for time-series and scientific data where adjacent values share high-order bytes.

The new reader:

  • Loads the entire encoded page into a byte[] via initFromPage
  • Uses direct per-element assembleInt / assembleLong helpers for byte gathering
  • Implements all batch read methods (readIntegers, readLongs, readFloats, readDoubles, readBinary) and skip methods
  • Supports FLOAT (W=4), DOUBLE (W=8), INT32 (W=4), INT64 (W=8), and FIXED_LEN_BYTE_ARRAY (W=type length)

The VectorizedColumnReader change is a single case BYTE_STREAM_SPLIT -> block (12 lines) that resolves the type width from the column descriptor and yields the new reader.

Why are the changes needed?

Before this PR, Spark fell back to parquet-mr's per-value ByteStreamSplitValuesReader for BSS-encoded columns. The new vectorized batch reader is 2.8-4.5x faster on the benchmark:

OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure
AMD EPYC 9V45 96-Core Processor

BYTE_STREAM_SPLIT INT32:                  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark vectorized readIntegers                         1              1           0       1103.4           0.9       1.0X
parquet-mr readInteger (per-value)                    4              4           0        247.6           4.0       0.2X

BYTE_STREAM_SPLIT INT64:                  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark vectorized readLongs                            2              3           0        428.1           2.3       1.0X
parquet-mr readLong (per-value)                       7              7           0        151.4           6.6       0.4X

BYTE_STREAM_SPLIT FLOAT:                  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark vectorized readFloats                           1              1           0       1053.1           0.9       1.0X
parquet-mr readFloat (per-value)                      4              4           0        251.5           4.0       0.2X

BYTE_STREAM_SPLIT DOUBLE:                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Spark vectorized readDoubles                          2              3           0        426.9           2.3       1.0X
parquet-mr readDouble (per-value)                     7              7           0        151.1           6.6       0.4X

Does this PR introduce any user-facing change?

No. This is an internal performance optimization. BSS-encoded Parquet columns that were already readable via the parquet-mr fallback are now decoded faster through the vectorized path. No API, configuration, or behavioral changes.

How was this patch tested?

  • 31 unit tests across 5 test suites in ParquetByteStreamSplitEncodingSuite.scala:
    • Abstract base ParquetByteStreamSplitEncodingSuite[T] with 7 shared test cases (roundtrip, nulls, skip, large batches, special values, sequential reads, mixed skip-read)
    • Concrete suites for Int, Long, Float, Double (Float/Double override assertEqual for bitwise NaN-safe comparison)
    • Standalone FLBA suite with 3 tests
  • Benchmark in VectorizedByteStreamSplitReaderBenchmark.scala comparing against parquet-mr per-value readers
  • All 260 existing + new Parquet tests pass on JDK 17

Was this patch authored or co-authored using generative AI tooling?

Generated-by: OpenCode (Claude claude-opus-4.6)

Adds a vectorized reader for the Parquet BYTE_STREAM_SPLIT encoding,
enabling native batch decoding of BSS-encoded columns in Spark's
vectorized Parquet reader.

BYTE_STREAM_SPLIT de-interleaves the bytes of N fixed-width values into
W separate streams (one per byte position). Decoding gathers the bytes
back: value[i] = {stream[0][i], stream[1][i], ..., stream[W-1][i]}.

This encoding is increasingly used for time-series and scientific data
(e.g., IoT sensor readings, financial tick data) because adjacent values
typically share high-order bytes, making each stream highly compressible.
Before this PR, BSS-encoded columns threw
SparkUnsupportedOperationException in vectorized mode.

Changes:
- VectorizedByteStreamSplitValuesReader: new reader extending
  VectorizedReaderBase. Eagerly reads all page bytes in initFromPage,
  then assembles values from the interleaved streams. Per-element
  assembleInt/assembleLong helpers are used in both single-value and
  batch read methods. Supports INT32, INT64, FLOAT, DOUBLE, and
  FIXED_LEN_BYTE_ARRAY.
- VectorizedColumnReader.getValuesReader: added BYTE_STREAM_SPLIT case
  that dispatches by primitive type to determine typeWidth.
- 31 unit tests across 5 suites (Integer, Long, Float, Double, FLBA)
  covering batch reads, single-value reads, skip operations, special
  values (NaN, Inf, min/max), and direct ByteBuffer support.
- Benchmark comparing Spark vectorized reader vs parquet-mr per-value
  reader: 2.7-4.1x speedup across all types.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant