Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,10 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
scalar.to_array().ok()
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
let (c, _) = self.column(&column.name)?;
let scalar = ScalarValue::UInt64(Some(c.num_values() as u64));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@Ted-Jiang Ted-Jiang Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scalar.to_array().ok()
}

fn contained(
Expand Down Expand Up @@ -1022,15 +1024,17 @@ mod tests {
column_statistics: Vec<ParquetStatistics>,
) -> RowGroupMetaData {
let mut columns = vec![];
let number_row = 1000;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before all unit test set each col with default 0 row, will trigger num_rows == num_nulls

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if this could cause problems in real files (for example, if the row counts were not included in the statistics that were written into the file).

However, I double checked the code and it seems like ColumnChunkMetaData::num_values is non nullable so I think we are good.

for (i, s) in column_statistics.iter().enumerate() {
let column = ColumnChunkMetaData::builder(schema_descr.column(i))
.set_statistics(s.clone())
.set_num_values(number_row)
.build()
.unwrap();
columns.push(column);
}
RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(1000)
.set_num_rows(number_row)
.set_total_byte_size(2000)
.set_column_metadata(columns)
.build()
Expand Down
38 changes: 31 additions & 7 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ pub trait PruningStatistics {
/// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END`
/// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END`
/// `x IS NULL` | `x_null_count > 0`
/// `x IS NOT NULL` | `x_null_count = 0`
/// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END`
///
/// ## Predicate Evaluation
Expand Down Expand Up @@ -1235,10 +1236,15 @@ fn build_single_column_expr(
/// returns a pruning expression in terms of IsNull that will evaluate to true
/// if the column may contain null, and false if definitely does not
/// contain null.
/// If set `with_not` to true: which means is not null
/// Given an expression reference to `expr`, if `expr` is a column expression,
/// returns a pruning expression in terms of IsNotNull that will evaluate to true
/// if the column not contain any null, and false if definitely contain null.
fn build_is_null_column_expr(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
required_columns: &mut RequiredColumns,
with_not: bool,
) -> Option<Arc<dyn PhysicalExpr>> {
if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
let field = schema.field_with_name(col.name()).ok()?;
Expand All @@ -1247,12 +1253,21 @@ fn build_is_null_column_expr(
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
// IsNull(column) => null_count > 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Gt,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
if with_not {
// IsNotNull(column) => null_count = 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Eq,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
} else {
// IsNull(column) => null_count > 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Gt,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
}
})
.ok()
} else {
Expand Down Expand Up @@ -1283,9 +1298,18 @@ fn build_predicate_expression(
// predicate expression can only be a binary expression
let expr_any = expr.as_any();
if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
return build_is_null_column_expr(is_null.arg(), schema, required_columns)
return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
.unwrap_or(unhandled);
}
if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
return build_is_null_column_expr(
is_not_null.arg(),
schema,
required_columns,
true,
)
.unwrap_or(unhandled);
}
if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
return build_single_column_expr(col, schema, required_columns, false)
.unwrap_or(unhandled);
Expand Down
30 changes: 30 additions & 0 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use arrow::{
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
use arrow_array::new_null_array;
use chrono::{Datelike, Duration, TimeDelta};
use datafusion::{
datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider},
Expand Down Expand Up @@ -75,6 +76,7 @@ enum Scenario {
DecimalLargePrecisionBloomFilter,
ByteArray,
PeriodsInColumnNames,
WithNullValues,
}

enum Unit {
Expand Down Expand Up @@ -630,6 +632,27 @@ fn make_names_batch(name: &str, service_name_values: Vec<&str>) -> RecordBatch {
RecordBatch::try_new(schema, vec![Arc::new(name), Arc::new(service_name)]).unwrap()
}

/// Return record batch with i8, i16, i32, and i64 sequences with all Null values
fn make_all_null_values() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("i8", DataType::Int8, true),
Field::new("i16", DataType::Int16, true),
Field::new("i32", DataType::Int32, true),
Field::new("i64", DataType::Int64, true),
]));

RecordBatch::try_new(
schema,
vec![
new_null_array(&DataType::Int8, 5),
new_null_array(&DataType::Int16, 5),
new_null_array(&DataType::Int32, 5),
new_null_array(&DataType::Int64, 5),
],
)
.unwrap()
}

fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
match scenario {
Scenario::Timestamps => {
Expand Down Expand Up @@ -799,6 +822,13 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
),
]
}
Scenario::WithNullValues => {
vec![
make_all_null_values(),
make_int_batches(1, 6),
make_all_null_values(),
]
}
}
}

Expand Down
60 changes: 60 additions & 0 deletions datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1262,3 +1262,63 @@ async fn prune_periods_in_column_names() {
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn test_row_group_with_null_values() {
// Three row groups:
// 1. all Null values
// 2. values from 1 to 5
// 3. all Null values

// After pruning, only row group 2 should be selected
RowGroupPruningTest::new()
.with_scenario(Scenario::WithNullValues)
.with_query("SELECT * FROM t WHERE \"i8\" <= 5")
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(2))
.with_expected_rows(5)
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.test_row_group_prune()
.await;

// After pruning, only row group 1,3 should be selected
RowGroupPruningTest::new()
.with_scenario(Scenario::WithNullValues)
.with_query("SELECT * FROM t WHERE \"i8\" is Null")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a tests:

  1. i16 IS NOT NULL (to cover the opposite)
  2. i32 > 7 (prune via nulls and some via counts)

Copy link
Member Author

@Ted-Jiang Ted-Jiang Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb thanks! Add test in 11567d9, and support the isNotNull

Do you plan to add support in page_filter.rs as well (maybe that is why the PR is marked "Part
#9961 ")?

As the page level prune i prefer in next pr to keep this pr short and clean.

.with_expected_errors(Some(0))
.with_matched_by_stats(Some(2))
.with_pruned_by_stats(Some(1))
.with_expected_rows(10)
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.test_row_group_prune()
.await;

// After pruning, only row group 2should be selected
RowGroupPruningTest::new()
.with_scenario(Scenario::WithNullValues)
.with_query("SELECT * FROM t WHERE \"i16\" is Not Null")
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(2))
.with_expected_rows(5)
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.test_row_group_prune()
.await;

// All row groups will be pruned
RowGroupPruningTest::new()
.with_scenario(Scenario::WithNullValues)
.with_query("SELECT * FROM t WHERE \"i32\" > 7")
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(0))
.with_pruned_by_stats(Some(3))
.with_expected_rows(0)
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.test_row_group_prune()
.await;
}