Skip to content

Conversation

@huawei-flink
Copy link

@huawei-flink huawei-flink commented Apr 26, 2017

Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.

  • General

    • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
    • The pull request addresses only one issue
    • Each commit in the PR has a meaningful commit message (including the JIRA id)
  • Documentation

    • Documentation has been added for new functionality
    • Old documentation affected by the pull request has been updated
    • JavaDoc for public methods has been added
  • Tests & Build

    • Functionality added by the pull request is covered by tests
    • mvn clean verify has been executed successfully locally or a Travis build has passed

@huawei-flink
Copy link
Author

@fhueske please have a look at this PR, it contains just the code generation part with optional distinct.

@zentol
Copy link
Contributor

zentol commented Apr 26, 2017

Please fix the PR title, you are referencing the wrong JIRA.

@huawei-flink huawei-flink changed the title [FLINK-6338] Add support for DISTINCT into Code Generated Aggregations [FLINK-6388] Add support for DISTINCT into Code Generated Aggregations Apr 27, 2017
@fhueske
Copy link
Contributor

fhueske commented Apr 27, 2017

Thanks for this PR @huawei-flink!

I think I made a mistake when I suggested to use the code-gen'd functions with registered MapState to compute distinct window aggregations. Originally, I thought it would be possible to register state (i.e., the MapState for the distinct values) in an AggregateFunction (which is used for the grouped window aggregates). However, that's unfortunately not possible as I learned today. All state of an AggregateFunction must be contained in the accumulator.

What does this mean? We cannot use the current approach of registering MapState in the code-gen'd function for group windowed aggregates. So we would need another approach for that.

However, we can still use your code for distinct over windows (ProcessFunction can obviously register state) once the API supports to define DISTINCT aggregates.

I'll try to have a closer look at this PR soon.

Best, Fabian

@rtudoran
Copy link
Contributor

@fhueske @stefanobortoli
I recently fixed in Calcite the problem of porting distinct flag to the the over. This was merged in the master. Hence it is a matter of when flink will get the new calcite version. We can also consider the temporary solution IMHO until then

@rtudoran
Copy link
Contributor

@fhueske @stefanobortoli
Regarding the options for solving the distinct. From my point of view the previous approach worked:

  • we keep in the processFunctions a state for each field that is used in a distinct aggregator
  • we count the occurrence of each value (meant for a distinct aggregate) that we observed
  • when a value is seen for the first time we accumulate it
  • when a value is retracted we decrease the corresponding count.
    -if count is 0 we retract the value from accumulator

Based on how things are implemented now - this would involved to have a separate list of aggregatefunctions for the distinct. In order to be able to control when to accumulate to these values.
What do you think? Do you see any disadvantage to this?

@huawei-flink
Copy link
Author

@rtudoran @fhueske the first implementation I made was with the state in the ProcessFunction without code generated aggregation function. Second, I pushed a branch with the state in the process function using the code generated process function. Then, third I moved the state within the code generated function.

It is not clear to me why the state cannot be within the code generated function. Could you please clarify so that we can understand whether it is worth working around it. This feature is quite important for us.

Anyway, you could have a look at the branch that uses the state in the process function and uses the code generated aggregation functions. Basically, rather than generate one code generated function for all the aggregations, I create one class for each, and then I call the corresponding accumulate/retract using the distinct logic when marked in the process function.

@fhueske
Copy link
Contributor

fhueske commented Apr 28, 2017

Hi @huawei-flink, I did not say that we need to move the state out of the code-gen'd function. We can and should leave the PR as it is.

However, we cannot use this code for distinct group window aggregates but only for distinct over aggregates once they are supported in the API.

@rtudoran
Copy link
Contributor

@fhueske thanks for the clarification. I think it is good also to have the solution for the over windows :)

I also wanted to ask you about the calcite and DISTINCT/DIST syntax. What do you think should be the right plan to proceed? Do we push it with DIST and sync with Calcite community of when they will have the next release and than create a pull request to upgrade the calcite version used?

@huawei-flink
Copy link
Author

@fhueske , this is the PR with the code generated distinct aggregation for OVER. You mentioned that the value of the aggregation should be a Row, but what is kept in the distinct state is just the event value, not its "aggregation value state". Perhaps you can try to explain it better to me so that I can complete this PR and we can move on. What do you think?

Copy link
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @huawei-flink, thanks for the PR and sorry for the long time it took me to review your code. The overall logic looks good, but I think we need to add a bit more to it:

  • We need support for user-defined aggregate functions with multiple parameters.
  • A deduplication of the distinct attributes would be good to reduce the size of the state.
  • The generated function should be tested with a unit tests. the HarnessTestBase provides the infrastructure for state related tests (RuntimeContext, access to the number of state objects).

What do you think?

Best, Fabian

val initDist: String = if( distinctAggsFlags.isDefined ) {
val statePackage = "org.apache.flink.api.common.state"
val distAggsFlags = distinctAggsFlags.get
for(i <- distAggsFlags.indices) yield
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a space after for, if, while, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to share distinct state across aggregations if they are on the same fields, i.e., make distinct state distinct ;-). I would do this in a preprocessing step in the generateAggregations method.

It would also change the way how we access the state, because we may increment / decrement a key only once per record (call of accumulate if state is shared.

val distAggsFlags = distinctAggsFlags.get
for(i <- distAggsFlags.indices) yield
if(distAggsFlags(i)) {
val typeString = javaTypes(aggFields(i)(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UDAGGs can have more than a single parameter.

Since the key can only be a single object, we have to put all arguments into a TupleX (I think the limitation of 25 fields is reasonable and we can throw an exception if we observe a DISTINCT UDAGG with more than 25 fields).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, why shouldn't we use directly rows? is there any specific reason to prefer tuple?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's actually a very good point! We have to use Row because Tuple doesn't support null values.

}.mkString("\n")
if (distinctAggsFlags.isDefined && distAggsFlags(i)){
j"""
| Long distValCount$i = (Long) distStateList[$i].get(${parameters(i)});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to put the parameters into a tuple before accessing the map state. The tuple should be reused.

}
}
if(existDistinct){
val initReusMember = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initReusMember -> initReuseMember

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code for the distinct deduplication and tuple reusage for multi-argument aggregate function could go here.

def initialize(ctx: RuntimeContext)

/**
* Sets the results of the aggregations (partial or final) to the output row.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you like to change this comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was some formatting issue

@stefanobortoli
Copy link
Contributor

@fhueske thanks for the comments. Did we include the latest calcite already?

@fhueske
Copy link
Contributor

fhueske commented Jul 3, 2017

Not yet, but AFAIK is @twalthr working on that.

@fhueske
Copy link
Contributor

fhueske commented Apr 26, 2018

Hi @huawei-flink ,

I'll close this PR later today when merging #5555.
#5555 adds runtime support for distinct aggregation and follows a similar approach as this PR. However, it leverages the MapView feature and is therefore a bit more generic.
Thanks for working on this. This PR led us in the right direction.

Best, Fabian

@asfgit asfgit closed this in 6aef014 Apr 26, 2018
sampathBhat pushed a commit to sampathBhat/flink that referenced this pull request Jul 26, 2018
- distinct values are stored in a MapView, either on the heap or in a StateBackend depending on the context.

This closes apache#5555.
This closes apache#3783 (PR was superseded by apache#5555)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants