Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5653] Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL #3547

Closed
wants to merge 36 commits into from

Conversation

huawei-flink
Copy link

Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.

  • [ x] General

    • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
    • The pull request addresses only one issue
    • Each commit in the PR has a meaningful commit message (including the JIRA id)
  • [ x ] Documentation

    • Documentation has been added for new functionality
    • Old documentation affected by the pull request has been updated
    • JavaDoc for public methods has been added
  • [ x] Tests & Build

    • Functionality added by the pull request is covered by tests
    • mvn clean verify has been executed successfully locally or a Travis build has passed

FLINK-5653b

# Conflicts:
#	flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
#	flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@fhueske
Copy link
Contributor

fhueske commented Mar 15, 2017

Thanks for the new PR @huawei-flink. I'll have a look soon.

Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI,@huawei-flink Thanks for the PR. I am very interested in this PR,And I only made a fast pass over the changes. Then I leave some comments. And there are some suggestions:

  1. I recommend using processFunctions implementation this JIRA.
  2. In the processFunction using state management data and incremental calculation result;
  3. Using AggregateFunction # retract method to process the incremental calculation result and calculate the current window result.

The above suggestions are for reference only, What do you think? @huawei-flink @fhueske
Thanks,
SunJIncheng

@@ -37,8 +35,8 @@ class LogicalWindowAggregate(
child: RelNode,
indicator: Boolean,
groupSet: ImmutableBitSet,
groupSets: util.List[ImmutableBitSet],
aggCalls: util.List[AggregateCall])
groupSets: java.util.List[ImmutableBitSet],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep the original way? util.List[ImmutableBitSet]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some reason it does not build in eclipse in the original way, and honestly I struggle to understand "the half package name" usage in Scala. Is there any practical reason for that? and does it break some convention to use the complete package name? I am asking with honest curiosity, and no polemical intention. Thanks for clarifying.

@@ -18,8 +18,6 @@

package org.apache.flink.table.plan.logical.rel

import java.util
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think import package name is good way. How do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle I agree, but I it caused some building problem. Is there a practical reason for not importing the List class directly rather than creating "half references"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your current package is org.apache.flink.table.plan.logical.rel, in this package there is a org.apache.flink.table.plan.logical.rel.util, this and util .List package conflicts. So you can try the following 2 way solution:
solution1:

import java.util.List
groupSets: List[ImmutableBitSet],
aggCalls: List[AggregateCall])

solution2:

import java.{util => JUtil}
groupSets: JUtil.List[ImmutableBitSet],
aggCalls: JUtil.List[AggregateCall])

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, both solutions look better than the half package import. I will apply one of those.

extends SingleRel(cluster, traitSet, inputNode)
with OverAggregate
with DataStreamRel {
logicWindow: Window,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent 4 space.

inputNode: RelNode,
rowRelDataType: RelDataType,
inputType: RelDataType)
extends SingleRel(cluster, traitSet, inputNode)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent 2 space.

.asInstanceOf[DataStream[Row]]
} // global non-partitioned aggregation
else {
throw TableException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you supported not-partitioned case in this JIRA.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If needed, I can do it.

import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT
import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER
import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT
import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove those unused import ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after an inspection, I realized that the imports you mentioned are used. I think there is no unused import at this moment. Am I missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I had clone your code.May be in this case,we can use ._ ,
e.g.: import org.apache.flink.table.functions.aggfunctions._ In this way, we can reduce 53 line to 1 line. What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, although I thought that wildcard import was not a best practice. It seems that the java and scala implementation are following different conventions. I have no problems with it in principle.

import org.apache.flink.table.functions.Accumulator
import java.lang.Iterable

class BoundedProcessingOverWindowFunction[W <: Window](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest use ProcessFunction to implement this feature. So that you can manage data flexibly and can use incremental calculations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunjincheng121 thanks for the suggestion. I decided to use Window because it is convenient in the row bounded case. Within the window I apply the incremental aggregation in the same way.

It is not clear to me what are the flexibility advantages in this specific case. Can you be more explicit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I glad to explain this. IMO. There are several advantages to using ProcessFunction:

  1. OverWindow implementation code consistent, such as: FLINK-5803, FLINK-5804, FLINK-5658 (See @fhueske propose the design in [FLINK-5658][table] support unbounded eventtime over window #3386 )
  2. Because using ProcessFunction can manage their own state, the management window of the data collection, you can customize the trigger window, so in the future when the FOLLOWING supported we can be very convenient support.
  3. For incremental aggregation, window mechanism must increase the implementation of reduceFunction, ProcessFunction in the management of the state at the same time will be very natural support.
    What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the clarification. I am really willing to do it right, but at the same time I need to understand. So, please be patient. :-)

When we started discussing the issue with @fhueske (https://issues.apache.org/jira/browse/FLINK-5654?filter=-1) there was a decision to use window, not process function.
Code consistency is pretty much the same, just extening a different interface. I understand that ProcessFunction can manage its state, but window checkpointing should replay all events in case of failure, so we would have consistent processing even without managing this level of granularity in the state. With procTime semantic, we can neglect retraction, and window can anyway customize triggering function.

I don't understand the third point.

The main argument I see for this specific case is that ProcessFunction supports granular state management. Besides the alleged code consistency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,@huawei-flink about the discussing I think we need @fhueske 's opinion.
Best,
SunJincheng

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought about a case where one wants to do a COUNT DISTINCT type of aggregation. How does a processFunction work for that?

Copy link
Contributor

@fhueske fhueske Mar 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @huawei-flink pointed out, I agreed to use a count window for the bounded OVER ROW case in our previous discussions. Although it would not be consistent with the other OVER windows, I thought this would be an easier solution.

However, the way it is implemented right now with a WindowFunction does not perform eager aggregation but collects all records and applies the WindowFunction at the end. If we want to use count windows, we should implement an org.apache.flink.api.common.functions.AggregateFunction (not to be confused with the Table API interface for aggregation functions) for preaggregation and a WindowFunction for the final assembly of the result row. You can have a look at the AggregateAggFunction and the DataStreamAggregate classes for how an AggreagteFunction is defined and used.

Regarding the aggregation of COUNT DISTINCT, we could implement a custom aggregation function, similar to the retractable min or max aggregation function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @huawei-flink @fhueske in bounded OVER ROW case, I still recommend using processFunction. But we can just agree to disagree at current time. Because in the continuous progress of flip11, we may change our view.
Best,
SunJIncheng

@sunjincheng121
Copy link
Member

Hi @fhueske I'am sorry, I have not seen your comments until I passed over this PR. If I have any mistakes, please correct it.
Best,
SunJIncheng

@fhueske
Copy link
Contributor

fhueske commented Mar 15, 2017

No worries @sunjincheng121!
Everybody is welcome and encouraged to review pull requests.
Thanks for doing this!

import org.junit.Ignore;
import org.junit.Test;

public class ProcTimeRowStreamAggregationSqlITCase extends StreamingMultipleProgramsTestBase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please implement the test in Scala.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should also the test be implemented in scala?

expected.add("5,21");
expected.add("5,23");
expected.add("5,26");
expected.add("5,27");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This result does not conform to the semantics of over. that is, in this sql,we must be got the result as follow:

1,0
2,1
2,3
3,3
3,7
3,12
4,6
4,13
4,21
4,24
5,10
5,21
5,33
5,37
5,39

The reason for the error I left the comments in DataStreamOverAggregate.scala

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides the fix you suggested, which is trivial (and thanks! :-)), it sounds a little odd to define a range with excluding boundaries. could you please point out the document in which this is discussed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense anyway, I fixed according to your suggestion.

inputType)

inputDS
.countWindowAll(lowerbound,1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lowerbound -> (lowerbound+1)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the semantic of between 2 rows and current row does not include the current row and I should count 3 elements?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the MS SQL Server documentation:

For example, ROWS BETWEEN 2 PRECEDING AND CURRENT ROW means that the window of rows that the function operates on is three rows in size, starting with 2 rows preceding until and including the current row.

So, since lowerBound is already AggregateUtil.getLowerBoundary(...) + 1, we should be good.

inputType)
inputDS
.keyBy(partitionKeys: _*)
.countWindow(lowerbound,1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lowerbound -> (lowerbound+1)

import org.apache.flink.table.functions.Accumulator
import java.lang.Iterable

class BoundedProcessingOverWindowFunction[W <: Window](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I glad to explain this. IMO. There are several advantages to using ProcessFunction:

  1. OverWindow implementation code consistent, such as: FLINK-5803, FLINK-5804, FLINK-5658 (See @fhueske propose the design in [FLINK-5658][table] support unbounded eventtime over window #3386 )
  2. Because using ProcessFunction can manage their own state, the management window of the data collection, you can customize the trigger window, so in the future when the FOLLOWING supported we can be very convenient support.
  3. For incremental aggregation, window mechanism must increase the implementation of reduceFunction, ProcessFunction in the management of the state at the same time will be very natural support.
    What do you think?

expected.add("5,21,10");
expected.add("5,23,11");
expected.add("5,26,12");
expected.add("5,27,13");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This result incorrect.The reason for the error I left the comments in DataStreamOverAggregate.scala

expected.add("5,10,3");
expected.add("5,11,3");
expected.add("5,13,2");
expected.add("5,13,2");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This result incorrect.The reason for the error I left the comments in DataStreamOverAggregate.scala

expected.add("5,10,GHI");
expected.add("5,10,HIJ");
expected.add("5,11,IJK");
expected.add("5,13,JKL");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This result incorrect.The reason for the error I left the comments in DataStreamOverAggregate.scala

expected.add("21");
expected.add("23");
expected.add("26");
expected.add("27");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This result incorrect.The reason for the error I left the comments in DataStreamOverAggregate.scala

expected.add("5,10");
expected.add("5,10");
expected.add("5,11");
expected.add("5,12");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This result incorrect.The reason for the error I left the comments in DataStreamOverAggregate.scala

Copy link
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @huawei-flink!
I made a few comments.

Thanks, Fabian

@@ -17,34 +17,41 @@
*/
package org.apache.flink.table.plan.nodes.datastream

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR has several changes that reformat the code but do not change the logic.
In general, we try to limit PRs to the scope of the issue and avoid reformatting.

  1. It makes the PR harder to review because every reformatting change needs to be checked (was something changed or not). This is especially tiresome when indention is changed.
  2. It makes it more difficult to track changes in the history.
  3. Other users do not necessarily agree with your (or your IDE's) formatting and will format it back. So it might get back and forth.

There is nothing wrong with adding a space between arguments here and there, but please keep reformatting changes to a minimum.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fhueske sorry about that. Will be more careful in the next one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries. I know that IDEs tend to reformat code but it really makes reviews harder.
Thanks!

@@ -17,35 +17,62 @@
*/
package org.apache.flink.table.runtime.aggregate

import java.util
import scala.collection.JavaConversions.asScalaBuffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep the imports as they are?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need that import, without it does not build.

@@ -89,6 +116,58 @@ object AggregateUtil {
aggregationStateType)
}
}

private[flink] def CreateBoundedProcessingOverWindowFunction(
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please align the arguments as the other functions do.


}

private[flink] def CreateBoundedProcessingOverGlobalWindowFunction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent -2

inputType)
inputDS
.keyBy(partitionKeys: _*)
.countWindow(lowerbound,1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.countWindow(lowerbound, 1) +space

def getLowerBoundary(
constants: ImmutableList[RexLiteral],
lowerBound: RexWindowBound,
input: RelNode):Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

input: ReoNode): Int = { +space

inputType,
needRetraction = false)

val aggregationStateType: RowTypeInfo =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used

var i = 0
// setting the accumulators for each aggregation
while (i < aggregates.length) {
accumulators.setField(i, aggregates(i).createAccumulator())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accumulators can be reset with aggregate.resetAccumulator(acc). So we can initialize once and reuse them

import scala.collection.mutable
import org.apache.flink.types.Row

class ProcTimeRowBoundedAggregationTest extends StreamingWithStateTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is an integration tests because it starts a Flink minicluster. Hence its name should end with ITCase and not Test which would indicate a lightweight unit test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each ITCase class adds significantly to the build time of the project because it starts a mini cluster which takes some time. Therefore, we try to keep the number of ITCase classes low and aim to test as much as possible with faster unit tests.

Please add unit test methods that test the translation of queries to org.apache.flink.table.api.scala.stream.sql.WindowAggregateTest.

Please add the following integration tests to org.apache.flink.table.api.scala.stream.sql.SqlITCase

  • multiple aggregation functions with partitioning
  • multiple aggregation functions without partitioning

These tests classes contain already a few test methods that you can use as a starting point.

Thank you

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fhueske Should I add all the test unit under the SqlITCase class?

@@ -106,9 +113,14 @@ class DataStreamOverAggregate(
if (overWindow.lowerBound.isUnbounded &&
overWindow.upperBound.isCurrentRow) {
createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
} // lowerbound is a BasicType and upperbound is PRECEEDING or CURRENT ROW
else if (overWindow.lowerBound.getOffset.getType.isInstanceOf[BasicSqlType]
Copy link
Contributor

@fhueske fhueske Mar 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the check overWindow.lowerBound.getOffset.getType.isInstanceOf[BasicSqlType] for and why do you check if overWindow.upperBound.isPreceding?

Should the condition rather be like this:

else if (overWindow.lowerBound.isPreceding() && !overWindow.lowerBound.isUnbounded() && // bounded preceding
  overWindow.upperBound.isCurrentRow() && // until current row
  overWindow.isRows) // is rows window

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that is also a way to do it. The check allows to distinguish between time bounded and row bounded. I have no particular affection for my solution, it just worked. I will apply and test yours as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I realized that when looking at PR #3550 that isInstanceOf[BasicSqlType] and .isInstanceOf[IntervalSqlType] distinguishes ROW from RANGE windows. I think using .isRows() is more clear and might also be safer because it appears to be a more public API than the type of the offset.

@fhueske
Copy link
Contributor

fhueske commented Mar 16, 2017

One more thing. The PR includes a merge commit which makes squashing the commits and merging the PR a lot more difficult (see also the contribution guidelines).

You can avoid merge commits if you branch from the master and just put commits on top. If you want to include somebody else's work, you can use git cherry-pick to pull over a commit. Use git rebase to rebase your commits on the latest master.

Best, Fabian

stefanobortoli and others added 13 commits March 20, 2017 15:25
Modifying Dockerfile to build from local flink-dist as well as release URLs.
Logging to stdout.
Adding scripts to deploy seamlessly on Docker Swarm.
Updating Docker Compose scripts to work correctly.
Parameterizing things so these Docker scripts are more generally useful.
docker-entrypoint.sh should error on invalid args

Improve docker build.sh cleanup

Dockerfile improvements per review

Remove unnecessary Dockerfile build steps

Now that docker-entrypoint.sh uses 'start-foreground', munging
flink-daemon.sh and overwriting the log4j config are not longer
necessary.

Improve Dockerfile and docker-entrypoint.sh

Clean up Dockerfile and improve docker-entrypoint.sh to support '--help'
and pass through non-(jobmanager|taskmanager) commands.

This closes apache#3205.
This closes apache#3494.
The hostname used for the BlobServer was set to the akka address which is
invalid for this use. Instead, this adds the hostname to the RpcGateway /
AkkaInvocationHandler so that this information is available to the TaskExecutor.

This closes apache#3551.
The CEP operator now cleans the registered state for a
key. This happens:
1) for the priority queue, when the queue is empty.
2) for the NFA, when its shared buffer is empty.
3) finally the key is removed from the watermark
   callback service if both the above are empty.
jinmingjian and others added 18 commits March 20, 2017 15:25
This is likely only used by Gelly and we have a more featureful
implementation allowing for multiple outputs and setting the job name.
Deprecation will allow this to be removed in Flink 2.0.

This closes apache#3516
- added missing serialVersionUID
- PythonPlan* classes no longer implement serializable
- marked several fields as transient
- replaced some fields by local variables
- input/output files are now being deleted
- remvoed some unnecessary casts
- renamved several ignored exceptions
- close socket in PythonPlanStreamer
- simplified PyProcess health check
- eliminated several uses of raw types
- removed some unthrown exception declarations
…r pushing it (1)

fix filterable test

rebase and trying fix rexnode parsing

create wrapper and update rules
…r pushing it (2)

This closes apache#3520.

fix compilation failure

fix compilation failure again.

1. Deep copy TableSource when we copy TableSourceScan
2. unify push project into scan rule for both batch and stream

address comments.

expand project list before creating new RexProgram

update tests
aggregation and relative window function.
@huawei-flink
Copy link
Author

sorry about the mess. I don't understand the mess the rebase does with eclipse... I will close this PR and open another one including all the changes and comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet