Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Optimize row based window operations for BOUNDED ranges #1860

Closed
revans2 opened this issue Mar 3, 2021 · 2 comments · Fixed by #9973
Closed

[FEA] Optimize row based window operations for BOUNDED ranges #1860

revans2 opened this issue Mar 3, 2021 · 2 comments · Fixed by #9973
Assignees
Labels
feature request New feature or request

Comments

@revans2
Copy link
Collaborator

revans2 commented Mar 3, 2021

This is a part of #1789 and likely a follow on to #1856

Window operations are typically partitioned, and as such in the worst case we need to have all of the data for a single partition in memory at once, but that means we have to keep a lot of data in the limited GPU memory. But not all queries really need to keep all of that data in memory at once. For row based window operations the query explicetly tells us the maximum number of rows before and after the current row are needed for us to get a correct answer for this row.

That means that if we have a row based window where neither preceding nor following is unbounded we can calculate how many rows of the batch are correctly calculate and can be returned, how many rows we need need to keep cached for the next batch so we can get the correct answer for rows in the next batch, and how many rows were kept around from the previous batch just to calculate later rows, but should not be output.

If we look at the partition by keys for the last row in a batch we can then do a lower bound on the batch to find the starting point for the window. We then add the number of rows following to that row to know where the cutoff would be for output (we have all the needed data for processing). We then could subtract the number of rows preceding from the completed index to know how much data we have to keep around for the next batch.

So for example if we are doing a query like

MAX(C) OVER (PARTITION BY A ORDER BY B ROWS BETWEEN 1 PRECEDING and 2 FOLLOWING) AS MY_MAX_VALUE

with input like

Batch 1:

A B C
1 0 0
1 1 2
1 2 4
1 3 8
1 4 16

and
Batch 2:

A B C
1 5 17
2 1 1

We would then load up batch 1 and run the window function on it to get

Batch 1:

A B C MY_MAX_VALUE
1 0 0 4
1 1 2 8
1 2 4 16
1 3 8 16
1 4 16 16

Next we need to figure out how much data we can output that is completely done. Because we have the ranges in the query we can figure this out. So the last row we can output needs at least 2 rows following it for it to be 100% complete. So we take the index of the last row (4) and subtract 2 from it to give us index 2 as the row that we can output. But before we do that we need to know if that index is still a part of the same window group. So we have to do a lower bound on using the partition keys from the last row in the batch (A=1). This ends up being index 0. The max of those two values is how much data we can output because it is complete (index 2).

Next we need to figure out how much data we need to keep around so we can compute the correct answer for the rows that were not totally complete. For that we need to keep 1 row preceding the rows we could not output. The rows we could not output started at index 3 - 1 row preceding gives us index 2. Again we need to compare that to the start of the window group (0) and we get the row to keep as starting at index 2.

So we end up with...

OUTPUT BATCH:

A B C MY_MAX_VALUE
1 0 0 4
1 1 2 8
1 2 4 16

Keep Around For the next Batch:

A B C
1 2 4
1 3 8
1 4 16

We then read in the next batch, concatenate them and run it through the window function like before to get.

A B C MY_MAX_VALUE
1 2 4 16
1 3 8 17
1 4 16 17
1 5 17 17
2 1 1 1

We now know that we kept one row from before, that we already output so we don't want to output row 0, we have to slice it off before we output any data. We then go through the entire process again, looking at the lower bound and the preceding/following rows... and we end up outputting

A B C MY_MAX_VALUE
1 3 8 17
1 4 16 17
1 5 17 17

And we save the following for the next batch to process.

A B C
2 1 1

As with #1856 it would be a really nice optimization to know if this batch is the last batch without calling has_next (that in come cases forces us to get the next batch to know if there is more or not). That would have allowed us to know at the end if we could output that last row, or if we had to wait on it.

@revans2 revans2 added feature request New feature or request ? - Needs Triage Need team to review and classify labels Mar 3, 2021
@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label Apr 6, 2021
@revans2
Copy link
Collaborator Author

revans2 commented Sep 26, 2023

Be aware that we are in the process of adding negative ranges for preceding and following.

#9229

We need to make sure that we test these use cases so that we don't run into issues with them.

@revans2
Copy link
Collaborator Author

revans2 commented Sep 26, 2023

We also should make sure that we test some extreme cases where the number of rows in a window is larger than the number of rows we have in a typical batch. Just to be sure that we can handle cases where there are possibly no rows that we could output because we need multiple batches to make a single window work. Bonus points if we can detect this up front and not do the window calculations themselves until we know that there is enough to make it worth doing.

@mythrocks mythrocks changed the title [FEA] Optimize row based window operations for non UNBOUNDED ranges [FEA] Optimize row based window operations for BOUNDED ranges Nov 16, 2023
mythrocks added a commit that referenced this issue Dec 13, 2023
Fixes #1860.

This commit adds support for batched processing of window aggregations where the window-extents are row-based and (finitely) bounded.

Example query:
```sql
SELECT 
  COUNT(1) OVER (PARTITION BY part ORDER BY ord ROWS BETWEEN 10 PRECEDING AND 20 FOLLOWING),
  MIN(col) OVER (PARTITION BY part ORDER BY ord ROWS BETWEEN 10 PRECEDING AND CURRENT ROW),
  AVG(nuther) OVER (PARTITION BY part ORDER BY ord ROWS BETWEEN CURRENT ROW AND 20 FOLLOWING)
FROM my_table;
```

The algorithm is described at length in #1860.  In brief:
1. A new exec `GpuBatchedBoundedWindowExec` is used to batch the input into chunks that fit into GPU memory.
2. Depending on the window specification, some rows towards the end of the input batch might not have the context (i.e. "following" rows necessary) to finalize their output.  Those rows are carried over to the next batch for recomputation.
3. At every stage, enough rows from the previous batch are carried forward to provide the "preceding" context for the window computation.

Note that window bounds might be specified with negative offsets. These are also supported. As a consequence, `LEAD()` and `LAG()` are supported as well.

```sql
SELECT
  COUNT(1)  OVER (PARTITION BY part ORDER BY ord ROWS BETWEEN 5 PRECEDING AND -3 FOLLOWING),
  LAG(col, 10)  OVER (PARTITION BY part ORDER BY ord),
  LEAD(col, 5) OVER (PARTITION BY part ORDER BY ord) ...
```

This implementation falls back to unbatched processing (via `GpuWindowExec`) if a window's preceding/following bounds exceeds a configurable maximum (defaulting to 100 rows in either direction).  This may be reconfigured via:
```scala
spark.conf.set("spark.rapids.sql.window.batched.bounded.row.extent", 500)
```

Signed-off-by: MithunR <mythrocks@gmail.com>

* Mostly working, but buggy. Losing some rows in the output.
* Fixed up the math. Looks to be working at 150M.
* Minor refactor/cleanup.
* Clearing cache on task completion.
* Fixed leak from trim().
* Document onTaskCompletion.
* Optimization: Skip window kernel if no output for current batch.
* Removed commented code, prints.
* Switched to exposing negative minPreceding.
* Also built safety guards to disable optimization for very large window extents.
* Removed incorrect error message.
* Tests for varying finite window combinations.
* Tests for unpartitioned cases.
* Plus, some minor reformatting.
* Fixed leak in concat.
* Test that large extents fall back to GpuWindowExec.
* Fix build break with Scala 2.13.
* Support for negative offsets.
* This now allows for `LEAD()`, `LAG()`, and regular window functions withnegative values for `preceding`,`following` window bounds.
* Removed erroneous batching.
* This commit fixes the batching.  The new exec should not have to receivebatched input.
* Config changes:
** Renamed config. '.extent' to '.max'.
** Fixed documentation for said config.
** Removed TODOs that were already handled.
* Docs update for batched row window config.
* Fixed output column order. This fixes the empty output problem.

Signed-off-by: MithunR <mythrocks@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants