From 5d5d673e7587eaf49f6903b3ed610883f24ef464 Mon Sep 17 00:00:00 2001 From: jatin Date: Sat, 12 Oct 2024 17:15:10 +0530 Subject: [PATCH 1/8] wip: combining the logic of rank, dense_rank and percent_rank udwf --- datafusion/functions-window/src/lib.rs | 15 +-- datafusion/functions-window/src/rank.rs | 150 +++++++++++++++++++----- 2 files changed, 126 insertions(+), 39 deletions(-) diff --git a/datafusion/functions-window/src/lib.rs b/datafusion/functions-window/src/lib.rs index b72780990841..0c04d81bcc8b 100644 --- a/datafusion/functions-window/src/lib.rs +++ b/datafusion/functions-window/src/lib.rs @@ -31,16 +31,17 @@ use datafusion_expr::WindowUDF; #[macro_use] pub mod macros; -pub mod dense_rank; -pub mod percent_rank; +// pub mod dense_rank; +// pub mod percent_rank; pub mod rank; pub mod row_number; /// Fluent-style API for creating `Expr`s pub mod expr_fn { - pub use super::dense_rank::dense_rank; - pub use super::percent_rank::percent_rank; - pub use super::rank::rank; + pub use super::rank::{ + // dense_rank, percent_rank, + rank, + }; pub use super::row_number::row_number; } @@ -49,8 +50,8 @@ pub fn all_default_window_functions() -> Vec> { vec![ row_number::row_number_udwf(), rank::rank_udwf(), - dense_rank::dense_rank_udwf(), - percent_rank::percent_rank_udwf(), + // rank::dense_rank_udwf(), + // rank::percent_rank_udwf(), ] } /// Registers all enabled packages with a [`FunctionRegistry`] diff --git a/datafusion/functions-window/src/rank.rs b/datafusion/functions-window/src/rank.rs index c52dec9061ba..4299cf3b4a12 100644 --- a/datafusion/functions-window/src/rank.rs +++ b/datafusion/functions-window/src/rank.rs @@ -25,12 +25,12 @@ use std::sync::Arc; use crate::define_udwf_and_expr; use datafusion_common::arrow::array::ArrayRef; -use datafusion_common::arrow::array::UInt64Array; +use datafusion_common::arrow::array::{Float64Array, UInt64Array}; use datafusion_common::arrow::compute::SortOptions; use datafusion_common::arrow::datatypes::DataType; use datafusion_common::arrow::datatypes::Field; use datafusion_common::utils::get_row_at_idx; -use datafusion_common::{Result, ScalarValue}; +use datafusion_common::{exec_err, Result, ScalarValue}; use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDFImpl}; use datafusion_functions_window_common::field; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; @@ -39,37 +39,76 @@ use field::WindowUDFFieldArgs; define_udwf_and_expr!( Rank, rank, - "Returns rank of the current row with gaps. Same as `row_number` of its first peer" + "Returns rank of the current row with gaps. Same as `row_number` of its first peer", + Rank::basic ); +// define_udwf_and_expr!( +// Rank, +// percent_rank, +// "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)", +// Rank::percent_rank +// ); +// +// define_udwf_and_expr!( +// Rank, +// dense_rank, +// "Returns rank of the current row without gaps. This function counts peer groups", +// Rank::dense_rank +// ); + /// rank expression #[derive(Debug)] pub struct Rank { + name: String, signature: Signature, + rank_type: RankType, + /// output data type + data_type: DataType, } impl Rank { - /// Create a new `rank` function - pub fn new() -> Self { + pub fn new(name: String, rank_type: RankType) -> Self { Self { + name, signature: Signature::any(0, Volatility::Immutable), + rank_type, + data_type: DataType::UInt64, } } -} -impl Default for Rank { - fn default() -> Self { - Self::new() + pub fn basic() -> Self { + Rank::new("rank".to_string(), RankType::Basic) + } + + pub fn dense_rank() -> Self { + Rank::new("dense_rank".to_string(), RankType::Dense) + } + + pub fn percent_rank() -> Self { + Rank::new("percent_rank".to_string(), RankType::Percent) + } + + /// Get rank_type of the rank in window function with order by + pub fn get_type(&self) -> RankType { + self.rank_type } } +#[derive(Debug, Copy, Clone)] +pub enum RankType { + Basic, + Dense, + Percent, +} + impl WindowUDFImpl for Rank { fn as_any(&self) -> &dyn Any { self } fn name(&self) -> &str { - "rank" + self.name.as_str() } fn signature(&self) -> &Signature { @@ -80,11 +119,21 @@ impl WindowUDFImpl for Rank { &self, _partition_evaluator_args: PartitionEvaluatorArgs, ) -> Result> { - Ok(Box::::default()) + Ok(Box::new(RankEvaluator { + state: RankState::default(), + rank_type: self.rank_type, + })) } fn field(&self, field_args: WindowUDFFieldArgs) -> Result { - Ok(Field::new(field_args.name(), DataType::UInt64, false)) + match self.rank_type { + RankType::Basic | RankType::Dense => { + Ok(Field::new(field_args.name(), DataType::UInt64, false)) + } + RankType::Percent => { + Ok(Field::new(field_args.name(), DataType::Float64, false)) + } + } } fn sort_options(&self) -> Option { @@ -109,15 +158,15 @@ pub struct RankState { } /// State for the `rank` built-in window function. -#[derive(Debug, Default)] +#[derive(Debug)] struct RankEvaluator { state: RankState, + rank_type: RankType, } impl PartitionEvaluator for RankEvaluator { fn is_causal(&self) -> bool { - // The rank function doesn't need "future" values to emit results: - true + matches!(self.rank_type, RankType::Basic | RankType::Dense) } fn evaluate( @@ -147,33 +196,70 @@ impl PartitionEvaluator for RankEvaluator { self.state.current_group_count += 1; } - Ok(ScalarValue::UInt64(Some( - self.state.last_rank_boundary as u64 + 1, - ))) + match self.rank_type { + RankType::Basic => Ok(ScalarValue::UInt64(Some( + self.state.last_rank_boundary as u64 + 1, + ))), + RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64))), + RankType::Percent => { + exec_err!("Can not execute PERCENT_RANK in a streaming fashion") + } + } } fn evaluate_all_with_rank( &self, - _num_rows: usize, + num_rows: usize, ranks_in_partition: &[Range], ) -> Result { - let result = Arc::new(UInt64Array::from_iter_values( - ranks_in_partition - .iter() - .scan(1_u64, |acc, range| { - let len = range.end - range.start; - let result = iter::repeat(*acc).take(len); - *acc += len as u64; - Some(result) - }) - .flatten(), - )); + let result: ArrayRef = match self.rank_type { + // rank + RankType::Basic => Arc::new(UInt64Array::from_iter_values( + ranks_in_partition + .iter() + .scan(1_u64, |acc, range| { + let len = range.end - range.start; + let result = iter::repeat(*acc).take(len); + *acc += len as u64; + Some(result) + }) + .flatten(), + )), + + // dense_rank + RankType::Dense => Arc::new(UInt64Array::from_iter_values( + ranks_in_partition + .iter() + .zip(1u64..) + .flat_map(|(range, rank)| { + let len = range.end - range.start; + iter::repeat(rank).take(len) + }), + )), + + RankType::Percent => { + let denominator = num_rows as f64; + + Arc::new(Float64Array::from_iter_values( + ranks_in_partition + .iter() + .scan(0_u64, |acc, range| { + let len = range.end - range.start; + let value = (*acc as f64) / (denominator - 1.0).max(1.0); + let result = iter::repeat(value).take(len); + *acc += len as u64; + Some(result) + }) + .flatten(), + )) + } + }; Ok(result) } fn supports_bounded_execution(&self) -> bool { - true + matches!(self.rank_type, RankType::Basic | RankType::Dense) } fn include_rank(&self) -> bool { @@ -212,7 +298,7 @@ mod tests { #[test] fn test_rank() -> Result<()> { - let r = Rank::default(); + let r = Rank::basic(); test_without_rank(&r, vec![1; 8])?; test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?; Ok(()) From 9486489e1acc527ea85182b7ee0d7089bbfa7055 Mon Sep 17 00:00:00 2001 From: jatin Date: Sat, 12 Oct 2024 17:48:32 +0530 Subject: [PATCH 2/8] added test for dense_rank and percent_rank --- datafusion/functions-window/src/rank.rs | 64 +++++++++++++++++++++---- 1 file changed, 56 insertions(+), 8 deletions(-) diff --git a/datafusion/functions-window/src/rank.rs b/datafusion/functions-window/src/rank.rs index 4299cf3b4a12..f88ed57e264e 100644 --- a/datafusion/functions-window/src/rank.rs +++ b/datafusion/functions-window/src/rank.rs @@ -43,13 +43,6 @@ define_udwf_and_expr!( Rank::basic ); -// define_udwf_and_expr!( -// Rank, -// percent_rank, -// "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)", -// Rank::percent_rank -// ); -// // define_udwf_and_expr!( // Rank, // dense_rank, @@ -57,6 +50,13 @@ define_udwf_and_expr!( // Rank::dense_rank // ); +// define_udwf_and_expr!( +// Rank, +// percent_rank, +// "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)", +// Rank::percent_rank +// ); + /// rank expression #[derive(Debug)] pub struct Rank { @@ -270,7 +270,7 @@ impl PartitionEvaluator for RankEvaluator { #[cfg(test)] mod tests { use super::*; - use datafusion_common::cast::as_uint64_array; + use datafusion_common::cast::{as_float64_array, as_uint64_array}; fn test_with_rank(expr: &Rank, expected: Vec) -> Result<()> { test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected) @@ -296,6 +296,22 @@ mod tests { Ok(()) } + fn test_f64_result( + expr: &Rank, + num_rows: usize, + ranks: Vec>, + expected: Vec, + ) -> Result<()> { + let args = PartitionEvaluatorArgs::default(); + let result = expr + .partition_evaluator(args)? + .evaluate_all_with_rank(num_rows, &ranks)?; + let result = as_float64_array(&result)?; + let result = result.values(); + assert_eq!(expected, *result); + Ok(()) + } + #[test] fn test_rank() -> Result<()> { let r = Rank::basic(); @@ -303,4 +319,36 @@ mod tests { test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?; Ok(()) } + + #[test] + fn test_dense_rank() -> Result<()> { + let r = Rank::dense_rank(); + test_without_rank(&r, vec![1; 8])?; + test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?; + Ok(()) + } + + #[test] + #[allow(clippy::single_range_in_vec_init)] + fn test_percent_rank() -> Result<()> { + let r = Rank::percent_rank(); + + // empty case + let expected = vec![0.0; 0]; + test_f64_result(&r, 0, vec![0..0; 0], expected)?; + + // singleton case + let expected = vec![0.0]; + test_f64_result(&r, 1, vec![0..1], expected)?; + + // uniform case + let expected = vec![0.0; 7]; + test_f64_result(&r, 7, vec![0..7], expected)?; + + // non-trivial case + let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5]; + test_f64_result(&r, 7, vec![0..3, 3..7], expected)?; + + Ok(()) + } } From aaa62bb2004a448d7647527cc11f015dde14cf86 Mon Sep 17 00:00:00 2001 From: jatin Date: Sat, 12 Oct 2024 18:27:28 +0530 Subject: [PATCH 3/8] removed unnecessary files and fixed issue for percent_rank and dense_rank udwf --- datafusion/functions-window/src/dense_rank.rs | 205 ------------------ datafusion/functions-window/src/lib.rs | 9 +- .../functions-window/src/percent_rank.rs | 192 ---------------- datafusion/functions-window/src/rank.rs | 34 +-- 4 files changed, 23 insertions(+), 417 deletions(-) delete mode 100644 datafusion/functions-window/src/dense_rank.rs delete mode 100644 datafusion/functions-window/src/percent_rank.rs diff --git a/datafusion/functions-window/src/dense_rank.rs b/datafusion/functions-window/src/dense_rank.rs deleted file mode 100644 index c969a7c46fc6..000000000000 --- a/datafusion/functions-window/src/dense_rank.rs +++ /dev/null @@ -1,205 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! `dense_rank` window function implementation - -use std::any::Any; -use std::fmt::Debug; -use std::iter; -use std::ops::Range; -use std::sync::Arc; - -use crate::define_udwf_and_expr; -use crate::rank::RankState; -use datafusion_common::arrow::array::ArrayRef; -use datafusion_common::arrow::array::UInt64Array; -use datafusion_common::arrow::compute::SortOptions; -use datafusion_common::arrow::datatypes::DataType; -use datafusion_common::arrow::datatypes::Field; -use datafusion_common::utils::get_row_at_idx; -use datafusion_common::{Result, ScalarValue}; -use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDFImpl}; -use datafusion_functions_window_common::field; -use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; -use field::WindowUDFFieldArgs; - -define_udwf_and_expr!( - DenseRank, - dense_rank, - "Returns rank of the current row without gaps. This function counts peer groups" -); - -/// dense_rank expression -#[derive(Debug)] -pub struct DenseRank { - signature: Signature, -} - -impl DenseRank { - /// Create a new `dense_rank` function - pub fn new() -> Self { - Self { - signature: Signature::any(0, Volatility::Immutable), - } - } -} - -impl Default for DenseRank { - fn default() -> Self { - Self::new() - } -} - -impl WindowUDFImpl for DenseRank { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - "dense_rank" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn partition_evaluator( - &self, - _partition_evaluator_args: PartitionEvaluatorArgs, - ) -> Result> { - Ok(Box::::default()) - } - - fn field(&self, field_args: WindowUDFFieldArgs) -> Result { - Ok(Field::new(field_args.name(), DataType::UInt64, false)) - } - - fn sort_options(&self) -> Option { - Some(SortOptions { - descending: false, - nulls_first: false, - }) - } -} - -/// State for the `dense_rank` built-in window function. -#[derive(Debug, Default)] -struct DenseRankEvaluator { - state: RankState, -} - -impl PartitionEvaluator for DenseRankEvaluator { - fn is_causal(&self) -> bool { - // The dense_rank function doesn't need "future" values to emit results: - true - } - - fn evaluate( - &mut self, - values: &[ArrayRef], - range: &Range, - ) -> Result { - let row_idx = range.start; - // There is no argument, values are order by column values (where rank is calculated) - let range_columns = values; - let last_rank_data = get_row_at_idx(range_columns, row_idx)?; - let new_rank_encountered = - if let Some(state_last_rank_data) = &self.state.last_rank_data { - // if rank data changes, new rank is encountered - state_last_rank_data != &last_rank_data - } else { - // First rank seen - true - }; - - if new_rank_encountered { - self.state.last_rank_data = Some(last_rank_data); - self.state.last_rank_boundary += self.state.current_group_count; - self.state.current_group_count = 1; - self.state.n_rank += 1; - } else { - // data is still in the same rank - self.state.current_group_count += 1; - } - - Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64))) - } - - fn evaluate_all_with_rank( - &self, - _num_rows: usize, - ranks_in_partition: &[Range], - ) -> Result { - let result = Arc::new(UInt64Array::from_iter_values( - ranks_in_partition - .iter() - .zip(1u64..) - .flat_map(|(range, rank)| { - let len = range.end - range.start; - iter::repeat(rank).take(len) - }), - )); - - Ok(result) - } - - fn supports_bounded_execution(&self) -> bool { - true - } - - fn include_rank(&self) -> bool { - true - } -} - -#[cfg(test)] -mod tests { - use super::*; - use datafusion_common::cast::as_uint64_array; - - fn test_with_rank(expr: &DenseRank, expected: Vec) -> Result<()> { - test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected) - } - - #[allow(clippy::single_range_in_vec_init)] - fn test_without_rank(expr: &DenseRank, expected: Vec) -> Result<()> { - test_i32_result(expr, vec![0..8], expected) - } - - fn test_i32_result( - expr: &DenseRank, - ranks: Vec>, - expected: Vec, - ) -> Result<()> { - let args = PartitionEvaluatorArgs::default(); - let result = expr - .partition_evaluator(args)? - .evaluate_all_with_rank(8, &ranks)?; - let result = as_uint64_array(&result)?; - let result = result.values(); - assert_eq!(expected, *result); - Ok(()) - } - - #[test] - fn test_dense_rank() -> Result<()> { - let r = DenseRank::default(); - test_without_rank(&r, vec![1; 8])?; - test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?; - Ok(()) - } -} diff --git a/datafusion/functions-window/src/lib.rs b/datafusion/functions-window/src/lib.rs index 0c04d81bcc8b..477d0e498e1c 100644 --- a/datafusion/functions-window/src/lib.rs +++ b/datafusion/functions-window/src/lib.rs @@ -38,10 +38,7 @@ pub mod row_number; /// Fluent-style API for creating `Expr`s pub mod expr_fn { - pub use super::rank::{ - // dense_rank, percent_rank, - rank, - }; + pub use super::rank::{dense_rank, percent_rank, rank}; pub use super::row_number::row_number; } @@ -50,8 +47,8 @@ pub fn all_default_window_functions() -> Vec> { vec![ row_number::row_number_udwf(), rank::rank_udwf(), - // rank::dense_rank_udwf(), - // rank::percent_rank_udwf(), + rank::dense_rank_udwf(), + rank::percent_rank_udwf(), ] } /// Registers all enabled packages with a [`FunctionRegistry`] diff --git a/datafusion/functions-window/src/percent_rank.rs b/datafusion/functions-window/src/percent_rank.rs deleted file mode 100644 index 147959f69be9..000000000000 --- a/datafusion/functions-window/src/percent_rank.rs +++ /dev/null @@ -1,192 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! `percent_rank` window function implementation - -use std::any::Any; -use std::fmt::Debug; -use std::iter; -use std::ops::Range; -use std::sync::Arc; - -use crate::define_udwf_and_expr; -use datafusion_common::arrow::array::ArrayRef; -use datafusion_common::arrow::array::Float64Array; -use datafusion_common::arrow::compute::SortOptions; -use datafusion_common::arrow::datatypes::DataType; -use datafusion_common::arrow::datatypes::Field; -use datafusion_common::{exec_err, Result, ScalarValue}; -use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDFImpl}; -use datafusion_functions_window_common::field; -use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; -use field::WindowUDFFieldArgs; - -define_udwf_and_expr!( - PercentRank, - percent_rank, - "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)" -); - -/// percent_rank expression -#[derive(Debug)] -pub struct PercentRank { - signature: Signature, -} - -impl PercentRank { - /// Create a new `percent_rank` function - pub fn new() -> Self { - Self { - signature: Signature::any(0, Volatility::Immutable), - } - } -} - -impl Default for PercentRank { - fn default() -> Self { - Self::new() - } -} - -impl WindowUDFImpl for PercentRank { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - "percent_rank" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn partition_evaluator( - &self, - _partition_evaluator_args: PartitionEvaluatorArgs, - ) -> Result> { - Ok(Box::::default()) - } - - fn field(&self, field_args: WindowUDFFieldArgs) -> Result { - Ok(Field::new(field_args.name(), DataType::Float64, false)) - } - - fn sort_options(&self) -> Option { - Some(SortOptions { - descending: false, - nulls_first: false, - }) - } -} - -/// State for the `percent_rank` built-in window function. -#[derive(Debug, Default)] -struct PercentRankEvaluator {} - -impl PartitionEvaluator for PercentRankEvaluator { - fn is_causal(&self) -> bool { - // The percent_rank function doesn't need "future" values to emit results: - false - } - - fn evaluate( - &mut self, - _values: &[ArrayRef], - _range: &Range, - ) -> Result { - exec_err!("Can not execute PERCENT_RANK in a streaming fashion") - } - - fn evaluate_all_with_rank( - &self, - num_rows: usize, - ranks_in_partition: &[Range], - ) -> Result { - let denominator = num_rows as f64; - let result = - // Returns the relative rank of the current row, that is (rank - 1) / (total partition rows - 1). The value thus ranges from 0 to 1 inclusive. - Arc::new(Float64Array::from_iter_values( - ranks_in_partition - .iter() - .scan(0_u64, |acc, range| { - let len = range.end - range.start; - let value = (*acc as f64) / (denominator - 1.0).max(1.0); - let result = iter::repeat(value).take(len); - *acc += len as u64; - Some(result) - }) - .flatten(), - )); - - Ok(result) - } - - fn supports_bounded_execution(&self) -> bool { - false - } - - fn include_rank(&self) -> bool { - true - } -} - -#[cfg(test)] -mod tests { - use super::*; - use datafusion_common::cast::as_float64_array; - - fn test_f64_result( - expr: &PercentRank, - num_rows: usize, - ranks: Vec>, - expected: Vec, - ) -> Result<()> { - let args = PartitionEvaluatorArgs::default(); - let result = expr - .partition_evaluator(args)? - .evaluate_all_with_rank(num_rows, &ranks)?; - let result = as_float64_array(&result)?; - let result = result.values(); - assert_eq!(expected, *result); - Ok(()) - } - - #[test] - #[allow(clippy::single_range_in_vec_init)] - fn test_percent_rank() -> Result<()> { - let r = PercentRank::default(); - - // empty case - let expected = vec![0.0; 0]; - test_f64_result(&r, 0, vec![0..0; 0], expected)?; - - // singleton case - let expected = vec![0.0]; - test_f64_result(&r, 1, vec![0..1], expected)?; - - // uniform case - let expected = vec![0.0; 7]; - test_f64_result(&r, 7, vec![0..7], expected)?; - - // non-trivial case - let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5]; - test_f64_result(&r, 7, vec![0..3, 3..7], expected)?; - - Ok(()) - } -} diff --git a/datafusion/functions-window/src/rank.rs b/datafusion/functions-window/src/rank.rs index f88ed57e264e..93c0eb2a4612 100644 --- a/datafusion/functions-window/src/rank.rs +++ b/datafusion/functions-window/src/rank.rs @@ -43,19 +43,19 @@ define_udwf_and_expr!( Rank::basic ); -// define_udwf_and_expr!( -// Rank, -// dense_rank, -// "Returns rank of the current row without gaps. This function counts peer groups", -// Rank::dense_rank -// ); - -// define_udwf_and_expr!( -// Rank, -// percent_rank, -// "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)", -// Rank::percent_rank -// ); +define_udwf_and_expr!( + DenseRank, + dense_rank, + "Returns rank of the current row without gaps. This function counts peer groups", + Rank::dense_rank +); + +define_udwf_and_expr!( + PercentRank, + percent_rank, + "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)", + Rank::percent_rank +); /// rank expression #[derive(Debug)] @@ -69,11 +69,17 @@ pub struct Rank { impl Rank { pub fn new(name: String, rank_type: RankType) -> Self { + let data_type = if matches!(rank_type, RankType::Percent) { + DataType::Float64 + } else { + DataType::UInt64 + }; + Self { name, signature: Signature::any(0, Volatility::Immutable), rank_type, - data_type: DataType::UInt64, + data_type, } } From 4421df65a728938ac1b394a598b28beeb0e7ffc1 Mon Sep 17 00:00:00 2001 From: jatin Date: Sat, 12 Oct 2024 18:54:08 +0530 Subject: [PATCH 4/8] updated the module imports for the percent_rank and dense_rank udwfs --- datafusion/core/tests/fuzz_cases/window_fuzz.rs | 3 +-- datafusion/functions-window/src/lib.rs | 2 -- datafusion/functions-window/src/rank.rs | 16 ++++++---------- .../proto/tests/cases/roundtrip_logical_plan.rs | 4 +--- 4 files changed, 8 insertions(+), 17 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index feffb11bf700..4a33334770a0 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -45,8 +45,7 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use test_utils::add_empty_batches; use datafusion::functions_window::row_number::row_number_udwf; -use datafusion_functions_window::dense_rank::dense_rank_udwf; -use datafusion_functions_window::rank::rank_udwf; +use datafusion_functions_window::rank::{dense_rank_udwf, rank_udwf}; use hashbrown::HashMap; use rand::distributions::Alphanumeric; use rand::rngs::StdRng; diff --git a/datafusion/functions-window/src/lib.rs b/datafusion/functions-window/src/lib.rs index 477d0e498e1c..ef624e13e61c 100644 --- a/datafusion/functions-window/src/lib.rs +++ b/datafusion/functions-window/src/lib.rs @@ -31,8 +31,6 @@ use datafusion_expr::WindowUDF; #[macro_use] pub mod macros; -// pub mod dense_rank; -// pub mod percent_rank; pub mod rank; pub mod row_number; diff --git a/datafusion/functions-window/src/rank.rs b/datafusion/functions-window/src/rank.rs index 93c0eb2a4612..aeaedf0fb4a3 100644 --- a/datafusion/functions-window/src/rank.rs +++ b/datafusion/functions-window/src/rank.rs @@ -132,14 +132,12 @@ impl WindowUDFImpl for Rank { } fn field(&self, field_args: WindowUDFFieldArgs) -> Result { - match self.rank_type { - RankType::Basic | RankType::Dense => { - Ok(Field::new(field_args.name(), DataType::UInt64, false)) - } - RankType::Percent => { - Ok(Field::new(field_args.name(), DataType::Float64, false)) - } - } + let nullable = false; + Ok(Field::new( + field_args.name(), + self.data_type.clone(), + nullable, + )) } fn sort_options(&self) -> Option { @@ -219,7 +217,6 @@ impl PartitionEvaluator for RankEvaluator { ranks_in_partition: &[Range], ) -> Result { let result: ArrayRef = match self.rank_type { - // rank RankType::Basic => Arc::new(UInt64Array::from_iter_values( ranks_in_partition .iter() @@ -232,7 +229,6 @@ impl PartitionEvaluator for RankEvaluator { .flatten(), )), - // dense_rank RankType::Dense => Arc::new(UInt64Array::from_iter_values( ranks_in_partition .iter() diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 8c9b368598de..ffa8fc1eefe9 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -47,9 +47,7 @@ use datafusion::functions_aggregate::expr_fn::{ }; use datafusion::functions_aggregate::min_max::max_udaf; use datafusion::functions_nested::map::map; -use datafusion::functions_window::dense_rank::dense_rank; -use datafusion::functions_window::percent_rank::percent_rank; -use datafusion::functions_window::rank::{rank, rank_udwf}; +use datafusion::functions_window::rank::{dense_rank, percent_rank, rank, rank_udwf}; use datafusion::functions_window::row_number::row_number; use datafusion::prelude::*; use datafusion::test_util::{TestTableFactory, TestTableProvider}; From 0c4b521c42e16e5631e3e68e5038abef79d1cced Mon Sep 17 00:00:00 2001 From: jatin Date: Sat, 12 Oct 2024 19:51:04 +0530 Subject: [PATCH 5/8] removed data_type field from then rank struct --- datafusion/functions-window/src/rank.rs | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/datafusion/functions-window/src/rank.rs b/datafusion/functions-window/src/rank.rs index aeaedf0fb4a3..7052850c37ca 100644 --- a/datafusion/functions-window/src/rank.rs +++ b/datafusion/functions-window/src/rank.rs @@ -63,23 +63,14 @@ pub struct Rank { name: String, signature: Signature, rank_type: RankType, - /// output data type - data_type: DataType, } impl Rank { pub fn new(name: String, rank_type: RankType) -> Self { - let data_type = if matches!(rank_type, RankType::Percent) { - DataType::Float64 - } else { - DataType::UInt64 - }; - Self { name, signature: Signature::any(0, Volatility::Immutable), rank_type, - data_type, } } @@ -114,7 +105,7 @@ impl WindowUDFImpl for Rank { } fn name(&self) -> &str { - self.name.as_str() + &self.name } fn signature(&self) -> &Signature { @@ -132,12 +123,13 @@ impl WindowUDFImpl for Rank { } fn field(&self, field_args: WindowUDFFieldArgs) -> Result { + let return_type = match self.rank_type { + RankType::Basic | RankType::Dense => DataType::UInt64, + RankType::Percent => DataType::Float64, + }; + let nullable = false; - Ok(Field::new( - field_args.name(), - self.data_type.clone(), - nullable, - )) + Ok(Field::new(field_args.name(), return_type, nullable)) } fn sort_options(&self) -> Option { From b5922ebb64d68093a2ec08b964700a187c2ed6d1 Mon Sep 17 00:00:00 2001 From: jatin Date: Sat, 12 Oct 2024 21:15:50 +0530 Subject: [PATCH 6/8] fixed function-window macros --- datafusion/functions-window/src/macros.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-window/src/macros.rs b/datafusion/functions-window/src/macros.rs index e934f883b101..2905ccf4c204 100644 --- a/datafusion/functions-window/src/macros.rs +++ b/datafusion/functions-window/src/macros.rs @@ -303,7 +303,7 @@ macro_rules! create_udwf_expr { ($UDWF:ident, $OUT_FN_NAME:ident, $DOC:expr) => { paste::paste! { #[doc = " Create a [`WindowFunction`](datafusion_expr::Expr::WindowFunction) expression for"] - #[doc = concat!(" [`", stringify!($UDWF), "`] user-defined window function.")] + #[doc = concat!(" `", stringify!($UDWF), "` user-defined window function.")] #[doc = ""] #[doc = concat!(" ", $DOC)] pub fn $OUT_FN_NAME() -> datafusion_expr::Expr { @@ -316,7 +316,7 @@ macro_rules! create_udwf_expr { ($UDWF:ident, $OUT_FN_NAME:ident, [$($PARAM:ident),+], $DOC:expr) => { paste::paste! { #[doc = " Create a [`WindowFunction`](datafusion_expr::Expr::WindowFunction) expression for"] - #[doc = concat!(" [`", stringify!($UDWF), "`] user-defined window function.")] + #[doc = concat!(" `", stringify!($UDWF), "` user-defined window function.")] #[doc = ""] #[doc = concat!(" ", $DOC)] pub fn $OUT_FN_NAME( From 5c58ca17ed0a79d6f4823a4c05a0e12c0e027d8b Mon Sep 17 00:00:00 2001 From: jatin Date: Sat, 12 Oct 2024 22:52:03 +0530 Subject: [PATCH 7/8] removed unused function --- datafusion/functions-window/src/rank.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/datafusion/functions-window/src/rank.rs b/datafusion/functions-window/src/rank.rs index 7052850c37ca..0c62353e4db9 100644 --- a/datafusion/functions-window/src/rank.rs +++ b/datafusion/functions-window/src/rank.rs @@ -85,11 +85,6 @@ impl Rank { pub fn percent_rank() -> Self { Rank::new("percent_rank".to_string(), RankType::Percent) } - - /// Get rank_type of the rank in window function with order by - pub fn get_type(&self) -> RankType { - self.rank_type - } } #[derive(Debug, Copy, Clone)] From a99ac4680b91bad73480268024acd3b400f8fb04 Mon Sep 17 00:00:00 2001 From: jatin Date: Wed, 16 Oct 2024 00:25:35 +0530 Subject: [PATCH 8/8] module doc updated for rank.rs --- datafusion/functions-window/src/rank.rs | 74 ++++++++++++++++++- .../user-guide/sql/window_functions_new.md | 27 +++++++ 2 files changed, 97 insertions(+), 4 deletions(-) diff --git a/datafusion/functions-window/src/rank.rs b/datafusion/functions-window/src/rank.rs index 0c62353e4db9..06c3f49055a5 100644 --- a/datafusion/functions-window/src/rank.rs +++ b/datafusion/functions-window/src/rank.rs @@ -15,13 +15,14 @@ // specific language governing permissions and limitations // under the License. -//! `rank` window function implementation +//! Implementation of `rank`, `dense_rank`, and `percent_rank` window functions, +//! which can be evaluated at runtime during query execution. use std::any::Any; use std::fmt::Debug; use std::iter; use std::ops::Range; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use crate::define_udwf_and_expr; use datafusion_common::arrow::array::ArrayRef; @@ -31,7 +32,10 @@ use datafusion_common::arrow::datatypes::DataType; use datafusion_common::arrow::datatypes::Field; use datafusion_common::utils::get_row_at_idx; use datafusion_common::{exec_err, Result, ScalarValue}; -use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDFImpl}; +use datafusion_expr::window_doc_sections::DOC_SECTION_RANKING; +use datafusion_expr::{ + Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl, +}; use datafusion_functions_window_common::field; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use field::WindowUDFFieldArgs; @@ -57,7 +61,7 @@ define_udwf_and_expr!( Rank::percent_rank ); -/// rank expression +/// Rank calculates the rank in the window function with order by #[derive(Debug)] pub struct Rank { name: String, @@ -66,6 +70,7 @@ pub struct Rank { } impl Rank { + /// Create a new `rank` function with the specified name and rank type pub fn new(name: String, rank_type: RankType) -> Self { Self { name, @@ -74,14 +79,17 @@ impl Rank { } } + /// Create a `rank` window function pub fn basic() -> Self { Rank::new("rank".to_string(), RankType::Basic) } + /// Create a `dense_rank` window function pub fn dense_rank() -> Self { Rank::new("dense_rank".to_string(), RankType::Dense) } + /// Create a `percent_rank` window function pub fn percent_rank() -> Self { Rank::new("percent_rank".to_string(), RankType::Percent) } @@ -94,6 +102,56 @@ pub enum RankType { Percent, } +static RANK_DOCUMENTATION: OnceLock = OnceLock::new(); + +fn get_rank_doc() -> &'static Documentation { + RANK_DOCUMENTATION.get_or_init(|| { + Documentation::builder() + .with_doc_section(DOC_SECTION_RANKING) + .with_description( + "Returns the rank of the current row within its partition, allowing \ + gaps between ranks. This function provides a ranking similar to `row_number`, but \ + skips ranks for identical values.", + ) + .with_syntax_example("rank()") + .build() + .unwrap() + }) +} + +static DENSE_RANK_DOCUMENTATION: OnceLock = OnceLock::new(); + +fn get_dense_rank_doc() -> &'static Documentation { + DENSE_RANK_DOCUMENTATION.get_or_init(|| { + Documentation::builder() + .with_doc_section(DOC_SECTION_RANKING) + .with_description( + "Returns the rank of the current row without gaps. This function ranks \ + rows in a dense manner, meaning consecutive ranks are assigned even for identical \ + values.", + ) + .with_syntax_example("dense_rank()") + .build() + .unwrap() + }) +} + +static PERCENT_RANK_DOCUMENTATION: OnceLock = OnceLock::new(); + +fn get_percent_rank_doc() -> &'static Documentation { + PERCENT_RANK_DOCUMENTATION.get_or_init(|| { + Documentation::builder() + .with_doc_section(DOC_SECTION_RANKING) + .with_description( + "Returns the percentage rank of the current row within its partition. \ + The value ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`.", + ) + .with_syntax_example("percent_rank()") + .build() + .unwrap() + }) +} + impl WindowUDFImpl for Rank { fn as_any(&self) -> &dyn Any { self @@ -133,6 +191,14 @@ impl WindowUDFImpl for Rank { nulls_first: false, }) } + + fn documentation(&self) -> Option<&Documentation> { + match self.rank_type { + RankType::Basic => Some(get_rank_doc()), + RankType::Dense => Some(get_dense_rank_doc()), + RankType::Percent => Some(get_percent_rank_doc()), + } + } } /// State for the RANK(rank) built-in window function. diff --git a/docs/source/user-guide/sql/window_functions_new.md b/docs/source/user-guide/sql/window_functions_new.md index ee981911f5cb..462fc900d139 100644 --- a/docs/source/user-guide/sql/window_functions_new.md +++ b/docs/source/user-guide/sql/window_functions_new.md @@ -157,8 +157,35 @@ All [aggregate functions](aggregate_functions.md) can be used as window function ## Ranking Functions +- [dense_rank](#dense_rank) +- [percent_rank](#percent_rank) +- [rank](#rank) - [row_number](#row_number) +### `dense_rank` + +Returns the rank of the current row without gaps. This function ranks rows in a dense manner, meaning consecutive ranks are assigned even for identical values. + +``` +dense_rank() +``` + +### `percent_rank` + +Returns the percentage rank of the current row within its partition. The value ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`. + +``` +percent_rank() +``` + +### `rank` + +Returns the rank of the current row within its partition, allowing gaps between ranks. This function provides a ranking similar to `row_number`, but skips ranks for identical values. + +``` +rank() +``` + ### `row_number` Number of the current row within its partition, counting from 1.