diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 5fbbbb106a..298b8745ff 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -21,7 +21,9 @@ use std::sync::Arc; use arrow_array::{ Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray, + StructArray, }; +use arrow_buffer::NullBuffer; use arrow_cast::cast; use arrow_schema::{ DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, @@ -443,6 +445,21 @@ impl RecordBatchTransformer { let vals: Vec> = vec![None; num_rows]; Arc::new(BinaryArray::from_opt_vec(vals)) } + (DataType::Struct(fields), None) => { + // Create a StructArray filled with nulls. Per Iceberg spec, optional struct fields + // default to null when added to the schema. We defer non-null default struct values + // and leave them as not implemented yet. + let null_arrays: Vec = fields + .iter() + .map(|field| Self::create_column(field.data_type(), &None, num_rows)) + .collect::>>()?; + + Arc::new(StructArray::new( + fields.clone(), + null_arrays, + Some(NullBuffer::new_null(num_rows)), + )) + } (DataType::Null, _) => Arc::new(NullArray::new(num_rows)), (dt, _) => { return Err(Error::new( @@ -585,6 +602,82 @@ mod test { assert!(date_column.is_null(2)); } + #[test] + fn schema_evolution_adds_struct_column_with_nulls() { + // Test that when a struct column is added after data files are written, + // the transformer can materialize the missing struct column with null values. + // This reproduces the scenario from Iceberg 1.10.0 TestSparkReaderDeletes tests + // where binaryData and structData columns were added to the schema. + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional( + 3, + "struct_col", + Type::Struct(crate::spec::StructType::new(vec![ + NestedField::optional( + 100, + "inner_field", + Type::Primitive(PrimitiveType::String), + ) + .into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(), + ); + let projected_iceberg_field_ids = [1, 2, 3]; + + let mut transformer = + RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids); + + let file_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + simple_field("data", DataType::Utf8, false, "2"), + ])); + + let file_batch = RecordBatch::try_new(file_schema, vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(file_batch).unwrap(); + + assert_eq!(result.num_columns(), 3); + assert_eq!(result.num_rows(), 3); + + let id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_column.values(), &[1, 2, 3]); + + let data_column = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(data_column.value(0), "a"); + assert_eq!(data_column.value(1), "b"); + assert_eq!(data_column.value(2), "c"); + + let struct_column = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(struct_column.is_null(0)); + assert!(struct_column.is_null(1)); + assert!(struct_column.is_null(2)); + } + pub fn source_record_batch() -> RecordBatch { RecordBatch::try_new( arrow_schema_promotion_addition_and_renaming_required(),