Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rust] [DataFusion] Average is not correct #25966

Closed
asfimport opened this issue Sep 8, 2020 · 3 comments
Closed

[Rust] [DataFusion] Average is not correct #25966

asfimport opened this issue Sep 8, 2020 · 3 comments

Comments

@asfimport
Copy link

The current design of aggregates makes the calculation of the average incorrect.

Namely, if there are multiple input partitions, the result is average of the averages. For example if the input it in two batches [1,2], and [3,4,5], datafusion will say "average=3.25" rather than "average=3".

It also makes it impossible to compute the geometric mean, distinct sum, and other operations.

The central issue is that Accumulator returns a ScalarValue during partial aggregations via get_value, but very often a ScalarValue is not sufficient information to perform the full aggregation.

A simple example is the average of 5 numbers, x1, x2, x3, x4, x5, that are distributed in batches of 2,

{[x1, x2], [x3, x4], [x5]}

. Our current calculation performs partial means,

{(x1+x2)/2, (x3+x4)/2, x5}

, and then reduces them using another average, i.e.

((x1+x2)/2 + (x3+x4)/2 + x5)/3

which is not equal to (x1 + x2 + x3 + x4 + x5)/5.

I believe that our Accumulators need to pass more information from the partial aggregations to the final aggregation.

We could consider taking an API equivalent to spark, i.e. have an update, a merge and an evaluate.

Code with a failing test (src/execution/context.rs)

    #[test]
    fn simple_avg() -> Result<()> {
        let schema = Schema::new(vec![
            Field::new("a", DataType::Int32, false),
        ]);

        let batch1 = RecordBatch::try_new(
            Arc::new(schema.clone()),
            vec![
                Arc::new(Int32Array::from(vec![1, 2, 3])),
            ],
        )?;
        let batch2 = RecordBatch::try_new(
            Arc::new(schema.clone()),
            vec![
                Arc::new(Int32Array::from(vec![4, 5])),
            ],
        )?;

        let mut ctx = ExecutionContext::new();

        let provider = MemTable::new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?;
        ctx.register_table("t", Box::new(provider));

        let result = collect(&mut ctx, "SELECT AVG(a) FROM t")?;

        let batch = &result[0];
        assert_eq!(1, batch.num_columns());
        assert_eq!(1, batch.num_rows());

        let values = batch
            .column(0)
            .as_any()
            .downcast_ref::<Float64Array>()
            .expect("failed to cast version");
        assert_eq!(values.len(), 1);
        // avg(1,2,3,4,5) = 3.0
        assert_eq!(values.value(0), 3.0_f64);
        Ok(())
    }

Reporter: Jorge Leitão / @jorgecarleitao
Assignee: Jorge Leitão / @jorgecarleitao

PRs and other links:

Note: This issue was originally created as ARROW-9937. Please see the migration documentation for further details.

@asfimport
Copy link
Author

Jorge Leitão / @jorgecarleitao:
@andygrove, I remember that you wanted to touch this. If not, let me know and I take a shoot at it.

Looking at Ballista's source code for this , I think that we have the same issue there. :/

@asfimport
Copy link
Author

Andrew Lamb / @alamb:
This is bad correctness bug – we should definitely fix this

@asfimport
Copy link
Author

Andy Grove / @andygrove:
Issue resolved by pull request 8172
#8172

@asfimport asfimport added this to the 2.0.0 milestone Jan 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants