From 3f87ebcb941fa0294999fbc79de48f9ea234a252 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 16 Oct 2025 20:45:33 +0800 Subject: [PATCH 1/2] fix --- crates/iceberg/src/arrow/reader.rs | 126 +++++++++++++++++++++++------ 1 file changed, 103 insertions(+), 23 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 564b281107..02aed34830 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -583,35 +583,25 @@ impl ArrowReader { true }); - if column_map.len() != leaf_field_ids.len() { - let missing_fields = leaf_field_ids - .iter() - .filter(|field_id| !column_map.contains_key(field_id)) - .collect::>(); - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Parquet schema {} and Iceberg schema {} do not match.", - iceberg_schema, iceberg_schema_of_task - ), - ) - .with_context("column_map", format! {"{:?}", column_map}) - .with_context("field_ids", format! {"{:?}", leaf_field_ids}) - .with_context("missing_fields", format! {"{:?}", missing_fields})); - } - + // Only project columns that exist in the Parquet file. + // Missing columns will be added by RecordBatchTransformer with default/NULL values. + // This supports schema evolution where new columns are added to the table schema + // but old Parquet files don't have them yet. let mut indices = vec![]; for field_id in leaf_field_ids { if let Some(col_idx) = column_map.get(&field_id) { indices.push(*col_idx); - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Field {} is not found in Parquet schema.", field_id), - )); } + // Skip fields that don't exist in the Parquet file - they will be added later + } + + if indices.is_empty() { + // If no columns from the projection exist in the file, project all columns + // This can happen if all requested columns are new and need to be added by the transformer + Ok(ProjectionMask::all()) + } else { + Ok(ProjectionMask::leaves(parquet_schema, indices)) } - Ok(ProjectionMask::leaves(parquet_schema, indices)) } } @@ -1958,4 +1948,94 @@ message schema { Arc::new(SchemaDescriptor::new(Arc::new(schema))) } + + /// Test schema evolution: reading old Parquet file (with only column 'a') + /// using a newer table schema (with columns 'a' and 'b'). + /// This tests that: + /// 1. get_arrow_projection_mask allows missing columns + /// 2. RecordBatchTransformer adds missing column 'b' with NULL values + #[tokio::test] + async fn test_schema_evolution_add_column() { + use arrow_array::{Array, Int32Array}; + + // New table schema: columns 'a' and 'b' (b was added later, file only has 'a') + let new_schema = Arc::new( + Schema::builder() + .with_schema_id(2) + .with_fields(vec![ + NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + // Create Arrow schema for old Parquet file (only has column 'a') + let arrow_schema_old = Arc::new(ArrowSchema::new(vec![ + Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + // Write old Parquet file with only column 'a' + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let file = File::create(format!("{}/old_file.parquet", &table_location)).unwrap(); + let mut writer = + ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + // Read the old Parquet file using the NEW schema (with column 'b') + let reader = ArrowReaderBuilder::new(file_io).build(); + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/old_file.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: new_schema.clone(), + project_field_ids: vec![1, 2], // Request both columns 'a' and 'b' + predicate: None, + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Verify we got the correct data + assert_eq!(result.len(), 1); + let batch = &result[0]; + + // Should have 2 columns now + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 3); + + // Column 'a' should have the original data + let col_a = batch.column(0).as_primitive::(); + assert_eq!(col_a.values(), &[1, 2, 3]); + + // Column 'b' should be all NULLs (it didn't exist in the old file) + let col_b = batch.column(1).as_primitive::(); + assert_eq!(col_b.null_count(), 3); + assert!(col_b.is_null(0)); + assert!(col_b.is_null(1)); + assert!(col_b.is_null(2)); + } } From 58cba6b60bda2a3ba52b22372095040fa9c26922 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 16 Oct 2025 20:55:40 +0800 Subject: [PATCH 2/2] fmt --- crates/iceberg/src/arrow/reader.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 02aed34830..ff4cff0a64 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -1990,8 +1990,7 @@ message schema { .set_compression(Compression::SNAPPY) .build(); let file = File::create(format!("{}/old_file.parquet", &table_location)).unwrap(); - let mut writer = - ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); @@ -2028,11 +2027,15 @@ message schema { assert_eq!(batch.num_rows(), 3); // Column 'a' should have the original data - let col_a = batch.column(0).as_primitive::(); + let col_a = batch + .column(0) + .as_primitive::(); assert_eq!(col_a.values(), &[1, 2, 3]); // Column 'b' should be all NULLs (it didn't exist in the old file) - let col_b = batch.column(1).as_primitive::(); + let col_b = batch + .column(1) + .as_primitive::(); assert_eq!(col_b.null_count(), 3); assert!(col_b.is_null(0)); assert!(col_b.is_null(1));