-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE #7177
Conversation
8e3215f
to
134abbc
Compare
e208911
to
000c2ad
Compare
000c2ad
to
05710d0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just reviewed the refactoring of the aggregate util. I like the refactoring but I added some comments to further improve the maintainability in the future.
(aggFieldIndexes, aggregates, isDistinctAggs, accTypes, accSpecs) | ||
} | ||
|
||
private def createFlinkAggFunction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment to every method in this class? This class is quite big every comment helps in understanding what is going on. For example, Converts Calcite's [[SqlAggFunction]] to a Flink UDF [[TableAggregationFunction]]
.
|
||
(aggregate, accumulatorType, accSpecs) | ||
} | ||
|
||
private def transformToAggregateFunctions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely need more documentation for this function. Can you create case classes for the return type of this function for proper naming because Array[Int]
or Array[Boolean]
is not very helpful.
|
||
// create aggregate function instances by function type and aggregate field data type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore this and the following line?
...ries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
Outdated
Show resolved
Hide resolved
...ries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
Outdated
Show resolved
Hide resolved
...ries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @dawidwys. I added some comments. Maybe I also found a bug in MatchCodeGenerator.AggBuilder#generateAggAccess
.
...nk-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
class PatternVariableFinder extends RexDefaultVisitor[Option[String]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not moving this to MatchUtil
and perform the validation in the DataStreamRelNode
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class has one additional function other than just validation. It also extracts the variable from aggregate RexCall
, which I need for grouping aggregates by it. I could add the validation at an earlier stage, but effectively I would need to duplicate/reuse the same code. If you feel it's worth doing, I will update it accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is ok to duplicate this class. Because we are just talking about 25 lines. We should try to separate validation and translation. I know we are also not doing it at other places but we should aim for it.
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...es/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
Show resolved
Hide resolved
...es/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
Show resolved
Hide resolved
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some feedback for the docs changes.
ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5 | ||
{% endhighlight %} | ||
|
||
An important thing to have in mind is how aggregates behave in situation when no rows where mapped to certain pattern variable. Every aggregate, beside `COUNT` will produce `null` in those cases. `COUNT` on the other hand will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An important thing to have in mind is how aggregates behave in situation when no rows were mapped to a certain pattern variable. Every aggregate (except for `COUNT`) will produce `null` in those cases. `COUNT` on the other hand will produce 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But is this statement true? Doesn't the empty case depend on the aggregate function that you implemented? If you are using a UDF it must not be null right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, didn't think of UDAGs. Do you think it is worth adding such disclaimer at all. I've added it in the first place, because there was something similar in the SQL standard, but there are no UDAGs there.
flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dawidwys thanks a lot for the PR. Just left a few minor comments.
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Show resolved
Hide resolved
} | ||
} | ||
|
||
private def generateAggAccess(call: RexCall) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about renaming to generateAggregation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually generates only the access. It does not generate computing the aggregate. That name might not be perfect, but actually I would like to differentiate it somehow.
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
private def doGenerateAggAccess(call: RexCall) = { | ||
val singleResultTerm = newName("result") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about removing the prefix single
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense here, as it is a term for a single aggregation from a set of all calculated for a pattern variable. It makes even more sense as I moved a variable that holds a term name for result containing all aggregates into this function.
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
Outdated
Show resolved
Hide resolved
322a43a
to
e5dda66
Compare
Hi @dianfu, @twalthr . Thank you for your review. I've updated the PR according to your comments. There are still two main discussion points from @twalthr review:
I would appreciate if you had another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @dawidwys. I added my last couple of comments. Can you prepare the PR for merging with proper commits? I will have a final look before merging.
namedAggregates.map(_.getKey), | ||
physicalInputRowType, | ||
needRetract, | ||
tableConfig) | ||
|
||
val aggMapping = aggregates.indices.map(_ + groupings.length).toArray | ||
val aggMapping = (0 until aggregateMetadata.getAggregateCallsCount).map(_ + groupings.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This occurs quite often (I counted 10 times). Maybe create helper function in AggregateMetadata
that takes the grouping length such as getAdjustedMapping()
?
* | ||
* } | ||
* }}} | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome documentation. Very helpful.
val exp: GeneratedExpression = generateAggAccess(call) | ||
aggregates += call | ||
reusableInputUnboxingExprs((call.toString, 0)) = exp | ||
val exp: GeneratedExpression = doGenerateAggAccess(aggCall) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming is still not perfect here. What is the difference between generateAggAccess
and doGenerateAggAccess
? How about generateDeduplicatedAggAccess
and generateAggAccess
?
| $singleResultNullTerm = false; | ||
|} else { | ||
| $singleResultNullTerm = true; | ||
| $singleResultTerm = $defaultValue; | ||
|} | ||
|""".stripMargin | ||
} else { | ||
j""" | ||
|boolean $singleResultNullTerm = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line can be dropped. The null term is not evaluated if nullCheck is disabled.
transformFuncName) | ||
|
||
generateAggCalculation(aggFunc, transformFuncName, inputTransform) | ||
} | ||
|
||
private case class LogicalMatchAggCall( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move all classes to the bottom to logically separate them from methods.
} | ||
} | ||
|
||
class PatternVariableFinder extends RexDefaultVisitor[Option[String]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is ok to duplicate this class. Because we are just talking about 25 lines. We should try to separate validation and translation. I know we are also not doing it at other places but we should aim for it.
@@ -691,12 +795,14 @@ class MatchCodeGenerator( | |||
|
|||
class PatternVariableFinder extends RexDefaultVisitor[Option[String]] { | |||
|
|||
val ALL_PATTERN_VARIABLE = "*" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could even move this constant definition to MatchUtil
?
a29aa68
to
779df14
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @dawidwys. I had one tiny comment. Otherwise the PR looks good to me. +1 to merge
val outputArity = groupings.length + aggregates.length | ||
|
||
val aggregationStateType: RowTypeInfo = new RowTypeInfo(accTypes: _*) | ||
val aggMapping = getAdjustedMapping(aggregateMetadata.getAggregateCallsCount, groupings.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I thought that this function can be part of AggregateMetadata
? Because you always apply it on the calls count.
|
||
<span class="label label-info">Note</span> Aggregations can be applied to expressions, but only if they reference a single pattern variable. Thus `SUM(A.price * A.tax)` is a valid one, but `AVG(A.price * B.tax)` is not. | ||
|
||
<span class="label label-danger">Attention</span> `DISTINCT` aggregations are not supported. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you open a follow up issue for this if it does not exist yet?
PATTERN (A+ B) | ||
DEFINE | ||
A AS AVG(A.price) < 15 | ||
) MR; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dawidwys what is MR
actually doing? Do we have to call SELECT MR.start_tstamp
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is optional, but it is needed if we want to e.g. join the result of MR with other table. For example in a query:
SELECT MR.aid as mId, T.id as tId
FROM MyTable
MATCH_RECOGNIZE (
ORDER BY proctime
MEASURES
A.id AS aid,
A.name as aName
PATTERN (A)
DEFINE
A as A.id > 0
) MR, MyTable T
WHERE MR.aName = T.name
779df14
to
1a92abc
Compare
Thank you very much @twalthr for your help and review! Will merge it once travis gives green (as I had to rebase) |
What is the purpose of the change
This change allows using aggregates in MEASURES and DEFINE clauses of MATCH_RECOGNIZE in sql.
Based on #7189
Brief change log
AggregateUtil#transformToAggregateFunctions
open/close
methods in code generatedPatternSelectFunction
andIterativeCondition
org.apache.flink.table.codegen.MatchCodeGenerator.AggBuilder
org.apache.flink.table.codegen.PatternVariableFinder
that checks for operands in udagsStill missing:
Verifying this change
Added tests:
org.apache.flink.table.match.MatchOperatorValidationTest#testAggregatesOnMultiplePatternVariablesNotSupported
org.apache.flink.table.runtime.stream.sql.MatchRecognizeITCase#testCepAggregates
org.apache.flink.table.
match.MatchOperatorValidationTest#testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation