Skip to content

FIRST accumulator gives wrong result when fed with multiple batches #6502

@mustafasrepo

Description

@mustafasrepo

Describe the bug

First Accumulator in its current implementation is not aware of whether it is set or not. Hence when fed with multiple batches, it returns first value of the last batch.

To Reproduce

One can use test below to reproduce problem

#[tokio::test]
async fn test_first_value_multi_partition() -> Result<()> {
    let config = SessionConfig::new()
        .with_target_partitions(1);
    let ctx = SessionContext::with_config(config);
    let fields = vec![Field::new("a", DataType::Int64, false)];
    let schema = Arc::new(Schema::new(fields));
    let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int64Array::from(vec![1,2,3,4])) as ArrayRef])?;
    let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int64Array::from(vec![5,6,7,8])) as ArrayRef])?;
    let partitions = vec![vec![batch1, batch2]];
    let mem_table = MemTable::try_new(schema, partitions)?;
    ctx.register_table("table1", Arc::new(mem_table))?;

    let sql = "SELECT FIRST_VALUE(a)
                    FROM table1";

    let msg = format!("Creating logical plan for '{sql}'");
    let dataframe = ctx.sql(sql).await.expect(&msg);
    let physical_plan = dataframe.create_physical_plan().await?;
    let batches = collect(physical_plan, ctx.task_ctx()).await?;
    print_batches(&batches)?;
    Ok(())
}

result should be 1 (first value of the first batch), where as now it returns 5(first_value of the last batch).

Expected behavior

Above test should return 1

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions