Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions datafusion/physical-expr/src/aggregate/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,35 @@ impl PartialEq<dyn Any> for FirstValue {
#[derive(Debug)]
struct FirstValueAccumulator {
first: ScalarValue,
// At the beginning, `is_set` is `false`, this means `first` is not seen yet.
// Once we see (`is_set=true`) first value, we do not update `first`.
is_set: bool,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another potential way that might let the compiler check that is_set was updated correctly would be something like:

struct FirstValueAccumulator {
  /// None until the data has been seen
  first: Option<ScalarValue>
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have considered this option, however, during evaluate call if first is None. We need to return NULL for appropriate type, such as ScalarValue::Int64(None) if type is Int64. However, datatype is only available during initialization. If we want to use Option for first, we may need to store data type also. Hence I opted for this approach

}

impl FirstValueAccumulator {
/// Creates a new `FirstValueAccumulator` for the given `data_type`.
pub fn try_new(data_type: &DataType) -> Result<Self> {
ScalarValue::try_from(data_type).map(|value| Self { first: value })
ScalarValue::try_from(data_type).map(|value| Self {
first: value,
is_set: false,
})
}
}

impl Accumulator for FirstValueAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.first.clone()])
Ok(vec![
self.first.clone(),
ScalarValue::Boolean(Some(self.is_set)),
])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
// If we have seen first value, we shouldn't update it
let values = &values[0];
if !values.is_empty() {
if !values.is_empty() && !self.is_set {
self.first = ScalarValue::try_from_array(values, 0)?;
self.is_set = true;
}
Ok(())
}
Expand Down Expand Up @@ -270,3 +280,41 @@ impl Accumulator for LastValueAccumulator {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.last) + self.last.size()
}
}

#[cfg(test)]
mod tests {
use crate::aggregate::first_last::{FirstValueAccumulator, LastValueAccumulator};
use arrow_array::{ArrayRef, Int64Array};
use arrow_schema::DataType;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Accumulator;
use std::sync::Arc;

#[test]
fn test_first_last_value_value() -> Result<()> {
let mut first_accumulator = FirstValueAccumulator::try_new(&DataType::Int64)?;
let mut last_accumulator = LastValueAccumulator::try_new(&DataType::Int64)?;
// first value in the tuple is start of the range (inclusive),
// second value in the tuple is end of the range (exclusive)
let ranges: Vec<(i64, i64)> = vec![(0, 10), (1, 11), (2, 13)];
// create 3 ArrayRefs between each interval e.g from 0 to 9, 1 to 10, 2 to 12
let arrs = ranges
.into_iter()
.map(|(start, end)| {
Arc::new(Int64Array::from((start..end).collect::<Vec<_>>())) as ArrayRef
})
.collect::<Vec<_>>();
for arr in arrs {
// Once first_value is set, accumulator should remember it.
// It shouldn't update first_value for each new batch
first_accumulator.update_batch(&[arr.clone()])?;
// last_value should be updated for each new batch.
last_accumulator.update_batch(&[arr])?;
}
// First Value comes from the first value of the first batch which is 0
assert_eq!(first_accumulator.evaluate()?, ScalarValue::Int64(Some(0)));
// Last value comes from the last value of the last batch which is 12
assert_eq!(last_accumulator.evaluate()?, ScalarValue::Int64(Some(12)));
Ok(())
}
}