Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,24 @@ pub mod proto_encode {
) -> Result<PhysicalExprNode> {
self.encoder.encode(expr)
}

/// Encode a sequence of child expressions, preserving order.
///
/// Convenience wrapper over [`Self::encode_child`] for expressions
/// holding a `repeated` proto field (e.g. the `list` of an `InList`).
/// The first encode error short-circuits.
pub fn encode_children_expressions<'b, I>(
&self,
exprs: I,
) -> Result<Vec<PhysicalExprNode>>
where
I: IntoIterator<Item = &'b Arc<dyn PhysicalExpr>>,
{
exprs
.into_iter()
.map(|expr| self.encode_child(expr))
.collect()
}
}

/// Internal dispatch trait. Implementors live in `datafusion-proto` and
Expand Down Expand Up @@ -636,6 +654,43 @@ pub mod proto_decode {
pub fn decode(&self, node: &PhysicalExprNode) -> Result<Arc<dyn PhysicalExpr>> {
self.decoder.decode(node, self.schema)
}

/// Decode a required child node, erroring if it is absent.
///
/// Proto child expressions are encoded as `Option<Box<PhysicalExprNode>>`;
/// pass the field directly (e.g. `node.expr.as_deref()`). `expr_name`
/// is the expression being decoded (e.g. `"InListExpr"`) and `field`
/// the proto field (e.g. `"expr"`); both are woven into the error so
/// it names *where* the missing field is, without each author
/// hand-rolling the string.
pub fn decode_required_expression(
&self,
node: Option<&PhysicalExprNode>,
expr_name: &str,
field: &str,
) -> Result<Arc<dyn PhysicalExpr>> {
let node = node.ok_or_else(|| {
datafusion_common::internal_datafusion_err!(
"{expr_name} is missing required field '{field}'"
)
})?;
self.decode(node)
}

/// Decode a sequence of child nodes, preserving order.
///
/// Convenience wrapper over [`Self::decode`] for expressions holding a
/// `repeated` proto field (e.g. the `list` of an `InList`). The first
/// decode error short-circuits.
pub fn decode_children_expressions<'b, I>(
&self,
nodes: I,
) -> Result<Vec<Arc<dyn PhysicalExpr>>>
where
I: IntoIterator<Item = &'b PhysicalExprNode>,
{
nodes.into_iter().map(|node| self.decode(node)).collect()
}
}

/// Internal dispatch trait. Implementors live in `datafusion-proto`.
Expand Down
22 changes: 5 additions & 17 deletions datafusion/physical-expr/src/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,9 @@ impl InListExpr {
}
};

let expr = ctx.decode(node.expr.as_deref().ok_or_else(|| {
datafusion_common::DataFusionError::Internal(
"InList is missing required field 'expr'".to_string(),
)
})?)?;

let list = node
.list
.iter()
.map(|e| ctx.decode(e))
.collect::<Result<Vec<_>>>()?;
let expr =
ctx.decode_required_expression(node.expr.as_deref(), "InListExpr", "expr")?;
let list = ctx.decode_children_expressions(&node.list)?;

Ok(Arc::new(InListExpr::try_new(
expr,
Expand Down Expand Up @@ -491,11 +483,7 @@ impl PhysicalExpr for InListExpr {
expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
protobuf::PhysicalInListNode {
expr: Some(Box::new(ctx.encode_child(&self.expr)?)),
list: self
.list
.iter()
.map(|e| ctx.encode_child(e))
.collect::<Result<Vec<_>>>()?,
list: ctx.encode_children_expressions(&self.list)?,
negated: self.negated,
},
))),
Expand Down Expand Up @@ -4007,7 +3995,7 @@ mod proto_tests {
let err = InListExpr::try_from_proto(&node, &ctx).unwrap_err();
assert!(matches!(
err,
DataFusionError::Internal(msg) if msg.contains("InList is missing required field 'expr'")
DataFusionError::Internal(msg) if msg.contains("InListExpr is missing required field 'expr'")
));
}

Expand Down
27 changes: 13 additions & 14 deletions datafusion/physical-expr/src/expressions/like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,22 +190,19 @@ impl LikeExpr {
_ => return internal_err!("PhysicalExprNode is not a LikeExpr"),
};

let expr = like_expr.expr.as_deref().ok_or_else(|| {
datafusion_common::DataFusionError::Internal(
"LikeExpr is missing required field 'expr'".to_string(),
)
})?;
let pattern = like_expr.pattern.as_deref().ok_or_else(|| {
datafusion_common::DataFusionError::Internal(
"LikeExpr is missing required field 'pattern'".to_string(),
)
})?;

Ok(Arc::new(LikeExpr::new(
like_expr.negated,
like_expr.case_insensitive,
ctx.decode(expr)?,
ctx.decode(pattern)?,
ctx.decode_required_expression(
like_expr.expr.as_deref(),
"LikeExpr",
"expr",
)?,
ctx.decode_required_expression(
like_expr.pattern.as_deref(),
"LikeExpr",
"pattern",
)?,
)))
}
}
Expand Down Expand Up @@ -491,7 +488,9 @@ mod proto_tests {
fn try_from_proto_rejects_missing_pattern() {
let node = like_node(false, false, Some(Box::new(column_node("a"))), None);
let schema = Schema::empty();
let decoder = UnreachableDecoder;
// `expr` is present, so it is decoded before the missing-`pattern`
// check fires; use a decoder that succeeds for that first child.
let decoder = StubDecoder::ok();
let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder);
let err = LikeExpr::try_from_proto(&node, &ctx).unwrap_err();
assert!(matches!(
Expand Down
Loading