Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 106 additions & 23 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,35 +583,25 @@ impl ArrowReader {
true
});

if column_map.len() != leaf_field_ids.len() {
let missing_fields = leaf_field_ids
.iter()
.filter(|field_id| !column_map.contains_key(field_id))
.collect::<Vec<_>>();
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Parquet schema {} and Iceberg schema {} do not match.",
iceberg_schema, iceberg_schema_of_task
),
)
.with_context("column_map", format! {"{:?}", column_map})
.with_context("field_ids", format! {"{:?}", leaf_field_ids})
.with_context("missing_fields", format! {"{:?}", missing_fields}));
}

// Only project columns that exist in the Parquet file.
// Missing columns will be added by RecordBatchTransformer with default/NULL values.
// This supports schema evolution where new columns are added to the table schema
// but old Parquet files don't have them yet.
let mut indices = vec![];
for field_id in leaf_field_ids {
if let Some(col_idx) = column_map.get(&field_id) {
indices.push(*col_idx);
} else {
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Field {} is not found in Parquet schema.", field_id),
));
}
// Skip fields that don't exist in the Parquet file - they will be added later
}

if indices.is_empty() {
// If no columns from the projection exist in the file, project all columns
// This can happen if all requested columns are new and need to be added by the transformer
Ok(ProjectionMask::all())
} else {
Ok(ProjectionMask::leaves(parquet_schema, indices))
}
Ok(ProjectionMask::leaves(parquet_schema, indices))
}
}

Expand Down Expand Up @@ -1958,4 +1948,97 @@ message schema {

Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}

/// Test schema evolution: reading old Parquet file (with only column 'a')
/// using a newer table schema (with columns 'a' and 'b').
/// This tests that:
/// 1. get_arrow_projection_mask allows missing columns
/// 2. RecordBatchTransformer adds missing column 'b' with NULL values
#[tokio::test]
async fn test_schema_evolution_add_column() {
use arrow_array::{Array, Int32Array};

// New table schema: columns 'a' and 'b' (b was added later, file only has 'a')
let new_schema = Arc::new(
Schema::builder()
.with_schema_id(2)
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);

// Create Arrow schema for old Parquet file (only has column 'a')
let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
]));

// Write old Parquet file with only column 'a'
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();

let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{}/old_file.parquet", &table_location)).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();

// Read the old Parquet file using the NEW schema (with column 'b')
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/old_file.parquet", table_location),
data_file_format: DataFileFormat::Parquet,
schema: new_schema.clone(),
project_field_ids: vec![1, 2], // Request both columns 'a' and 'b'
predicate: None,
deletes: vec![],
})]
.into_iter(),
)) as FileScanTaskStream;

let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

// Verify we got the correct data
assert_eq!(result.len(), 1);
let batch = &result[0];

// Should have 2 columns now
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 3);

// Column 'a' should have the original data
let col_a = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(col_a.values(), &[1, 2, 3]);

// Column 'b' should be all NULLs (it didn't exist in the old file)
let col_b = batch
.column(1)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(col_b.null_count(), 3);
assert!(col_b.is_null(0));
assert!(col_b.is_null(1));
assert!(col_b.is_null(2));
}
}
Loading