New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Starting on Window Functions #13458
Starting on Window Functions #13458
Conversation
This is an initial take on how to use Processors for Window Processing. A Processor is an interface that transforms RowsAndColumns objects. RowsAndColumns objects are essentially combinations of rows and columns. The intention is that these Processors are the start of a set of operators that more closely resemble what DB engineers would be accustomed to seeing.
can run them end-to-end. This code can be used to actually run a query, so yay!
This is an initial take on how to use Processors for Window Processing. A Processor is an interface that transforms RowsAndColumns objects. RowsAndColumns objects are essentially combinations of rows and columns. The intention is that these Processors are the start of a set of operators that more closely resemble what DB engineers would be accustomed to seeing.
can run them end-to-end. This code can be used to actually run a query, so yay!
can run them end-to-end. This code can be used to actually run a query, so yay!
This introduces the SQL-bindings for the parsing and planning of window queries. There are limitations like only a single partitioning can be planned and there is a baked in assumption that the data is pre-sorted. Both of these are pretty sharp edges, so the functionality is still not ready for general consumption, but this is a start that works.
Added wikipedia data to the indexes available to the SQL queries and tests validating the windowing functionality as it exists now.
This pull request introduces 2 alerts and fixes 1 when merging 851acb8 into 50963ed - view on LGTM.com new alerts:
fixed alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
@@ -0,0 +1,1019 @@ | |||
type: "operatorValidation" | |||
|
|||
sql: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try sql: |
for sweet multi line strings in yaml!
This pull request introduces 2 alerts and fixes 1 when merging 299d735 into 138a6de - view on LGTM.com new alerts:
fixed alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
This pull request introduces 2 alerts and fixes 1 when merging 935864c into 9177419 - view on LGTM.com new alerts:
fixed alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm looking forward to seeing where we can take this, and especially to seeing a general OperatorQuery & integrating with frames and MSQ. It looks like there are pathways to do that in the future, so that's good. It's also good that the SQL functionality is off by default and not documented, since people might otherwise stumble into it when trying to run window functions. Having it be obscure in that way will help us evolve it quickly.
Let's get this out there so we can start testing and evolving it. I left a few relatively minor line comments asking for various clarifications and also asking for the SQL planning stuff to be even more switched-off-by-default than it currently is. With those changes it would look good to me.
* Get the number of cells | ||
* @return the number of cells | ||
*/ | ||
int numCells(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get why this is called a cell, but it would make more sense to me if it was called a row.
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
public class WindowOperatorQuery extends BaseQuery<RowsAndColumns> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this meant to be supplanted by an operator
query at some point? Would both live side by side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope that eventually we have a "pure" OperatorQuery
that we can just switch this to extend and then "be done". I'm keeping it named WindowOperator
for now to make it easier to create a new OperatorQuery
if it turns out that this is done wrong.
@JsonCreator | ||
public WindowOperatorQuery( | ||
@JsonProperty("dataSource") DataSource dataSource, | ||
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it too wild to consider combining DataSource and QuerySegmentSpec as part of this effort? Once I really internalized what they each mean it feels like it's weird that they're separate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am reminded that the equivalent MSQ concept here is InputSpec
, where the TableInputSpec
combines datasource, intervals, and filter. Wonder if it would make sense to use that here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sympathetic to your ask, but I think it's a bit too much for this PR at this point in time. That said, I think I can remove intervals
entirely from this query spec...
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
public class WindowOperatorQuery extends BaseQuery<RowsAndColumns> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the preconditions for the data from the dataSource
of this query? Would make good javadoc.
* This naive partitioning operator assumes that it's child operator always gives it RowsAndColumns objects that are | ||
* a superset of the partitions that it needs to provide. It will never attempt to make a partition larger than a | ||
* single RowsAndColumns object that it is given from its child Operator. A different operator should be used | ||
* if that is an important bit of functionality to have. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any preconditions? Do things need to be sorted any particular way?
Windowing.fromCalciteStuff( | ||
partialQuery, | ||
plannerContext, | ||
sourceRowSignature, // TODO(gianm): window can only apply to the source data, because SCAN -> WINDOW |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be fine as a regular comment. It's saying that it's ok to use the sourceRowSignature
here, because PartialDruidQuery (which enforces ordering of operators) only allows WINDOW to apply on top of SCAN. And SCAN is where the sourceRowSignature
comes from.
@@ -870,6 +911,12 @@ public Query<?> getQuery() | |||
*/ | |||
private Query<?> computeQuery() | |||
{ | |||
// TODO(gianm): structure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with this comment I was thinking that the code could be structured better. But that was a while ago. I'm over it.
WHERE_FILTER, | ||
SELECT_PROJECT, | ||
|
||
// AGGREGATE, HAING_FILTER, AGGREGATE_PROJECT can only be present on aggregating queries. | ||
// AGGREGATE, HAING_FILTER, AGGREGATE_PROJECT can only be present on non-WINDOW aggregating queries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HAVING_FILTER (spelling)
@@ -86,10 +87,16 @@ public static List<RelOptRule> rules(PlannerContext plannerContext) | |||
PartialDruidQuery.Stage.SORT_PROJECT, | |||
PartialDruidQuery::withSortProject | |||
), | |||
new DruidQueryRule<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you adjust things so this rule isn't added unless windowsAreForClosers: true
? Will limit the impact if it turns out that something is wrong with the rule.
DruidOuterQueryRule.AGGREGATE, | ||
DruidOuterQueryRule.WHERE_FILTER, | ||
DruidOuterQueryRule.SELECT_PROJECT, | ||
DruidOuterQueryRule.SORT, | ||
DruidOuterQueryRule.WINDOW, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same with this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for the feature Eric! :)
Posting a partial review for now - will do the remaining soon surely. Thanks for the patience!
import org.apache.druid.query.rowsandcols.column.Column; | ||
|
||
/** | ||
* A RowsAndColumns that supposed appending columns. This interface is particularly useful because even if there is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* A RowsAndColumns that supposed appending columns. This interface is particularly useful because even if there is | |
* A RowsAndColumns that supports appending columns. This interface is particularly useful because even if there is |
import java.util.Optional; | ||
import java.util.function.Function; | ||
|
||
public class ArrayListRowsAndColumns<RowType> implements RowsAndColumns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very similar to RowBasedStorageAdapter
- but I guess we can't reuse it since there are lot of differences in the interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was basically copied from that, but created specifically to assume that it had an ArrayList (and therefore be randomly accessible)
import java.util.List; | ||
|
||
@SuppressWarnings("unused") | ||
public class DefaultGroupPartitioner implements GroupPartitioner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently this and the default sorted partitioner are same right? Are you thinking that in future, the GroupPartitioner
may also allow hash based partitioning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, with the one exception that this one isn't used anymore. It's really just sitting around in the code for posterity rather than any specific usage. Which probably means it's worth deleting. I'd prefer to do that in a subsequent PR though.
int currGroup = 0; | ||
int prevGroupVal = 0; | ||
for (int i = 1; i < retVal.length; ++i) { | ||
if (retVal[i] == prevGroupVal) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems incorrect, should we always just do accessor.compareCells(i - 1, i)
and then check the result of that comparison?
I think that retVal[i]
will always be 0 in this check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retVal[i]
will always be 0 for the first column that is walked. For the second column, it will only be all-zero if all of the values of the first column were the same. Basically, the algorithm is computing the groupings one column at a time, and the value of retVal up until then is the grouping of all of the previous columns. This check is validating that we are still in an equivalency group based on all of the previous columns (if row i
and row i-1
are different based on the previous columns, then the value of our new column doesn't matter, row i
and row i-1
will be part of different groupings)
for (Aggregator agg : aggs) { | ||
retVal.add(agg.get()); | ||
} | ||
return retVal; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should close the aggregators before returning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. I realized that in another PR and have that change coming in the follow-on PR.
int rowId = currRow.get(); | ||
while (rowId < numRows) { | ||
for (int i = 0; i < aggs.length; ++i) { | ||
aggs[i].aggregate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe could add a new method which returns the current state upon calling aggregate
- the default impl. would do aggregate + get
. shouldn't do it incase seems no future use-case for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding methods to AggregatorFactory
must be done with care, that's an interface that is the most likely to be implemented externally.
Either way, the aggregate-then-get algorithm implemented here is technically incorrect (sketch aggs will update the object that they return from get()
), so I have a follow on PR that does window framing properly and actually implements things correctly.
rowId = currRow.incrementAndGet(); | ||
} | ||
|
||
return retVal; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should close the aggregators before returning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow-on PR
* @param aggFactories definition of aggregations to be done | ||
* @return a list of objects, one per AggregatorFactory. That is, the length of the return list should be equal to | ||
* the length of the aggFactories list passed as an argument, while the length of the internal {@code Object[]} will | ||
* be equivalent to the number of rows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we transpose the dimensions (aggs, rows) of this 2D structure to (rows, aggs)? that could also potentially allow for returning a streaming structure from aggregateCumulative
to lessen memory reqs. The stream could be consumed by the higher operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a follow-on PR, I've changed this interface to return a RowsAndColumns
. The input is also a RowsAndColumns
, there is an assumption that we are returning batches of rows, rather than returning row-at-a-time. We can still stream batches, just not rows. I think it should be fine/good to assume a stream of batches.
* @return the groupings, rows with the same int value are in the same group. There is no sort-order implied by the | ||
* int values. | ||
*/ | ||
int[] computeGroupings(List<String> columns); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for reminder, that int[]
would limit the processing to INT_MAX values even if we have memory left. if we want to support partitioning more than that values, we could consider using BigArrays
from fast-util or something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... this interface is probably gonna die anyway, it's not actually used in this PR anyway.
{ | ||
private final RowsAndColumns rac; | ||
|
||
public DefaultOnHeapAggregatable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what the correct name would be, but this name seems a bit off. The methods of this class independently build aggregate tables on the RowsAndColumns
object.
I mean that this class is mostly stateless wrt the aggregate table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're gonna complain about naming, you gotta give an alternative name that you like better :P.
I'm fine with changing names and can do it on the subsequent PR that adds window framing, but, what's to say that I change the name and it's still not loved? ;)
This pull request introduces 2 alerts when merging 8cc8235 into 658a9c2 - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rad 🤘
nothing blocker so i'm going to leave a checkmark, played around with this in debugger and everything seems to be working well enough
I agree with @gianm about 'cell' terminology, while correct it is a bit jarring since we don't really use that elsewhere and am afraid it would be a bit extra cognitive load unless we plan to standardize other areas of the code to use cell instead of row. it looks like you have removed a lot of it, but there are still a handful of references in method names and javadocs, compareCells
, getActualCell
, etc. I don't think it needs changed right now though
🚀
new ArrayList<>() | ||
); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add a json serde test (unless i missed it somewhere) in follow-up PR?
AppendableRowsAndColumns retVal = RowsAndColumns.expectAppendable(inputPartition); | ||
|
||
if (aggregations != null) { | ||
OnHeapAggregatable aggregatable = inputPartition.as(OnHeapAggregatable.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a stub? (none of the RowsAndColumns
implementations seem to implement this...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a common pattern for all of the "semantic interfaces" actually. For all of them, there is a default implementation that can be fallen back to (the default is generally implemented in terms of "find necessary columns, use their ColumnAccessor and do logic), but we always ask the RowsAndColumns first so that it can offer a more optimal implementation if it wants. This PR doesn't yet have more optimal implementations, but I expect that I will have to create some to truly make the SortOperator that I will need work in any sort of meaningful manner. So, until that PR, this will look very barren and then suddenly it will either become super magical or make sense or both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah sorry for the confusion, the pattern makes perfect sense to me, I guess I was just wondering if you had any concrete plans or if this was more of a placeholder at this time which you've clarified, thanks 👍
} | ||
|
||
if (cumulativeAggregations != null) { | ||
OnHeapCumulativeAggregatable cummulativeAgg = inputPartition.as(OnHeapCumulativeAggregatable.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about stub
{ | ||
final AppendableRowsAndColumns retVal = RowsAndColumns.expectAppendable(incomingPartition); | ||
|
||
SortedGroupPartitioner groupPartitioner = incomingPartition.as(SortedGroupPartitioner.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question re: stub
* @param rowNum the cell id, 0-indexed | ||
* @return the primitive {@code int} representation of the cell. Returns {@code 0} If {@link #isNull} is true. | ||
*/ | ||
int getInt(int rowNum); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are the plans for this? when should it be used? or is it just for the row number thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, although please fix the comments in PartialDruidQuery#canAccept
. Looks like they got mangled somehow.
// Cannot do post-sort stages without a sort. | ||
return false; | ||
} else { | ||
} else // Cannot do post-sort stages without a sort. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something is weird with the comments here… automated rewrite gone wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
YEah, looks like it re-wrote the final one to actually just return instead of be a branch and then had a hard time with the comments...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the response on the review. LGTM given that the needed changes from the review will be posted in the follow-up!
Thanks a lot for adding the support! 🚀
This pull request introduces 2 alerts when merging edb06d8 into 136322d - view on LGTM.com new alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
Description
This commit adds support for Window functions to Druid. The intent is to support window functions similarly to what postgresql does (https://www.postgresql.org/docs/current/functions-window.html) as well as Drill (https://drill.apache.org/docs/sql-window-functions-introduction/)
The commit is large and there is more work to do, but this lays the foundation of supporting window functions. This PR is best read from a few entry points:
Processor
s. Looking at this interface will give an idea of how the actual window functions are implemented. The biggest notion here is thatProcessor
s deal with aRowsAndColumns
object that represents a full partition. Generally speaking, they believe that something else has prepared theRowsAndColumns
for them which proves to be a nice simplifying assumption for their implementation.RowsAndColumns
is an interface that represents a set of... rows and columns. If you look at the interface, it is rather minimalist. The idea here is to lean into the idea of.as()
as has existed on Segments for a long time. We use it to effectively build a "menu" of common, generic functionality that can be done on a batch of data and then Operators/Processors can be written in terms of that common, generic functionality. The implementation in this PR provides naive implementations of this functionality, as we get deeper in future PRs, this functionality will get fleshed out and specialized more to avoid object copies and megamorphism while further offering love for vectorized processing.Operator
is an interface introduced here.WindowOperatorQuery
is an operator-defined query for running Window operations on top of the results of a sub-query. All of the logic for the operators is handled inWindowOperatorQueryQueryToolChest.merge
right now. Essentially, this PR has been co-opted to get Operators introduced into the Druid code base. This means we are using window queries as an initial jumping off point forOperators
to be introduced into the code flow. We expect more and more iteration on this to expand the capabilities deeper and perhaps make a future world where Operator-only queries are a thing that Druid supports.CalciteWindowQueryTest
leverages the recent changes that brought us theQueryTestBuilder
to have fully file-driven tests. For the window functions, we needed more data, so I've moved the wikipedia dataset that was checked in as part ofenvironment
to be in theresources
of the test jar. We then index that and reference it in the tests. The tests sit incalcite/tests/window
of resources, each file is a SQL query, the window Operator structure that we expect to be built from it and the expected results. Hopefully, this will simplify the addition of test cases making it easier for someone to add a test without necessarily knowing how to work with the code and fix it.That said, this PR introduces various things, but it is not complete yet. This is a first step and there are still sharp edges/unimplemented functionality. That said, what exists here does work for a subset of use cases and is a meaningful milestone that can be committed while we iterate on fleshing out and finishing up the functionality. As such, I'd like to get this reviewed and committed before making it even larger. All interfaces introduced in this PR are experimental and the "windowOperator" query is also intended to be experimental. Given that this is still experimental, I am intending to merge this PR as an undocumented feature, which we will document better as we get the sharp edges resolved.
Sharp Edges
ROWS BETWEEN 2 PRECEEDING AND 2 FOLLOWING
style clauses).RANGE
andROWS
when evaluating peers. (The built-in functions are all implemented with the correctly defined semantics, similar to what the postgresql document says above)PARTITION BY X
clauses)These sharp edges are in the weeds enough that this support should still be considered experimental and it should exist as an undocumented feature. Subsequent PRs will smooth out these sharp edges.
Release note
None as the intent is for this to be an undocumented, experimental addition.
This PR has: