From 7983e0dd73a65995a0465722ce822bfde821b04c Mon Sep 17 00:00:00 2001 From: Hao Jiang Date: Wed, 20 May 2026 08:39:46 -0700 Subject: [PATCH 1/3] wip --- .../vectorized/VectorizedArrowReader.java | 11 ++- .../arrow/vectorized/TestArrowReader.java | 93 +++++++++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index e9ebed2826f4..a820975107ed 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -252,7 +252,16 @@ private static Types.NestedField getPhysicalType( // Use FixedSizeBinaryVector for binary backed decimal type = Types.FixedType.ofLength(primitive.getTypeLength()); } - physicalType = Types.NestedField.from(logicalType).ofType(type).build(); + // do not copy initialDefault/writeDefault from the logical type: those defaults are typed + // for the logical (decimal) type and cannot be cast to the underlying physical type + physicalType = + Types.NestedField.builder() + .isOptional(logicalType.isOptional()) + .withId(logicalType.fieldId()) + .withName(logicalType.name()) + .ofType(type) + .withDoc(logicalType.doc()) + .build(); } return physicalType; diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java index cf3eb2700265..6afd336da83f 100644 --- a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java +++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java @@ -388,6 +388,99 @@ public void testTimestampMillisAreReadCorrectly() throws Exception { assertThat(totalRowsRead).as("Should read all rows").isEqualTo(millisValues.size()); } + /** + * Regression test: a decimal column whose Iceberg field carries an initialDefault/writeDefault + * must be readable by the vectorized reader. Before the fix to {@code + * VectorizedArrowReader#getPhysicalType}, allocating the vector copied the decimal-typed default + * onto the underlying physical type (int/long/fixed) and failed with {@code + * IllegalArgumentException: Cannot cast default value to ...}. + */ + @Test + public void testDecimalWithDefaultIsReadByVectorizedReader() throws Exception { + tables = new HadoopTables(); + Schema schema = + new Schema( + Types.NestedField.required("id").withId(1).ofType(Types.IntegerType.get()).build(), + Types.NestedField.optional("amount") + .withId(2) + .ofType(Types.DecimalType.of(5, 2)) + .withInitialDefault(Expressions.lit(new BigDecimal("0.00"))) + .withWriteDefault(Expressions.lit(new BigDecimal("0.00"))) + .build()); + + Table table = + tables.create( + schema, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "3"), + tableLocation); + + File testFile = new File(tempDir, "decimal-with-default.parquet"); + List ids = Lists.newArrayList(1, 2, 3); + // raw INT32 unscaled values for 1.23, 4.56, 7.89 at scale 2 + List rawAmounts = Lists.newArrayList(123, 456, 789); + + MessageType parquetSchema = + new MessageType( + "test", + primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) + .id(1) + .named("id"), + primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.decimalType(2, 5)) + .id(2) + .named("amount")); + + try (ParquetWriter writer = + ExampleParquetWriter.builder(new Path(testFile.toURI())) + .withType(parquetSchema) + // disable dictionary encoding so vector allocation goes through getPhysicalType, + // where the bug surfaces. With dict encoding, allocateDictEncodedVector is used + // and the buggy path is bypassed. + .withDictionaryEncoding(false) + .build()) { + SimpleGroupFactory factory = new SimpleGroupFactory(parquetSchema); + for (int i = 0; i < ids.size(); i++) { + Group group = factory.newGroup(); + group.add("id", ids.get(i)); + group.add("amount", rawAmounts.get(i)); + writer.write(group); + } + } + + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(testFile.getAbsolutePath()) + .withFileSizeInBytes(testFile.length()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(ids.size()) + .build(); + table.newAppend().appendFile(dataFile).commit(); + + int rowIndex = 0; + try (VectorizedTableScanIterable vectorizedReader = + new VectorizedTableScanIterable(table.newScan(), 1024, false)) { + for (ColumnarBatch batch : vectorizedReader) { + VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors(); + FieldVector amountVector = root.getVector("amount"); + assertThat(amountVector) + .as("INT32-backed decimal should be exposed as IntVector by the vectorized reader") + .isInstanceOf(IntVector.class); + + IntVector amounts = (IntVector) amountVector; + for (int i = 0; i < root.getRowCount(); i++) { + assertThat(amounts.get(i)) + .as("Row %d: raw INT32 decimal value", rowIndex) + .isEqualTo(rawAmounts.get(rowIndex)); + rowIndex++; + } + root.close(); + } + } + + assertThat(rowIndex).as("Should read all rows").isEqualTo(ids.size()); + } + @ParameterizedTest @MethodSource("rejectedUnsignedIntegerCases") public void testUnsignedIntegerColumnThrowsException( From 5037e584022210550da4f0cd8f5be9eccfbf39d0 Mon Sep 17 00:00:00 2001 From: Hao Jiang Date: Thu, 21 May 2026 16:43:34 -0700 Subject: [PATCH 2/3] address comments --- .../arrow/vectorized/TestArrowReader.java | 93 ------------------- .../parquet/TestParquetVectorizedReads.java | 49 ++++++++++ 2 files changed, 49 insertions(+), 93 deletions(-) diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java index 6afd336da83f..cf3eb2700265 100644 --- a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java +++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java @@ -388,99 +388,6 @@ public void testTimestampMillisAreReadCorrectly() throws Exception { assertThat(totalRowsRead).as("Should read all rows").isEqualTo(millisValues.size()); } - /** - * Regression test: a decimal column whose Iceberg field carries an initialDefault/writeDefault - * must be readable by the vectorized reader. Before the fix to {@code - * VectorizedArrowReader#getPhysicalType}, allocating the vector copied the decimal-typed default - * onto the underlying physical type (int/long/fixed) and failed with {@code - * IllegalArgumentException: Cannot cast default value to ...}. - */ - @Test - public void testDecimalWithDefaultIsReadByVectorizedReader() throws Exception { - tables = new HadoopTables(); - Schema schema = - new Schema( - Types.NestedField.required("id").withId(1).ofType(Types.IntegerType.get()).build(), - Types.NestedField.optional("amount") - .withId(2) - .ofType(Types.DecimalType.of(5, 2)) - .withInitialDefault(Expressions.lit(new BigDecimal("0.00"))) - .withWriteDefault(Expressions.lit(new BigDecimal("0.00"))) - .build()); - - Table table = - tables.create( - schema, - PartitionSpec.unpartitioned(), - ImmutableMap.of(TableProperties.FORMAT_VERSION, "3"), - tableLocation); - - File testFile = new File(tempDir, "decimal-with-default.parquet"); - List ids = Lists.newArrayList(1, 2, 3); - // raw INT32 unscaled values for 1.23, 4.56, 7.89 at scale 2 - List rawAmounts = Lists.newArrayList(123, 456, 789); - - MessageType parquetSchema = - new MessageType( - "test", - primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) - .id(1) - .named("id"), - primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL) - .as(LogicalTypeAnnotation.decimalType(2, 5)) - .id(2) - .named("amount")); - - try (ParquetWriter writer = - ExampleParquetWriter.builder(new Path(testFile.toURI())) - .withType(parquetSchema) - // disable dictionary encoding so vector allocation goes through getPhysicalType, - // where the bug surfaces. With dict encoding, allocateDictEncodedVector is used - // and the buggy path is bypassed. - .withDictionaryEncoding(false) - .build()) { - SimpleGroupFactory factory = new SimpleGroupFactory(parquetSchema); - for (int i = 0; i < ids.size(); i++) { - Group group = factory.newGroup(); - group.add("id", ids.get(i)); - group.add("amount", rawAmounts.get(i)); - writer.write(group); - } - } - - DataFile dataFile = - DataFiles.builder(PartitionSpec.unpartitioned()) - .withPath(testFile.getAbsolutePath()) - .withFileSizeInBytes(testFile.length()) - .withFormat(FileFormat.PARQUET) - .withRecordCount(ids.size()) - .build(); - table.newAppend().appendFile(dataFile).commit(); - - int rowIndex = 0; - try (VectorizedTableScanIterable vectorizedReader = - new VectorizedTableScanIterable(table.newScan(), 1024, false)) { - for (ColumnarBatch batch : vectorizedReader) { - VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors(); - FieldVector amountVector = root.getVector("amount"); - assertThat(amountVector) - .as("INT32-backed decimal should be exposed as IntVector by the vectorized reader") - .isInstanceOf(IntVector.class); - - IntVector amounts = (IntVector) amountVector; - for (int i = 0; i < root.getRowCount(); i++) { - assertThat(amounts.get(i)) - .as("Row %d: raw INT32 decimal value", rowIndex) - .isEqualTo(rawAmounts.get(rowIndex)); - rowIndex++; - } - root.close(); - } - } - - assertThat(rowIndex).as("Should read all rows").isEqualTo(ids.size()); - } - @ParameterizedTest @MethodSource("rejectedUnsignedIntegerCases") public void testUnsignedIntegerColumnThrowsException( diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index 6011c6dad7d2..2ef94023347e 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.math.BigDecimal; import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Path; @@ -45,6 +46,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; @@ -65,6 +67,7 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; @@ -254,6 +257,16 @@ FileAppender getParquetV2Writer(Schema schema, File testFile) throws IOE .build(); } + FileAppender getParquetWriterWithoutDictionary(Schema schema, File testFile) + throws IOException { + return Parquet.write(Files.localOutput(testFile)) + .schema(schema) + .createWriterFunc(GenericParquetWriter::create) + .named("test") + .set(ParquetOutputFormat.ENABLE_DICTIONARY, "false") + .build(); + } + void assertRecordsMatch( Schema schema, int expectedSize, @@ -460,6 +473,42 @@ public void testUuidReads() throws Exception { assertRecordsMatch(schema, numRows, data, dataFile, false, BATCH_SIZE); } + @Test + public void testDecimalWithDefaultValueNotDictionaryEncoded() throws Exception { + // Regression test for vector allocation of a decimal column whose Iceberg field carries a + // default value. The bug only surfaces when the column is not dictionary-encoded, because + // VectorizedArrowReader#allocateDictEncodedVector bypasses the buggy code path. + Schema schema = + new Schema( + required(100, "id", Types.LongType.get()), + Types.NestedField.optional("int_backed") + .withId(101) + .ofType(Types.DecimalType.of(5, 2)) + .withInitialDefault(Literal.of(new BigDecimal("0.00"))) + .withWriteDefault(Literal.of(new BigDecimal("0.00"))) + .build(), + Types.NestedField.optional("long_backed") + .withId(102) + .ofType(Types.DecimalType.of(15, 2)) + .withInitialDefault(Literal.of(new BigDecimal("0.00"))) + .withWriteDefault(Literal.of(new BigDecimal("0.00"))) + .build(), + Types.NestedField.optional("fixed_backed") + .withId(103) + .ofType(Types.DecimalType.of(25, 2)) + .withInitialDefault(Literal.of(new BigDecimal("0.00"))) + .withWriteDefault(Literal.of(new BigDecimal("0.00"))) + .build()); + + File dataFile = temp.resolve("decimal-no-dict.parquet").toFile(); + Iterable data = generateData(schema, 1000, 0L, 0.0f, IDENTITY); + try (FileAppender writer = getParquetWriterWithoutDictionary(schema, dataFile)) { + writer.addAll(data); + } + + assertRecordsMatch(schema, 1000, data, dataFile, false, BATCH_SIZE); + } + private void assertIdenticalFileContents( File actual, File expected, Schema schema, boolean vectorized) throws IOException { try (CloseableIterable expectedIterator = From 21f6a068260a0f04d0eecb3938f12c50eb4f0690 Mon Sep 17 00:00:00 2001 From: Hao Jiang Date: Fri, 22 May 2026 11:37:57 -0700 Subject: [PATCH 3/3] address comments and add new tests --- .../vectorized/VectorizedArrowReader.java | 12 +- .../TestVectorizedDefaultValues.java | 142 ++++++++++++++++++ .../parquet/TestParquetVectorizedReads.java | 7 +- 3 files changed, 149 insertions(+), 12 deletions(-) create mode 100644 arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestVectorizedDefaultValues.java diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index a820975107ed..1727a2b7dd3c 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -252,15 +252,13 @@ private static Types.NestedField getPhysicalType( // Use FixedSizeBinaryVector for binary backed decimal type = Types.FixedType.ofLength(primitive.getTypeLength()); } - // do not copy initialDefault/writeDefault from the logical type: those defaults are typed - // for the logical (decimal) type and cannot be cast to the underlying physical type + // drop initialDefault/writeDefault: they are typed for the logical (decimal) type and + // cannot be cast to the underlying physical type physicalType = - Types.NestedField.builder() - .isOptional(logicalType.isOptional()) - .withId(logicalType.fieldId()) - .withName(logicalType.name()) + Types.NestedField.from(logicalType) .ofType(type) - .withDoc(logicalType.doc()) + .withInitialDefault(null) + .withWriteDefault(null) .build(); } diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestVectorizedDefaultValues.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestVectorizedDefaultValues.java new file mode 100644 index 000000000000..5b50168d6167 --- /dev/null +++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestVectorizedDefaultValues.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.arrow.vectorized; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.math.BigDecimal; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Vectorized-read tests focused on Iceberg field defaults. The reader has two paths that interact + * with defaults: + * + *
    + *
  • column missing from the Parquet file → defaults are applied via a {@code + * ConstantVectorReader} in {@code VectorizedReaderBuilder}; + *
  • column present in the Parquet file → defaults are not consumed for value reads, but they + * were historically copied through {@code VectorizedArrowReader#getPhysicalType} when the + * reader rewrote the field to its underlying physical type. For decimal columns, that copy + * failed because {@code DecimalLiteral.to(IntegerType | LongType | FixedType)} returns {@code + * null}, which trips {@code Preconditions.checkArgument} in {@code NestedField#castDefault}. + *
+ * + *

These tests exercise the second path. The bug only surfaces when the column is not + * dictionary-encoded — with dictionary encoding {@code allocateDictEncodedVector} is used and + * {@code getPhysicalType} is bypassed. So the parquet file is written with dictionary encoding + * disabled. + */ +public class TestVectorizedDefaultValues { + + @TempDir private File tempDir; + + @Test + public void testDecimalWithDefaultValueNotDictionaryEncoded() throws Exception { + Schema schema = + new Schema( + Types.NestedField.required("id").withId(1).ofType(Types.LongType.get()).build(), + Types.NestedField.optional("int_backed") + .withId(2) + .ofType(Types.DecimalType.of(5, 2)) + .withInitialDefault(Literal.of(new BigDecimal("0.00"))) + .withWriteDefault(Literal.of(new BigDecimal("0.00"))) + .build(), + Types.NestedField.optional("long_backed") + .withId(3) + .ofType(Types.DecimalType.of(15, 2)) + .withInitialDefault(Literal.of(new BigDecimal("0.00"))) + .withWriteDefault(Literal.of(new BigDecimal("0.00"))) + .build(), + Types.NestedField.optional("fixed_backed") + .withId(4) + .ofType(Types.DecimalType.of(25, 2)) + .withInitialDefault(Literal.of(new BigDecimal("0.00"))) + .withWriteDefault(Literal.of(new BigDecimal("0.00"))) + .build()); + + HadoopTables tables = new HadoopTables(); + Table table = + tables.create( + schema, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "3"), + tempDir.toURI().toString()); + + List records = Lists.newArrayList(); + GenericRecord template = GenericRecord.create(schema); + for (long i = 0; i < 5; i++) { + GenericRecord rec = template.copy(); + rec.setField("id", i); + rec.setField("int_backed", new BigDecimal("12.34")); + rec.setField("long_backed", new BigDecimal("1234567890.12")); + rec.setField("fixed_backed", new BigDecimal("1234567890123456789.12")); + records.add(rec); + } + + File dataFile = new File(tempDir, "decimal-no-dict.parquet"); + try (FileAppender writer = + Parquet.write(Files.localOutput(dataFile)) + .schema(schema) + .createWriterFunc(GenericParquetWriter::create) + .set(ParquetOutputFormat.ENABLE_DICTIONARY, "false") + .build()) { + writer.addAll(records); + } + + DataFile parquetFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(dataFile.getAbsolutePath()) + .withFileSizeInBytes(dataFile.length()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(records.size()) + .build(); + table.newAppend().appendFile(parquetFile).commit(); + + int rowsRead = 0; + try (VectorizedTableScanIterable reader = + new VectorizedTableScanIterable(table.newScan(), 1024, false)) { + for (ColumnarBatch batch : reader) { + rowsRead += batch.numRows(); + } + } + + assertThat(rowsRead).isEqualTo(records.size()); + } +} diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index 2ef94023347e..4a24a725e0f5 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -257,7 +257,7 @@ FileAppender getParquetV2Writer(Schema schema, File testFile) throws IOE .build(); } - FileAppender getParquetWriterWithoutDictionary(Schema schema, File testFile) + FileAppender parquetWriterWithoutDictionary(Schema schema, File testFile) throws IOException { return Parquet.write(Files.localOutput(testFile)) .schema(schema) @@ -475,9 +475,6 @@ public void testUuidReads() throws Exception { @Test public void testDecimalWithDefaultValueNotDictionaryEncoded() throws Exception { - // Regression test for vector allocation of a decimal column whose Iceberg field carries a - // default value. The bug only surfaces when the column is not dictionary-encoded, because - // VectorizedArrowReader#allocateDictEncodedVector bypasses the buggy code path. Schema schema = new Schema( required(100, "id", Types.LongType.get()), @@ -502,7 +499,7 @@ public void testDecimalWithDefaultValueNotDictionaryEncoded() throws Exception { File dataFile = temp.resolve("decimal-no-dict.parquet").toFile(); Iterable data = generateData(schema, 1000, 0L, 0.0f, IDENTITY); - try (FileAppender writer = getParquetWriterWithoutDictionary(schema, dataFile)) { + try (FileAppender writer = parquetWriterWithoutDictionary(schema, dataFile)) { writer.addAll(data); }