From 2ae57516144591b10d09b73cb76784add2d68613 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Fri, 13 Aug 2021 17:59:27 +0800 Subject: [PATCH 1/7] [docs] add how-to-write-aggregate-functions --- website/datafuse/mkdocs.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/website/datafuse/mkdocs.yml b/website/datafuse/mkdocs.yml index 075b4438f2bbe..98307546af9c7 100755 --- a/website/datafuse/mkdocs.yml +++ b/website/datafuse/mkdocs.yml @@ -189,6 +189,7 @@ nav: - Tracing: development/tracing.md - Profling: development/profiling.md - Roadmap: development/roadmap.md + - How to write aggregate functions: development/how-to-write-aggregate-functions.md - RFCs: - DatafuseQuery Join: rfcs/query/0001-join-framework-design.md - DatafuseQuery Expression: rfcs/query/0002-plan-expression.md From 8048aac932a95417969b0392d6790eb6762389ad Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Fri, 13 Aug 2021 17:59:32 +0800 Subject: [PATCH 2/7] [docs] add how-to-write-aggregate-functions --- .../how-to-write-aggregate-functions.md | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 website/datafuse/docs/development/how-to-write-aggregate-functions.md diff --git a/website/datafuse/docs/development/how-to-write-aggregate-functions.md b/website/datafuse/docs/development/how-to-write-aggregate-functions.md new file mode 100644 index 0000000000000..8de8d0a5722ca --- /dev/null +++ b/website/datafuse/docs/development/how-to-write-aggregate-functions.md @@ -0,0 +1,89 @@ +--- +id: how-to-write-aggregate-functions.md +title: How to write aggregate functions +--- + +## How to write aggregate functions + +Datafuse allows you to write custom aggregate functions through rust codes. +It's not an easy way because you need to be rustacean first. Datafuse has a plan to support writing UDAF in other languages(like js, web assembly) in the future. + +In this section we will talk about how to write aggregate functions in datatfuse. + + +## AggregateFunction trait introduction + +All aggregate functions implement `AggregateFunction` trait, and we register them into a global static factory named `FactoryFuncRef`, the factory is just an index map and the keys are names of aggregate functions, noted that the function's name in datafuse are all case insensitive. + + +``` rust +pub trait AggregateFunction: fmt::Display + Sync + Send { + fn name(&self) -> &str; + fn return_type(&self) -> Result; + fn nullable(&self, _input_schema: &DataSchema) -> Result; + + fn init_state(&self, place: StateAddr); + fn state_layout(&self) -> Layout; + + fn accumulate(&self, _place: StateAddr, _arrays: &[Series], _input_rows: usize) -> Result<()>; + + fn accumulate_keys( + &self, + _places: &[StateAddr], + _offset: usize, + _arrays: &[Series], + _input_rows: usize, + ) -> Result<()>; + + fn serialize(&self, _place: StateAddr, _writer: &mut BytesMut) -> Result<()>; + + fn deserialize(&self, _place: StateAddr, _reader: &mut &[u8]) -> Result<()>; + + fn merge(&self, _place: StateAddr, _rhs: StateAddr) -> Result<()>; + + fn merge_result(&self, _place: StateAddr) -> Result; +} +``` + +### Understand the functions + +- The function `name` indicates the name of this function, such as `sum`, `min` +- The function `return_type` indicates the return type of the function, it may vary with different arguments, such as `sum(int8)` -> `int64`, `sum(uint8)` -> `uint64`, `sum(float64)` -> `float64`. +- The function `nullable` indicates whether the `return_type` is nullable. + +Before we start to introduce function `init_state`, let's ask a question: + + > what's aggregate function state? + +It indicates the temporary result of the aggregate function. Because aggregate function accumulates block by block and there will results from different query nodes after the accumulations. The state must be mergeable, serializable. + +For example, in the `avg` aggregate function, we can represent the state like: + +``` +struct AggregateAvgState { + pub value: T, + pub count: u64, +} +``` + +- The function `init_state` wants us to initialize the aggregate function state, we ensure the memory is already allocated, we just need to initial the state with the initial value. +- The function `state_layout` indicates the memory layout of the state. +- The function `accumulate`, this function is used in aggregation with single batch, which means the whole block can be aggregated in a single state, no more other keys. A SQL query which applys aggregation without group-by columns will hit this function. + +Noted that the argument `_arrays` is the function arguments, we can safely get the array by index without index bound check because we must validate the argument numbers and types in function constructor. + + The `_input_rows` is the rows of current block, it maybe useful when the `_arrays` is empty, eg: `count()` function. + + +- The function `accumulate_keys`, similar to `accumulate` but we must take into consideration of the `keys` and `offset`, each key reprsents an unique memory address named `place`. +- The function `serialize`, we used to serialize state into binary. +- The function `deserialize`, we used to deserialize state from binary. +- The function `merge`, can be used to merge other state into current state. +- The function `merge_result`, can be used to represent the aggregate function state into one-row field. + +## Refer to other examples +As you see, adding a new aggregate function in datafuse is not as hard as you think. +Before we start to add one, please refer to other aggregate function examples, such like `min`, `count`, `sum`, `avg`. + +## Summary +Welcome all community users to contribute more powerful functions into datafuse. If you found any problems, feel free to open an issue in Github, we will try our best to help you. From f65ba80fb64ea1a11f31fb0fb22b4153a8dde779 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Fri, 13 Aug 2021 18:14:19 +0800 Subject: [PATCH 3/7] [docs] add how-to-write-aggregate-functions --- .../docs/development/how-to-write-aggregate-functions.md | 2 +- website/datafuse/mkdocs.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/website/datafuse/docs/development/how-to-write-aggregate-functions.md b/website/datafuse/docs/development/how-to-write-aggregate-functions.md index 8de8d0a5722ca..3f12dadb964e3 100644 --- a/website/datafuse/docs/development/how-to-write-aggregate-functions.md +++ b/website/datafuse/docs/development/how-to-write-aggregate-functions.md @@ -72,7 +72,7 @@ struct AggregateAvgState { Noted that the argument `_arrays` is the function arguments, we can safely get the array by index without index bound check because we must validate the argument numbers and types in function constructor. - The `_input_rows` is the rows of current block, it maybe useful when the `_arrays` is empty, eg: `count()` function. + The `_input_rows` is the rows of current block, it may be useful when the `_arrays` is empty, eg: `count()` function. - The function `accumulate_keys`, similar to `accumulate` but we must take into consideration of the `keys` and `offset`, each key reprsents an unique memory address named `place`. diff --git a/website/datafuse/mkdocs.yml b/website/datafuse/mkdocs.yml index 98307546af9c7..ce7cb6c025497 100755 --- a/website/datafuse/mkdocs.yml +++ b/website/datafuse/mkdocs.yml @@ -186,10 +186,10 @@ nav: - Development: - Contributing: development/contributing.md - Coding Guideline: development/coding-guidelines.md + - How to write aggregate functions: development/how-to-write-aggregate-functions.md - Tracing: development/tracing.md - Profling: development/profiling.md - Roadmap: development/roadmap.md - - How to write aggregate functions: development/how-to-write-aggregate-functions.md - RFCs: - DatafuseQuery Join: rfcs/query/0001-join-framework-design.md - DatafuseQuery Expression: rfcs/query/0002-plan-expression.md From e6b90ce070ae93624fa59da0fe60af0fe4f9b2d1 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Fri, 13 Aug 2021 20:43:10 +0800 Subject: [PATCH 4/7] [docs] apply suggestions --- .../how-to-write-aggregate-functions.md | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/website/datafuse/docs/development/how-to-write-aggregate-functions.md b/website/datafuse/docs/development/how-to-write-aggregate-functions.md index 3f12dadb964e3..77b14ca6ca1a7 100644 --- a/website/datafuse/docs/development/how-to-write-aggregate-functions.md +++ b/website/datafuse/docs/development/how-to-write-aggregate-functions.md @@ -5,15 +5,15 @@ title: How to write aggregate functions ## How to write aggregate functions -Datafuse allows you to write custom aggregate functions through rust codes. -It's not an easy way because you need to be rustacean first. Datafuse has a plan to support writing UDAF in other languages(like js, web assembly) in the future. +Datafuse allows you to write custom aggregate functions through rust code. +It's not an easy way because you need to be a rustacean first. Datafuse has a plan to support writing UDAFs in other languages(like js, web assembly) in the future. In this section we will talk about how to write aggregate functions in datatfuse. ## AggregateFunction trait introduction -All aggregate functions implement `AggregateFunction` trait, and we register them into a global static factory named `FactoryFuncRef`, the factory is just an index map and the keys are names of aggregate functions, noted that the function's name in datafuse are all case insensitive. +All aggregate functions implement `AggregateFunction` trait, and we register them into a global static factory named `FactoryFuncRef`, the factory is just an index map and the keys are names of aggregate functions, noted that a function's name in datafuse is case-insensitive. ``` rust @@ -51,11 +51,11 @@ pub trait AggregateFunction: fmt::Display + Sync + Send { - The function `return_type` indicates the return type of the function, it may vary with different arguments, such as `sum(int8)` -> `int64`, `sum(uint8)` -> `uint64`, `sum(float64)` -> `float64`. - The function `nullable` indicates whether the `return_type` is nullable. -Before we start to introduce function `init_state`, let's ask a question: +Before we start to introduce the function `init_state`, let's ask a question: > what's aggregate function state? -It indicates the temporary result of the aggregate function. Because aggregate function accumulates block by block and there will results from different query nodes after the accumulations. The state must be mergeable, serializable. +It indicates the temporary results of an aggregate function. Because an aggregate function accumulates data in columns block by block and there will be results after the aggregation. Therefore, the state must be mergeable, serializable. For example, in the `avg` aggregate function, we can represent the state like: @@ -66,16 +66,16 @@ struct AggregateAvgState { } ``` -- The function `init_state` wants us to initialize the aggregate function state, we ensure the memory is already allocated, we just need to initial the state with the initial value. +- The function `init_state` initializes the aggregate function state, we ensure the memory is already allocated, and we just need to initial the state with the initial value. - The function `state_layout` indicates the memory layout of the state. -- The function `accumulate`, this function is used in aggregation with single batch, which means the whole block can be aggregated in a single state, no more other keys. A SQL query which applys aggregation without group-by columns will hit this function. +- The function `accumulate` is used in aggregation with a single batch, which means the whole block can be aggregated in a single state, no other keys. AA SQL query, which applies aggregation without group-by columns, will hit this function. Noted that the argument `_arrays` is the function arguments, we can safely get the array by index without index bound check because we must validate the argument numbers and types in function constructor. - The `_input_rows` is the rows of current block, it may be useful when the `_arrays` is empty, eg: `count()` function. + The `_input_rows` is the rows of the current block, and it may be useful when the `_arrays` is empty, e.g., `count()` function. -- The function `accumulate_keys`, similar to `accumulate` but we must take into consideration of the `keys` and `offset`, each key reprsents an unique memory address named `place`. +- The function `accumulate_keys` is similar to accumulate, but we must take into consideration of the keys and offsets, for which each key represents a unique memory address named place. - The function `serialize`, we used to serialize state into binary. - The function `deserialize`, we used to deserialize state from binary. - The function `merge`, can be used to merge other state into current state. @@ -83,7 +83,7 @@ Noted that the argument `_arrays` is the function arguments, we can safely get t ## Refer to other examples As you see, adding a new aggregate function in datafuse is not as hard as you think. -Before we start to add one, please refer to other aggregate function examples, such like `min`, `count`, `sum`, `avg`. +Before you start to add one, please refer to other aggregate function examples, such as `min`, `count`, `sum`, `avg`. ## Summary -Welcome all community users to contribute more powerful functions into datafuse. If you found any problems, feel free to open an issue in Github, we will try our best to help you. +We welcome all community users to contribute more powerful functions to datafuse. If you find any problems, feel free to open an issue in Github, we will use our best efforts to help you. From 9499ae5d67c071a750f2ab8e11cf0e670489d9fb Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Fri, 13 Aug 2021 20:44:27 +0800 Subject: [PATCH 5/7] [docs] apply suggestions --- .../docs/development/how-to-write-aggregate-functions.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/website/datafuse/docs/development/how-to-write-aggregate-functions.md b/website/datafuse/docs/development/how-to-write-aggregate-functions.md index 77b14ca6ca1a7..f03cd5c96d7d7 100644 --- a/website/datafuse/docs/development/how-to-write-aggregate-functions.md +++ b/website/datafuse/docs/development/how-to-write-aggregate-functions.md @@ -76,8 +76,8 @@ Noted that the argument `_arrays` is the function arguments, we can safely get t - The function `accumulate_keys` is similar to accumulate, but we must take into consideration of the keys and offsets, for which each key represents a unique memory address named place. -- The function `serialize`, we used to serialize state into binary. -- The function `deserialize`, we used to deserialize state from binary. +- The function `serialize` serializes state into binary. +- The function `deserialize` deserializes state from binary. - The function `merge`, can be used to merge other state into current state. - The function `merge_result`, can be used to represent the aggregate function state into one-row field. From befc2b35b86c331c699be50f97e9944cd29e2f66 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Fri, 13 Aug 2021 20:45:01 +0800 Subject: [PATCH 6/7] [docs] apply suggestions --- .../docs/development/how-to-write-aggregate-functions.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/datafuse/docs/development/how-to-write-aggregate-functions.md b/website/datafuse/docs/development/how-to-write-aggregate-functions.md index f03cd5c96d7d7..4b4ec88d0eba6 100644 --- a/website/datafuse/docs/development/how-to-write-aggregate-functions.md +++ b/website/datafuse/docs/development/how-to-write-aggregate-functions.md @@ -5,7 +5,7 @@ title: How to write aggregate functions ## How to write aggregate functions -Datafuse allows you to write custom aggregate functions through rust code. +Datafuse allows us to write custom aggregate functions through rust code. It's not an easy way because you need to be a rustacean first. Datafuse has a plan to support writing UDAFs in other languages(like js, web assembly) in the future. In this section we will talk about how to write aggregate functions in datatfuse. From 4b2062774396ce1f996056467f273adbee983ca7 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Sat, 14 Aug 2021 07:30:30 +0800 Subject: [PATCH 7/7] [docs] add example --- .../how-to-write-aggregate-functions.md | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/website/datafuse/docs/development/how-to-write-aggregate-functions.md b/website/datafuse/docs/development/how-to-write-aggregate-functions.md index 4b4ec88d0eba6..479bba47e1941 100644 --- a/website/datafuse/docs/development/how-to-write-aggregate-functions.md +++ b/website/datafuse/docs/development/how-to-write-aggregate-functions.md @@ -81,9 +81,37 @@ Noted that the argument `_arrays` is the function arguments, we can safely get t - The function `merge`, can be used to merge other state into current state. - The function `merge_result`, can be used to represent the aggregate function state into one-row field. + +## Example +Let's take an example of aggregate function `sum`. + +It's declared as `AggregateSumFunction`, because if can accept varying integer types like `UInt8Type`, `Int8Type`. `T` and `SumT` is logic types which implement `DFNumericType`. e.g., `T` is `UInt8Type` and `SumT` must be `UInt64Type`. + +We can dispatch it using macros by matching the types of the arguments. Take a look at the `dispatch_numeric_types` to understand the dispatch macros. + +The `AggregateSumState` will be +``` +struct AggregateSumState { + pub value: Option, +} +``` + +The generic `T` is from `SumT::Native`, the `Option` can return `null` if nothing is passed into this function. + +Let's take into the function `accumulate_keys`, because this is the only function that a little hard to understand in this case. + +The `places` is the memory address of the first state in this row, so we can get the address of `AggregateSumState` using `places[row] + offset`, then using `place.get::>()` to get the value of the corresponding state. + +Since we already know the array type of this function, we can safely cast it to arrow's `PrimitiveArray`, here we make two branches to reduce the branch prediction of CPU, `null` and `no_null`. In `no_null` case, we just iterate the array and apply the `sum`, this is good for compiler to optimize the codes into vectorized codes. + +Ok, this example is pretty easy. If you already read this, you may have the ability to write a new function. + ## Refer to other examples As you see, adding a new aggregate function in datafuse is not as hard as you think. -Before you start to add one, please refer to other aggregate function examples, such as `min`, `count`, `sum`, `avg`. +Before you start to add one, please refer to other aggregate function examples, such as `min`, `count`, `max`, `avg`. + +## Testing +To be a good engineer, dont'forget to test your codes, please add unit tests and stateless tests after you finish the new aggregate functions. ## Summary We welcome all community users to contribute more powerful functions to datafuse. If you find any problems, feel free to open an issue in Github, we will use our best efforts to help you.