Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 176 additions & 28 deletions src/query/expression/src/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
// limitations under the License.

use std::collections::HashMap;
use std::ops::Not;

use common_arrow::arrow::bitmap;
use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::bitmap::MutableBitmap;
use common_exception::ErrorCode;
use common_exception::Result;
use common_exception::Span;
Expand Down Expand Up @@ -43,6 +46,9 @@ use crate::FunctionContext;
use crate::FunctionDomain;
use crate::FunctionRegistry;

pub(crate) type EvalError = (Span, Value<AnyType>, Bitmap, String);
pub(crate) type EvalResult<T> = std::result::Result<T, EvalError>;

pub struct Evaluator<'a> {
input_columns: &'a DataBlock,
func_ctx: FunctionContext,
Expand Down Expand Up @@ -113,7 +119,15 @@ impl<'a> Evaluator<'a> {
new_evaluator.run(expr)
}

#[inline]
pub fn run(&self, expr: &Expr) -> Result<Value<AnyType>> {
match self.run_impl(expr) {
Ok(v) => Ok(v),
Err((span, _, _, msg)) => Err(ErrorCode::Internal(msg).set_span(span)),
}
}

pub fn run_impl(&self, expr: &Expr) -> EvalResult<Value<AnyType>> {
#[cfg(debug_assertions)]
self.check_expr(expr);

Expand All @@ -129,8 +143,8 @@ impl<'a> Evaluator<'a> {
} => {
let cols = args
.iter()
.map(|expr| self.run(expr))
.collect::<Result<Vec<_>>>()?;
.map(|expr| self.run_impl(expr))
.collect::<EvalResult<Vec<_>>>()?;
assert!(
cols.iter()
.filter_map(|val| match val {
Expand All @@ -146,9 +160,10 @@ impl<'a> Evaluator<'a> {
validity: None,
errors: None,
tz: self.func_ctx.tz,
already_rendered: false,
};
let result = (function.eval)(cols_ref.as_slice(), &mut ctx);
ctx.render_error(*span, &cols, &function.signature.name)?;
ctx.render_error(*span, &cols, &result, &function.signature.name)?;
Ok(result)
}
Expr::Cast {
Expand All @@ -157,13 +172,50 @@ impl<'a> Evaluator<'a> {
expr,
dest_type,
} => {
let value = self.run(expr)?;
let value = self.run_impl(expr)?;
if *is_try {
self.run_try_cast(*span, expr.data_type(), dest_type, value)
} else {
self.run_cast(*span, expr.data_type(), dest_type, value)
}
}
Expr::Catch {
expr, data_type, ..
} => Ok(match self.run_impl(expr) {
Ok(value) => match value {
Value::Scalar(scalar) => {
Value::Scalar(Scalar::Tuple(vec![scalar, Scalar::Null]))
}
Value::Column(col) => {
let len = col.len();
Value::Column(Column::Tuple {
fields: vec![col, Column::Null { len }],
len,
})
}
},
Err((_, value, bitmap, err)) => {
let num_rows = self.input_columns.num_rows();
let inner_type = data_type.as_tuple().unwrap().first().unwrap();
let value_col = value.convert_to_full_column(inner_type, num_rows);
let err_scalar = Scalar::String(err.into_bytes());
let err_col =
ColumnBuilder::repeat(&err_scalar.as_ref(), num_rows, &DataType::String)
.build();

// If the element in the bitmap is true,
// it means the corresponding row is error.
let bitmap = bitmap.not();
let err_col = Column::Nullable(Box::new(NullableColumn {
column: err_col,
validity: bitmap,
}));
Value::Column(Column::Tuple {
fields: vec![value_col, err_col],
len: num_rows,
})
}
}),
};

#[cfg(debug_assertions)]
Expand Down Expand Up @@ -203,7 +255,7 @@ impl<'a> Evaluator<'a> {
src_type: &DataType,
dest_type: &DataType,
value: Value<AnyType>,
) -> Result<Value<AnyType>> {
) -> EvalResult<Value<AnyType>> {
if src_type == dest_type {
return Ok(value);
}
Expand Down Expand Up @@ -244,17 +296,27 @@ impl<'a> Evaluator<'a> {
other => unreachable!("source: {}", other),
},
(DataType::Nullable(inner_src_ty), _) => match value {
Value::Scalar(Scalar::Null) => Err(ErrorCode::Internal(format!(
"unable to cast type `{src_type}` to type `{dest_type}`"
))
.set_span(span)),
Value::Scalar(Scalar::Null) => {
let mut bitmap = MutableBitmap::new();
bitmap.extend_constant(self.input_columns.num_rows(), false);
Err((
span,
Value::Scalar(Scalar::default_value(dest_type)),
bitmap.into(),
format!("unable to cast type `{src_type}` to type `{dest_type}`"),
))
}
Value::Scalar(_) => self.run_cast(span, inner_src_ty, dest_type, value),
Value::Column(Column::Nullable(col)) => {
if col.validity.unset_bits() > 0 {
return Err(ErrorCode::Internal(format!(
"unable to cast `NULL` to type `{dest_type}`"
))
.set_span(span));
let mut bitmap = MutableBitmap::new();
bitmap.extend_constant(self.input_columns.num_rows(), false);
return Err((
span,
Value::Scalar(Scalar::default_value(dest_type)),
bitmap.into(),
format!("unable to cast `NULL` to type `{dest_type}`"),
));
}
let column = self
.run_cast(span, inner_src_ty, dest_type, Value::Column(col.column))?
Expand Down Expand Up @@ -361,7 +423,7 @@ impl<'a> Evaluator<'a> {
self.run_cast(span, src_ty, dest_ty, Value::Scalar(field))
.map(|val| val.into_scalar().unwrap())
})
.collect::<Result<Vec<_>>>()?;
.collect::<EvalResult<Vec<_>>>()?;
Ok(Value::Scalar(Scalar::Tuple(new_fields)))
}
Value::Column(Column::Tuple { fields, len }) => {
Expand All @@ -373,7 +435,7 @@ impl<'a> Evaluator<'a> {
self.run_cast(span, src_ty, dest_ty, Value::Column(field))
.map(|val| val.into_column().unwrap())
})
.collect::<Result<_>>()?;
.collect::<EvalResult<_>>()?;
Ok(Value::Column(Column::Tuple {
fields: new_fields,
len,
Expand All @@ -383,10 +445,16 @@ impl<'a> Evaluator<'a> {
}
}

_ => Err(ErrorCode::Internal(format!(
"unable to cast type `{src_type}` to type `{dest_type}`"
))
.set_span(span)),
_ => {
let mut bitmap = MutableBitmap::new();
bitmap.extend_constant(self.input_columns.num_rows(), false);
Err((
span,
Value::Scalar(Scalar::default_value(dest_type)),
bitmap.into(),
format!("unable to cast type `{src_type}` to type `{dest_type}`"),
))
}
}
}

Expand All @@ -396,7 +464,7 @@ impl<'a> Evaluator<'a> {
src_type: &DataType,
dest_type: &DataType,
value: Value<AnyType>,
) -> Result<Value<AnyType>> {
) -> EvalResult<Value<AnyType>> {
if src_type == dest_type {
return Ok(value);
}
Expand Down Expand Up @@ -542,7 +610,7 @@ impl<'a> Evaluator<'a> {
.into_scalar()
.unwrap())
})
.collect::<Result<_>>()?;
.collect::<EvalResult<_>>()?;
Ok(Value::Scalar(Scalar::Tuple(new_fields)))
}
Value::Column(Column::Tuple { fields, len }) => {
Expand All @@ -556,7 +624,7 @@ impl<'a> Evaluator<'a> {
.into_column()
.unwrap())
})
.collect::<Result<_>>()?;
.collect::<EvalResult<_>>()?;
let new_col = Column::Tuple {
fields: new_fields,
len,
Expand All @@ -567,10 +635,16 @@ impl<'a> Evaluator<'a> {
}
}

_ => Err(ErrorCode::Internal(format!(
"unable to cast type `{src_type}` to type `{dest_type}`"
))
.set_span(span)),
_ => {
let mut bitmap = MutableBitmap::new();
bitmap.extend_constant(self.input_columns.num_rows(), false);
Err((
span,
Value::Scalar(Scalar::default_value(dest_type)),
bitmap.into(),
format!("unable to cast type `{src_type}` to type `{dest_type}`"),
))
}
}
}

Expand All @@ -581,7 +655,7 @@ impl<'a> Evaluator<'a> {
dest_type: &DataType,
value: Value<AnyType>,
cast_fn: &str,
) -> Result<Option<Value<AnyType>>> {
) -> EvalResult<Option<Value<AnyType>>> {
let expr = Expr::ColumnRef {
span,
id: 0,
Expand Down Expand Up @@ -616,7 +690,7 @@ impl<'a> Evaluator<'a> {
num_rows,
);
let evaluator = Evaluator::new(&block, self.func_ctx, self.fn_registry);
Ok(Some(evaluator.run(&cast_expr)?))
Ok(Some(evaluator.run_impl(&cast_expr)?))
}
}

Expand Down Expand Up @@ -769,6 +843,80 @@ impl<'a, Index: ColumnIndex> ConstantFolder<'a, Index> {
new_domain,
)
}
Expr::Catch {
span,
data_type,
expr,
} => {
let (inner_expr, inner_domain) = self.fold_once(expr);
let inner_type = &data_type.as_tuple().unwrap()[0];

if inner_expr.as_constant().is_some() {
return (
Expr::Catch {
span: *span,
data_type: data_type.clone(),
expr: Box::new(inner_expr),
},
inner_domain.map(|d| {
Domain::Tuple(vec![
d,
Domain::Nullable(NullableDomain {
has_null: true,
value: None,
}),
])
}),
);
}

match inner_domain {
// If the inner domain is not None,
// it means the inner expr will not throw an error.
Some(d) => match d.as_singleton() {
Some(scalar) => (
Expr::Catch {
span: *span,
data_type: data_type.clone(),
expr: Box::new(Expr::Constant {
span: *span,
scalar,
data_type: inner_type.clone(),
}),
},
Some(Domain::Tuple(vec![
d,
Domain::Nullable(NullableDomain {
has_null: true,
value: None,
}),
])),
),
None => (
Expr::Catch {
span: *span,
data_type: data_type.clone(),
expr: Box::new(inner_expr),
},
Some(Domain::Tuple(vec![
d,
Domain::Nullable(NullableDomain {
has_null: true,
value: None,
}),
])),
),
},
None => (
Expr::Catch {
span: *span,
data_type: data_type.clone(),
expr: Box::new(inner_expr),
},
None,
),
}
}
Expr::FunctionCall {
span,
id,
Expand Down
Loading