Skip to content

Commit

Permalink
Introduce schema evolution on RecordBatchWriter
Browse files Browse the repository at this point in the history
This commit introduces the `WriteMode` enum and the ability to specify
writes which should enable [schema
evolution](https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/).

The result of this is a new `metaData` action added to the transaction
log with the write which reflects the updated schema

There are some caveats however such as all writes must include non-nullable columns.

This change does not modify the Write operation which has a datafusion
dependency. Unfortunately we have some redundancy in API surface insofar
that the writer in src/operations/ just performs parquet writes. The
Write operation however requires datafusion and wiull actually effect
transaction log writes.

Fixes #1386

Sponsored-by: Raft, LLC.
  • Loading branch information
rtyler committed Feb 5, 2024
1 parent f85152b commit 105fb5d
Show file tree
Hide file tree
Showing 6 changed files with 590 additions and 75 deletions.
1 change: 0 additions & 1 deletion crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//! Used to write [RecordBatch]es into a delta table.
//!
//! New Table Semantics
//! - The schema of the [RecordBatch] is used to initialize the table.
Expand Down
111 changes: 91 additions & 20 deletions crates/core/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl From<WriteError> for DeltaTableError {
}

/// Configuration to write data into Delta tables
#[derive(Debug)]
pub struct WriterConfig {
/// Schema of the delta table
table_schema: ArrowSchemaRef,
Expand Down Expand Up @@ -112,6 +113,7 @@ impl WriterConfig {
}
}

#[derive(Debug)]
/// A parquet writer implementation tailored to the needs of writing data to a delta table.
pub struct DeltaWriter {
/// An object store pointing at Delta table root
Expand Down Expand Up @@ -212,6 +214,7 @@ impl DeltaWriter {
}
}

#[derive(Debug)]
pub(crate) struct PartitionWriterConfig {
/// Schema of the data written to disk
file_schema: ArrowSchemaRef,
Expand Down Expand Up @@ -257,6 +260,7 @@ impl PartitionWriterConfig {
}
}

#[derive(Debug)]
pub(crate) struct PartitionWriter {
object_store: ObjectStoreRef,
writer_id: uuid::Uuid,
Expand Down Expand Up @@ -396,12 +400,47 @@ impl PartitionWriter {
mod tests {
use super::*;
use crate::storage::utils::flatten_list_stream as list;
use crate::writer::test_utils::get_record_batch;
use crate::writer::test_utils::*;
use crate::DeltaTableBuilder;
use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use std::sync::Arc;

fn get_delta_writer(
object_store: ObjectStoreRef,
batch: &RecordBatch,
writer_properties: Option<WriterProperties>,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
) -> DeltaWriter {
let config = WriterConfig::new(
batch.schema(),
vec![],
writer_properties,
target_file_size,
write_batch_size,
);
DeltaWriter::new(object_store, config)
}

fn get_partition_writer(
object_store: ObjectStoreRef,
batch: &RecordBatch,
writer_properties: Option<WriterProperties>,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
) -> PartitionWriter {
let config = PartitionWriterConfig::try_new(
batch.schema(),
IndexMap::new(),
writer_properties,
target_file_size,
write_batch_size,
)
.unwrap();
PartitionWriter::try_with_config(object_store, config).unwrap()
}

#[tokio::test]
async fn test_write_partition() {
let log_store = DeltaTableBuilder::from_uri("memory://")
Expand All @@ -411,7 +450,7 @@ mod tests {
let batch = get_record_batch(None, false);

// write single un-partitioned batch
let mut writer = get_writer(object_store.clone(), &batch, None, None, None);
let mut writer = get_partition_writer(object_store.clone(), &batch, None, None, None);
writer.write(&batch).await.unwrap();
let files = list(object_store.as_ref(), None).await.unwrap();
assert_eq!(files.len(), 0);
Expand Down Expand Up @@ -443,8 +482,9 @@ mod tests {
let properties = WriterProperties::builder()
.set_max_row_group_size(1024)
.build();
// configure small target file size and row group size so we can observe multiple files written
let mut writer = get_writer(object_store, &batch, Some(properties), Some(10_000), None);
// configure small target file size and and row group size so we can observe multiple files written
let mut writer =
get_partition_writer(object_store, &batch, Some(properties), Some(10_000), None);
writer.write(&batch).await.unwrap();

// check that we have written more then once file, and no more then 1 is below target size
Expand All @@ -471,7 +511,7 @@ mod tests {
.unwrap()
.object_store();
// configure small target file size so we can observe multiple files written
let mut writer = get_writer(object_store, &batch, None, Some(10_000), None);
let mut writer = get_partition_writer(object_store, &batch, None, Some(10_000), None);
writer.write(&batch).await.unwrap();

// check that we have written more then once file, and no more then 1 is below target size
Expand Down Expand Up @@ -499,28 +539,59 @@ mod tests {
.object_store();
// configure high batch size and low file size to observe one file written and flushed immediately
// upon writing batch, then ensures the buffer is empty upon closing writer
let mut writer = get_writer(object_store, &batch, None, Some(9000), Some(10000));
let mut writer = get_partition_writer(object_store, &batch, None, Some(9000), Some(10000));
writer.write(&batch).await.unwrap();

let adds = writer.close().await.unwrap();
assert!(adds.len() == 1);
}

fn get_writer(
object_store: ObjectStoreRef,
batch: &RecordBatch,
writer_properties: Option<WriterProperties>,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
) -> PartitionWriter {
let config = PartitionWriterConfig::try_new(
batch.schema(),
IndexMap::new(),
writer_properties,
target_file_size,
write_batch_size,
#[tokio::test]
async fn test_write_mismatched_schema() {
let log_store = DeltaTableBuilder::from_uri("memory://")
.build_storage()
.unwrap();
let object_store = log_store.object_store();
let batch = get_record_batch(None, false);

// write single un-partitioned batch
let mut writer = get_delta_writer(object_store.clone(), &batch, None, None, None);
writer.write(&batch).await.unwrap();
// Ensure the write hasn't been flushed
let files = list(object_store.as_ref(), None).await.unwrap();
assert_eq!(files.len(), 0);

// Create a second batch with a different schema
let second_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
]));
let second_batch = RecordBatch::try_new(
second_schema,
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(2)])),
Arc::new(StringArray::from(vec![Some("will"), Some("robert")])),
],
)
.unwrap();
PartitionWriter::try_with_config(object_store, config).unwrap()

let result = writer.write(&second_batch).await;
assert!(result.is_err());

match result {
Ok(_) => {
assert!(false, "Should not have successfully written");
}
Err(e) => {
match e {
DeltaTableError::SchemaMismatch { .. } => {
// this is expected
}
others => {
assert!(false, "Got the wrong error: {others:?}");
}
}
}
};
}
}
1 change: 0 additions & 1 deletion crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ mod tests {
use serde_json::json;

use super::*;
use crate::kernel::Format;
use crate::kernel::StructType;
use crate::operations::DeltaOps;
use crate::protocol::Metadata;
Expand Down
115 changes: 113 additions & 2 deletions crates/core/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_from_message,
record_batch_without_partitions,
};
use super::{DeltaWriter, DeltaWriterError};
use super::{DeltaWriter, DeltaWriterError, WriteMode};
use crate::errors::DeltaTableError;
use crate::kernel::{Add, PartitionsExt, Scalar, StructType};
use crate::table::builder::DeltaTableBuilder;
Expand Down Expand Up @@ -286,8 +286,20 @@ impl JsonWriter {

#[async_trait::async_trait]
impl DeltaWriter<Vec<Value>> for JsonWriter {
/// Writes the given values to internal parquet buffers for each represented partition.
/// Write a chunk of values into the internal write buffers with the default write mode
async fn write(&mut self, values: Vec<Value>) -> Result<(), DeltaTableError> {
self.write_with_mode(values, WriteMode::Default).await
}

/// Writes the given values to internal parquet buffers for each represented partition.
async fn write_with_mode(
&mut self,
values: Vec<Value>,
mode: WriteMode,
) -> Result<(), DeltaTableError> {
if mode != WriteMode::Default {
warn!("The JsonWriter does not currently support non-default write modes, falling back to default mode");
}
let mut partial_writes: Vec<(Value, ParquetError)> = Vec::new();
let arrow_schema = self.arrow_schema();
let divided = self.divide_by_partition_values(values)?;
Expand Down Expand Up @@ -544,4 +556,103 @@ mod tests {
})
));
}

// The following sets of tests are related to #1386 and mergeSchema support
// <https://github.com/delta-io/delta-rs/issues/1386>
mod schema_evolution {
use super::*;

#[tokio::test]
async fn test_json_write_mismatched_values() {
let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();

let arrow_schema = <ArrowSchema as TryFrom<&StructType>>::try_from(&schema).unwrap();
let mut writer = JsonWriter::try_new(
path.clone(),
Arc::new(arrow_schema),
Some(vec!["modified".to_string()]),
None,
)
.unwrap();

let data = serde_json::json!(
{
"id" : "A",
"value": 42,
"modified": "2021-02-01"
}
);

writer.write(vec![data]).await.unwrap();
let add_actions = writer.flush().await.unwrap();
assert_eq!(add_actions.len(), 1);

let second_data = serde_json::json!(
{
"id" : 1,
"name" : "Ion"
}
);

match writer.write(vec![second_data]).await {
Ok(_) => {
assert!(false, "Should not have successfully written");
}
_ => {}
}
}

#[tokio::test]
async fn test_json_write_mismatched_schema() {
use crate::operations::create::CreateBuilder;
let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();

let mut table = CreateBuilder::new()
.with_location(&path)
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(schema.fields().clone())
.await
.unwrap();
table.load().await.expect("Failed to load table");
assert_eq!(table.version(), 0);

let arrow_schema = <ArrowSchema as TryFrom<&StructType>>::try_from(&schema).unwrap();
let mut writer = JsonWriter::try_new(
path.clone(),
Arc::new(arrow_schema),
Some(vec!["modified".to_string()]),
None,
)
.unwrap();

let data = serde_json::json!(
{
"id" : "A",
"value": 42,
"modified": "2021-02-01"
}
);

writer.write(vec![data]).await.unwrap();
let add_actions = writer.flush().await.unwrap();
assert_eq!(add_actions.len(), 1);

let second_data = serde_json::json!(
{
"postcode" : 1,
"name" : "Ion"
}
);

// TODO This should fail because we haven't asked to evolve the schema
writer.write(vec![second_data]).await.unwrap();
writer.flush_and_commit(&mut table).await.unwrap();
assert_eq!(table.version(), 1);
}
}
}
Loading

0 comments on commit 105fb5d

Please sign in to comment.