diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs deleted file mode 100644 index a1f4b7ace530..000000000000 --- a/datafusion/src/physical_plan/windows.rs +++ /dev/null @@ -1,717 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Execution plan for window functions - -use crate::error::{DataFusionError, Result}; -use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; -use crate::physical_plan::{ - aggregates, common, - expressions::{ - dense_rank, lag, lead, rank, Literal, NthValue, PhysicalSortExpr, RowNumber, - }, - type_coercion::coerce, - window_functions::{ - signature_for_built_in, BuiltInWindowFunction, BuiltInWindowFunctionExpr, - WindowFunction, - }, - Accumulator, AggregateExpr, Distribution, ExecutionPlan, Partitioning, PhysicalExpr, - RecordBatchStream, SendableRecordBatchStream, WindowExpr, -}; -use crate::scalar::ScalarValue; -use arrow::compute::concat; -use arrow::{ - array::ArrayRef, - datatypes::{Field, Schema, SchemaRef}, - error::{ArrowError, Result as ArrowResult}, - record_batch::RecordBatch, -}; -use async_trait::async_trait; -use futures::stream::Stream; -use futures::Future; -use pin_project_lite::pin_project; -use std::any::Any; -use std::convert::TryInto; -use std::ops::Range; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -/// Window execution plan -#[derive(Debug)] -pub struct WindowAggExec { - /// Input plan - input: Arc, - /// Window function expression - window_expr: Vec>, - /// Schema after the window is run - schema: SchemaRef, - /// Schema before the window - input_schema: SchemaRef, -} - -/// Create a physical expression for window function -pub fn create_window_expr( - fun: &WindowFunction, - name: String, - args: &[Arc], - partition_by: &[Arc], - order_by: &[PhysicalSortExpr], - window_frame: Option, - input_schema: &Schema, -) -> Result> { - Ok(match fun { - WindowFunction::AggregateFunction(fun) => Arc::new(AggregateWindowExpr { - aggregate: aggregates::create_aggregate_expr( - fun, - false, - args, - input_schema, - name, - )?, - partition_by: partition_by.to_vec(), - order_by: order_by.to_vec(), - window_frame, - }), - WindowFunction::BuiltInWindowFunction(fun) => Arc::new(BuiltInWindowExpr { - fun: fun.clone(), - expr: create_built_in_window_expr(fun, args, input_schema, name)?, - partition_by: partition_by.to_vec(), - order_by: order_by.to_vec(), - window_frame, - }), - }) -} - -fn get_scalar_value_from_args( - args: &[Arc], - index: usize, -) -> Option { - args.get(index).map(|v| { - v.as_any() - .downcast_ref::() - .unwrap() - .value() - .clone() - }) -} - -fn create_built_in_window_expr( - fun: &BuiltInWindowFunction, - args: &[Arc], - input_schema: &Schema, - name: String, -) -> Result> { - Ok(match fun { - BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name)), - BuiltInWindowFunction::Rank => Arc::new(rank(name)), - BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name)), - BuiltInWindowFunction::Lag => { - let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?; - let arg = coerced_args[0].clone(); - let data_type = args[0].data_type(input_schema)?; - let shift_offset = get_scalar_value_from_args(&coerced_args, 1) - .map(|v| v.try_into()) - .and_then(|v| v.ok()); - let default_value = get_scalar_value_from_args(&coerced_args, 2); - Arc::new(lag(name, data_type, arg, shift_offset, default_value)) - } - BuiltInWindowFunction::Lead => { - let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?; - let arg = coerced_args[0].clone(); - let data_type = args[0].data_type(input_schema)?; - let shift_offset = get_scalar_value_from_args(&coerced_args, 1) - .map(|v| v.try_into()) - .and_then(|v| v.ok()); - let default_value = get_scalar_value_from_args(&coerced_args, 2); - Arc::new(lead(name, data_type, arg, shift_offset, default_value)) - } - BuiltInWindowFunction::NthValue => { - let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?; - let arg = coerced_args[0].clone(); - let n = coerced_args[1] - .as_any() - .downcast_ref::() - .unwrap() - .value(); - let n: i64 = n - .clone() - .try_into() - .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; - let n: u32 = n as u32; - let data_type = args[0].data_type(input_schema)?; - Arc::new(NthValue::nth_value(name, arg, data_type, n)?) - } - BuiltInWindowFunction::FirstValue => { - let arg = - coerce(args, input_schema, &signature_for_built_in(fun))?[0].clone(); - let data_type = args[0].data_type(input_schema)?; - Arc::new(NthValue::first_value(name, arg, data_type)) - } - BuiltInWindowFunction::LastValue => { - let arg = - coerce(args, input_schema, &signature_for_built_in(fun))?[0].clone(); - let data_type = args[0].data_type(input_schema)?; - Arc::new(NthValue::last_value(name, arg, data_type)) - } - _ => { - return Err(DataFusionError::NotImplemented(format!( - "Window function with {:?} not yet implemented", - fun - ))) - } - }) -} - -/// A window expr that takes the form of a built in window function -#[derive(Debug)] -pub struct BuiltInWindowExpr { - fun: BuiltInWindowFunction, - expr: Arc, - partition_by: Vec>, - order_by: Vec, - window_frame: Option, -} - -impl WindowExpr for BuiltInWindowExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - self.expr.name() - } - - fn field(&self) -> Result { - self.expr.field() - } - - fn expressions(&self) -> Vec> { - self.expr.expressions() - } - - fn partition_by(&self) -> &[Arc] { - &self.partition_by - } - - fn order_by(&self) -> &[PhysicalSortExpr] { - &self.order_by - } - - fn evaluate(&self, batch: &RecordBatch) -> Result { - let evaluator = self.expr.create_evaluator(batch)?; - let num_rows = batch.num_rows(); - let partition_points = - self.evaluate_partition_points(num_rows, &self.partition_columns(batch)?)?; - let results = if evaluator.include_rank() { - let sort_partition_points = - self.evaluate_partition_points(num_rows, &self.sort_columns(batch)?)?; - evaluator.evaluate_with_rank(partition_points, sort_partition_points)? - } else { - evaluator.evaluate(partition_points)? - }; - let results = results.iter().map(|i| i.as_ref()).collect::>(); - concat(&results).map_err(DataFusionError::ArrowError) - } -} - -/// Given a partition range, and the full list of sort partition points, given that the sort -/// partition points are sorted using [partition columns..., order columns...], the split -/// boundaries would align (what's sorted on [partition columns...] would definitely be sorted -/// on finer columns), so this will use binary search to find ranges that are within the -/// partition range and return the valid slice. -pub(crate) fn find_ranges_in_range<'a>( - partition_range: &Range, - sort_partition_points: &'a [Range], -) -> &'a [Range] { - let start_idx = sort_partition_points - .partition_point(|sort_range| sort_range.start < partition_range.start); - let end_idx = start_idx - + sort_partition_points[start_idx..] - .partition_point(|sort_range| sort_range.end <= partition_range.end); - &sort_partition_points[start_idx..end_idx] -} - -/// A window expr that takes the form of an aggregate function -#[derive(Debug)] -pub struct AggregateWindowExpr { - aggregate: Arc, - partition_by: Vec>, - order_by: Vec, - window_frame: Option, -} - -impl AggregateWindowExpr { - /// the aggregate window function operates based on window frame, and by default the mode is - /// "range". - fn evaluation_mode(&self) -> WindowFrameUnits { - self.window_frame.unwrap_or_default().units - } - - /// create a new accumulator based on the underlying aggregation function - fn create_accumulator(&self) -> Result { - let accumulator = self.aggregate.create_accumulator()?; - Ok(AggregateWindowAccumulator { accumulator }) - } - - /// peer based evaluation based on the fact that batch is pre-sorted given the sort columns - /// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same - /// results for peers) and concatenate the results. - fn peer_based_evaluate(&self, batch: &RecordBatch) -> Result { - let num_rows = batch.num_rows(); - let partition_points = - self.evaluate_partition_points(num_rows, &self.partition_columns(batch)?)?; - let sort_partition_points = - self.evaluate_partition_points(num_rows, &self.sort_columns(batch)?)?; - let values = self.evaluate_args(batch)?; - let results = partition_points - .iter() - .map(|partition_range| { - let sort_partition_points = - find_ranges_in_range(partition_range, &sort_partition_points); - let mut window_accumulators = self.create_accumulator()?; - sort_partition_points - .iter() - .map(|range| window_accumulators.scan_peers(&values, range)) - .collect::>>() - }) - .collect::>>>()? - .into_iter() - .flatten() - .collect::>(); - let results = results.iter().map(|i| i.as_ref()).collect::>(); - concat(&results).map_err(DataFusionError::ArrowError) - } - - fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result { - Err(DataFusionError::NotImplemented(format!( - "Group based evaluation for {} is not yet implemented", - self.name() - ))) - } - - fn row_based_evaluate(&self, _batch: &RecordBatch) -> Result { - Err(DataFusionError::NotImplemented(format!( - "Row based evaluation for {} is not yet implemented", - self.name() - ))) - } -} - -impl WindowExpr for AggregateWindowExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - self.aggregate.name() - } - - fn field(&self) -> Result { - self.aggregate.field() - } - - fn expressions(&self) -> Vec> { - self.aggregate.expressions() - } - - fn partition_by(&self) -> &[Arc] { - &self.partition_by - } - - fn order_by(&self) -> &[PhysicalSortExpr] { - &self.order_by - } - - /// evaluate the window function values against the batch - fn evaluate(&self, batch: &RecordBatch) -> Result { - match self.evaluation_mode() { - WindowFrameUnits::Range => self.peer_based_evaluate(batch), - WindowFrameUnits::Rows => self.row_based_evaluate(batch), - WindowFrameUnits::Groups => self.group_based_evaluate(batch), - } - } -} - -/// Aggregate window accumulator utilizes the accumulator from aggregation and do a accumulative sum -/// across evaluation arguments based on peer equivalences. -#[derive(Debug)] -struct AggregateWindowAccumulator { - accumulator: Box, -} - -impl AggregateWindowAccumulator { - /// scan one peer group of values (as arguments to window function) given by the value_range - /// and return evaluation result that are of the same number of rows. - fn scan_peers( - &mut self, - values: &[ArrayRef], - value_range: &Range, - ) -> Result { - if value_range.is_empty() { - return Err(DataFusionError::Internal( - "Value range cannot be empty".to_owned(), - )); - } - let len = value_range.end - value_range.start; - let values = values - .iter() - .map(|v| v.slice(value_range.start, len)) - .collect::>(); - self.accumulator.update_batch(&values)?; - let value = self.accumulator.evaluate()?; - Ok(value.to_array_of_size(len)) - } -} - -fn create_schema( - input_schema: &Schema, - window_expr: &[Arc], -) -> Result { - let mut fields = Vec::with_capacity(input_schema.fields().len() + window_expr.len()); - for expr in window_expr { - fields.push(expr.field()?); - } - fields.extend_from_slice(input_schema.fields()); - Ok(Schema::new(fields)) -} - -impl WindowAggExec { - /// Create a new execution plan for window aggregates - pub fn try_new( - window_expr: Vec>, - input: Arc, - input_schema: SchemaRef, - ) -> Result { - let schema = create_schema(&input_schema, &window_expr)?; - let schema = Arc::new(schema); - Ok(WindowAggExec { - input, - window_expr, - schema, - input_schema, - }) - } - - /// Window expressions - pub fn window_expr(&self) -> &[Arc] { - &self.window_expr - } - - /// Input plan - pub fn input(&self) -> &Arc { - &self.input - } - - /// Get the input schema before any window functions are applied - pub fn input_schema(&self) -> SchemaRef { - self.input_schema.clone() - } -} - -#[async_trait] -impl ExecutionPlan for WindowAggExec { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn children(&self) -> Vec> { - vec![self.input.clone()] - } - - /// Get the output partitioning of this plan - fn output_partitioning(&self) -> Partitioning { - // because we can have repartitioning using the partition keys - // this would be either 1 or more than 1 depending on the presense of - // repartitioning - self.input.output_partitioning() - } - - fn required_child_distribution(&self) -> Distribution { - if self - .window_expr() - .iter() - .all(|expr| expr.partition_by().is_empty()) - { - Distribution::SinglePartition - } else { - Distribution::UnspecifiedDistribution - } - } - - fn with_new_children( - &self, - children: Vec>, - ) -> Result> { - match children.len() { - 1 => Ok(Arc::new(WindowAggExec::try_new( - self.window_expr.clone(), - children[0].clone(), - self.input_schema.clone(), - )?)), - _ => Err(DataFusionError::Internal( - "WindowAggExec wrong number of children".to_owned(), - )), - } - } - - async fn execute(&self, partition: usize) -> Result { - let input = self.input.execute(partition).await?; - let stream = Box::pin(WindowAggStream::new( - self.schema.clone(), - self.window_expr.clone(), - input, - )); - Ok(stream) - } -} - -pin_project! { - /// stream for window aggregation plan - pub struct WindowAggStream { - schema: SchemaRef, - #[pin] - output: futures::channel::oneshot::Receiver>, - finished: bool, - } -} - -/// Compute the window aggregate columns -fn compute_window_aggregates( - window_expr: Vec>, - batch: &RecordBatch, -) -> Result> { - window_expr - .iter() - .map(|window_expr| window_expr.evaluate(batch)) - .collect() -} - -impl WindowAggStream { - /// Create a new WindowAggStream - pub fn new( - schema: SchemaRef, - window_expr: Vec>, - input: SendableRecordBatchStream, - ) -> Self { - let (tx, rx) = futures::channel::oneshot::channel(); - let schema_clone = schema.clone(); - tokio::spawn(async move { - let schema = schema_clone.clone(); - let result = WindowAggStream::process(input, window_expr, schema).await; - tx.send(result) - }); - - Self { - output: rx, - finished: false, - schema, - } - } - - async fn process( - input: SendableRecordBatchStream, - window_expr: Vec>, - schema: SchemaRef, - ) -> ArrowResult { - let input_schema = input.schema(); - let batches = common::collect(input) - .await - .map_err(DataFusionError::into_arrow_external_error)?; - let batch = common::combine_batches(&batches, input_schema.clone())?; - if let Some(batch) = batch { - // calculate window cols - let mut columns = compute_window_aggregates(window_expr, &batch) - .map_err(DataFusionError::into_arrow_external_error)?; - // combine with the original cols - // note the setup of window aggregates is that they newly calculated window - // expressions are always prepended to the columns - columns.extend_from_slice(batch.columns()); - RecordBatch::try_new(schema, columns) - } else { - Ok(RecordBatch::new_empty(schema)) - } - } -} - -impl Stream for WindowAggStream { - type Item = ArrowResult; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.finished { - return Poll::Ready(None); - } - - // is the output ready? - let this = self.project(); - let output_poll = this.output.poll(cx); - - match output_poll { - Poll::Ready(result) => { - *this.finished = true; - // check for error in receiving channel and unwrap actual result - let result = match result { - Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving - Ok(result) => Some(result), - }; - Poll::Ready(result) - } - Poll::Pending => Poll::Pending, - } - } -} - -impl RecordBatchStream for WindowAggStream { - /// Get the schema - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::physical_plan::aggregates::AggregateFunction; - use crate::physical_plan::collect; - use crate::physical_plan::csv::{CsvExec, CsvReadOptions}; - use crate::physical_plan::expressions::col; - use crate::test; - use arrow::array::*; - - fn create_test_schema(partitions: usize) -> Result<(Arc, SchemaRef)> { - let schema = test::aggr_test_schema(); - let path = test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; - let csv = CsvExec::try_new( - &path, - CsvReadOptions::new().schema(&schema), - None, - 1024, - None, - )?; - - let input = Arc::new(csv); - Ok((input, schema)) - } - - #[test] - fn test_create_window_exp_lead_no_args() -> Result<()> { - let (_, schema) = create_test_schema(1)?; - - let expr = create_window_expr( - &WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::Lead), - "prev".to_owned(), - &[col("c2", &schema)?], - &[], - &[], - Some(WindowFrame::default()), - schema.as_ref(), - )?; - - assert_eq!(expr.name(), "prev"); - - Ok(()) - } - - #[test] - fn test_create_window_exp_lead_with_args() -> Result<()> { - let (_, schema) = create_test_schema(1)?; - - let expr = create_window_expr( - &WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::Lead), - "prev".to_owned(), - &[ - col("c2", &schema)?, - Arc::new(Literal::new(ScalarValue::Int64(Some(1)))), - ], - &[], - &[], - Some(WindowFrame::default()), - schema.as_ref(), - )?; - - assert_eq!(expr.name(), "prev"); - - Ok(()) - } - - #[tokio::test] - async fn window_function() -> Result<()> { - let (input, schema) = create_test_schema(1)?; - - let window_exec = Arc::new(WindowAggExec::try_new( - vec![ - create_window_expr( - &WindowFunction::AggregateFunction(AggregateFunction::Count), - "count".to_owned(), - &[col("c3", &schema)?], - &[], - &[], - Some(WindowFrame::default()), - schema.as_ref(), - )?, - create_window_expr( - &WindowFunction::AggregateFunction(AggregateFunction::Max), - "max".to_owned(), - &[col("c3", &schema)?], - &[], - &[], - Some(WindowFrame::default()), - schema.as_ref(), - )?, - create_window_expr( - &WindowFunction::AggregateFunction(AggregateFunction::Min), - "min".to_owned(), - &[col("c3", &schema)?], - &[], - &[], - Some(WindowFrame::default()), - schema.as_ref(), - )?, - ], - input, - schema.clone(), - )?); - - let result: Vec = collect(window_exec).await?; - assert_eq!(result.len(), 1); - - let columns = result[0].columns(); - - // c3 is small int - - let count: &UInt64Array = as_primitive_array(&columns[0]); - assert_eq!(count.value(0), 100); - assert_eq!(count.value(99), 100); - - let max: &Int8Array = as_primitive_array(&columns[1]); - assert_eq!(max.value(0), 125); - assert_eq!(max.value(99), 125); - - let min: &Int8Array = as_primitive_array(&columns[2]); - assert_eq!(min.value(0), -117); - assert_eq!(min.value(99), -117); - - Ok(()) - } -} diff --git a/datafusion/src/physical_plan/windows/aggregate.rs b/datafusion/src/physical_plan/windows/aggregate.rs new file mode 100644 index 000000000000..f7c29ba6aff7 --- /dev/null +++ b/datafusion/src/physical_plan/windows/aggregate.rs @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical exec for aggregate window function expressions. + +use crate::error::{DataFusionError, Result}; +use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; +use crate::physical_plan::windows::find_ranges_in_range; +use crate::physical_plan::{ + expressions::PhysicalSortExpr, Accumulator, AggregateExpr, PhysicalExpr, WindowExpr, +}; +use arrow::compute::concat; +use arrow::record_batch::RecordBatch; +use arrow::{array::ArrayRef, datatypes::Field}; +use std::any::Any; +use std::iter::IntoIterator; +use std::ops::Range; +use std::sync::Arc; + +/// A window expr that takes the form of an aggregate function +#[derive(Debug)] +pub struct AggregateWindowExpr { + aggregate: Arc, + partition_by: Vec>, + order_by: Vec, + window_frame: Option, +} + +impl AggregateWindowExpr { + /// create a new aggregate window function expression + pub(super) fn new( + aggregate: Arc, + partition_by: &[Arc], + order_by: &[PhysicalSortExpr], + window_frame: Option, + ) -> Self { + Self { + aggregate, + partition_by: partition_by.to_vec(), + order_by: order_by.to_vec(), + window_frame, + } + } + + /// the aggregate window function operates based on window frame, and by default the mode is + /// "range". + fn evaluation_mode(&self) -> WindowFrameUnits { + self.window_frame.unwrap_or_default().units + } + + /// create a new accumulator based on the underlying aggregation function + fn create_accumulator(&self) -> Result { + let accumulator = self.aggregate.create_accumulator()?; + Ok(AggregateWindowAccumulator { accumulator }) + } + + /// peer based evaluation based on the fact that batch is pre-sorted given the sort columns + /// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same + /// results for peers) and concatenate the results. + fn peer_based_evaluate(&self, batch: &RecordBatch) -> Result { + let num_rows = batch.num_rows(); + let partition_points = + self.evaluate_partition_points(num_rows, &self.partition_columns(batch)?)?; + let sort_partition_points = + self.evaluate_partition_points(num_rows, &self.sort_columns(batch)?)?; + let values = self.evaluate_args(batch)?; + let results = partition_points + .iter() + .map(|partition_range| { + let sort_partition_points = + find_ranges_in_range(partition_range, &sort_partition_points); + let mut window_accumulators = self.create_accumulator()?; + sort_partition_points + .iter() + .map(|range| window_accumulators.scan_peers(&values, range)) + .collect::>>() + }) + .collect::>>>()? + .into_iter() + .flatten() + .collect::>(); + let results = results.iter().map(|i| i.as_ref()).collect::>(); + concat(&results).map_err(DataFusionError::ArrowError) + } + + fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result { + Err(DataFusionError::NotImplemented(format!( + "Group based evaluation for {} is not yet implemented", + self.name() + ))) + } + + fn row_based_evaluate(&self, _batch: &RecordBatch) -> Result { + Err(DataFusionError::NotImplemented(format!( + "Row based evaluation for {} is not yet implemented", + self.name() + ))) + } +} + +impl WindowExpr for AggregateWindowExpr { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + self.aggregate.name() + } + + fn field(&self) -> Result { + self.aggregate.field() + } + + fn expressions(&self) -> Vec> { + self.aggregate.expressions() + } + + fn partition_by(&self) -> &[Arc] { + &self.partition_by + } + + fn order_by(&self) -> &[PhysicalSortExpr] { + &self.order_by + } + + /// evaluate the window function values against the batch + fn evaluate(&self, batch: &RecordBatch) -> Result { + match self.evaluation_mode() { + WindowFrameUnits::Range => self.peer_based_evaluate(batch), + WindowFrameUnits::Rows => self.row_based_evaluate(batch), + WindowFrameUnits::Groups => self.group_based_evaluate(batch), + } + } +} + +/// Aggregate window accumulator utilizes the accumulator from aggregation and do a accumulative sum +/// across evaluation arguments based on peer equivalences. +#[derive(Debug)] +struct AggregateWindowAccumulator { + accumulator: Box, +} + +impl AggregateWindowAccumulator { + /// scan one peer group of values (as arguments to window function) given by the value_range + /// and return evaluation result that are of the same number of rows. + fn scan_peers( + &mut self, + values: &[ArrayRef], + value_range: &Range, + ) -> Result { + if value_range.is_empty() { + return Err(DataFusionError::Internal( + "Value range cannot be empty".to_owned(), + )); + } + let len = value_range.end - value_range.start; + let values = values + .iter() + .map(|v| v.slice(value_range.start, len)) + .collect::>(); + self.accumulator.update_batch(&values)?; + let value = self.accumulator.evaluate()?; + Ok(value.to_array_of_size(len)) + } +} diff --git a/datafusion/src/physical_plan/windows/built_in.rs b/datafusion/src/physical_plan/windows/built_in.rs new file mode 100644 index 000000000000..82040de6ef5c --- /dev/null +++ b/datafusion/src/physical_plan/windows/built_in.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical exec for built-in window function expressions. + +use crate::error::{DataFusionError, Result}; +use crate::logical_plan::window_frames::WindowFrame; +use crate::physical_plan::{ + expressions::PhysicalSortExpr, + window_functions::{BuiltInWindowFunction, BuiltInWindowFunctionExpr}, + PhysicalExpr, WindowExpr, +}; +use arrow::compute::concat; +use arrow::record_batch::RecordBatch; +use arrow::{array::ArrayRef, datatypes::Field}; +use std::any::Any; +use std::sync::Arc; + +/// A window expr that takes the form of a built in window function +#[derive(Debug)] +pub struct BuiltInWindowExpr { + fun: BuiltInWindowFunction, + expr: Arc, + partition_by: Vec>, + order_by: Vec, + window_frame: Option, +} + +impl BuiltInWindowExpr { + /// create a new built-in window function expression + pub(super) fn new( + fun: BuiltInWindowFunction, + expr: Arc, + partition_by: &[Arc], + order_by: &[PhysicalSortExpr], + window_frame: Option, + ) -> Self { + Self { + fun, + expr, + partition_by: partition_by.to_vec(), + order_by: order_by.to_vec(), + window_frame, + } + } +} + +impl WindowExpr for BuiltInWindowExpr { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + self.expr.name() + } + + fn field(&self) -> Result { + self.expr.field() + } + + fn expressions(&self) -> Vec> { + self.expr.expressions() + } + + fn partition_by(&self) -> &[Arc] { + &self.partition_by + } + + fn order_by(&self) -> &[PhysicalSortExpr] { + &self.order_by + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + let evaluator = self.expr.create_evaluator(batch)?; + let num_rows = batch.num_rows(); + let partition_points = + self.evaluate_partition_points(num_rows, &self.partition_columns(batch)?)?; + let results = if evaluator.include_rank() { + let sort_partition_points = + self.evaluate_partition_points(num_rows, &self.sort_columns(batch)?)?; + evaluator.evaluate_with_rank(partition_points, sort_partition_points)? + } else { + evaluator.evaluate(partition_points)? + }; + let results = results.iter().map(|i| i.as_ref()).collect::>(); + concat(&results).map_err(DataFusionError::ArrowError) + } +} diff --git a/datafusion/src/physical_plan/windows/mod.rs b/datafusion/src/physical_plan/windows/mod.rs new file mode 100644 index 000000000000..194aa8de5bb5 --- /dev/null +++ b/datafusion/src/physical_plan/windows/mod.rs @@ -0,0 +1,258 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical expressions for window functions + +use crate::error::{DataFusionError, Result}; +use crate::logical_plan::window_frames::WindowFrame; +use crate::physical_plan::{ + aggregates, + expressions::{ + dense_rank, lag, lead, rank, Literal, NthValue, PhysicalSortExpr, RowNumber, + }, + type_coercion::coerce, + window_functions::{ + signature_for_built_in, BuiltInWindowFunction, BuiltInWindowFunctionExpr, + WindowFunction, + }, + PhysicalExpr, WindowExpr, +}; +use crate::scalar::ScalarValue; +use arrow::datatypes::Schema; +use std::convert::TryInto; +use std::ops::Range; +use std::sync::Arc; + +mod aggregate; +mod built_in; +mod window_agg_exec; + +pub use aggregate::AggregateWindowExpr; +pub use built_in::BuiltInWindowExpr; +pub use window_agg_exec::WindowAggExec; + +/// Create a physical expression for window function +pub fn create_window_expr( + fun: &WindowFunction, + name: String, + args: &[Arc], + partition_by: &[Arc], + order_by: &[PhysicalSortExpr], + window_frame: Option, + input_schema: &Schema, +) -> Result> { + Ok(match fun { + WindowFunction::AggregateFunction(fun) => Arc::new(AggregateWindowExpr::new( + aggregates::create_aggregate_expr(fun, false, args, input_schema, name)?, + partition_by, + order_by, + window_frame, + )), + WindowFunction::BuiltInWindowFunction(fun) => Arc::new(BuiltInWindowExpr::new( + fun.clone(), + create_built_in_window_expr(fun, args, input_schema, name)?, + partition_by, + order_by, + window_frame, + )), + }) +} + +fn get_scalar_value_from_args( + args: &[Arc], + index: usize, +) -> Option { + args.get(index).map(|v| { + v.as_any() + .downcast_ref::() + .unwrap() + .value() + .clone() + }) +} + +fn create_built_in_window_expr( + fun: &BuiltInWindowFunction, + args: &[Arc], + input_schema: &Schema, + name: String, +) -> Result> { + Ok(match fun { + BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name)), + BuiltInWindowFunction::Rank => Arc::new(rank(name)), + BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name)), + BuiltInWindowFunction::Lag => { + let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?; + let arg = coerced_args[0].clone(); + let data_type = args[0].data_type(input_schema)?; + let shift_offset = get_scalar_value_from_args(&coerced_args, 1) + .map(|v| v.try_into()) + .and_then(|v| v.ok()); + let default_value = get_scalar_value_from_args(&coerced_args, 2); + Arc::new(lag(name, data_type, arg, shift_offset, default_value)) + } + BuiltInWindowFunction::Lead => { + let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?; + let arg = coerced_args[0].clone(); + let data_type = args[0].data_type(input_schema)?; + let shift_offset = get_scalar_value_from_args(&coerced_args, 1) + .map(|v| v.try_into()) + .and_then(|v| v.ok()); + let default_value = get_scalar_value_from_args(&coerced_args, 2); + Arc::new(lead(name, data_type, arg, shift_offset, default_value)) + } + BuiltInWindowFunction::NthValue => { + let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?; + let arg = coerced_args[0].clone(); + let n = coerced_args[1] + .as_any() + .downcast_ref::() + .unwrap() + .value(); + let n: i64 = n + .clone() + .try_into() + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + let n: u32 = n as u32; + let data_type = args[0].data_type(input_schema)?; + Arc::new(NthValue::nth_value(name, arg, data_type, n)?) + } + BuiltInWindowFunction::FirstValue => { + let arg = + coerce(args, input_schema, &signature_for_built_in(fun))?[0].clone(); + let data_type = args[0].data_type(input_schema)?; + Arc::new(NthValue::first_value(name, arg, data_type)) + } + BuiltInWindowFunction::LastValue => { + let arg = + coerce(args, input_schema, &signature_for_built_in(fun))?[0].clone(); + let data_type = args[0].data_type(input_schema)?; + Arc::new(NthValue::last_value(name, arg, data_type)) + } + _ => { + return Err(DataFusionError::NotImplemented(format!( + "Window function with {:?} not yet implemented", + fun + ))) + } + }) +} + +/// Given a partition range, and the full list of sort partition points, given that the sort +/// partition points are sorted using [partition columns..., order columns...], the split +/// boundaries would align (what's sorted on [partition columns...] would definitely be sorted +/// on finer columns), so this will use binary search to find ranges that are within the +/// partition range and return the valid slice. +pub(crate) fn find_ranges_in_range<'a>( + partition_range: &Range, + sort_partition_points: &'a [Range], +) -> &'a [Range] { + let start_idx = sort_partition_points + .partition_point(|sort_range| sort_range.start < partition_range.start); + let end_idx = start_idx + + sort_partition_points[start_idx..] + .partition_point(|sort_range| sort_range.end <= partition_range.end); + &sort_partition_points[start_idx..end_idx] +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::physical_plan::aggregates::AggregateFunction; + use crate::physical_plan::collect; + use crate::physical_plan::csv::{CsvExec, CsvReadOptions}; + use crate::physical_plan::expressions::col; + use crate::test; + use arrow::array::*; + use arrow::datatypes::SchemaRef; + use arrow::record_batch::RecordBatch; + + fn create_test_schema(partitions: usize) -> Result<(Arc, SchemaRef)> { + let schema = test::aggr_test_schema(); + let path = test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; + let csv = CsvExec::try_new( + &path, + CsvReadOptions::new().schema(&schema), + None, + 1024, + None, + )?; + + let input = Arc::new(csv); + Ok((input, schema)) + } + + #[tokio::test] + async fn window_function() -> Result<()> { + let (input, schema) = create_test_schema(1)?; + + let window_exec = Arc::new(WindowAggExec::try_new( + vec![ + create_window_expr( + &WindowFunction::AggregateFunction(AggregateFunction::Count), + "count".to_owned(), + &[col("c3", &schema)?], + &[], + &[], + Some(WindowFrame::default()), + schema.as_ref(), + )?, + create_window_expr( + &WindowFunction::AggregateFunction(AggregateFunction::Max), + "max".to_owned(), + &[col("c3", &schema)?], + &[], + &[], + Some(WindowFrame::default()), + schema.as_ref(), + )?, + create_window_expr( + &WindowFunction::AggregateFunction(AggregateFunction::Min), + "min".to_owned(), + &[col("c3", &schema)?], + &[], + &[], + Some(WindowFrame::default()), + schema.as_ref(), + )?, + ], + input, + schema.clone(), + )?); + + let result: Vec = collect(window_exec).await?; + assert_eq!(result.len(), 1); + + let columns = result[0].columns(); + + // c3 is small int + + let count: &UInt64Array = as_primitive_array(&columns[0]); + assert_eq!(count.value(0), 100); + assert_eq!(count.value(99), 100); + + let max: &Int8Array = as_primitive_array(&columns[1]); + assert_eq!(max.value(0), 125); + assert_eq!(max.value(99), 125); + + let min: &Int8Array = as_primitive_array(&columns[2]); + assert_eq!(min.value(0), -117); + assert_eq!(min.value(99), -117); + + Ok(()) + } +} diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs b/datafusion/src/physical_plan/windows/window_agg_exec.rs new file mode 100644 index 000000000000..2ff1f34ce4c7 --- /dev/null +++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Stream and channel implementations for window function expressions. + +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::{ + common, Distribution, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, WindowExpr, +}; +use arrow::{ + array::ArrayRef, + datatypes::{Schema, SchemaRef}, + error::{ArrowError, Result as ArrowResult}, + record_batch::RecordBatch, +}; +use async_trait::async_trait; +use futures::stream::Stream; +use futures::Future; +use pin_project_lite::pin_project; +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +/// Window execution plan +#[derive(Debug)] +pub struct WindowAggExec { + /// Input plan + input: Arc, + /// Window function expression + window_expr: Vec>, + /// Schema after the window is run + schema: SchemaRef, + /// Schema before the window + input_schema: SchemaRef, +} + +impl WindowAggExec { + /// Create a new execution plan for window aggregates + pub fn try_new( + window_expr: Vec>, + input: Arc, + input_schema: SchemaRef, + ) -> Result { + let schema = create_schema(&input_schema, &window_expr)?; + let schema = Arc::new(schema); + Ok(WindowAggExec { + input, + window_expr, + schema, + input_schema, + }) + } + + /// Window expressions + pub fn window_expr(&self) -> &[Arc] { + &self.window_expr + } + + /// Input plan + pub fn input(&self) -> &Arc { + &self.input + } + + /// Get the input schema before any window functions are applied + pub fn input_schema(&self) -> SchemaRef { + self.input_schema.clone() + } +} + +#[async_trait] +impl ExecutionPlan for WindowAggExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + /// Get the output partitioning of this plan + fn output_partitioning(&self) -> Partitioning { + // because we can have repartitioning using the partition keys + // this would be either 1 or more than 1 depending on the presense of + // repartitioning + self.input.output_partitioning() + } + + fn required_child_distribution(&self) -> Distribution { + if self + .window_expr() + .iter() + .all(|expr| expr.partition_by().is_empty()) + { + Distribution::SinglePartition + } else { + Distribution::UnspecifiedDistribution + } + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + match children.len() { + 1 => Ok(Arc::new(WindowAggExec::try_new( + self.window_expr.clone(), + children[0].clone(), + self.input_schema.clone(), + )?)), + _ => Err(DataFusionError::Internal( + "WindowAggExec wrong number of children".to_owned(), + )), + } + } + + async fn execute(&self, partition: usize) -> Result { + let input = self.input.execute(partition).await?; + let stream = Box::pin(WindowAggStream::new( + self.schema.clone(), + self.window_expr.clone(), + input, + )); + Ok(stream) + } +} + +fn create_schema( + input_schema: &Schema, + window_expr: &[Arc], +) -> Result { + let mut fields = Vec::with_capacity(input_schema.fields().len() + window_expr.len()); + for expr in window_expr { + fields.push(expr.field()?); + } + fields.extend_from_slice(input_schema.fields()); + Ok(Schema::new(fields)) +} + +/// Compute the window aggregate columns +fn compute_window_aggregates( + window_expr: Vec>, + batch: &RecordBatch, +) -> Result> { + window_expr + .iter() + .map(|window_expr| window_expr.evaluate(batch)) + .collect() +} + +pin_project! { + /// stream for window aggregation plan + pub struct WindowAggStream { + schema: SchemaRef, + #[pin] + output: futures::channel::oneshot::Receiver>, + finished: bool, + } +} + +impl WindowAggStream { + /// Create a new WindowAggStream + pub fn new( + schema: SchemaRef, + window_expr: Vec>, + input: SendableRecordBatchStream, + ) -> Self { + let (tx, rx) = futures::channel::oneshot::channel(); + let schema_clone = schema.clone(); + tokio::spawn(async move { + let schema = schema_clone.clone(); + let result = WindowAggStream::process(input, window_expr, schema).await; + tx.send(result) + }); + + Self { + output: rx, + finished: false, + schema, + } + } + + async fn process( + input: SendableRecordBatchStream, + window_expr: Vec>, + schema: SchemaRef, + ) -> ArrowResult { + let input_schema = input.schema(); + let batches = common::collect(input) + .await + .map_err(DataFusionError::into_arrow_external_error)?; + let batch = common::combine_batches(&batches, input_schema.clone())?; + if let Some(batch) = batch { + // calculate window cols + let mut columns = compute_window_aggregates(window_expr, &batch) + .map_err(DataFusionError::into_arrow_external_error)?; + // combine with the original cols + // note the setup of window aggregates is that they newly calculated window + // expressions are always prepended to the columns + columns.extend_from_slice(batch.columns()); + RecordBatch::try_new(schema, columns) + } else { + Ok(RecordBatch::new_empty(schema)) + } + } +} + +impl Stream for WindowAggStream { + type Item = ArrowResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.finished { + return Poll::Ready(None); + } + + // is the output ready? + let this = self.project(); + let output_poll = this.output.poll(cx); + + match output_poll { + Poll::Ready(result) => { + *this.finished = true; + // check for error in receiving channel and unwrap actual result + let result = match result { + Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving + Ok(result) => Some(result), + }; + Poll::Ready(result) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl RecordBatchStream for WindowAggStream { + /// Get the schema + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +}