Skip to content

Commit

Permalink
refactor: replace null_if with distinct in skipping predicate
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Feb 1, 2024
1 parent aa44f36 commit 5f4ba2f
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 39 deletions.
2 changes: 1 addition & 1 deletion kernel/examples/dump-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ fn main() {
let scan = ScanBuilder::new(snapshot).build();

let schema = scan.schema();
let header_names = schema.fields.iter().map(|field| {
let header_names = schema.fields().map(|field| {
let cell = Cell::new(field.name());
if cli.ascii {
cell
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/client/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl TryFrom<&StructType> for ArrowSchema {
fn try_from(s: &StructType) -> Result<Self, ArrowError> {
let fields = s
.fields()
.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(f))
.map(TryInto::try_into)
.collect::<Result<Vec<ArrowField>, ArrowError>>()?;

Ok(ArrowSchema::new(fields))
Expand Down Expand Up @@ -125,7 +125,7 @@ impl TryFrom<&DataType> for ArrowDataType {
}
DataType::Struct(s) => Ok(ArrowDataType::Struct(
s.fields()
.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(f))
.map(TryInto::try_into)
.collect::<Result<Vec<ArrowField>, ArrowError>>()?
.into(),
)),
Expand Down
41 changes: 27 additions & 14 deletions kernel/src/client/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use arrow_array::{
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, StringArray,
StructArray, TimestampMicrosecondArray,
};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_ord::cmp::{distinct, eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{ArrowError, Schema as ArrowSchema};
use arrow_select::nullif::nullif;

use crate::error::{DeltaResult, Error};
use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, VariadicOperator};
Expand Down Expand Up @@ -216,17 +215,22 @@ fn evaluate_expression(
})
}
(VariadicOperation { .. }, _) => {
// NOTE: this panics as it would be a bug in our code if we get here. However it does swallow
// NOTE: If we get here, it would be a bug in our code. However it does swallow
// the error message from the compiler if we add variants to the enum and forget to add them here.
unreachable!("We unly support variadic operations for boolean expressions right now.")
Err(Error::Generic(format!(
"Current variadic expressions are expected to return boolean results, got {:?}",
result_type
)))
}
(NullIf { expr, if_expr }, _) => {
let expr_arr = evaluate_expression(expr.as_ref(), batch, None)?;
let if_expr_arr =
evaluate_expression(if_expr.as_ref(), batch, Some(&DataType::BOOLEAN))?;
let if_expr_arr = downcast_to_bool(&if_expr_arr)?;
Ok(nullif(&expr_arr, if_expr_arr)?)
(Distinct { lhs, rhs }, Some(&DataType::BOOLEAN)) => {
let lhs_arr = evaluate_expression(lhs.as_ref(), batch, None)?;
let rhs_arr = evaluate_expression(rhs.as_ref(), batch, None)?;
Ok(distinct(&lhs_arr, &rhs_arr).map(wrap_comparison_result)?)
}
(Distinct { .. }, _) => Err(Error::Generic(format!(
"Distinct will always return boolean result, got {:?}",
result_type
))),
}
}

Expand Down Expand Up @@ -272,6 +276,7 @@ impl ExpressionEvaluator for DefaultExpressionEvaluator {

#[cfg(test)]
mod tests {

use super::*;
use arrow_array::Int32Array;
use arrow_schema::{DataType, Field, Fields, Schema};
Expand Down Expand Up @@ -426,7 +431,9 @@ mod tests {
let column_b = Expression::column("b");

let expression = Box::new(column_a.clone().and(column_b.clone()));
let results = evaluate_expression(&expression, &batch, None).unwrap();
let results =
evaluate_expression(&expression, &batch, Some(&crate::schema::DataType::BOOLEAN))
.unwrap();
let expected = Arc::new(BooleanArray::from(vec![false, false]));
assert_eq!(results.as_ref(), expected.as_ref());

Expand All @@ -435,12 +442,16 @@ mod tests {
.clone()
.and(Expression::literal(Scalar::Boolean(true))),
);
let results = evaluate_expression(&expression, &batch, None).unwrap();
let results =
evaluate_expression(&expression, &batch, Some(&crate::schema::DataType::BOOLEAN))
.unwrap();
let expected = Arc::new(BooleanArray::from(vec![true, false]));
assert_eq!(results.as_ref(), expected.as_ref());

let expression = Box::new(column_a.clone().or(column_b));
let results = evaluate_expression(&expression, &batch, None).unwrap();
let results =
evaluate_expression(&expression, &batch, Some(&crate::schema::DataType::BOOLEAN))
.unwrap();
let expected = Arc::new(BooleanArray::from(vec![true, true]));
assert_eq!(results.as_ref(), expected.as_ref());

Expand All @@ -449,7 +460,9 @@ mod tests {
.clone()
.or(Expression::literal(Scalar::Boolean(false))),
);
let results = evaluate_expression(&expression, &batch, None).unwrap();
let results =
evaluate_expression(&expression, &batch, Some(&crate::schema::DataType::BOOLEAN))
.unwrap();
let expected = Arc::new(BooleanArray::from(vec![true, false]));
assert_eq!(results.as_ref(), expected.as_ref());
}
Expand Down
31 changes: 14 additions & 17 deletions kernel/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ pub enum VariadicOperator {
impl Display for BinaryOperator {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
// Self::And => write!(f, "AND"),
// Self::Or => write!(f, "OR"),
Self::Plus => write!(f, "+"),
Self::Minus => write!(f, "-"),
Self::Multiply => write!(f, "*"),
Expand Down Expand Up @@ -101,12 +99,11 @@ pub enum Expression {
/// The expressions.
exprs: Vec<Expression>,
},
// TODO how to model required functions?
NullIf {
/// The expression to evaluate.
expr: Box<Expression>,
/// The expression to compare against.
if_expr: Box<Expression>,
Distinct {
/// left hand side of the distinct
lhs: Box<Expression>,
/// right hand side of the distinct
rhs: Box<Expression>,
},
// TODO: support more expressions, such as IS IN, LIKE, etc.
}
Expand Down Expand Up @@ -148,7 +145,7 @@ impl Display for Expression {
)
}
},
Self::NullIf { expr, if_expr } => write!(f, "NULLIF({}, {})", expr, if_expr),
Self::Distinct { lhs, rhs } => write!(f, "DISTINCT({}, {})", lhs, rhs),
}
}
}
Expand Down Expand Up @@ -274,11 +271,11 @@ impl Expression {
Self::or_from([self, other])
}

/// Create a new expression `NULLIF(self, other)`
pub fn null_if(self, other: Self) -> Self {
Self::NullIf {
expr: Box::new(self),
if_expr: Box::new(other),
/// Create a new expression `DISTINCT(self, other)`
pub fn distinct(self, other: Self) -> Self {
Self::Distinct {
lhs: Box::new(self),
rhs: Box::new(other),
}
}

Expand All @@ -302,9 +299,9 @@ impl Expression {
Self::VariadicOperation { exprs, .. } => {
stack.extend(exprs.iter());
}
Self::NullIf { expr, if_expr } => {
stack.push(expr);
stack.push(if_expr);
Self::Distinct { lhs, rhs } => {
stack.push(lhs);
stack.push(rhs);
}
}
Some(expr)
Expand Down
5 changes: 1 addition & 4 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,7 @@ impl DataSkippingFilter {
static ref PREDICATE_SCHEMA: DataType = StructType::new(vec![
StructField::new("predicate", DataType::BOOLEAN, true),
]).into();
static ref FILTER_EXPR: Expr = Expr::is_null(Expr::null_if(
Expr::column("predicate"),
Expr::column("predicate"),
));
static ref FILTER_EXPR: Expr = Expr::column("predicate").distinct(Expr::literal(false));
static ref STATS_EXPR: Expr = Expr::column("add.stats");
);

Expand Down
1 change: 0 additions & 1 deletion kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ impl Scan {
let read_schema = Arc::new(StructType::new(
self.schema()
.fields()
.into_iter()
.filter(|f| {
!self
.snapshot
Expand Down

0 comments on commit 5f4ba2f

Please sign in to comment.