New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8690][table]Support group window distinct aggregation on DataStream #5940
Conversation
@@ -82,7 +82,7 @@ private class FlinkLogicalAggregateConverter | |||
case _ => true | |||
} | |||
|
|||
!agg.containsDistinctCall() && supported | |||
supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think we need this extra local variable here after your change.
@@ -49,11 +49,16 @@ trait CommonAggregate { | |||
|
|||
val aggs = namedAggregates.map(_.getKey) | |||
val aggStrings = aggs.map( a => s"${a.getAggregation}(${ | |||
if (a.getArgList.size() > 0) { | |||
val prefix = if (a.isDistinct) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think one line should be fine here, slightly more compact IMO.
val prefix = if (a.isDistinct) "DISTINCT " else ""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @walterddr!
I noticed that some of the changes are based on @haohui's PR #3764. I think it would be good to put your changes on top of his commit.
I left a few comments that should be easy to address.
FYI, I also created a JIRA issue to support distinct aggregates also in non-windowed grouping queries.
Given the runtime support, this should be very easy to achieve and basically just require a few tests for validation.
Thanks, Fabian
@@ -641,7 +641,8 @@ class AggregationCodeGenerator( | |||
| java.util.Map.Entry entry = (java.util.Map.Entry) mergeIt$i.next(); | |||
| Object k = entry.getKey(); | |||
| Long v = (Long) entry.getValue(); | |||
| if (aDistinctAcc$i.add(k, v)) { | |||
| if (aDistinctAcc$i.add( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key in the entry is a Row
already
@@ -641,7 +641,8 @@ class AggregationCodeGenerator( | |||
| java.util.Map.Entry entry = (java.util.Map.Entry) mergeIt$i.next(); | |||
| Object k = entry.getKey(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this line to ${classOf[Row].getCanonicalName} k = (${classOf[Row].getCanonicalName}) entry.getKey();
} | ||
|
||
@Test | ||
def testDistinctAggregateWithNonDistinctAndGrouping(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test can be removed
def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = { | ||
// create a watermark with 10ms offset to delay the window emission by 10ms to verify merge | ||
val sessionWindowTestdata = List( | ||
(1L, 1, "Hello"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test is not checking for DISTINCT semantics since all aggregated values are distinct. We could do COUNT(DISTINCT num)
(int
has to be renamed to num
because its a SQL keyword).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To check the correct merge behavior, we need two windows which aggregate the same value that is than deduplicated in merge.
Some data like:
(1L, 2, "Hello"), // 1. Hello window
(2L, 2, "Hello"), // 1. Hello window, deduped
(8L, 2, "Hello"), // 2. Hello window, deduped during merge
(10L, 3, "Hello"), // 2. Hello window, forwarded during merge
(9L, 9, "Hello World"), // 1. Hello World window
(4L, 1, "Hello"), // 1. Hello window, triggering merge of 1. and 2. Hello windows
(16L, 16, "Hello")) // 3. Hello window (not merged)
tEnv.registerTable("MyTable", table) | ||
|
||
val sqlQuery = "SELECT string, " + | ||
" COUNT(DISTINCT long) " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to add the end timestamp of the windows (SESSION_END(rowtime, INTERVAL '0.005' SECOND)
) to make it easier to eyeball the expected test results.
Thanks @suez1224 @fhueske for the comments. I will change them accordingly. Yes I copied a lot of test cases from @haohui's PR for my own testing. I can definitely put it on top given the runtime support is already merged in #5555. Procedure-wise question: should I rebase his commit then add my change on top, then attached to this PR? I am not sure if there's a clever way to both preserve the discussion in this thread and rebase on top of his change. |
Hmmm, good point. The discussion would be lost. |
LOL. I think I found a way:
:-) |
…ES, and enabled distinct aggregate support for window aggregate over datastream
Thanks for the update @walterddr. |
merging |
…eaming tables. This closes apache#5940.
…eaming tables. This closes apache#5940.
What is the purpose of the change
Brief change log
AggregateExpandDistinctAggregatesRule.JOIN
toDATASET_NORM_RULES
DataStreamGroupWindowAggregate
to support distinct agg while maintaining unsupported for[DataStream/DataSet]GroupAggregate
.UNION ALL
.Verifying this change
DistinctAggregateTest
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation
Aggregate
section orGroup Window
section? inputs are highly appreciated. Also distinct over aggregate is bug-fixed in FLINK-8689 but not documented.