Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ mod negative;
mod not;
mod nth_value;
mod nullif;
mod rank;
mod row_number;
mod sum;
mod try_cast;
Expand All @@ -63,6 +64,7 @@ pub use negative::{negative, NegativeExpr};
pub use not::{not, NotExpr};
pub use nth_value::NthValue;
pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
pub use rank::{dense_rank, rank};
pub use row_number::RowNumber;
pub use sum::{sum_return_type, Sum};
pub use try_cast::{try_cast, TryCastExpr};
Expand Down
59 changes: 36 additions & 23 deletions datafusion/src/physical_plan/expressions/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
//! Defines physical expressions that can evaluated at runtime during query execution

use crate::error::{DataFusionError, Result};
use crate::physical_plan::window_functions::PartitionEvaluator;
use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
use crate::scalar::ScalarValue;
use arrow::array::{new_empty_array, new_null_array, ArrayRef};
use arrow::array::{new_null_array, ArrayRef};
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

/// nth_value kind
Expand Down Expand Up @@ -111,25 +114,34 @@ impl BuiltInWindowFunctionExpr for NthValue {
&self.name
}

fn evaluate(&self, num_rows: usize, values: &[ArrayRef]) -> Result<ArrayRef> {
if values.is_empty() {
return Err(DataFusionError::Execution(format!(
"No arguments supplied to {}",
self.name()
)));
}
let value = &values[0];
if value.len() != num_rows {
return Err(DataFusionError::Execution(format!(
"Invalid data supplied to {}, expect {} rows, got {} rows",
self.name(),
num_rows,
value.len()
)));
}
if num_rows == 0 {
return Ok(new_empty_array(value.data_type()));
}
fn create_evaluator(
&self,
batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>> {
let values = self
.expressions()
.iter()
.map(|e| e.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(NthValueEvaluator {
kind: self.kind,
values,
}))
}
}

/// Value evaluator for nth_value functions
pub(crate) struct NthValueEvaluator {
kind: NthValueKind,
values: Vec<ArrayRef>,
}

impl PartitionEvaluator for NthValueEvaluator {
fn evaluate_partition(&self, partition: Range<usize>) -> Result<ArrayRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface makes sense (to pass in the range of rows), though it may make more sense to explicitly pass in values: Vec<ArrayRef> rather than assume whatever implements the Evaluator was constructed in a way they can be found

let value = &self.values[0];
let num_rows = partition.end - partition.start;
let value = value.slice(partition.start, num_rows);
let index: usize = match self.kind {
NthValueKind::First => 0,
NthValueKind::Last => (num_rows as usize) - 1,
Expand All @@ -138,7 +150,7 @@ impl BuiltInWindowFunctionExpr for NthValue {
Ok(if index >= num_rows {
new_null_array(value.data_type(), num_rows)
} else {
let value = ScalarValue::try_from_array(value, index)?;
let value = ScalarValue::try_from_array(&value, index)?;
value.to_array_of_size(num_rows)
})
}
Expand All @@ -157,8 +169,9 @@ mod tests {
let values = vec![arr];
let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
let result = expr.evaluate(batch.num_rows(), &values)?;
let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
let result = expr.create_evaluator(&batch)?.evaluate(vec![0..8])?;
assert_eq!(1, result.len());
let result = result[0].as_any().downcast_ref::<Int32Array>().unwrap();
let result = result.values();
assert_eq!(expected, result);
Ok(())
Expand Down
172 changes: 172 additions & 0 deletions datafusion/src/physical_plan/expressions/rank.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines physical expressions that can evaluated at runtime during query execution

use crate::error::Result;
use crate::physical_plan::window_functions::PartitionEvaluator;
use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
use arrow::array::ArrayRef;
use arrow::array::UInt64Array;
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use std::any::Any;
use std::iter;
use std::ops::Range;
use std::sync::Arc;

/// Rank calculates the rank in the window function with order by
#[derive(Debug)]
pub struct Rank {
name: String,
dense: bool,
}

/// Create a rank window function
pub fn rank(name: String) -> Rank {
Rank { name, dense: false }
}

/// Create a dense rank window function
pub fn dense_rank(name: String) -> Rank {
Rank { name, dense: true }
}

impl BuiltInWindowFunctionExpr for Rank {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
let nullable = false;
let data_type = DataType::UInt64;
Ok(Field::new(self.name(), data_type, nullable))
}

fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![]
}

fn name(&self) -> &str {
&self.name
}

fn create_evaluator(
&self,
_batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(RankEvaluator { dense: self.dense }))
}
}

pub(crate) struct RankEvaluator {
dense: bool,
}

impl PartitionEvaluator for RankEvaluator {
fn include_rank(&self) -> bool {
true
}

fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef> {
unreachable!("rank evaluation must be called with evaluate_partition_with_rank")
}

fn evaluate_partition_with_rank(
&self,
_partition: Range<usize>,
ranks_in_partition: &[Range<usize>],
) -> Result<ArrayRef> {
let result = if self.dense {
UInt64Array::from_iter_values(ranks_in_partition.iter().zip(1u64..).flat_map(
|(range, rank)| {
let len = range.end - range.start;
iter::repeat(rank).take(len)
},
))
} else {
UInt64Array::from_iter_values(
ranks_in_partition
.iter()
.scan(1_u64, |acc, range| {
let len = range.end - range.start;
let result = iter::repeat(*acc).take(len);
*acc += len as u64;
Some(result)
})
.flatten(),
)
};
Ok(Arc::new(result))
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::{array::*, datatypes::*};

fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
test_i32_result(
expr,
vec![-2, -2, 1, 3, 3, 3, 7, 8],
vec![0..2, 2..3, 3..6, 6..7, 7..8],
expected,
)
}

fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
test_i32_result(expr, vec![-2, -2, 1, 3, 3, 3, 7, 8], vec![0..8], expected)
}

fn test_i32_result(
expr: &Rank,
data: Vec<i32>,
ranks: Vec<Range<usize>>,
expected: Vec<u64>,
) -> Result<()> {
let arr: ArrayRef = Arc::new(Int32Array::from(data));
let values = vec![arr];
let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
let result = expr
.create_evaluator(&batch)?
.evaluate_with_rank(vec![0..8], ranks)?;
assert_eq!(1, result.len());
let result = result[0].as_any().downcast_ref::<UInt64Array>().unwrap();
let result = result.values();
assert_eq!(expected, result);
Ok(())
}

#[test]
fn test_dense_rank() -> Result<()> {
let r = dense_rank("arr".into());
test_without_rank(&r, vec![1; 8])?;
test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
Ok(())
}

#[test]
fn test_rank() -> Result<()> {
let r = rank("arr".into());
test_without_rank(&r, vec![1; 8])?;
test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
Ok(())
}
}
32 changes: 25 additions & 7 deletions datafusion/src/physical_plan/expressions/row_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
//! Defines physical expression for `row_number` that can evaluated at runtime during query execution

use crate::error::Result;
use crate::physical_plan::window_functions::PartitionEvaluator;
use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
use arrow::array::{ArrayRef, UInt64Array};
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

/// row_number expression
Expand Down Expand Up @@ -54,12 +57,25 @@ impl BuiltInWindowFunctionExpr for RowNumber {
}

fn name(&self) -> &str {
self.name.as_str()
&self.name
}

fn evaluate(&self, num_rows: usize, _values: &[ArrayRef]) -> Result<ArrayRef> {
fn create_evaluator(
&self,
_batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(NumRowsEvaluator::default()))
}
}

#[derive(Default)]
pub(crate) struct NumRowsEvaluator {}

impl PartitionEvaluator for NumRowsEvaluator {
fn evaluate_partition(&self, partition: Range<usize>) -> Result<ArrayRef> {
let num_rows = partition.end - partition.start;
Ok(Arc::new(UInt64Array::from_iter_values(
(1..num_rows + 1).map(|i| i as u64),
1..(num_rows as u64) + 1,
)))
}
}
Expand All @@ -79,8 +95,9 @@ mod tests {
let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?;
let row_number = RowNumber::new("row_number".to_owned());
let result = row_number.evaluate(batch.num_rows(), &[])?;
let result = result.as_any().downcast_ref::<UInt64Array>().unwrap();
let result = row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?;
assert_eq!(1, result.len());
let result = result[0].as_any().downcast_ref::<UInt64Array>().unwrap();
let result = result.values();
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result);
Ok(())
Expand All @@ -94,8 +111,9 @@ mod tests {
let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?;
let row_number = RowNumber::new("row_number".to_owned());
let result = row_number.evaluate(batch.num_rows(), &[])?;
let result = result.as_any().downcast_ref::<UInt64Array>().unwrap();
let result = row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?;
assert_eq!(1, result.len());
let result = result[0].as_any().downcast_ref::<UInt64Array>().unwrap();
let result = result.values();
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result);
Ok(())
Expand Down
Loading