From eba984724dda47188b1be2c0473761c79f14d373 Mon Sep 17 00:00:00 2001 From: "Sung Yun (CODE SIGNING KEY)" Date: Tue, 19 May 2026 21:22:53 -0400 Subject: [PATCH] fix int96 timestamp offset in arrow dictionary decode --- ...ectorizedParquetDefinitionLevelReader.java | 2 +- ...ectorizedParquetDefinitionLevelReader.java | 115 ++++++++++++++++++ 2 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/TestVectorizedParquetDefinitionLevelReader.java diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java index 1ca3bfe809c0..d093d4c97989 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java @@ -539,7 +539,7 @@ protected void nextDictEncodedVal( .toByteBuffer() .order(ByteOrder.LITTLE_ENDIAN); long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer); - vector.getDataBuffer().setLong(idx, timestampInt96); + vector.getDataBuffer().setLong((long) idx * typeWidth, timestampInt96); break; default: throw new UnsupportedOperationException( diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/TestVectorizedParquetDefinitionLevelReader.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/TestVectorizedParquetDefinitionLevelReader.java new file mode 100644 index 000000000000..10bf3e64766c --- /dev/null +++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/TestVectorizedParquetDefinitionLevelReader.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.arrow.vectorized.parquet; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.io.api.Binary; +import org.junit.jupiter.api.Test; + +public class TestVectorizedParquetDefinitionLevelReader { + private static final int UNIX_EPOCH_JULIAN_DAY = 2_440_588; + + @Test + public void timestampInt96ReaderPackedDictionaryDecodeDecodesRowsCorrectly() { + try (RootAllocator allocator = new RootAllocator(); + BigIntVector vector = new BigIntVector("ts", allocator)) { + vector.allocateNew(2); + vector.set(0, -1L); + vector.set(1, -1L); + + VectorizedParquetDefinitionLevelReader definitionReader = + new VectorizedParquetDefinitionLevelReader(1, 1, false); + VectorizedDictionaryEncodedParquetValuesReader dictionaryReader = + new VectorizedDictionaryEncodedParquetValuesReader(1, false); + + dictionaryReader.mode = BaseVectorizedParquetValuesReader.Mode.PACKED; + dictionaryReader.currentCount = 2; + dictionaryReader.packedValuesBuffer[0] = 0; + dictionaryReader.packedValuesBuffer[1] = 1; + + Dictionary dictionary = + new Dictionary(Encoding.PLAIN_DICTIONARY) { + @Override + public int getMaxId() { + return 1; + } + + @Override + public Binary decodeToBinary(int id) { + if (id == 0) { + return int96Binary(111_111L); + } else if (id == 1) { + return int96Binary(222_222L); + } + + throw new IllegalArgumentException("Unexpected dictionary id: " + id); + } + }; + + VectorizedParquetDefinitionLevelReader.TimestampInt96Reader timestampReader = + definitionReader.timestampInt96Reader(); + + timestampReader.nextDictEncodedVal( + vector, + 0, + dictionaryReader, + dictionary, + BaseVectorizedParquetValuesReader.Mode.PACKED, + 1, + null, + Long.BYTES); + timestampReader.nextDictEncodedVal( + vector, + 1, + dictionaryReader, + dictionary, + BaseVectorizedParquetValuesReader.Mode.PACKED, + 1, + null, + Long.BYTES); + + vector.setValueCount(2); + + assertThat(vector.get(0)) + .as("row 0 should receive the first decoded timestamp") + .isEqualTo(111_111L); + assertThat(vector.get(1)) + .as("row 1 should receive the second decoded timestamp") + .isEqualTo(222_222L); + } + } + + private static Binary int96Binary(long micros) { + long timeOfDayNanos = micros * 1_000L; + byte[] bytes = + ByteBuffer.allocate(12) + .order(ByteOrder.LITTLE_ENDIAN) + .putLong(timeOfDayNanos) + .putInt(UNIX_EPOCH_JULIAN_DAY) + .array(); + return Binary.fromConstantByteArray(bytes); + } +}