New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
More GROUP BY enhancements #2111
Conversation
…r encapsulate functionality.
Amazing, @big-andy-coates . Are the docs being updated to reflect the greater flexibility. These would be great examples to put in our docs, vs leaving them languishing in PRs. cc @JimGalasyn can you take that on? |
@apurvam Absolutely, I've opened a JIRA to track! |
Thanks @JimGalasyn! |
To be honest I didn't update the docs as docs don't currently call out what is not supported by GROUP BY, and most users will just assume GROUP BY supports all the things a normal DB would... which it now does. There's probably scope to add more advanced examples that use the improved functionality of GROUP BY, and this should undoubtedly be called out in the release notes. Other than that, I don't think the docs need to call out standard SQL functionality. But that's just my opinion. |
Conflict on: ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java
@big-andy-coates thanks for the PR. Couple of comments on the semantics before I start the review:
|
|
Even for strings we will have the ambiguity. Consider ('test1', 'test2') and ('tes', 't1test2'). These both will result in the same group by expression, |
SQL supports HAVING clauses that use non-aggregate columns. I don't see a good reason why we wouldn't support them, even though you might as well just put those conditions in the WHERE clause. |
Agree that we need to decide the semantics for the case @hjafarpour pointed out. FWIW, different SQL engines seem to treat this case differently. Postgres rejects the statement, and MySql uses an arbitrary row to fill the non-aggregate SELECT columns not present in the group by: |
I've been doing a lot of reviews recently. Will get back to this asap. Thanks for the input. Current plan is to reject such statements, but I'll know more once I start having a look. |
Here's the issue with tables illustrated with an example: Suppose we have the following table aggregation:
Let's look at what happens when KSQL sees the following series of records in
Assuming our semantic for select fields is that they take the value of the latest seen record, this should cause the following series of updates to T2:
Internally, streams tries to let us do this by emitting the following records prior to the aggregation:
Then, when we get the UNDO record, its up to our aggregator to undo the effect of that table update. I can't think of a way to implement this (or even the relaxed semantic that X and Y take the values of some row from the table) without buffering all the previously seen values. We could relax the semantic to just say that X and Y are some previously seen value for that grouping, but at that point what's the point of even supporting it? |
@blueedgenick and @rmoff your feedback on this one is much appreciated! :) |
I'm not sure I see where there is a problem. When streams emits the
I would of thought KSQL will first be repartitioning the record based on the
And then the aggregator will undo this record for the Or am I missing something? |
Maybe we could support this using a 'LAST' or 'LATEST' UDAF...? This would make it more SQL like. |
cc @mjsax, see Rohan's comment above: #2111 (comment) This goes back to our KStreams related discussions on aggregation semantics (and properties like associative / commutative / ... for functions). |
I agree with @hjafarpour that those statements should be rejected as invalid. For constants in The argument for timestamps is also not completely applicable IMHO: semantically, the timestamp of the output records is computed as an aggregation over the input records. We want it to be the maximum over all (atm, this is not implemented in Streams, and KSQL does also not need to reimplement it, but just "copy" the timestamp). But this is just a gap between the implementation and the concept. Conceptually, it'a an aggregation. The nature of the timestamp aggregation is also important, because it make the computation deterministic. I think KSQL semantics should be build on event-time order but not offset order -- the suggestion to use Btw: MySQL should not be the gold standard here... it takes too many shortcuts -- it's like browsers that render all kind of ill-formatted HTML to let people "successfully" do their homepage -- MySQL seems to be similar and accepts mis-formed SQL statements to be accessible to a broader user base -- we should not follow this example IMHO -- education before shortcuts!. |
@mjsax I'm not advocating for supporting it. My position is that if we can support it as syntactic sugar for a LAST/LATEST/REDUCE UDAF then I don't see a problem. But I think there's 2 gaps we'd need to bridge before doing so:
|
Thanks for clarification @rodesai. I don't think it's a good idea to auto-magically insert a I don't see any reason why LATEST should be special (ie, why do you pick this one as default) compared to other UDAFs and auto magical behavior is usually not a good idea from my experience---or maybe just my personal taste?. Anyway. How to implement a LATEST function is a different beast. I agree, that you would need to preserve the complete history of all updates (because there is no time window involved) and this make it impossible to implement, as this history is unbounded. For a Stream input, it would be simpler, because there is no retraction and it's sufficient to maintain the value for the record with the largest timestamp seen. The question know is, you we want to allow LATEST for streams but not for table? Or should it not be added for both at all (to provide same functionality for both)? |
Thanks for everyone's input on this! I think we've kind of agreed on using explicit syntax for this, e.g. something like Though I'm not sure 'LATEST' is the right name as I don't think it should attempt to get the value from the latest record, i.e. the record with the highest timestamp. Rather, the functionality I'm thinking about is the ability to copy a field from the source row, i.e. the row that's currently being processed and is updating the aggregate, what ever that row may be. I guess The idea I'm throwing about is probably more of a Yeah, this would be offset ordered, but if that doesn't cause issues with the use case the user is trying to solve, then is there still a place for this, caveats and all? I can't help thinking that it may be useful and help solve some use case that wouldn't be possible with KSQL without it. I just can't think of one, which may in itself prove something... To get this PR moving again, my plan is to only allow matching select clauses, so the following will be valid:
But the following, unfortunately won't:
|
I modified one of your "valid" examples to align with @rodesai's original example at #2111 (comment). CREATE TABLE T1 (K INTEGER, X INTEGER, Y INTEGER) WITH (..., KEY='K');
CREATE TABLE T2 AS SELECT x + y AS XY, COUNT(*) AS CNT FROM T1 GROUP BY x + y; Let's look at what happens when KSQL sees the following series of records in T1 (A -> B denotes a record with key A and value B):
I think Andy is suggesting that this would result in the following series of updates to T2:
However, I think we still have the same problem. The second input record for key
If understand correctly, then the value of output key
And we're back to start with the problem. |
Agreed :) For what is allowed and what not. I basically agree. However, for the example
we could actually be more sophisticated, because we know the semantics of Does this make sense? @miguno What you say makes sense, however, there is no problem, because for the example you describe, the know that the value associated with each group does not change, ie, in your example we know that |
@mjsax: I think the root issue is that we are doing a table->table operation. Perhaps this is the shortest way I can describe the issue I am seeing: With tables, we normally recommend log compaction. So this original, non-compacted input:
could be log-compacted to:
If this happens, then Andy's "valid" variant will cause the table->table operation to produce different results because, in his variant, the idea is that we would not be sending an "correction" record when the second record for key For the non-compacted input, the output would be (note: the output itself is subject to compaction because it's the output type is a table):
For the compacted input, the output would be:
The problem is not that |
I agree that we need to produce the same result, and that we need to send a correction/subtraction record for the non-compacted input. However, why do you think this would not happen? Kafka Streams will generate this record correctly from my understanding. Can you elaborate, why Kafka Streams would compute an incorrect subtraction record (or would not emit it at all)? From my understanding, the output will be:
|
Yeah, I've thought along the same lines, but its not a simple undertaking to be able to figure this out. It's not too bad if its the same function, but what about if it starts to go across functions, but still 'contained'? SELECT MIN(MAX(x, 5), 10), COUNT() FROM TEST GROUP BY Max(x, 5); Even just handling parameter order in functions where it doesn't matter doesn't come for free: -- won't work as it stands as the parser doesn't know MIN(5, x) === MIN(x, 5)
SELECT MIN(5, x), COUNT() FROM TEST GROUP BY MIN(x, 5);
-- contrived, but should also be allowed, but won't be.
SELECT MIN(x, CAST(CAST(5, VARCHAR), INT)), COUNT() FROM TEST GROUP BY MIN(x, 5); That's why I was thinking the SELECT MIN(MAX(FROM_ROW(x), 5), 10), COUNT() FROM TEST GROUP BY Max(x, 5);
SELECT MIN(5, FROM_ROW(x)), COUNT() FROM TEST GROUP BY MIN(x, 5);
SELECT SUBSTRING(FROM_ROW(x), 0, 2), COUNT(*) FROM TEST GROUP BY SUBSTRING(x, 0, 10); Though, I guess this not preferable to the parser knowing these things are allowed, it's just easier to implement. |
@big-andy-coates Totally agreed. It's a very challenging problem... Just wanted to mention it for sake of completeness of the discussion. |
Yes, agree here. My concern was more about a LATEST UDAF and tables. |
I synced offline with @big-andy-coates. This helped to clear up a few things. :-) First, I see now why Andy's following suggestion would indeed work, unlike what I wrote in my two last comments above (#2111 (comment) and #2111 (comment)): -- This would work indeed.
CREATE TABLE T2 AS SELECT x + y AS XY, COUNT(*) AS CNT FROM T1 GROUP BY x + y; To clarify for other readers, I try to summarize again what the problem is that we are talking about. DDL for the example: CREATE TABLE T1 (K INTEGER, X INTEGER, Y INTEGER) WITH (..., KEY='K');
CREATE TABLE T2 AS SELECT X, Y, COUNT(*) AS C FROM T1 GROUP BY x + y; Updates for table T1:
This results in the following updates to table T2:
The problem is what X and Y are in record #d (it's not about C), and this is why e.g. we talked about issues with "LATEST" behavior for tables being problematic. Record #d (and thus the state of T2) should be the same as #a (X=1, Y=2, C=1). But for tables (which have and need UNDOs, unlike streams), as Rohan pointed out above in #2111 (comment), in a realistic scenario this would require us to retain the full, ordered history of past records (in a simple example such as the one above we could fix the example by having access to just the 1-2 previous records, but that doesn't work in general; roughly speaking: the more UNDOs, the more prior history you need). However, the only known information for X and Y we'd have access to in the current KStreams implementation (correct me if I'm wrong) are:
Neither of which are the values for X and Y that -- in my opinion -- one would intuitively expect in this scenario. This intuition is what the "LATEST" discussion above was about, where (in this example after record 4 was processed) the now latest update in T2 for key 3 is determined by record 1, i.e. @big-andy-coates's suggestion now is to limit what you can put into the SELECT clause when you use an expression in GROUP BY. CREATE TABLE T2 AS SELECT x + y AS XY, COUNT(*) AS C FROM T1 GROUP BY x + y; Here, we are effectively ensuring that all fields in the T2 update records are either (1) a constant across all records (e.g. X+Y aka XY == 3 for all records for the downstream key 3) or (2) the result of an aggregation (like COUNT(*) aka C), which we know how to update correctly. Is that a fair summary? |
Yep, and I agree that can't work. Even a |
@big-andy-coates I'm +1 for this approach. I see the point from @mjsax about supporting more than exact match but it not only makes implementation more complex, but also it may result in many confusions for users. We would have to explain when non-exact match is permissible and when it is not. Requiring exact match makes it much easier to describe the query behavior and results in much less ambiguity. |
I agree with @hjafarpour that we shouldn't get blocked by supporting more than an exact match for the initial implementation. |
@blueedgenick : You also had some comments to share if I recall correctly? |
#### Conflicting files `ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java` `ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java` `ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java` `ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java` `ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerTest.java` `ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerTest.java` `ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java` `ksql-engine/src/test/java/io/confluent/ksql/planner/PlanSourceExtractorVisitorTest.java` `ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java` `ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java` `ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java` `ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java` `ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java` `ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java` `ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java` `ksql-engine/src/test/java/io/confluent/ksql/structured/SqlPredicateTest.java` `ksql-engine/src/test/java/io/confluent/ksql/testutils/AnalysisTestUtil.java` `ksql-engine/src/test/resources/query-validation-tests/group-by.json`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, 1337 files changed, browser oom, LGTM!
HFS, this PR brought my machine to its knees! >< |
…sts will not pass)
- must have aggregate column - disallows UDFs in projection with different params to GROUP BY - disallows fields in projection that are params to UDFs in GROUP BY - disallows string concat in projection that differs from GROUP BY - disallows fields in projection that are involved in string concat in GROUP BY - disallows arithmetic in projection that differs from GROUP BY - does now allow constant HAVING clause.
@JimGalasyn investigating - PR shouldn't be that large. Something has gone wrong. |
Closing in favour of #2472 |
Description
Following on from my recent PR on improving
GROUP BY
, this PR looks to do a little refactoring to tidy things up and add support for more weird and wonderfulGROUP BY
clauses. You can now:Also fixes: #2455
Use non-aggregate functions in the select statement:
e.g. the use of
SUBSTRING
here:GROUP BY arithmetic or string concat using '+'
GROUP BY with constants in the SELECT expression
HAVING clause that is an aggregate function:
Multiple having clause
Not allowed, though possible:
Having clause that is constant
Allowed, though contentious:
GROUP BY Functions where the parameters differ
e.g. Note the different parameters to
SUBSTRING
below.GROUP BY Functions select directly uses parameters from GROUP BY
The argument to allow non-aggregate non-group-by select columns
When we're doing
GROUP BY
operations, a normal db implementation would fail if you have a column in the select that isn't part of the group by, e.g.SELECT x, y, COUNT() FROM t GROUP BY x
The db would complain about
y
. (Or most do anyway). This is because the db is building a single output row, (the aggregate), from a set of source rows.KSQL, on the other hand, is processing row by row. Each input row updates the aggregate and outputs a new row. So KSQL has access to both the aggregate and the input row. This means we could support copying arbitrary fields from the source row to the output row!
We already copy copy
ROWTIME
from the source row to the output row. This is basically aupdated_at
of the aggregate. But another use-case we don't currently support would be wanting to know theupdated_by
for the aggregate. We could chose to support something like:input (x, user) output (x, count, updated_by)
0, bob 0, 1, bob
1, fred 1, 1, fred
0, peter 0, 2, peter
etc.
I'd be interested to hear what people think!
Testing done
Added unit, functional and
QueryTransaltionTest
s.Reviewer checklist