Skip to content

Commit

Permalink
fix: Return unsupported error for merging schemas in the presence of …
Browse files Browse the repository at this point in the history
…partiton columns
  • Loading branch information
emcake authored and rtyler committed May 7, 2024
1 parent cfb20f1 commit 35664c0
Showing 1 changed file with 88 additions and 2 deletions.
90 changes: 88 additions & 2 deletions crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
values: RecordBatch,
mode: WriteMode,
) -> Result<(), DeltaTableError> {
if mode == WriteMode::MergeSchema && !self.partition_columns.is_empty() {
return Err(DeltaTableError::Generic(
"Merging Schemas with partition columns present is currently unsupported"
.to_owned(),
));
}
// Set the should_evolve flag for later in case the writer should perform schema evolution
// on its flush_and_commit
self.should_evolve = mode == WriteMode::MergeSchema;
Expand Down Expand Up @@ -237,8 +243,12 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {

if self.arrow_schema_ref != self.original_schema_ref && self.should_evolve {
let schema: StructType = self.arrow_schema_ref.clone().try_into()?;
// TODO: Handle partition columns somehow? Can we even evolve partition columns? Maybe
// this should just propagate the existing columns in the new action
if !self.partition_columns.is_empty() {
return Err(DeltaTableError::Generic(
"Merging Schemas with partition columns present is currently unsupported"
.to_owned(),
));
}
let part_cols: Vec<String> = vec![];
let metadata = Metadata::try_new(schema, part_cols, HashMap::new())?;
adds.push(Action::Metadata(metadata));
Expand Down Expand Up @@ -662,6 +672,8 @@ mod tests {
// The following sets of tests are related to #1386 and mergeSchema support
// <https://github.com/delta-io/delta-rs/issues/1386>
mod schema_evolution {
use itertools::Itertools;

use super::*;

#[tokio::test]
Expand Down Expand Up @@ -772,6 +784,80 @@ mod tests {
);
}

#[tokio::test]
async fn test_write_schema_evolution_with_partition_columns_should_fail_as_unsupported() {
let table_schema = get_delta_schema();
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path();

let mut table = CreateBuilder::new()
.with_location(table_path.to_str().unwrap())
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(table_schema.fields().clone())
.with_partition_columns(["id"])
.await
.unwrap();
table.load().await.expect("Failed to load table");
assert_eq!(table.version(), 0);

let batch = get_record_batch(None, false);
let mut writer = RecordBatchWriter::for_table(&table).unwrap();

writer.write(batch).await.unwrap();
let version = writer.flush_and_commit(&mut table).await.unwrap();
assert_eq!(version, 1);
table.load().await.expect("Failed to load table");
assert_eq!(table.version(), 1);

// Create a second batch with appended columns
let second_batch = {
let second = get_record_batch(None, false);
let second_schema = ArrowSchema::new(
second
.schema()
.fields
.iter()
.cloned()
.chain([
Field::new("vid", DataType::Int32, true).into(),
Field::new("name", DataType::Utf8, true).into(),
])
.collect_vec(),
);

let len = second.num_rows();

let second_arrays = second
.columns()
.iter()
.cloned()
.chain([
Arc::new(Int32Array::from(vec![Some(1); len])) as _, // vid
Arc::new(StringArray::from(vec![Some("will"); len])) as _, // name
])
.collect_vec();

RecordBatch::try_new(second_schema.into(), second_arrays).unwrap()
};

let result = writer
.write_with_mode(second_batch, WriteMode::MergeSchema)
.await;

assert!(result.is_err());

match result.unwrap_err() {
DeltaTableError::Generic(s) => {
assert_eq!(
s,
"Merging Schemas with partition columns present is currently unsupported"
)
}
e => panic!("unexpected error: {e:?}"),
}
}

#[tokio::test]
async fn test_schema_evolution_column_type_mismatch() {
let batch = get_record_batch(None, false);
Expand Down

0 comments on commit 35664c0

Please sign in to comment.