Skip to content

Commit

Permalink
feat: omit unmodified files during merge write (#1969)
Browse files Browse the repository at this point in the history
# Description
Implements a new Datafusion node called `MergeBarrier` that determines
which files have modifications. For files that do not have modifications
a remove action is no longer created.

# Related Issue(s)
- enhances #850
  • Loading branch information
Blajda committed Dec 30, 2023
1 parent 6da3b3b commit 74f9d33
Show file tree
Hide file tree
Showing 5 changed files with 821 additions and 81 deletions.
8 changes: 8 additions & 0 deletions crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,14 @@ async fn benchmark_merge_tpcds(
.object_store()
.delete(&Path::parse("_delta_log/00000000000000000002.json")?)
.await?;
table
.object_store()
.delete(&Path::parse("_delta_log/00000000000000000003.json")?)
.await?;
let _ = table
.object_store()
.delete(&Path::parse("_delta_log/00000000000000000004.json")?)
.await;

Ok((duration, metrics))
}
Expand Down
16 changes: 16 additions & 0 deletions crates/deltalake-core/src/delta_datafusion/logical.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Logical Operations for DataFusion

use std::collections::HashSet;

use datafusion_expr::{LogicalPlan, UserDefinedLogicalNodeCore};

// Metric Observer is used to update DataFusion metrics from a record batch.
Expand All @@ -10,6 +12,7 @@ pub(crate) struct MetricObserver {
// id is preserved during conversion to physical node
pub id: String,
pub input: LogicalPlan,
pub enable_pushdown: bool,
}

impl UserDefinedLogicalNodeCore for MetricObserver {
Expand All @@ -35,6 +38,18 @@ impl UserDefinedLogicalNodeCore for MetricObserver {
write!(f, "MetricObserver id={}", &self.id)
}

fn prevent_predicate_push_down_columns(&self) -> HashSet<String> {
if self.enable_pushdown {
HashSet::new()
} else {
self.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect()
}
}

fn from_template(
&self,
_exprs: &[datafusion_expr::Expr],
Expand All @@ -43,6 +58,7 @@ impl UserDefinedLogicalNodeCore for MetricObserver {
MetricObserver {
id: self.id.clone(),
input: inputs[0].clone(),
enable_pushdown: self.enable_pushdown,
}
}
}
56 changes: 30 additions & 26 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaR
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use arrow_array::types::UInt16Type;
use arrow_array::{Array, DictionaryArray, StringArray};
use arrow_array::{Array, DictionaryArray, StringArray, TypedDictionaryArray};
use arrow_cast::display::array_value_to_string;

use arrow_schema::Field;
Expand Down Expand Up @@ -132,6 +132,21 @@ fn get_scalar_value(value: Option<&ColumnValueStat>, field: &Arc<Field>) -> Prec
}
}

pub(crate) fn get_path_column<'a>(
batch: &'a RecordBatch,
path_column: &str,
) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
batch
.column_by_name(path_column)
.unwrap()
.as_any()
.downcast_ref::<DictionaryArray<UInt16Type>>()
.ok_or_else(err)?
.downcast_dict::<StringArray>()
.ok_or_else(err)
}

impl DeltaTableState {
/// Provide table level statistics to Datafusion
pub fn datafusion_table_statistics(&self) -> DataFusionResult<Statistics> {
Expand Down Expand Up @@ -1362,31 +1377,20 @@ fn join_batches_with_add_actions(

let mut files = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum());
for batch in batches {
let array = batch.column_by_name(path_column).ok_or_else(|| {
DeltaTableError::Generic(format!("Unable to find column {}", path_column))
})?;

let iter: Box<dyn Iterator<Item = Option<&str>>> =
if dict_array {
let array = array
.as_any()
.downcast_ref::<DictionaryArray<UInt16Type>>()
.ok_or(DeltaTableError::Generic(format!(
"Unable to downcast column {}",
path_column
)))?
.downcast_dict::<StringArray>()
.ok_or(DeltaTableError::Generic(format!(
"Unable to downcast column {}",
path_column
)))?;
Box::new(array.into_iter())
} else {
let array = array.as_any().downcast_ref::<StringArray>().ok_or(
DeltaTableError::Generic(format!("Unable to downcast column {}", path_column)),
)?;
Box::new(array.into_iter())
};
let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());

let iter: Box<dyn Iterator<Item = Option<&str>>> = if dict_array {
let array = get_path_column(&batch, path_column)?;
Box::new(array.into_iter())
} else {
let array = batch
.column_by_name(path_column)
.ok_or_else(err)?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(err)?;
Box::new(array.into_iter())
};

for path in iter {
let path = path.ok_or(DeltaTableError::Generic(format!(
Expand Down
Loading

0 comments on commit 74f9d33

Please sign in to comment.