Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions datafusion/core/src/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,7 @@ fn check_not_null_contraits(
batch: RecordBatch,
column_indices: &Vec<usize>,
) -> Result<RecordBatch> {
for i in column_indices {
let index = *i;
for &index in column_indices {
if batch.num_columns() <= index {
return Err(DataFusionError::Execution(format!(
"Invalid batch column count {} expected > {}",
Expand Down
64 changes: 61 additions & 3 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,30 @@ impl ExprSchemable for Expr {
Expr::Alias(expr, _)
| Expr::Not(expr)
| Expr::Negative(expr)
| Expr::Sort(Sort { expr, .. })
| Expr::InList(InList { expr, .. }) => expr.nullable(input_schema),
| Expr::Sort(Sort { expr, .. }) => expr.nullable(input_schema),

Expr::InList(InList { expr, list, .. }) => {
// Avoid inspecting too many expressions.
const MAX_INSPECT_LIMIT: usize = 6;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to know why use this number? Is it the practice from other systems?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark handle it simplified.

override def nullable: Boolean = children. Exists(_.nullable)

Copy link
Member Author

@jonahgao jonahgao Jun 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to know why use this number? Is it the practice from other systems?

@jackwener No. The nullable function may be called multiple times during the optimization phase,
So I think adding a limitation would be preferable in order to prevent it from being excessively slow.
But I'm not quite sure what would be an appropriate number.

spark handle it simplified.

override def nullable: Boolean = children. Exists(_.nullable)

This seems to be a cache style.
We can implement this by precomputing nullable in the InList::new() function.
But the disadvantages are:

  • We need a new field for the InList struct
  • The precomputed nullable may not be used.
  • Calculating nullable requires input_schema

@jackwener Which solution do you prefer?

Update: It seems challenging to accomplish. I need to take a closer look.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @jonahgao hints at , trying to add some sort of cache / memoization is going to be challenging given Rust and how DataFusion is structured

I think the current PR solution is good:

  1. It is well tested
  2. It is conservative (it might say an InList is nullable that isn't, but I think that will not generate wrong results, only potentially less optimal plans)

Thus I think we should merge this PR as is and then as a follow on PR we can remove the limit or increase it, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @jonahgao @alamb

// Stop if a nullable expression is found or an error occurs.
let has_nullable = std::iter::once(expr.as_ref())
.chain(list)
.take(MAX_INSPECT_LIMIT)
.find_map(|e| {
e.nullable(input_schema)
.map(|nullable| if nullable { Some(()) } else { None })
.transpose()
})
.transpose()?;
Ok(match has_nullable {
// If a nullable subexpression is found, the result may also be nullable.
Some(_) => true,
// If the list is too long, we assume it is nullable.
None if list.len() + 1 > MAX_INSPECT_LIMIT => true,
// All the subexpressions are non-nullable, so the result must be non-nullable.
_ => false,
})
}

Expr::Between(Between {
expr, low, high, ..
Expand Down Expand Up @@ -390,6 +412,31 @@ mod tests {
assert!(expr.nullable(&get_schema(false)).unwrap());
}

#[test]
fn test_inlist_nullability() {
let get_schema = |nullable| {
MockExprSchema::new()
.with_data_type(DataType::Int32)
.with_nullable(nullable)
};

let expr = col("foo").in_list(vec![lit(1); 5], false);
assert!(!expr.nullable(&get_schema(false)).unwrap());
assert!(expr.nullable(&get_schema(true)).unwrap());
// Testing nullable() returns an error.
assert!(expr
.nullable(&get_schema(false).with_error_on_nullable(true))
.is_err());

let null = lit(ScalarValue::Int32(None));
let expr = col("foo").in_list(vec![null, lit(1)], false);
assert!(expr.nullable(&get_schema(false)).unwrap());

// Testing on long list
let expr = col("foo").in_list(vec![lit(1); 6], false);
assert!(expr.nullable(&get_schema(false)).unwrap());
}

#[test]
fn expr_schema_data_type() {
let expr = col("foo");
Expand All @@ -404,13 +451,15 @@ mod tests {
struct MockExprSchema {
nullable: bool,
data_type: DataType,
error_on_nullable: bool,
}

impl MockExprSchema {
fn new() -> Self {
Self {
nullable: false,
data_type: DataType::Null,
error_on_nullable: false,
}
}

Expand All @@ -423,11 +472,20 @@ mod tests {
self.data_type = data_type;
self
}

fn with_error_on_nullable(mut self, error_on_nullable: bool) -> Self {
self.error_on_nullable = error_on_nullable;
self
}
}

impl ExprSchema for MockExprSchema {
fn nullable(&self, _col: &Column) -> Result<bool> {
Ok(self.nullable)
if self.error_on_nullable {
Err(DataFusionError::Internal("nullable error".into()))
} else {
Ok(self.nullable)
}
}

fn data_type(&self, _col: &Column) -> Result<&DataType> {
Expand Down