From acf6443b858bd5a82402e2f23a4e68ff4ce3d63c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Fri, 15 May 2026 22:33:40 +0200 Subject: [PATCH] GH-3530: Optimize PLAIN encoding and decoding with direct ByteBuffer I/O Replace ByteBufferInputStream and LittleEndianDataInputStream wrappers with direct ByteBuffer access for all PLAIN value readers and writers. Readers (PlainValuesReader, BooleanPlainValuesReader, BinaryPlainValuesReader, FixedLenByteArrayPlainValuesReader) now hold a little-endian ByteBuffer obtained from initFromPage() and call getInt/getLong/getFloat/getDouble directly, eliminating per-value stream overhead. Writers (PlainValuesWriter, BooleanPlainValuesWriter, FixedLenByteArrayPlainValuesWriter) write through CapacityByteArrayOutputStream's new writeInt/writeLong methods, which put values directly into the NIO slab buffer in little-endian order, avoiding temporary byte-array allocation. Supporting changes: - CapacityByteArrayOutputStream: allocate slabs with ByteOrder.LITTLE_ENDIAN, add writeInt(int) and writeLong(long) for single-value NIO writes. - BytesInput: add zero-copy writeTo(ByteBuffer) and toByteArray() using bulk ByteBuffer.get() instead of stream copy. - LittleEndianDataOutputStream: batch single-byte writes into single write(buf, 0, N) calls for writeShort/writeInt. Includes JMH benchmarks (PlainEncodingBenchmark, PlainDecodingBenchmark) covering all 7 primitive types for both encoding and decoding. --- parquet-benchmarks/pom.xml | 9 + .../benchmarks/PlainDecodingBenchmark.java | 289 ++++++++++++++++++ .../benchmarks/PlainEncodingBenchmark.java | 257 ++++++++++++++++ .../parquet/benchmarks/TestDataFactory.java | 108 +++++++ .../values/plain/BinaryPlainValuesReader.java | 46 +-- .../plain/BooleanPlainValuesReader.java | 68 +++-- .../plain/BooleanPlainValuesWriter.java | 46 ++- .../FixedLenByteArrayPlainValuesReader.java | 32 +- .../FixedLenByteArrayPlainValuesWriter.java | 16 +- .../values/plain/PlainValuesReader.java | 75 ++--- .../values/plain/PlainValuesWriter.java | 43 +-- .../TestBinaryPlainValuesWriterReader.java | 132 ++++++++ .../TestBooleanPlainValuesWriterReader.java | 141 +++++++++ ...edLenByteArrayPlainValuesWriterReader.java | 168 ++++++++++ .../plain/TestPlainValuesWriterReader.java | 262 ++++++++++++++++ .../org/apache/parquet/bytes/BytesInput.java | 46 ++- .../bytes/CapacityByteArrayOutputStream.java | 30 ++ .../bytes/LittleEndianDataOutputStream.java | 20 +- .../apache/parquet/bytes/TestBytesInput.java | 23 ++ .../TestCapacityByteArrayOutputStream.java | 55 ++++ .../hadoop/TestColumnChunkPageWriteStore.java | 11 +- pom.xml | 2 + 22 files changed, 1678 insertions(+), 201 deletions(-) create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PlainDecodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PlainEncodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestBinaryPlainValuesWriterReader.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestBooleanPlainValuesWriterReader.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestFixedLenByteArrayPlainValuesWriterReader.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestPlainValuesWriterReader.java diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml index d5a288b677..3ce4dce5ce 100644 --- a/parquet-benchmarks/pom.xml +++ b/parquet-benchmarks/pom.xml @@ -94,6 +94,15 @@ org.apache.maven.plugins maven-compiler-plugin + + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + + + org.apache.maven.plugins diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PlainDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PlainDecodingBenchmark.java new file mode 100644 index 0000000000..59a67c90ce --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PlainDecodingBenchmark.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.plain.BinaryPlainValuesReader; +import org.apache.parquet.column.values.plain.BooleanPlainValuesReader; +import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter; +import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesReader; +import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; +import org.apache.parquet.column.values.plain.PlainValuesReader; +import org.apache.parquet.column.values.plain.PlainValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Decoding micro-benchmarks for the PLAIN encoding across all Parquet primitive types: + * {@code BOOLEAN}, {@code INT32}, {@code INT64}, {@code FLOAT}, {@code DOUBLE}, + * {@code BINARY}, and {@code FIXED_LEN_BYTE_ARRAY}. + * + *

Each invocation decodes {@value #VALUE_COUNT} values. Per-value methods measure + * scalar read throughput using direct ByteBuffer access. + * + *

Fixed-width types (BOOLEAN through DOUBLE) are data-independent for PLAIN + * decoding -- the cost per value is constant regardless of the value content or + * distribution pattern -- so no {@code @Param} is needed. + * + *

Variable/fixed-length byte types use inner {@link State} classes with + * {@code @Param} for the dimension that genuinely affects PLAIN throughput: + *

+ */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class PlainDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + // ---- Pre-encoded pages for fixed-width types ---- + + private byte[] boolPage; + private byte[] intPage; + private byte[] longPage; + private byte[] floatPage; + private byte[] doublePage; + + @Setup(Level.Trial) + public void setup() throws IOException { + Random r = new Random(42); + + // Encode BOOLEAN + { + BooleanPlainValuesWriter w = new BooleanPlainValuesWriter(); + for (int i = 0; i < VALUE_COUNT; i++) { + w.writeBoolean(r.nextBoolean()); + } + boolPage = w.getBytes().toByteArray(); + w.close(); + } + + // Encode INT32 + { + PlainValuesWriter w = new PlainValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (int i = 0; i < VALUE_COUNT; i++) { + w.writeInteger(r.nextInt()); + } + intPage = w.getBytes().toByteArray(); + w.close(); + } + + // Encode INT64 + { + PlainValuesWriter w = new PlainValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (int i = 0; i < VALUE_COUNT; i++) { + w.writeLong(r.nextLong()); + } + longPage = w.getBytes().toByteArray(); + w.close(); + } + + // Encode FLOAT + { + PlainValuesWriter w = new PlainValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (int i = 0; i < VALUE_COUNT; i++) { + w.writeFloat(r.nextFloat()); + } + floatPage = w.getBytes().toByteArray(); + w.close(); + } + + // Encode DOUBLE + { + PlainValuesWriter w = new PlainValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (int i = 0; i < VALUE_COUNT; i++) { + w.writeDouble(r.nextDouble()); + } + doublePage = w.getBytes().toByteArray(); + w.close(); + } + } + + // ---- BINARY state: parameterized by string length ---- + + /** + * Separate state for BINARY decode benchmarks. Pre-encodes a page of binary values + * so the {@code stringLength} parameter only creates trials for binary-related + * benchmark methods. + */ + @State(Scope.Thread) + public static class BinaryState { + /** Short (10), medium (100), and long (1000) strings. */ + @Param({"10", "100", "1000"}) + public int stringLength; + + byte[] page; + + @Setup(Level.Trial) + public void setup() throws IOException { + Binary[] data = + TestDataFactory.generateBinaryData(VALUE_COUNT, stringLength, 0, TestDataFactory.DEFAULT_SEED); + PlainValuesWriter w = new PlainValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (Binary v : data) { + w.writeBytes(v); + } + page = w.getBytes().toByteArray(); + w.close(); + } + } + + // ---- FLBA state: parameterized by fixed byte length ---- + + /** + * Separate state for FIXED_LEN_BYTE_ARRAY decode benchmarks. Pre-encodes a page of + * FLBA values so the {@code fixedLength} parameter only creates trials for + * FLBA-related benchmark methods. Values: 2 = FLOAT16, 12 = INT96, 16 = UUID. + */ + @State(Scope.Thread) + public static class FlbaState { + /** FLOAT16 (2), INT96 (12), UUID (16). */ + @Param({"2", "12", "16"}) + public int fixedLength; + + byte[] page; + + @Setup(Level.Trial) + public void setup() throws IOException { + Binary[] data = TestDataFactory.generateFixedLenByteArrays( + VALUE_COUNT, fixedLength, 0, TestDataFactory.DEFAULT_SEED); + FixedLenByteArrayPlainValuesWriter w = new FixedLenByteArrayPlainValuesWriter( + fixedLength, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (Binary v : data) { + w.writeBytes(v); + } + page = w.getBytes().toByteArray(); + w.close(); + } + } + + // ---- BOOLEAN ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeBoolean(Blackhole bh) throws IOException { + ValuesReader reader = new BooleanPlainValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(boolPage))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBoolean()); + } + } + + // ---- INT32 ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeInt(Blackhole bh) throws IOException { + PlainValuesReader.IntegerPlainValuesReader reader = new PlainValuesReader.IntegerPlainValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(intPage))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readInteger()); + } + } + + // ---- INT64 ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeLong(Blackhole bh) throws IOException { + PlainValuesReader.LongPlainValuesReader reader = new PlainValuesReader.LongPlainValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(longPage))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readLong()); + } + } + + // ---- FLOAT ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeFloat(Blackhole bh) throws IOException { + PlainValuesReader.FloatPlainValuesReader reader = new PlainValuesReader.FloatPlainValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(floatPage))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readFloat()); + } + } + + // ---- DOUBLE ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDouble(Blackhole bh) throws IOException { + PlainValuesReader.DoublePlainValuesReader reader = new PlainValuesReader.DoublePlainValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(doublePage))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readDouble()); + } + } + + // ---- BINARY (parameterized by string length) ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeBinary(BinaryState state, Blackhole bh) throws IOException { + BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(state.page))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } + + // ---- FIXED_LEN_BYTE_ARRAY (parameterized by fixed length) ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeFixedLenByteArray(FlbaState state, Blackhole bh) throws IOException { + FixedLenByteArrayPlainValuesReader reader = new FixedLenByteArrayPlainValuesReader(state.fixedLength); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(state.page))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PlainEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PlainEncodingBenchmark.java new file mode 100644 index 0000000000..71d8e6008a --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PlainEncodingBenchmark.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter; +import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; +import org.apache.parquet.column.values.plain.PlainValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Encoding micro-benchmarks for the PLAIN encoding across all Parquet primitive types: + * {@code BOOLEAN}, {@code INT32}, {@code INT64}, {@code FLOAT}, {@code DOUBLE}, + * {@code BINARY}, and {@code FIXED_LEN_BYTE_ARRAY}. + * + *

Measures per-value scalar write throughput using direct ByteBuffer I/O + * through {@code CapacityByteArrayOutputStream}. + * + *

Fixed-width types (BOOLEAN through DOUBLE) are data-independent for PLAIN + * encoding -- the cost per value is constant regardless of the value content or + * distribution pattern -- so no {@code @Param} is needed. + * + *

Variable/fixed-length byte types use inner {@link State} classes with + * {@code @Param} for the dimension that genuinely affects PLAIN throughput: + *

+ */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class PlainEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + // ---- Fixed-width data (no @Param needed -- PLAIN cost is constant per value) ---- + + private boolean[] boolData; + private int[] intData; + private long[] longData; + private float[] floatData; + private double[] doubleData; + + @Setup(Level.Trial) + public void setup() { + Random r = new Random(42); + boolData = new boolean[VALUE_COUNT]; + intData = new int[VALUE_COUNT]; + longData = new long[VALUE_COUNT]; + floatData = new float[VALUE_COUNT]; + doubleData = new double[VALUE_COUNT]; + for (int i = 0; i < VALUE_COUNT; i++) { + boolData[i] = r.nextBoolean(); + intData[i] = r.nextInt(); + longData[i] = r.nextLong(); + floatData[i] = r.nextFloat(); + doubleData[i] = r.nextDouble(); + } + } + + // ---- BINARY state: parameterized by string length ---- + + /** + * Separate state for BINARY benchmarks so the {@code stringLength} parameter only + * creates trials for binary-related benchmark methods, not for fixed-width types. + */ + @State(Scope.Thread) + public static class BinaryState { + /** Short (10), medium (100), and long (1000) strings. */ + @Param({"10", "100", "1000"}) + public int stringLength; + + Binary[] data; + + @Setup(Level.Trial) + public void setup() { + data = TestDataFactory.generateBinaryData(VALUE_COUNT, stringLength, 0, TestDataFactory.DEFAULT_SEED); + } + } + + // ---- FLBA state: parameterized by fixed byte length ---- + + /** + * Separate state for FIXED_LEN_BYTE_ARRAY benchmarks so the {@code fixedLength} + * parameter only creates trials for FLBA-related benchmark methods. + * Values: 2 = FLOAT16, 12 = INT96, 16 = UUID. + */ + @State(Scope.Thread) + public static class FlbaState { + /** FLOAT16 (2), INT96 (12), UUID (16). */ + @Param({"2", "12", "16"}) + public int fixedLength; + + Binary[] data; + + @Setup(Level.Trial) + public void setup() { + data = TestDataFactory.generateFixedLenByteArrays( + VALUE_COUNT, fixedLength, 0, TestDataFactory.DEFAULT_SEED); + } + } + + // ---- Writer factories ---- + + private static PlainValuesWriter newWriter() { + return new PlainValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + private static BooleanPlainValuesWriter newBoolWriter() { + return new BooleanPlainValuesWriter(); + } + + private static FixedLenByteArrayPlainValuesWriter newFlbaWriter(int fixedLength) { + return new FixedLenByteArrayPlainValuesWriter( + fixedLength, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + // ---- BOOLEAN ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeBoolean() throws IOException { + ValuesWriter w = newBoolWriter(); + for (boolean v : boolData) { + w.writeBoolean(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + // ---- INT32 ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeInt() throws IOException { + PlainValuesWriter w = newWriter(); + for (int v : intData) { + w.writeInteger(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + // ---- INT64 ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeLong() throws IOException { + PlainValuesWriter w = newWriter(); + for (long v : longData) { + w.writeLong(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + // ---- FLOAT ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeFloat() throws IOException { + PlainValuesWriter w = newWriter(); + for (float v : floatData) { + w.writeFloat(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + // ---- DOUBLE ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDouble() throws IOException { + PlainValuesWriter w = newWriter(); + for (double v : doubleData) { + w.writeDouble(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + // ---- BINARY (parameterized by string length) ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeBinary(BinaryState state) throws IOException { + PlainValuesWriter w = newWriter(); + for (Binary v : state.data) { + w.writeBytes(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + // ---- FIXED_LEN_BYTE_ARRAY (parameterized by fixed length) ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeFixedLenByteArray(FlbaState state) throws IOException { + FixedLenByteArrayPlainValuesWriter w = newFlbaWriter(state.fixedLength); + for (Binary v : state.data) { + w.writeBytes(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java new file mode 100644 index 0000000000..e666ebe6f3 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.nio.charset.StandardCharsets; +import java.util.Random; +import org.apache.parquet.io.api.Binary; + +/** + * Utility class for generating test data for encoding benchmarks. + */ +public final class TestDataFactory { + + /** Default RNG seed used across benchmarks for deterministic data. */ + public static final long DEFAULT_SEED = 42L; + + private TestDataFactory() {} + + // ---- Fixed-length byte array data generation ---- + + /** + * Generates fixed-length byte arrays with the specified cardinality. + * + * @param count number of values + * @param length byte length of each value + * @param distinct number of distinct values (0 means all unique) + * @param seed RNG seed + */ + public static Binary[] generateFixedLenByteArrays(int count, int length, int distinct, long seed) { + Random random = new Random(seed); + if (distinct > 0) { + Binary[] palette = new Binary[distinct]; + for (int i = 0; i < distinct; i++) { + byte[] bytes = new byte[length]; + random.nextBytes(bytes); + palette[i] = Binary.fromConstantByteArray(bytes); + } + Binary[] data = new Binary[count]; + for (int i = 0; i < count; i++) { + data[i] = palette[random.nextInt(distinct)]; + } + return data; + } else { + Binary[] data = new Binary[count]; + for (int i = 0; i < count; i++) { + byte[] bytes = new byte[length]; + random.nextBytes(bytes); + data[i] = Binary.fromConstantByteArray(bytes); + } + return data; + } + } + + // ---- Binary data generation ---- + + /** + * Generates binary strings of the given length with the specified cardinality. + * + * @param count number of values + * @param stringLength length of each string + * @param distinct number of distinct values (0 means all unique) + * @param seed RNG seed + */ + public static Binary[] generateBinaryData(int count, int stringLength, int distinct, long seed) { + Random random = new Random(seed); + Binary[] data = new Binary[count]; + if (distinct > 0) { + Binary[] dictionary = new Binary[distinct]; + for (int i = 0; i < distinct; i++) { + dictionary[i] = Binary.fromConstantByteArray( + randomString(stringLength, random).getBytes(StandardCharsets.UTF_8)); + } + for (int i = 0; i < count; i++) { + data[i] = dictionary[random.nextInt(distinct)]; + } + } else { + for (int i = 0; i < count; i++) { + data[i] = Binary.fromConstantByteArray( + randomString(stringLength, random).getBytes(StandardCharsets.UTF_8)); + } + } + return data; + } + + private static String randomString(int length, Random random) { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append((char) ('a' + random.nextInt(26))); + } + return sb.toString(); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java index 6ce2f31a43..532dd2a20c 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java @@ -19,44 +19,44 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.parquet.bytes.ByteBufferInputStream; -import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +/** + * Plain encoding reader for BINARY values. + * + *

Reads directly from a {@link ByteBuffer} with {@link ByteOrder#LITTLE_ENDIAN} byte order, + * using {@link ByteBuffer#getInt()} for the 4-byte length prefix instead of 4 individual + * {@code InputStream.read()} calls through {@link org.apache.parquet.bytes.BytesUtils#readIntLittleEndian}. + */ public class BinaryPlainValuesReader extends ValuesReader { - private static final Logger LOG = LoggerFactory.getLogger(BinaryPlainValuesReader.class); - private ByteBufferInputStream in; + private ByteBuffer buffer; @Override public Binary readBytes() { - try { - int length = BytesUtils.readIntLittleEndian(in); - return Binary.fromConstantByteBuffer(in.slice(length)); - } catch (IOException | RuntimeException e) { - throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e); - } + int length = buffer.getInt(); + ByteBuffer valueSlice = buffer.slice(); + valueSlice.limit(length); + buffer.position(buffer.position() + length); + return Binary.fromConstantByteBuffer(valueSlice); } @Override public void skip() { - try { - int length = BytesUtils.readIntLittleEndian(in); - in.skipFully(length); - } catch (IOException | RuntimeException e) { - throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e); - } + int length = buffer.getInt(); + buffer.position(buffer.position() + length); } @Override public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { - LOG.debug( - "init from page at offset {} for length {}", - stream.position(), - (stream.available() - stream.position())); - this.in = stream.remainingStream(); + int available = stream.available(); + if (available > 0) { + this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN); + } else { + this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN); + } } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java index 22ca2d567c..79581842c4 100755 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java @@ -18,57 +18,65 @@ */ package org.apache.parquet.column.values.plain; -import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN; - import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * encodes boolean for the plain encoding: one bit at a time (0 = false) + * Decodes PLAIN-encoded booleans: one bit per value, packed 8 per byte, little-endian + * bit order (bit 0 of each byte is the first value). + * + *

Direct bit extraction from the page ByteBuffer avoids the overhead of the generic + * bit-packing machinery ({@code ByteBitPackingValuesReader}) and intermediate + * {@code int[8]} buffers. */ public class BooleanPlainValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(BooleanPlainValuesReader.class); - private ByteBitPackingValuesReader in = new ByteBitPackingValuesReader(1, LITTLE_ENDIAN); + private byte[] pageData; + private int pageOffset; + private int bitIndex; - /** - * {@inheritDoc} - * - * @see org.apache.parquet.column.values.ValuesReader#readBoolean() - */ @Override - public boolean readBoolean() { - return in.readInteger() == 0 ? false : true; + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { + LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); + int effectiveBitLength = valueCount; // bitWidth = 1 + int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); + length = Math.min(length, stream.available()); + ByteBuffer buf = stream.slice(length); + + // Bulk access: use backing array directly if available, otherwise copy once. + if (buf.hasArray()) { + pageData = buf.array(); + pageOffset = buf.arrayOffset() + buf.position(); + } else { + pageData = new byte[length]; + buf.get(pageData); + pageOffset = 0; + } + bitIndex = 0; + updateNextOffset(length); } - /** - * {@inheritDoc} - * - * @see org.apache.parquet.column.values.ValuesReader#skip() - */ @Override - public void skip() { - in.readInteger(); + public boolean readBoolean() { + int byteIdx = pageOffset + (bitIndex >>> 3); + int bitPos = bitIndex & 7; + bitIndex++; + return ((pageData[byteIdx] >>> bitPos) & 1) != 0; } - /** - * {@inheritDoc} - * - * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream) - */ @Override - public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { - LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); - this.in.initFromPage(valueCount, stream); + public void skip() { + bitIndex++; } - @Deprecated @Override - public int getNextOffset() { - return in.getNextOffset(); + public void skip(int n) { + bitIndex += n; } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java index 7f80ec150a..5fb899067b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java @@ -19,52 +19,74 @@ package org.apache.parquet.column.values.plain; import static org.apache.parquet.column.Encoding.PLAIN; -import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN; import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.CapacityByteArrayOutputStream; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.values.ValuesWriter; -import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesWriter; /** - * An implementation of the PLAIN encoding + * An implementation of the PLAIN encoding for BOOLEAN values. + * + *

Packs booleans directly into bytes (8 per byte, LSB first) without + * going through the generic int-based bit-packing encoder. */ public class BooleanPlainValuesWriter extends ValuesWriter { - private ByteBitPackingValuesWriter bitPackingWriter; + private static final int INITIAL_SLAB_SIZE = 1024; + private static final int MAX_CAPACITY = 64 * 1024; + + private final CapacityByteArrayOutputStream baos; + private int currentByte; + private int bitsWritten; public BooleanPlainValuesWriter() { - bitPackingWriter = new ByteBitPackingValuesWriter(1, LITTLE_ENDIAN); + this.baos = new CapacityByteArrayOutputStream(INITIAL_SLAB_SIZE, MAX_CAPACITY); + this.currentByte = 0; + this.bitsWritten = 0; } @Override public final void writeBoolean(boolean v) { - bitPackingWriter.writeInteger(v ? 1 : 0); + currentByte |= ((v ? 1 : 0) << bitsWritten); + bitsWritten++; + if (bitsWritten == 8) { + baos.write(currentByte); + currentByte = 0; + bitsWritten = 0; + } } @Override public long getBufferedSize() { - return bitPackingWriter.getBufferedSize(); + return baos.size() + (bitsWritten > 0 ? 1 : 0); } @Override public BytesInput getBytes() { - return bitPackingWriter.getBytes(); + if (bitsWritten > 0) { + baos.write(currentByte); + currentByte = 0; + bitsWritten = 0; + } + return BytesInput.from(baos); } @Override public void reset() { - bitPackingWriter.reset(); + baos.reset(); + currentByte = 0; + bitsWritten = 0; } @Override public void close() { - bitPackingWriter.close(); + baos.close(); } @Override public long getAllocatedSize() { - return bitPackingWriter.getAllocatedSize(); + return baos.getCapacity(); } @Override @@ -74,6 +96,6 @@ public Encoding getEncoding() { @Override public String memUsageString(String prefix) { - return bitPackingWriter.memUsageString(prefix); + return String.format("%s BooleanPlainValuesWriter %d bytes", prefix, getAllocatedSize()); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java index adfc488924..8fb50542b4 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java @@ -19,21 +19,26 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * ValuesReader for FIXED_LEN_BYTE_ARRAY. + * + *

Reads directly from a {@link ByteBuffer}, bypassing the {@link ByteBufferInputStream} + * wrapper to avoid per-value stream overhead (remaining checks, IOException wrapping, + * virtual dispatch). The underlying page data is obtained as a single contiguous + * {@link ByteBuffer} via {@link ByteBufferInputStream#slice(int)}. */ public class FixedLenByteArrayPlainValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(FixedLenByteArrayPlainValuesReader.class); private final int length; - private ByteBufferInputStream in; + private ByteBuffer buffer; public FixedLenByteArrayPlainValuesReader(int length) { this.length = length; @@ -41,30 +46,29 @@ public FixedLenByteArrayPlainValuesReader(int length) { @Override public Binary readBytes() { - try { - return Binary.fromConstantByteBuffer(in.slice(length)); - } catch (IOException | RuntimeException e) { - throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e); - } + Binary result = Binary.fromConstantByteBuffer(buffer, buffer.position(), length); + buffer.position(buffer.position() + length); + return result; } @Override public void skip() { - skip(1); + buffer.position(buffer.position() + length); } @Override public void skip(int n) { - try { - in.skipFully(n * length); - } catch (IOException | RuntimeException e) { - throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e); - } + buffer.position(buffer.position() + n * length); } @Override public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); - this.in = stream.remainingStream(); + int available = stream.available(); + if (available > 0) { + this.buffer = stream.slice(available); + } else { + this.buffer = ByteBuffer.allocate(0); + } } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java index dec4d1be1b..b57fdef84a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java @@ -22,7 +22,6 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; -import org.apache.parquet.bytes.LittleEndianDataOutputStream; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.io.ParquetEncodingException; @@ -34,19 +33,15 @@ * ValuesWriter for FIXED_LEN_BYTE_ARRAY. */ public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter { - private static final Logger LOG = LoggerFactory.getLogger(PlainValuesWriter.class); + private static final Logger LOG = LoggerFactory.getLogger(FixedLenByteArrayPlainValuesWriter.class); private CapacityByteArrayOutputStream arrayOut; - private LittleEndianDataOutputStream out; private int length; - private ByteBufferAllocator allocator; public FixedLenByteArrayPlainValuesWriter( int length, int initialSize, int pageSize, ByteBufferAllocator allocator) { this.length = length; - this.allocator = allocator; - this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, this.allocator); - this.out = new LittleEndianDataOutputStream(arrayOut); + this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator); } @Override @@ -56,7 +51,7 @@ public final void writeBytes(Binary v) { "Fixed Binary size " + v.length() + " does not match field type length " + length); } try { - v.writeTo(out); + v.writeTo(arrayOut); } catch (IOException e) { throw new ParquetEncodingException("could not write fixed bytes", e); } @@ -69,11 +64,6 @@ public long getBufferedSize() { @Override public BytesInput getBytes() { - try { - out.flush(); - } catch (IOException e) { - throw new ParquetEncodingException("could not write page", e); - } LOG.debug("writing a buffer of size {}", arrayOut.size()); return BytesInput.from(arrayOut); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java index a0c7af7394..eff926a374 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java @@ -19,25 +19,35 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.parquet.bytes.ByteBufferInputStream; -import org.apache.parquet.bytes.LittleEndianDataInputStream; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.ParquetDecodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Plain encoding for float, double, int, long + * Plain encoding for float, double, int, long. + * + *

Reads directly from a {@link ByteBuffer} with {@link ByteOrder#LITTLE_ENDIAN} byte order, + * bypassing the deprecated {@code LittleEndianDataInputStream} wrapper to avoid per-value virtual + * dispatch overhead. The underlying page data is obtained as a single contiguous {@link ByteBuffer} + * via {@link ByteBufferInputStream#slice(int)}. */ public abstract class PlainValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(PlainValuesReader.class); - protected LittleEndianDataInputStream in; + protected ByteBuffer buffer; @Override public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); - this.in = new LittleEndianDataInputStream(stream.remainingStream()); + int available = stream.available(); + if (available > 0) { + this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN); + } else { + this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN); + } } @Override @@ -45,31 +55,16 @@ public void skip() { skip(1); } - void skipBytesFully(int n) throws IOException { - int skipped = 0; - while (skipped < n) { - skipped += in.skipBytes(n - skipped); - } - } - public static class DoublePlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - skipBytesFully(n * 8); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " double values", e); - } + buffer.position(buffer.position() + n * 8); } @Override public double readDouble() { - try { - return in.readDouble(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read double", e); - } + return buffer.getDouble(); } } @@ -77,20 +72,12 @@ public static class FloatPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - skipBytesFully(n * 4); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " floats", e); - } + buffer.position(buffer.position() + n * 4); } @Override public float readFloat() { - try { - return in.readFloat(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read float", e); - } + return buffer.getFloat(); } } @@ -98,20 +85,12 @@ public static class IntegerPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - in.skipBytes(n * 4); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " ints", e); - } + buffer.position(buffer.position() + n * 4); } @Override public int readInteger() { - try { - return in.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read int", e); - } + return buffer.getInt(); } } @@ -119,20 +98,12 @@ public static class LongPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { - try { - in.skipBytes(n * 8); - } catch (IOException e) { - throw new ParquetDecodingException("could not skip " + n + " longs", e); - } + buffer.position(buffer.position() + n * 8); } @Override public long readLong() { - try { - return in.readLong(); - } catch (IOException e) { - throw new ParquetDecodingException("could not read long", e); - } + return buffer.getLong(); } } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java index c7069bc092..be5dfc0fa8 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java @@ -23,7 +23,6 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; -import org.apache.parquet.bytes.LittleEndianDataOutputStream; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.io.ParquetEncodingException; @@ -41,18 +40,16 @@ public class PlainValuesWriter extends ValuesWriter { public static final Charset CHARSET = Charset.forName("UTF-8"); private CapacityByteArrayOutputStream arrayOut; - private LittleEndianDataOutputStream out; public PlainValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) { arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator); - out = new LittleEndianDataOutputStream(arrayOut); } @Override public final void writeBytes(Binary v) { try { - out.writeInt(v.length()); - v.writeTo(out); + arrayOut.writeInt(v.length()); + v.writeTo(arrayOut); } catch (IOException e) { throw new ParquetEncodingException("could not write bytes", e); } @@ -60,47 +57,27 @@ public final void writeBytes(Binary v) { @Override public final void writeInteger(int v) { - try { - out.writeInt(v); - } catch (IOException e) { - throw new ParquetEncodingException("could not write int", e); - } + arrayOut.writeInt(v); } @Override public final void writeLong(long v) { - try { - out.writeLong(v); - } catch (IOException e) { - throw new ParquetEncodingException("could not write long", e); - } + arrayOut.writeLong(v); } @Override public final void writeFloat(float v) { - try { - out.writeFloat(v); - } catch (IOException e) { - throw new ParquetEncodingException("could not write float", e); - } + arrayOut.writeInt(Float.floatToIntBits(v)); } @Override public final void writeDouble(double v) { - try { - out.writeDouble(v); - } catch (IOException e) { - throw new ParquetEncodingException("could not write double", e); - } + arrayOut.writeLong(Double.doubleToLongBits(v)); } @Override public void writeByte(int value) { - try { - out.write(value); - } catch (IOException e) { - throw new ParquetEncodingException("could not write byte", e); - } + arrayOut.write(value); } @Override @@ -110,11 +87,6 @@ public long getBufferedSize() { @Override public BytesInput getBytes() { - try { - out.flush(); - } catch (IOException e) { - throw new ParquetEncodingException("could not write page", e); - } if (LOG.isDebugEnabled()) LOG.debug("writing a buffer of size {}", arrayOut.size()); return BytesInput.from(arrayOut); } @@ -127,7 +99,6 @@ public void reset() { @Override public void close() { arrayOut.close(); - out.close(); } @Override diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestBinaryPlainValuesWriterReader.java b/parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestBinaryPlainValuesWriterReader.java new file mode 100644 index 0000000000..da29bddb95 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestBinaryPlainValuesWriterReader.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.plain; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; +import org.apache.parquet.io.api.Binary; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for {@link BinaryPlainValuesReader} (variable-length BINARY) + * covering scalar round-trips via {@link PlainValuesWriter}. + */ +public class TestBinaryPlainValuesWriterReader { + + private TrackingByteBufferAllocator allocator; + + @Before + public void initAllocator() { + allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator()); + } + + @After + public void closeAllocator() { + allocator.close(); + } + + private PlainValuesWriter newWriter() { + return new PlainValuesWriter(1024, 64 * 1024, allocator); + } + + private ByteBufferInputStream wrapForReading(PlainValuesWriter writer) throws IOException { + byte[] bytes = writer.getBytes().toByteArray(); + return ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)); + } + + private Binary binaryFromString(String s) { + return Binary.fromConstantByteArray(s.getBytes(java.nio.charset.StandardCharsets.UTF_8)); + } + + // ---- Scalar round-trip ---- + + @Test + public void testScalarRoundTrip() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + String[] strings = {"hello", "", "world", "a", "longer string value"}; + for (String s : strings) { + writer.writeBytes(binaryFromString(s)); + } + + BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); + reader.initFromPage(strings.length, wrapForReading(writer)); + + for (String s : strings) { + assertEquals(s, reader.readBytes().toStringUsingUTF8()); + } + } + } + + // ---- Skip ---- + + @Test + public void testSkip() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + writer.writeBytes(binaryFromString("first")); + writer.writeBytes(binaryFromString("second")); + writer.writeBytes(binaryFromString("third")); + + BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); + reader.initFromPage(3, wrapForReading(writer)); + + reader.skip(); // skip "first" + assertEquals("second", reader.readBytes().toStringUsingUTF8()); + } + } + + // ---- Empty page ---- + + @Test + public void testEmptyPage() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); + reader.initFromPage(0, wrapForReading(writer)); + // Should not throw + } + } + + // ---- Binary content (non-UTF8) ---- + + @Test + public void testBinaryContent() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + byte[] raw1 = {0, 1, 2, (byte) 0xFF, (byte) 0xFE}; + byte[] raw2 = {(byte) 0x80, 0}; + Binary bin1 = Binary.fromConstantByteArray(raw1); + Binary bin2 = Binary.fromConstantByteArray(raw2); + + writer.writeBytes(bin1); + writer.writeBytes(bin2); + + BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); + reader.initFromPage(2, wrapForReading(writer)); + + assertArrayEquals(raw1, reader.readBytes().getBytes()); + assertArrayEquals(raw2, reader.readBytes().getBytes()); + } + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestBooleanPlainValuesWriterReader.java b/parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestBooleanPlainValuesWriterReader.java new file mode 100644 index 0000000000..aba92ec68e --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestBooleanPlainValuesWriterReader.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.plain; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.Encoding; +import org.junit.Test; + +/** + * Tests for {@link BooleanPlainValuesWriter} and {@link BooleanPlainValuesReader} + * covering scalar round-trips, edge cases with partial bytes, and skip. + */ +public class TestBooleanPlainValuesWriterReader { + + private ByteBufferInputStream wrapForReading(BooleanPlainValuesWriter writer) throws IOException { + byte[] bytes = writer.getBytes().toByteArray(); + return ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)); + } + + // ---- Encoding metadata ---- + + @Test + public void testEncoding() { + try (BooleanPlainValuesWriter writer = new BooleanPlainValuesWriter()) { + assertEquals(Encoding.PLAIN, writer.getEncoding()); + } + } + + // ---- Scalar round-trip ---- + + @Test + public void testScalarRoundTrip() throws IOException { + try (BooleanPlainValuesWriter writer = new BooleanPlainValuesWriter()) { + boolean[] expected = {true, false, true, true, false, false, true, false}; + for (boolean v : expected) { + writer.writeBoolean(v); + } + + BooleanPlainValuesReader reader = new BooleanPlainValuesReader(); + reader.initFromPage(expected.length, wrapForReading(writer)); + + for (int i = 0; i < expected.length; i++) { + assertEquals("value at index " + i, expected[i], reader.readBoolean()); + } + } + } + + // ---- Partial byte (< 8 booleans) ---- + + @Test + public void testPartialByte() throws IOException { + try (BooleanPlainValuesWriter writer = new BooleanPlainValuesWriter()) { + boolean[] expected = {true, false, true}; + for (boolean v : expected) { + writer.writeBoolean(v); + } + + BooleanPlainValuesReader reader = new BooleanPlainValuesReader(); + reader.initFromPage(expected.length, wrapForReading(writer)); + + for (int i = 0; i < expected.length; i++) { + assertEquals("value at index " + i, expected[i], reader.readBoolean()); + } + } + } + + // ---- Skip ---- + + @Test + public void testSkip() throws IOException { + try (BooleanPlainValuesWriter writer = new BooleanPlainValuesWriter()) { + boolean[] values = {true, false, true, false, true}; + for (boolean v : values) { + writer.writeBoolean(v); + } + + BooleanPlainValuesReader reader = new BooleanPlainValuesReader(); + reader.initFromPage(values.length, wrapForReading(writer)); + + reader.skip(); // skip true + assertEquals(false, reader.readBoolean()); + reader.skip(2); // skip true, false + assertEquals(true, reader.readBoolean()); + } + } + + // ---- Single value ---- + + @Test + public void testSingleValue() throws IOException { + try (BooleanPlainValuesWriter writer = new BooleanPlainValuesWriter()) { + writer.writeBoolean(true); + + BooleanPlainValuesReader reader = new BooleanPlainValuesReader(); + reader.initFromPage(1, wrapForReading(writer)); + + assertEquals(true, reader.readBoolean()); + } + } + + // ---- Reset ---- + + @Test + public void testWriterReset() throws IOException { + try (BooleanPlainValuesWriter writer = new BooleanPlainValuesWriter()) { + writer.writeBoolean(true); + writer.writeBoolean(false); + writer.reset(); + assertEquals(0, writer.getBufferedSize()); + + writer.writeBoolean(false); + writer.writeBoolean(true); + + BooleanPlainValuesReader reader = new BooleanPlainValuesReader(); + reader.initFromPage(2, wrapForReading(writer)); + + assertEquals(false, reader.readBoolean()); + assertEquals(true, reader.readBoolean()); + } + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestFixedLenByteArrayPlainValuesWriterReader.java b/parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestFixedLenByteArrayPlainValuesWriterReader.java new file mode 100644 index 0000000000..09ec6dbe7a --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestFixedLenByteArrayPlainValuesWriterReader.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.plain; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.io.api.Binary; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for {@link FixedLenByteArrayPlainValuesWriter} and + * {@link FixedLenByteArrayPlainValuesReader} covering scalar + * round-trips for fixed-length byte arrays. + */ +public class TestFixedLenByteArrayPlainValuesWriterReader { + + private static final int FIXED_LEN = 12; + + private TrackingByteBufferAllocator allocator; + + @Before + public void initAllocator() { + allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator()); + } + + @After + public void closeAllocator() { + allocator.close(); + } + + private FixedLenByteArrayPlainValuesWriter newWriter() { + return new FixedLenByteArrayPlainValuesWriter(FIXED_LEN, 1024, 64 * 1024, allocator); + } + + private ByteBufferInputStream wrapForReading(FixedLenByteArrayPlainValuesWriter writer) throws IOException { + byte[] bytes = writer.getBytes().toByteArray(); + return ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)); + } + + private Binary fixedBinary(int seed) { + byte[] data = new byte[FIXED_LEN]; + for (int i = 0; i < FIXED_LEN; i++) { + data[i] = (byte) ((seed + i) & 0xFF); + } + return Binary.fromConstantByteArray(data); + } + + // ---- Encoding metadata ---- + + @Test + public void testEncoding() { + try (FixedLenByteArrayPlainValuesWriter writer = newWriter()) { + assertEquals(Encoding.PLAIN, writer.getEncoding()); + } + } + + // ---- Scalar round-trip ---- + + @Test + public void testScalarRoundTrip() throws IOException { + try (FixedLenByteArrayPlainValuesWriter writer = newWriter()) { + Binary[] expected = {fixedBinary(0), fixedBinary(100), fixedBinary(200)}; + for (Binary v : expected) { + writer.writeBytes(v); + } + + FixedLenByteArrayPlainValuesReader reader = new FixedLenByteArrayPlainValuesReader(FIXED_LEN); + reader.initFromPage(expected.length, wrapForReading(writer)); + + for (int i = 0; i < expected.length; i++) { + assertArrayEquals( + "value at index " + i, + expected[i].getBytes(), + reader.readBytes().getBytes()); + } + } + } + + // ---- Skip ---- + + @Test + public void testSkip() throws IOException { + try (FixedLenByteArrayPlainValuesWriter writer = newWriter()) { + writer.writeBytes(fixedBinary(1)); + writer.writeBytes(fixedBinary(2)); + writer.writeBytes(fixedBinary(3)); + writer.writeBytes(fixedBinary(4)); + + FixedLenByteArrayPlainValuesReader reader = new FixedLenByteArrayPlainValuesReader(FIXED_LEN); + reader.initFromPage(4, wrapForReading(writer)); + + reader.skip(); // skip 1 + assertArrayEquals(fixedBinary(2).getBytes(), reader.readBytes().getBytes()); + reader.skip(1); // skip 3 + assertArrayEquals(fixedBinary(4).getBytes(), reader.readBytes().getBytes()); + } + } + + // ---- Wrong length rejection ---- + + @Test + public void testRejectWrongLengthScalar() { + try (FixedLenByteArrayPlainValuesWriter writer = newWriter()) { + Binary wrongLen = Binary.fromConstantByteArray(new byte[FIXED_LEN + 1]); + try { + writer.writeBytes(wrongLen); + fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException e) { + // expected + } + } + } + + // ---- Reset ---- + + @Test + public void testWriterReset() throws IOException { + try (FixedLenByteArrayPlainValuesWriter writer = newWriter()) { + writer.writeBytes(fixedBinary(99)); + writer.reset(); + assertEquals(0, writer.getBufferedSize()); + + writer.writeBytes(fixedBinary(42)); + + FixedLenByteArrayPlainValuesReader reader = new FixedLenByteArrayPlainValuesReader(FIXED_LEN); + reader.initFromPage(1, wrapForReading(writer)); + + assertArrayEquals(fixedBinary(42).getBytes(), reader.readBytes().getBytes()); + } + } + + // ---- Empty page ---- + + @Test + public void testEmptyPage() throws IOException { + try (FixedLenByteArrayPlainValuesWriter writer = newWriter()) { + FixedLenByteArrayPlainValuesReader reader = new FixedLenByteArrayPlainValuesReader(FIXED_LEN); + reader.initFromPage(0, wrapForReading(writer)); + // Should not throw + } + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestPlainValuesWriterReader.java b/parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestPlainValuesWriterReader.java new file mode 100644 index 0000000000..6b98f74b07 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/plain/TestPlainValuesWriterReader.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.plain; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; +import org.apache.parquet.column.Encoding; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for {@link PlainValuesWriter} and {@link PlainValuesReader} covering + * scalar read/write round-trips for int, long, float, and double. + */ +public class TestPlainValuesWriterReader { + + private TrackingByteBufferAllocator allocator; + + @Before + public void initAllocator() { + allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator()); + } + + @After + public void closeAllocator() { + allocator.close(); + } + + private PlainValuesWriter newWriter() { + return new PlainValuesWriter(1024, 64 * 1024, allocator); + } + + private ByteBufferInputStream wrapForReading(PlainValuesWriter writer) throws IOException { + byte[] bytes = writer.getBytes().toByteArray(); + return ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)); + } + + // ---- Encoding metadata ---- + + @Test + public void testEncoding() { + try (PlainValuesWriter writer = newWriter()) { + assertEquals(Encoding.PLAIN, writer.getEncoding()); + } + } + + // ---- Integer scalar ---- + + @Test + public void testIntegerScalarRoundTrip() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + int[] expected = {0, 1, -1, Integer.MIN_VALUE, Integer.MAX_VALUE, 42, -42}; + for (int v : expected) { + writer.writeInteger(v); + } + + PlainValuesReader.IntegerPlainValuesReader reader = new PlainValuesReader.IntegerPlainValuesReader(); + reader.initFromPage(expected.length, wrapForReading(writer)); + + for (int i = 0; i < expected.length; i++) { + assertEquals("value at index " + i, expected[i], reader.readInteger()); + } + } + } + + // ---- Long scalar ---- + + @Test + public void testLongScalarRoundTrip() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + long[] expected = {0L, 1L, -1L, Long.MIN_VALUE, Long.MAX_VALUE, 123456789L}; + for (long v : expected) { + writer.writeLong(v); + } + + PlainValuesReader.LongPlainValuesReader reader = new PlainValuesReader.LongPlainValuesReader(); + reader.initFromPage(expected.length, wrapForReading(writer)); + + for (int i = 0; i < expected.length; i++) { + assertEquals("value at index " + i, expected[i], reader.readLong()); + } + } + } + + // ---- Float scalar ---- + + @Test + public void testFloatScalarRoundTrip() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + float[] expected = { + 0.0f, + 1.5f, + -1.5f, + Float.MIN_VALUE, + Float.MAX_VALUE, + Float.NaN, + Float.POSITIVE_INFINITY, + Float.NEGATIVE_INFINITY + }; + for (float v : expected) { + writer.writeFloat(v); + } + + PlainValuesReader.FloatPlainValuesReader reader = new PlainValuesReader.FloatPlainValuesReader(); + reader.initFromPage(expected.length, wrapForReading(writer)); + + for (int i = 0; i < expected.length; i++) { + assertEquals( + "value at index " + i, + Float.floatToIntBits(expected[i]), + Float.floatToIntBits(reader.readFloat())); + } + } + } + + // ---- Double scalar ---- + + @Test + public void testDoubleScalarRoundTrip() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + double[] expected = { + 0.0, + 1.5, + -1.5, + Double.MIN_VALUE, + Double.MAX_VALUE, + Double.NaN, + Double.POSITIVE_INFINITY, + Double.NEGATIVE_INFINITY + }; + for (double v : expected) { + writer.writeDouble(v); + } + + PlainValuesReader.DoublePlainValuesReader reader = new PlainValuesReader.DoublePlainValuesReader(); + reader.initFromPage(expected.length, wrapForReading(writer)); + + for (int i = 0; i < expected.length; i++) { + assertEquals( + "value at index " + i, + Double.doubleToLongBits(expected[i]), + Double.doubleToLongBits(reader.readDouble())); + } + } + } + + // ---- Skip ---- + + @Test + public void testIntegerSkipThenRead() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + writer.writeInteger(1); + writer.writeInteger(2); + writer.writeInteger(3); + writer.writeInteger(4); + + PlainValuesReader.IntegerPlainValuesReader reader = new PlainValuesReader.IntegerPlainValuesReader(); + reader.initFromPage(4, wrapForReading(writer)); + + reader.skip(); // skip 1 + assertEquals(2, reader.readInteger()); + reader.skip(1); // skip 3 + assertEquals(4, reader.readInteger()); + } + } + + @Test + public void testLongSkip() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + writer.writeLong(100L); + writer.writeLong(200L); + writer.writeLong(300L); + + PlainValuesReader.LongPlainValuesReader reader = new PlainValuesReader.LongPlainValuesReader(); + reader.initFromPage(3, wrapForReading(writer)); + + reader.skip(2); + assertEquals(300L, reader.readLong()); + } + } + + @Test + public void testFloatSkip() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + writer.writeFloat(1.0f); + writer.writeFloat(2.0f); + writer.writeFloat(3.0f); + + PlainValuesReader.FloatPlainValuesReader reader = new PlainValuesReader.FloatPlainValuesReader(); + reader.initFromPage(3, wrapForReading(writer)); + + reader.skip(); + assertEquals(Float.floatToIntBits(2.0f), Float.floatToIntBits(reader.readFloat())); + } + } + + @Test + public void testDoubleSkip() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + writer.writeDouble(1.0); + writer.writeDouble(2.0); + writer.writeDouble(3.0); + + PlainValuesReader.DoublePlainValuesReader reader = new PlainValuesReader.DoublePlainValuesReader(); + reader.initFromPage(3, wrapForReading(writer)); + + reader.skip(); + assertEquals(Double.doubleToLongBits(2.0), Double.doubleToLongBits(reader.readDouble())); + } + } + + // ---- Reset ---- + + @Test + public void testWriterReset() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + writer.writeInteger(999); + writer.reset(); + assertEquals(0, writer.getBufferedSize()); + + writer.writeInteger(42); + + PlainValuesReader.IntegerPlainValuesReader reader = new PlainValuesReader.IntegerPlainValuesReader(); + reader.initFromPage(1, wrapForReading(writer)); + + assertEquals(42, reader.readInteger()); + } + } + + // ---- Empty page ---- + + @Test + public void testEmptyPage() throws IOException { + try (PlainValuesWriter writer = newWriter()) { + PlainValuesReader.IntegerPlainValuesReader reader = new PlainValuesReader.IntegerPlainValuesReader(); + reader.initFromPage(0, wrapForReading(writer)); + // Should not throw — no values to read + } + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java index 0e66140744..0098b537fe 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java @@ -608,7 +608,14 @@ public void writeAllTo(OutputStream out) throws IOException { @Override void writeInto(ByteBuffer buffer) { - buffer.put(arrayOut.toByteArray()); + // Use writeTo() which writes directly from the internal buf[] array, + // avoiding the toByteArray() copy that would allocate a full-size byte[] + try { + arrayOut.writeTo(new ByteBufferBackedOutputStream(buffer)); + } catch (IOException e) { + // ByteBufferBackedOutputStream does not throw IOException + throw new RuntimeException("Unexpected IOException writing to ByteBuffer", e); + } } @Override @@ -617,6 +624,29 @@ public long size() { } } + /** + * Thin adapter that allows writing to a {@link ByteBuffer} via the {@link OutputStream} interface. + * Used by {@link BAOSBytesInput#writeInto(ByteBuffer)} to avoid the intermediate copy from + * {@link ByteArrayOutputStream#toByteArray()}. + */ + private static class ByteBufferBackedOutputStream extends OutputStream { + private final ByteBuffer buffer; + + ByteBufferBackedOutputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + @Override + public void write(int b) { + buffer.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) { + buffer.put(b, off, len); + } + } + private static class ByteArrayBytesInput extends BytesInput { private final byte[] in; @@ -643,6 +673,20 @@ public ByteBuffer toByteBuffer() throws IOException { return java.nio.ByteBuffer.wrap(in, offset, length); } + /** + * Zero-copy override: returns the backing array directly when fully used, + * skipping the base-class BAOS allocation + copy on every decompressor call. + * Returning the mutable array is safe — the base class already exposes a + * mutable {@code BAOS.getBuf()}. + */ + @Override + public byte[] toByteArray() { + if (offset == 0 && length == in.length) { + return in; + } + return Arrays.copyOfRange(in, offset, offset + length); + } + @Override public long size() { return length; diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java index d3d8b1b6de..3aa2c736a3 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; import java.util.List; import org.apache.parquet.OutputStreamCloseException; @@ -201,6 +202,7 @@ private void addSlab(int minimumSize) { LOG.debug("used {} slabs, adding new slab of size {}", slabs.size(), nextSlabSize); this.currentSlab = allocator.allocate(nextSlabSize); + this.currentSlab.order(ByteOrder.LITTLE_ENDIAN); this.slabs.add(currentSlab); this.bytesAllocated = Math.addExact(this.bytesAllocated, nextSlabSize); } @@ -232,6 +234,34 @@ public void write(byte b[], int off, int len) { bytesUsed = Math.addExact(bytesUsed, len); } + /** + * Writes an int in little-endian byte order directly to the underlying slab, + * bypassing intermediate byte array decomposition. Slabs are set to + * {@link ByteOrder#LITTLE_ENDIAN} order so {@code putInt} produces the correct encoding. + * + * @param v the int value to write + */ + public void writeInt(int v) { + if (currentSlab.remaining() < 4) { + addSlab(4); + } + currentSlab.putInt(v); + bytesUsed = Math.addExact(bytesUsed, 4); + } + + /** + * Writes a long in little-endian byte order directly to the underlying slab. + * + * @param v the long value to write + */ + public void writeLong(long v) { + if (currentSlab.remaining() < 8) { + addSlab(8); + } + currentSlab.putLong(v); + bytesUsed = Math.addExact(bytesUsed, 8); + } + private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException { if (buf.hasArray()) { out.write(buf.array(), buf.arrayOffset(), len); diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java index ef6c71bc86..baf7109836 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java @@ -30,6 +30,7 @@ public class LittleEndianDataOutputStream extends OutputStream { private static final Logger LOG = LoggerFactory.getLogger(LittleEndianDataOutputStream.class); private final OutputStream out; + private byte writeBuffer[] = new byte[8]; /** * Creates a new data output stream to write data to the specified @@ -130,8 +131,9 @@ public final void writeByte(int v) throws IOException { * @see java.io.FilterOutputStream#out */ public final void writeShort(int v) throws IOException { - out.write((v >>> 0) & 0xFF); - out.write((v >>> 8) & 0xFF); + writeBuffer[0] = (byte) (v >>> 0); + writeBuffer[1] = (byte) (v >>> 8); + out.write(writeBuffer, 0, 2); } /** @@ -144,17 +146,13 @@ public final void writeShort(int v) throws IOException { * @see java.io.FilterOutputStream#out */ public final void writeInt(int v) throws IOException { - // TODO: see note in LittleEndianDataInputStream: maybe faster - // to use Integer.reverseBytes() and then writeInt, or a ByteBuffer - // approach - out.write((v >>> 0) & 0xFF); - out.write((v >>> 8) & 0xFF); - out.write((v >>> 16) & 0xFF); - out.write((v >>> 24) & 0xFF); + writeBuffer[0] = (byte) (v >>> 0); + writeBuffer[1] = (byte) (v >>> 8); + writeBuffer[2] = (byte) (v >>> 16); + writeBuffer[3] = (byte) (v >>> 24); + out.write(writeBuffer, 0, 4); } - private byte writeBuffer[] = new byte[8]; - /** * Writes a long to the underlying output stream as eight * bytes, low byte first. In no exception is thrown, the counter diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java index 38d4b79219..6aca1811fe 100644 --- a/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -128,6 +129,28 @@ public void testFromByteArray() throws IOException { validate(data, factory); } + @Test + public void testFromByteArrayToByteArrayZeroCopy() throws IOException { + // Full array (offset=0, length=array.length): toByteArray() returns the backing array directly + byte[] data = new byte[1000]; + RANDOM.nextBytes(data); + BytesInput bi = BytesInput.from(data, 0, data.length); + byte[] result = bi.toByteArray(); + assertSame("toByteArray() should return the backing array when offset=0 and length=full", data, result); + } + + @Test + public void testFromByteArrayToByteArraySubRange() throws IOException { + // Sub-range (offset != 0): toByteArray() must return a copy of the specified range + byte[] input = new byte[1000]; + RANDOM.nextBytes(input); + BytesInput bi = BytesInput.from(input, 10, 500); + byte[] result = bi.toByteArray(); + byte[] expected = new byte[500]; + System.arraycopy(input, 10, expected, 0, 500); + assertArrayEquals(expected, result); + } + @Test public void testFromInputStream() throws IOException { byte[] data = new byte[1000]; diff --git a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java index 583b902099..f272594691 100644 --- a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java @@ -300,4 +300,59 @@ private void validate(CapacityByteArrayOutputStream capacityByteArrayOutputStrea assertEquals(i, byteArray[i]); } } + + // ---- Scalar write methods (writeInt, writeLong) ---- + + /** + * Reads a little-endian int from the byte array at position {@code pos}. + */ + private static int readIntLE(byte[] b, int pos) { + return (b[pos] & 0xFF) | ((b[pos + 1] & 0xFF) << 8) | ((b[pos + 2] & 0xFF) << 16) | ((b[pos + 3] & 0xFF) << 24); + } + + /** + * Reads a little-endian long from the byte array at position {@code pos}. + */ + private static long readLongLE(byte[] b, int pos) { + return (b[pos] & 0xFFL) + | ((b[pos + 1] & 0xFFL) << 8) + | ((b[pos + 2] & 0xFFL) << 16) + | ((b[pos + 3] & 0xFFL) << 24) + | ((b[pos + 4] & 0xFFL) << 32) + | ((b[pos + 5] & 0xFFL) << 40) + | ((b[pos + 6] & 0xFFL) << 48) + | ((b[pos + 7] & 0xFFL) << 56); + } + + @Test + public void testWriteInt() throws Throwable { + try (CapacityByteArrayOutputStream cbaos = newCapacityBAOS(10)) { + int[] values = {0, 1, -1, Integer.MIN_VALUE, Integer.MAX_VALUE, 42}; + for (int v : values) { + cbaos.writeInt(v); + } + assertEquals(values.length * 4, cbaos.size()); + + byte[] bytes = BytesInput.from(cbaos).toByteArray(); + for (int i = 0; i < values.length; i++) { + assertEquals("value at index " + i, values[i], readIntLE(bytes, i * 4)); + } + } + } + + @Test + public void testWriteLong() throws Throwable { + try (CapacityByteArrayOutputStream cbaos = newCapacityBAOS(10)) { + long[] values = {0L, 1L, -1L, Long.MIN_VALUE, Long.MAX_VALUE, 123456789L}; + for (long v : values) { + cbaos.writeLong(v); + } + assertEquals(values.length * 8, cbaos.size()); + + byte[] bytes = BytesInput.from(cbaos).toByteArray(); + for (int i = 0; i < values.length; i++) { + assertEquals("value at index " + i, values[i], readLongLE(bytes, i * 8)); + } + } + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index a17cf678f5..8d735b55ab 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -38,9 +38,8 @@ import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.inOrder; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteOrder; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -49,7 +48,6 @@ import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.bytes.HeapByteBufferAllocator; -import org.apache.parquet.bytes.LittleEndianDataInputStream; import org.apache.parquet.bytes.TrackingByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; @@ -249,12 +247,7 @@ public void test(Configuration config, ByteBufferAllocator allocator) throws Exc } private int intValue(BytesInput in) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - in.writeAllTo(baos); - LittleEndianDataInputStream os = new LittleEndianDataInputStream(new ByteArrayInputStream(baos.toByteArray())); - int i = os.readInt(); - os.close(); - return i; + return in.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN).getInt(); } @Test diff --git a/pom.xml b/pom.xml index 1bd9893d87..b2fd26c5d5 100644 --- a/pom.xml +++ b/pom.xml @@ -594,6 +594,8 @@ org.apache.parquet.internal.column.columnindex.IndexIterator org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader#gatherElementDataFromStreams(byte[]) + + org.apache.parquet.column.values.plain.PlainValuesReader#in org.apache.parquet.arrow.schema.SchemaMapping$TypeMappingVisitor#visit(org.apache.parquet.arrow.schema.SchemaMapping$MapTypeMapping)