Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 155 additions & 2 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,23 @@ use arrow::array::{
cast::AsArray,
};
use arrow::array::{
Datum, GenericListArray, Int32Array, Int64Array, MutableArrayData, make_array,
ArrowPrimitiveType, Datum, GenericListArray, Int32Array, Int64Array,
MutableArrayData, PrimitiveArray, make_array,
};
use arrow::array::{LargeListViewArray, ListViewArray};
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow::compute::kernels::cmp::neq;
use arrow::compute::kernels::length::length;
use arrow::compute::{SortColumn, SortOptions, partition};
use arrow::datatypes::{DataType, Field, SchemaRef};
use arrow::datatypes::{
ArrowNativeType, DataType, Field, Int32Type, Int64Type, SchemaRef,
};
#[cfg(feature = "sql")]
use sqlparser::{ast::Ident, dialect::GenericDialect, parser::Parser};
use std::borrow::{Borrow, Cow};
use std::cmp::{Ordering, min};
use std::collections::HashSet;
use std::iter::repeat_n;
use std::num::NonZero;
use std::ops::Range;
use std::sync::{Arc, LazyLock};
Expand Down Expand Up @@ -1181,6 +1185,74 @@ fn truncate_list_nulls<O: OffsetSizeTrait>(
Ok(list.clone())
}

/// If `array` is a list or a map, returns a new array of the same length as it's inner values
/// where each value is the 1-based index of the sublist it's contained. Example:
///
/// `[[1], [2, 3], [4, 5, 6]] => [1, 2, 2, 3, 3, 3]`
///
/// Otherwise returns an error
pub fn list_values_row_number(array: &dyn Array) -> Result<ArrayRef> {
match array.data_type() {
DataType::List(_) => Ok(Arc::new(variable_size_list_values_row_number::<
Int32Type,
>(array.as_list().offsets()))),
DataType::LargeList(_) => Ok(Arc::new(variable_size_list_values_row_number::<
Int64Type,
>(array.as_list().offsets()))),
DataType::ListView(_) => Ok(Arc::new(variable_size_list_values_row_number::<
Int32Type,
>(array.as_list_view().offsets()))),
DataType::LargeListView(_) => {
Ok(Arc::new(variable_size_list_values_row_number::<Int64Type>(
array.as_list_view().offsets(),
)))
}
DataType::FixedSizeList(_, _) => {
let fixed_size_list = array.as_fixed_size_list();

Ok(Arc::new(fsl_values_row_number(
fixed_size_list.value_length(),
fixed_size_list.len(),
)?))
}
DataType::Map(_, _) => Ok(Arc::new(variable_size_list_values_row_number::<
Int32Type,
>(array.as_map().offsets()))),
other => _exec_err!("expected list, got {other}"),
}
}

/// [0, 2, 2, 5, 6] -> [0, 0, 2, 2, 2, 3]
fn variable_size_list_values_row_number<T: ArrowPrimitiveType>(
offsets: &[T::Native],
) -> PrimitiveArray<T> {
let mut rows_number = Vec::with_capacity(
offsets[offsets.len() - 1].to_usize().unwrap() - offsets[0].to_usize().unwrap(),
);

for (i, w) in offsets.windows(2).enumerate() {
let len = w[1].as_usize() - w[0].as_usize();
rows_number.extend(repeat_n(T::Native::usize_as(i), len));
}

PrimitiveArray::new(rows_number.into(), None)
}

/// (2, 3) -> [0, 0, 1, 1, 2, 2]
fn fsl_values_row_number(list_size: i32, array_len: usize) -> Result<Int32Array> {
let list_size = list_size.to_usize().ok_or_else(|| {
_exec_datafusion_err!("fsl_values_index: invalid list_size {list_size}")
})?;

let mut rows_number = Vec::with_capacity(list_size * array_len);

for i in 0..array_len {
rows_number.extend(repeat_n(i as i32, list_size));
}

Ok(PrimitiveArray::new(rows_number.into(), None))
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -1617,4 +1689,85 @@ mod tests {
assert_eq!(res.values(), expected.values());
assert_eq!(res.offsets(), expected.offsets());
}

#[test]
fn test_list_array_values_row_number() {
assert_eq!(
variable_size_list_values_row_number::<Int32Type>(
&OffsetBuffer::from_lengths([1, 3, 0, 2,])
),
Int32Array::from(vec![0, 1, 1, 1, 3, 3])
);

assert_eq!(
variable_size_list_values_row_number::<Int32Type>(
&OffsetBuffer::from_lengths([])
),
Int32Array::new_null(0)
);

assert_eq!(
variable_size_list_values_row_number::<Int32Type>(
&OffsetBuffer::from_lengths([0])
),
Int32Array::new_null(0)
);

assert_eq!(
variable_size_list_values_row_number::<Int32Type>(
&OffsetBuffer::from_lengths([0, 0])
),
Int32Array::new_null(0)
);

assert_eq!(
variable_size_list_values_row_number::<Int32Type>(
&OffsetBuffer::from_lengths([1])
),
Int32Array::from(vec![0])
);

assert_eq!(
variable_size_list_values_row_number::<Int32Type>(
&OffsetBuffer::from_lengths([2])
),
Int32Array::from(vec![0, 0])
);
}

#[test]
fn test_fsl_values_row_number() {
assert_eq!(
fsl_values_row_number(2, 3).unwrap(),
Int32Array::from(vec![0, 0, 1, 1, 2, 2])
);

assert_eq!(
fsl_values_row_number(1, 3).unwrap(),
Int32Array::from(vec![0, 1, 2])
);

assert_eq!(
fsl_values_row_number(2, 1).unwrap(),
Int32Array::from(vec![0, 0])
);

assert_eq!(
fsl_values_row_number(2, 0).unwrap(),
Int32Array::new_null(0),
);

assert_eq!(
fsl_values_row_number(0, 2).unwrap(),
Int32Array::new_null(0),
);

assert_eq!(
fsl_values_row_number(0, 0).unwrap(),
Int32Array::new_null(0),
);

fsl_values_row_number(-1, 2).unwrap_err();
fsl_values_row_number(-1, 0).unwrap_err();
}
}
24 changes: 23 additions & 1 deletion datafusion/expr/src/execution_props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::var_provider::{VarProvider, VarType};
use chrono::{DateTime, Utc};
use datafusion_common::HashMap;
use datafusion_common::ScalarValue;
use datafusion_common::TableReference;
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Result, internal_err};
Expand Down Expand Up @@ -69,6 +70,10 @@ pub struct ExecutionProps {
/// Shared results container for uncorrelated scalar subquery values.
/// Populated at execution time by `ScalarSubqueryExec`.
pub subquery_results: ScalarSubqueryResults,
/// Maps each lambda variable name to its lambda qualifier generated
/// during physical planning. Populated by the physical planner for
/// each lambda before calling `create_physical_expr`.
pub lambda_variable_qualifier: HashMap<String, TableReference>,
}

impl Default for ExecutionProps {
Expand All @@ -87,6 +92,7 @@ impl ExecutionProps {
var_providers: None,
subquery_indexes: HashMap::new(),
subquery_results: ScalarSubqueryResults::default(),
lambda_variable_qualifier: HashMap::new(),
}
}

Expand Down Expand Up @@ -145,6 +151,22 @@ impl ExecutionProps {
pub fn config_options(&self) -> Option<&Arc<ConfigOptions>> {
self.config_options.as_ref()
}

/// Adds a mapping for each variable to the given qualifier. Existing
/// variables with conflicting names get's shadowed
pub fn with_qualified_lambda_variables(
mut self,
qualifier: &TableReference,
variables: &[String],
) -> Self {
for var in variables {
self.lambda_variable_qualifier
.entry_ref(var)
.insert(qualifier.clone());
}

self
}
}

/// Index of a scalar subquery within a [`ScalarSubqueryResults`] container.
Expand Down Expand Up @@ -252,7 +274,7 @@ mod test {
fn debug() {
let props = ExecutionProps::new();
assert_eq!(
"ExecutionProps { query_execution_start_time: None, alias_generator: AliasGenerator { next_id: 1 }, config_options: None, var_providers: None, subquery_indexes: {}, subquery_results: [] }",
"ExecutionProps { query_execution_start_time: None, alias_generator: AliasGenerator { next_id: 1 }, config_options: None, var_providers: None, subquery_indexes: {}, subquery_results: [], lambda_variable_qualifier: {} }",
format!("{props:?}")
);
}
Expand Down
97 changes: 87 additions & 10 deletions datafusion/expr/src/higher_order_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use arrow::array::{ArrayRef, RecordBatch};
use arrow::datatypes::{DataType, FieldRef, Schema};
use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Result, ScalarValue, not_impl_err};
use datafusion_common::{Result, ScalarValue, exec_err, not_impl_err};
use datafusion_expr_common::dyn_eq::{DynEq, DynHash};
use datafusion_expr_common::signature::Volatility;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
Expand Down Expand Up @@ -224,35 +224,112 @@ pub struct LambdaArgument {
/// per outer sublist), avoiding the per-call `Schema::new` build that
/// includes constructing the internal name -> index map.
schema: SchemaRef,
/// A RecordBatch containing the captured columns inside this lambda body, if any
///
/// For example, for `array_transform([2], v -> v + a + b)`,
/// this will be a `RecordBatch` with two columns, `a` and `b`
captures: Option<RecordBatch>,
Comment on lines +227 to +231
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use ProjectionExpr? with all of its existing helpers

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProjectionExprs can be used if we modify it to also handle LambdaVariable's, should we do that?
I checked all usages I could find and didn't identify none where such modification would cause problems, I'm just not sure if we should do it.
In case you note in next reads of the PR, currently indices of lambda variables of inner lambdas are also collected and would cause problems with ProjectionExprs, but that's easy fixable so please don't take that into account when thinking about it

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was PhysicalExpr, but because this is not and also created every evaluate call, then I don't think you would have a bug with the naming.

I do think that we should try to have LambdaArgument in the physical plan so we won't have to recreate it every batch and have that cost

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProjectionExprs can be used if we modify it to also handle LambdaVariable's, should we do that?

Maybe try to create a different pr of supporting ProjectionExprs and how would that look like

}

impl LambdaArgument {
pub fn new(params: Vec<FieldRef>, body: Arc<dyn PhysicalExpr>) -> Self {
let schema = Arc::new(Schema::new(params.clone()));
pub fn new(
params: Vec<FieldRef>,
body: Arc<dyn PhysicalExpr>,
captures: Option<RecordBatch>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PhysicalExpr usually don't know the schema on creation which mean that now the API is a little weird, since captures requires you to know the names which also mean any tree transformation that will rename will now no longer work since the captures are not updated.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LambdaArgument it's not a PhysicalExpr, it's just a helper to the function implementer to encapsulate the evaluation of only the requested lambda parameters and merging the outer columns with the evaluated parameters. It's created only during execution, after I believe all tree transformation should have already happened

Ok(ValueOrLambda::Lambda(LambdaArgument::new(
params,
Arc::clone(lambda.projected_body()),
if projection.is_empty() {
None
} else {
Some(batch.project(&projection)?)
},
)))

Link to the PR diff UI

) -> Self {
let fields = match &captures {
Some(batch) => batch
.schema_ref()
.fields()
.iter()
.cloned()
.chain(params.clone())
.collect(),
None => params.clone(),
};

let schema = Arc::new(Schema::new(fields));

Self {
params,
body,
schema,
captures,
}
}

/// Evaluate this lambda
/// `args` should evaluate to the value of each parameter
/// of the correspondent lambda returned in [HigherOrderUDF::lambda_parameters].
///
/// `spread_captures` is responsible for transforming the captured column arrays
/// so they align with the evaluation batch. Captures are snapshotted from the
/// outer batch at construction time, giving one value per outer row, but the
/// function may evaluate the lambda body over a batch with a different number
/// of rows. It is the function responsibility to provide the appropriate `spread_captures`
/// closure to expand (or otherwise reshape) the captures to match.
/// Function working on lists, for example `array_transform(arr, v -> v + 1)`
/// flattens all list elements into a single batch, duplicating captured
/// values for rows with multiple elements and dropping them for empty lists.
/// If the lambda has no captures, `spread_captures` is never called.
pub fn evaluate(
&self,
args: &[&dyn Fn() -> Result<ArrayRef>],
spread_captures: impl FnOnce(&[ArrayRef]) -> Result<Vec<ArrayRef>>,
) -> Result<ColumnarValue> {
let columns = args
let spread_captures = self
.captures
.as_ref()
.map(|captures| {
let spread_columns = spread_captures(captures.columns())?;

RecordBatch::try_new(captures.schema(), spread_columns)
})
.transpose()?;

let merged = merge_captures_with_variables(
spread_captures.as_ref(),
Arc::clone(&self.schema),
&self.params,
args,
)?;

self.body.evaluate(&merged)
}
}

fn merge_captures_with_variables(
captures: Option<&RecordBatch>,
schema: SchemaRef,
params: &[FieldRef],
variables: &[&dyn Fn() -> Result<ArrayRef>],
) -> Result<RecordBatch> {
if variables.len() < params.len() {
return exec_err!(
"expected at least {} lambda arguments to merge with captures, got {}",
params.len(),
variables.len()
);
}

let columns = match captures {
Some(captures) => {
let mut columns = captures.columns().to_vec();

for arg in &variables[..params.len()] {
columns.push(arg()?);
}

columns
}
None => variables
.iter()
.take(self.params.len())
.take(params.len())
.map(|arg| arg())
.collect::<Result<_>>()?;

let batch = RecordBatch::try_new(Arc::clone(&self.schema), columns)?;
.collect::<Result<_>>()?,
};

self.body.evaluate(&batch)
}
Ok(RecordBatch::try_new(schema, columns)?)
}

/// Information about arguments passed to the function
Expand Down
Loading
Loading