Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it easier to create WindowFunctions with the Expr API #6747

Open
alamb opened this issue Jun 22, 2023 · 17 comments · May be fixed by #6746
Open

Make it easier to create WindowFunctions with the Expr API #6747

alamb opened this issue Jun 22, 2023 · 17 comments · May be fixed by #6746
Assignees
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@alamb
Copy link
Contributor

alamb commented Jun 22, 2023

Is your feature request related to a problem or challenge?

Follow on to #5781

There are at least three things named WindowFunction in DataFusion -- Expr::WindowFunction, window_function::WindowFunction and expr::WindowFunction

https://docs.rs/datafusion-expr/26.0.0/datafusion_expr/index.html?search=WindowFunction

Constructing an Expr::WindowFunction to pass to LogicalPlanBuilder::window is quite challenging

Describe the solution you'd like

I would like to make this process easier with a builder style:

for lead(foo) OVER(PARTITION BY bar) for example:

let expr = lead(col("foo"))
  .with_partition_by(col("bar"))

Describe alternatives you've considered

No response

Additional context

No response

@alamb
Copy link
Contributor Author

alamb commented May 2, 2024

Here is another example from #10345 / @timsaucer showing how non easy it is to create a window function via the expr API

use datafusion::{logical_expr::{expr::WindowFunction, BuiltInWindowFunction, WindowFrame, WindowFunctionDefinition}, prelude::*};

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {

    let ctx = SessionContext::new();
    let mut df = ctx.read_csv("/Users/tsaucer/working/testing_ballista/lead_lag/example.csv", CsvReadOptions::default()).await?;

    df = df.with_column("array_col", make_array(vec![col("a"), col("b"), col("c")]))?;

    df.clone().show().await?;

    let lag_expr = Expr::WindowFunction(WindowFunction::new(
        WindowFunctionDefinition::BuiltInWindowFunction(
            BuiltInWindowFunction::Lead,
        ),
        vec![col("array_col")],
        vec![],
        vec![],
        WindowFrame::new(None),
        None,
    ));

    df = df.select(vec![col("a"), col("b"), col("c"), col("array_col"), lag_expr.alias("lagged")])?;

    df.show().await?;

    Ok(())
}

It would be great if instead of

    let lag_expr = Expr::WindowFunction(WindowFunction::new(
        WindowFunctionDefinition::BuiltInWindowFunction(
            BuiltInWindowFunction::Lead,
        ),
        vec![col("array_col")],
        vec![],
        vec![],
        WindowFrame::new(None),
        None,
    ));

It looked more like

    let lag_expr = lead(
        vec![col("array_col")],
        vec![],
        vec![],
        WindowFrame::new(None),
        None,
    ));

Maybe even better like a builder style

    let lag_expr = lead(col("array_col")).build()

Which would permit adding the various OVER clauses like

    let lag_expr = lead(col("array_col"))
      .partition_by(vec![])
      .order_by(vec![])
      .build()

Maybe there are some inspirations in the polars API too: https://docs.pola.rs/user-guide/expressions/window/#group-by-aggregations-in-selection

@alamb alamb added the help wanted Extra attention is needed label May 2, 2024
@alamb
Copy link
Contributor Author

alamb commented May 2, 2024

🤔 it seems like spark's API is like

count("dt").over(w).alias("count")).show()

https://stackoverflow.com/questions/32769328/how-to-use-window-functions-in-pyspark-using-dataframes

So maybe for DataFusion it could look like

   let w = Window::new()
     .partition_by(col("id"))
     .order_by(col("dt"));

    let lag_expr = lag(col("array_col"))
       .over(w)

@alamb
Copy link
Contributor Author

alamb commented May 2, 2024

Note I have some code in #6746 that had some part of it (along with an example)

@shanretoo
Copy link
Contributor

I am willing to help with this task.

@timsaucer
Copy link
Contributor

Great! I've rebased @alamb 's branch and added the changes I suggested. I was about to start testing the code and then I was going to write up the unit tests. My work in progress is here: https://github.com/timsaucer/datafusion/tree/feature/easier_window_funcs There was a little bit of changes I needed to make around the null_options. I got distracted by a task in the datafusion-python repo but I was hoping to tackle this very soon.

@shanretoo
Copy link
Contributor

Thanks for your update! I'll work on the tests.

@shanretoo
Copy link
Contributor

FYI, my work is in: https://github.com/shanretoo/datafusion/tree/feat-window-fn

@shanretoo
Copy link
Contributor

@timsaucer I have fixed the calls of expr::WindowFunction to meet the changes and add tests for those window functions in dataframe_functions.rs.
Let me know if I missed anything.

@timsaucer
Copy link
Contributor

Oh, great. Have you been able to run the example code above using the new easy interface?

@shanretoo
Copy link
Contributor

You can check it in the unit test: test_fn_lead.

@timsaucer
Copy link
Contributor

Thank you. I pulled your branch and many of the tests are failing for me even though the functions are returning correct values when I add additional debug statements. I think what's happening here is that because we have the partition_by there is no guarantee what order the results come back as. On my machine the unit tests are returning the partitions on column C in order 10 then 1. I'm guessing on yours it was the opposite.

There are a couple of things I think we can do to resolve this. One way would be to make a new macro for testing these partitioned functions. I could do something like

macro_rules! assert_sorted_fn_batches {
    ($EXPR:expr, $EXPECTED: expr, $SORTBY: expr) => {
        let df = create_test_table().await?;
        let df = df.select($EXPR)?.sort($SORTBY)?.limit(0, Some(10))?;
        let batches = df.collect().await?;

        assert_batches_eq!($EXPECTED, &batches);
    };
}

And then the lead function test would become


async fn test_fn_lead() -> Result<()> {

    let expr = lead(col("b"), Some(1), Some(ScalarValue::Int32(Some(-1))))
        .with_partition_by(vec![col("c")])
        .with_order_by(vec![col("b").sort(true, true)])
        .build()
        .alias("lead_b");

    let expected = [
        "+----+--------+",
        "| c  | lead_b |",
        "+----+--------+",
        "| 1  | 10     |",
        "| 1  | 10     |",
        "| 1  | -1     |",
        "| 10 | -1     |",
        "+----+--------+",
    ];

    let select_expr = vec![col("c"), expr];
    let sort_by = vec![col("c").sort(true, true)];

    assert_sorted_fn_batches!(select_expr, expected, sort_by);

    Ok(())
}

I've added an alias just because I think it makes the test more readable. If we wanted to get really explicit we could also output column A, and sort by columns A and C then we would have guaranteed the correctness because each row would be unique.

@timsaucer
Copy link
Contributor

The one thing I think we're missing is the other variants for these. I don't think it's covered in other unit tests that I can find. So for example, for lead we would want to validate:

  • that setting a different shift offset works as expected
  • That setting no shift offset default works (this can be the basic test)
  • Setting no default value gives nulls
  • With and without partition_by
  • Testing with and without order_by (required by some window functions, so only test with)
  • Testing with and without null treatment - I'm not sure which of the functions this impacts
  • Testing with and without window frames

What do you think? I might try to write a macro around all these variants.

I'm now unblocked on the other task I was working on, so I can pick it up if you'd like or I'm happy to work on other things. Please let me know.

@shanretoo
Copy link
Contributor

Sorry, my fault. I haven't taken into account the ordering issue. Maybe we could add a following match arm in the macro to omit the order_by parameter and add the output column A to ensure the correctness. What do you think?

macro_rules! assert_sorted_fn_batches {
    ($EXPR:expr, $EXPECTED: expr) => {
        let sort_by = $EXPR
            .iter()
            .map(|e| {
                let alias = e.name_for_alias().expect("failed to get an alias");
                col(alias).sort(true, true)
            })
            .collect::<Vec<_>>();
        assert_sorted_fn_batches!($EXPR, $EXPECTED, sort_by);
    };

@shanretoo
Copy link
Contributor

Have you checked tests in sqllogictest?
If we want to make sure all the variants work as expected, I think we should add those tests in sqllogictest.
And for the unit tests here, we can just check the situations that might have different results, for example, those default values we set in the builder functions.
What do you think?

You can take over this and I'm happy to help when needed.

@timsaucer
Copy link
Contributor

I think you're doing a great job, and good point on the sqllogictest. TBH I find those tests harder to wrap my head around than the rust tests, but that's more personal preference.

About the test function, I realize we can probably make it simpler:

macro_rules! assert_unordered_fn_batches {
    ($EXPRS:expr, $EXPECTED: expr) => {
        let df = create_test_table().await?;
        let df = df.select($EXPRS)?.limit(0, Some(10))?;
        let batches = df.collect().await?;

        assert_batches_sorted_eq!($EXPECTED, &batches);
    };
}
#[tokio::test]
async fn test_fn_lead() -> Result<()> {

    let expr = lead(col("b"), Some(1), Some(ScalarValue::Int32(Some(-1))))
        .with_partition_by(vec![col("c")])
        .with_order_by(vec![col("b").sort(true, true)])
        .build()
        .alias("lead_b");

    let expected = [
        "+-----------+----+--------+",
        "| a         | c  | lead_b |",
        "+-----------+----+--------+",
        "| 123AbcDef | 10 | -1     |",
        "| CBAdef    | 1  | -1     |",
        "| abc123    | 1  | 10     |",
        "| abcDEF    | 1  | 10     |",
        "+-----------+----+--------+",
    ];

    let select_expr = vec![col("a"), col("c"), expr];

    assert_unordered_fn_batches!(select_expr, expected);

    Ok(())
}

What do you think?

@shanretoo
Copy link
Contributor

Looks good. It is clearer to understand the results in this way.

@alamb
Copy link
Contributor Author

alamb commented Jun 8, 2024

Update here is that @jayzhan211 and I have been working on a similar API for creating Aggregate exprs on #10560. I am quite pleased with how it worked out. Perhaps we can follow a similar model for the window functions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants