diff --git a/docs/dev/table/streaming/match_recognize.md b/docs/dev/table/streaming/match_recognize.md
index 493f998fe535af..d02bc03b1c4a7a 100644
--- a/docs/dev/table/streaming/match_recognize.md
+++ b/docs/dev/table/streaming/match_recognize.md
@@ -211,13 +211,13 @@ If a condition is not defined for a pattern variable, a default condition will b
For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section.
-### Scalar & Aggregate functions
+### Aggregations
-One can use scalar and aggregate functions in those clauses, both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) as well as provide [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions.
+Aggregations can be used in `DEFINE` and `MEASURES` clauses. Both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) and custom [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions are supported.
-Aggregate functions are applied to subset of rows mapped to a match. To understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section.
+Aggregate functions are applied to each subset of rows mapped to a match. In order to understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section.
-With a task to find the longest period of time for which the average price of a ticker did not go below certain threshold, one can see how expressible `MATCH_RECOGNIZE` can become with aggregations.
+The task of the following example is to find the longest period of time for which the average price of a ticker did not go below certain threshold. It shows how expressible `MATCH_RECOGNIZE` can become with aggregations.
This task can be performed with the following query:
{% highlight sql %}
@@ -256,8 +256,8 @@ symbol rowtime price tax
'ACME' '01-Apr-11 10:00:10' 30 1
{% endhighlight %}
-The query will accumulate events as part of `A` pattern variable as long as the average price of them does not exceed 15. Which will happen at `01-Apr-11 10:00:04`. The next such period that starts then will
-exceed average price of 15 at `01-Apr-11 10:00:10`. Thus the results for said query will be:
+The query will accumulate events as part of the pattern variable `A` as long as the average price of them does not exceed `15`. For example, such a limit exceeding happens at `01-Apr-11 10:00:04`.
+The following period exceeds the average price of `15` again at `01-Apr-11 10:00:10`. Thus the results for said query will be:
{% highlight text %}
symbol start_tstamp end_tstamp avgPrice
@@ -266,10 +266,9 @@ ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5
ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5
{% endhighlight %}
-An important thing to have in mind is how aggregates behave in situation when no rows where mapped to certain pattern variable. Every aggregate, beside `COUNT` will produce `null` in those cases. `COUNT` on the other hand will
-produce 0.
+Note Aggregations can be applied to expressions, but only if they reference a single pattern variable. Thus `SUM(A.price * A.tax)` is a valid one, but `AVG(A.price * B.tax)` is not.
-Attention `DISTINCT` aggregations are not supported. Moreover the `DISTINCT` modifier will be silently dropped if specified for aggregation!
+Attention `DISTINCT` aggregations are not supported.
Defining a Pattern
------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index b5e29f20031bc3..f318166e14554f 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -136,6 +136,7 @@
/*
* THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT UNTIL CALCITE-2707 IS FIXED.
+ * (Added lines: 5937-5943)
*/
/**
@@ -5934,11 +5935,13 @@ private static class NavigationExpander extends NavigationModifier {
List operands = call.getOperandList();
List newOperands = new ArrayList<>();
+ // This code is a workaround for CALCITE-2707
if (call.getFunctionQuantifier() != null
&& call.getFunctionQuantifier().getValue() == SqlSelectKeyword.DISTINCT) {
final SqlParserPos pos = call.getParserPosition();
throw SqlUtil.newContextException(pos, Static.RESOURCE.functionQuantifierNotAllowed(call.toString()));
}
+ // This code is a workaround for CALCITE-2707
if (isLogicalNavigation(kind) || isPhysicalNavigation(kind)) {
SqlNode inner = operands.get(0);
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
index 998aca893c7ea8..3cbc7de752b5ce 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
@@ -584,8 +584,6 @@ class MatchCodeGenerator(
private val variableUID = newName("variable")
- private val resultRowTerm = newName(s"aggRow_$variableUID")
-
private val rowTypeTerm = "org.apache.flink.types.Row"
def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
@@ -601,50 +599,52 @@ class MatchCodeGenerator(
}
}
- private def doGenerateAggAccess(call: RexCall) = {
- val singleResultTerm = newName("result")
- val singleResultNullTerm = newName("nullTerm")
- val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
- val primitiveSingleResultTypeTerm = primitiveTypeTermForTypeInfo(singleResultType)
- val boxedSingleResultTypeTerm = boxedTypeTermForTypeInfo(singleResultType)
+ private def doGenerateAggAccess(call: RexCall): GeneratedExpression = {
+ val singleAggResultTerm = newName("result")
+ val singleAggNullTerm = newName("nullTerm")
+ val singleAggResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+ val primitiveSingleAggResultTypeTerm = primitiveTypeTermForTypeInfo(singleAggResultType)
+ val boxedSingleAggResultTypeTerm = boxedTypeTermForTypeInfo(singleAggResultType)
- val patternName = findEventsByPatternName(variable)
+ val calculateAggFuncName = s"calculateAgg_$variableUID"
+ val allAggRowTerm = newName(s"aggRow_$variableUID")
+ val rowsForVariableCode = findEventsByPatternName(variable)
val codeForAgg =
j"""
- |$rowTypeTerm $resultRowTerm = calculateAgg_$variableUID(${patternName.resultTerm});
+ |$rowTypeTerm $allAggRowTerm = $calculateAggFuncName(${rowsForVariableCode.resultTerm});
|""".stripMargin
reusablePerRecordStatements += codeForAgg
- val defaultValue = primitiveDefaultValue(singleResultType)
+ val defaultValue = primitiveDefaultValue(singleAggResultType)
val codeForSingleAgg = if (nullCheck) {
j"""
- |boolean $singleResultNullTerm;
- |$primitiveSingleResultTypeTerm $singleResultTerm;
- |if ($resultRowTerm.getField(${aggregates.size}) != null) {
- | $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm
+ |boolean $singleAggNullTerm;
+ |$primitiveSingleAggResultTypeTerm $singleAggResultTerm;
+ |if ($allAggRowTerm.getField(${aggregates.size}) != null) {
+ | $singleAggResultTerm = ($boxedSingleAggResultTypeTerm) $allAggRowTerm
| .getField(${aggregates.size});
- | $singleResultNullTerm = false;
+ | $singleAggNullTerm = false;
|} else {
- | $singleResultNullTerm = true;
- | $singleResultTerm = $defaultValue;
+ | $singleAggNullTerm = true;
+ | $singleAggResultTerm = $defaultValue;
|}
|""".stripMargin
} else {
j"""
- |boolean $singleResultNullTerm = false;
- |$primitiveSingleResultTypeTerm $singleResultTerm =
- | ($boxedSingleResultTypeTerm) $resultRowTerm.getField(${aggregates.size});
+ |boolean $singleAggNullTerm = false;
+ |$primitiveSingleAggResultTypeTerm $singleAggResultTerm =
+ | ($boxedSingleAggResultTypeTerm) $allAggRowTerm.getField(${aggregates.size});
|""".stripMargin
}
reusablePerRecordStatements += codeForSingleAgg
- GeneratedExpression(singleResultTerm, singleResultNullTerm, NO_CODE, singleResultType)
+ GeneratedExpression(singleAggResultTerm, singleAggNullTerm, NO_CODE, singleAggResultType)
}
- def generateAggFunction() : Unit = {
+ def generateAggFunction(): Unit = {
val matchAgg = extractAggregatesAndExpressions
val aggGenerator = new AggregationCodeGenerator(config, false, input, None)
@@ -677,20 +677,20 @@ class MatchCodeGenerator(
generateAggCalculation(aggFunc, transformFuncName, inputTransform)
}
- private case class LogicalMatchAggCall(
+ private case class LogicalSingleAggCall(
function: SqlAggFunction,
inputTypes: Seq[RelDataType],
exprIndices: Seq[Int]
)
- private case class MatchAggCall(
+ private case class SingleAggCall(
aggFunction: TableAggregateFunction[_, _],
inputIndices: Array[Int],
dataViews: Seq[DataViewSpec[_]]
)
private case class MatchAgg(
- aggregations: Seq[MatchAggCall],
+ aggregations: Seq[SingleAggCall],
inputExprs: Seq[RexNode]
)
@@ -708,10 +708,10 @@ class MatchCodeGenerator(
inputRows(innerCall.toString) = callWithIndex
callWithIndex
}
- }).toList
+ })
val agg = rexAggCall.getOperator.asInstanceOf[SqlAggFunction]
- LogicalMatchAggCall(agg,
+ LogicalSingleAggCall(agg,
callsWithIndices.map(_._1.getType),
callsWithIndices.map(_._2).toArray)
})
@@ -720,14 +720,14 @@ class MatchCodeGenerator(
case (agg, index) =>
val result = AggregateUtil.extractAggregateCallMetadata(
agg.function,
- isDistinct = false,
+ isDistinct = false, // TODO properly set once supported in Calcite
agg.inputTypes,
needRetraction = false,
config,
isStateBackedDataViews = false,
index)
- MatchAggCall(result.aggregateFunction, agg.exprIndices.toArray, result.accumulatorSpecs)
+ SingleAggCall(result.aggregateFunction, agg.exprIndices.toArray, result.accumulatorSpecs)
}
MatchAgg(aggs, inputRows.values.map(_._1).toSeq)
@@ -736,14 +736,14 @@ class MatchCodeGenerator(
private def generateAggCalculation(
aggFunc: GeneratedAggregationsFunction,
transformFuncName: String,
- inputTransform: String)
+ inputTransformFunc: String)
: Unit = {
val aggregatorTerm = s"aggregator_$variableUID"
val code =
j"""
|private final ${aggFunc.name} $aggregatorTerm;
|
- |$inputTransform
+ |$inputTransformFunc
|
|private $rowTypeTerm calculateAgg_$variableUID(java.util.List input)
| throws Exception {
@@ -769,17 +769,19 @@ class MatchCodeGenerator(
: String = {
isWithinAggExprState = true
val resultTerm = newName("result")
- val exprs = inputExprs.zipWithIndex.map(row => {
- val expr = generateExpression(row._1)
- s"""
- |${expr.code}
- |if (${expr.nullTerm}) {
- | $resultTerm.setField(${row._2}, null);
- |} else {
- | $resultTerm.setField(${row._2}, ${expr.resultTerm});
- |}
+ val exprs = inputExprs.zipWithIndex.map {
+ case (inputExpr, outputIndex) => {
+ val expr = generateExpression(inputExpr)
+ s"""
+ |${expr.code}
+ |if (${expr.nullTerm}) {
+ | $resultTerm.setField($outputIndex, null);
+ |} else {
+ | $resultTerm.setField($outputIndex, ${expr.resultTerm});
+ |}
""".stripMargin
- }).mkString("\n")
+ }
+ }.mkString("\n")
isWithinAggExprState = false
j"""