Skip to content

DataFrame API: allow aggregate functions in select() #17874

@Jefffrey

Description

@Jefffrey

Is your feature request related to a problem or challenge?

Be able to run something like this:

use datafusion::error::Result;
use datafusion::functions_aggregate::expr_fn::approx_distinct;
use datafusion::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();

    let testdata = datafusion::test_util::arrow_test_data();
    ctx.register_csv(
        "aggregate_test_100",
        format!("{testdata}/csv/aggregate_test_100.csv"),
        CsvReadOptions::default().has_header(true),
    )
    .await?;

    ctx.table("aggregate_test_100")
        .await?
        // Run aggregate in select() without needing to do via aggregate()
        .select(vec![
            approx_distinct(col("c9")).alias("count_c9"),
            approx_distinct(cast(col("c9"), arrow_schema::DataType::Utf8View))
                .alias("count_c9_str"),
        ])?
        .show()
        .await?;

    Ok(())
}

Currently failing with:

Error: NotImplemented("Physical plan does not support logical expression AggregateFunction(AggregateFunction { func: AggregateUDF { inner: ApproxDistinct { name: \"approx_distinct\", signature: Signature { type_signature: Any(1), volatility: Immutable } } }, params: AggregateFunctionParams { args: [Column(Column { relation: Some(Bare { table: \"aggregate_test_100\" }), name: \"c9\" })], distinct: false, filter: None, order_by: [], null_treatment: None } })")

Describe the solution you'd like

If using aggregate via select() then assume no group by.

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Labels

enhancementNew feature or request

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions