Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7371] [table] Add support for constant parameters in OVER aggregate #4736

Closed
wants to merge 3 commits into from

Conversation

twalthr
Copy link
Contributor

@twalthr twalthr commented Sep 27, 2017

What is the purpose of the change

This PR allows to pass constants to OVER window aggregates. E.g. .select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg).

Brief change log

Until now the constants where simply ignored. I added code generation for the literals in AggregationCodeGenerator.

Verifying this change

I add a ITCase for it. I might add more tests if I have time. In general, we need to rework the logic there a little bit, because I think we also do not support DATE, TIME etc. right now.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Copy link
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @twalthr.
I made a few suggestions.

Best, Fabian


val field =
s"""
|transient $boxedType $fieldTerm;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why transient? Couldn't this be final?

.window(
Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
.select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)
.select('c, 'wAvg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be removed

@@ -86,6 +86,13 @@ public Long getValue(WeightedAvgAccum accumulator) {
}

// overloaded accumulate method
// dummy to test constants
public void accumulate(WeightedAvgAccum accumulator, long iValue, int iWeight, int x, String string) {
accumulator.sum += iWeight + Integer.parseInt(string);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the method to

accumulator.sum += (iValue + Integer.parseInt(string)) * iWeight;
accumulator.count += iWeight;

to have some influence of the value of string in the result?

@twalthr
Copy link
Contributor Author

twalthr commented Oct 16, 2017

Thanks @fhueske. I addressed you feedback. Will merge this now...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants