Skip to content

Conversation

shaoxuan-wang
Copy link
Contributor

Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.

  • General

    • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
    • The pull request addresses only one issue
    • Each commit in the PR has a meaningful commit message (including the JIRA id)
  • Documentation

    • Documentation has been added for new functionality
    • Old documentation affected by the pull request has been updated
    • JavaDoc for public methods has been added
  • Tests & Build

    • Functionality added by the pull request is covered by tests
    • mvn clean verify has been executed successfully locally or a Travis build has passed

* @param accumulators the accumulators (saved in a row) which contains the current
* aggregated results
* @param output output results collected in a row
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to extend the GeneratedAggregations interface (except for resetAccumulators())
I would rather implement another code generation function that implements the existing methods differently. This would mean to add another method to CodeGenerator that generates the GeneratedAggregations interface suitable for the DataSet aggregations.

  • setAggregationResultsWithKeyOffset -> setAggregationResults
  • setKeyToOutput -> setForwardedFields
  • accumulateWithKeyOffset -> accumulate
  • createAccumulatorsAndSetToOutput could be replaced by createAccumulators (called once to create a reusable accumulators), resetAccumulators, and setAggregationResults (if it sets the accumulators instead of calling AggFunction.getValue(), see below)
  • copyAccumulatorsToBuffer -> setAggregationResults (the accumulators are partial aggregation results). This would mean we have two behaviors, setting the final (getValue()) or the partial result (accumulator) for setAggregateResults(). A simple flag during code gen would go for either the final or the partial result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could reuse all your code, but just put it into a different method of the CodeGenerator and make it implement the existing methods. Their interfaces are the same.

Copy link
Contributor Author

@shaoxuan-wang shaoxuan-wang Apr 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds a very good idea. I actually have thought to merge *WithKeyOffset functions into the existing functions. It works for most functions, but DataSetFinalAggFunction and DataSetAggFunction are tricky. For accumulate and setAggregateResults, they do not need keyOffset, but for merge, they need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm not sure if we really need to implement a different code generation function. I had a look at the code generation code and think that we could just add a few more parameters to the current code gen method. Right now, the behavior of most generated methods can be exactly defined:

  • createAccumulators(): generates a Row with the accumulators for each provided AggregationFunction. Some methods to GeneratedAggregations expect a Row of accumulators with exactly this layout as one of their input parameters. In the following, this parameter is called accs.
  • accumulate(accs, row): The aggFields parameter controls which fields of row are accumulated into which accumulator. We should rename this parameter to accFields though, IMO.
  • retract(accs, row): same as for accumulate. We should add a separate parameter retractFields: Array[Int] though.
  • setForwardedFields(input, output): The fwdMapping parameter controls which field of the input row is copied to which position of the output row. We could add an optional parameter to copy the groupSetMapping to the output as well.
  • setAggregationResults(accs, output): The aggMapping parameter controls to which output fields the aggregation results are copied. If we add another parameter partialResults: Boolean, we can control whether to copy final results (AggregateFunction.getValue()) or partial results (the accumulator).
  • createOutputRow(): the outputArity parameter specfies the arity of the output row.
  • mergeAccumulatorsPair(accs, other): This is the only inflexible method. We could change the behavior of the method as follows: The method expects as first parameter (accs) a Row with the same layout as generated by createAccumulators. The second parameter can be any row with accumulators at arbitrary positions. To enable the merging, we add a parameter mergeMapping: Array[Int] to the code generating function which defines which fields of the other parameter are merged with the fields in the accs Row. The method returns a Row with the default layout (as generated by createAccumulators()).
  • resetAccumulator(accs): resets a Row of accumulators of the known layout.

I haven't checked this thoroughly, but I think with these parameters, we can control the generated code sufficiently to support all aggregation operators for DataSet and DataStream, i.e., we can generate the currently existing functions such that they behave as the more specialized ones that you added. Since all code gen parameters (accFields, retractFields, fwdMapping, groupSetMapping, aggMapping, partialResults, outputArity, mergeMapping) can be independently set for each type of operator, this should give us the flexibility for all types for operators. We only need to parameterize the code generation method appropriately.

In addition, we could make all parameters Option and generate empty methods if the parameters for a function are not set. (This could also be a follow up issue, IMO)

What do you think @shaoxuan-wang ?

accumulators(i) = aggregates(i).createAccumulator()
i += 1
}
accumulators = function.createAccumulators()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could create the accumulators once and use function.resetAccumulators() to reset and reuse the object.

function.accumulate(accumulators, record)

// check if this record is the last record
if (!iterator.hasNext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we use function.setForwardFields() to forward the grouping keys to the output?

function.setAggregationResults(accumulators, output)

// set grouping set flags to output
if (intermediateGKeys.isDefined) {
Copy link
Contributor

@fhueske fhueske Apr 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should eventually be integrated with setForwardFields() as well.

Copy link
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @shaoxuan-wang.
I think we can simplify the code generation if we add a few more parameters to the code generating method in CodeGenerator and call it with the right parameters for each operator (details in a comment below).

I also found a few things to improve the functions which not directly related to the changes of this PR, but could be improved on the way as we are touching the code anyway.

Thanks, Fabian

| public void setAggregationResults(
| org.apache.flink.types.Row accs,
| org.apache.flink.types.Row output) {
| setAggregationResultsHelper(accs, output, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code generated methods should be as "flat" as possible. Calling other helper methods adds overhead compared to inlining the code.

|
| output.setField(
| ${aggMapping(i)},
| ${aggMapping(i)} + offset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

${aggMapping(i)} + offset -> ${aggMapping(i) + offset} to add the constant offset to the mapping before generating the code.

j"""
| ${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
| ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField($i);
| ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField($i + offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b.getField($i + offset) -> b.getField(${i + offset})

for (i <- accTypes.indices) yield
j"""
| accList$i = new java.util.ArrayList<${accTypes(i)}>(2);
| accList$i = new java.util.ArrayList<${accTypes(i)}>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not creating the ArrayList with initial capacity 2?

private val aggInFields: Array[Array[Int]],
private val aggOutMapping: Array[(Int, Int)],
private val genAggregations: GeneratedAggregationsFunction,
private val gkeyOutMapping: Array[(Int, Int)],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good if we could parameterize the method that generates the code such that we can do the grouping keys and grouping set copies with GeneratedAggregations.setForwardFields(). This should be possible as it is actually just setting constant boolean flags at certain positions in the output Row.

function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)

// check if this record is the last record
if (!iterator.hasNext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this behind the loop

function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)

// trigger tumbling evaluation
if (!iterator.hasNext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this behind the loop

function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)

// check if this record is the last record
if (!iterator.hasNext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this behind the loop

output.setField(groupingKeys.length + i, accumulator)
i += 1
}
function.createAccumulatorsAndSetToOutput(output)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create an accumulator with function.createAccumulator() once in open(), reset it here, and copy it to output with function.setAggregationResults()?

* @param accumulators the accumulators (saved in a row) which contains the current
* aggregated results
* @param output output results collected in a row
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm not sure if we really need to implement a different code generation function. I had a look at the code generation code and think that we could just add a few more parameters to the current code gen method. Right now, the behavior of most generated methods can be exactly defined:

  • createAccumulators(): generates a Row with the accumulators for each provided AggregationFunction. Some methods to GeneratedAggregations expect a Row of accumulators with exactly this layout as one of their input parameters. In the following, this parameter is called accs.
  • accumulate(accs, row): The aggFields parameter controls which fields of row are accumulated into which accumulator. We should rename this parameter to accFields though, IMO.
  • retract(accs, row): same as for accumulate. We should add a separate parameter retractFields: Array[Int] though.
  • setForwardedFields(input, output): The fwdMapping parameter controls which field of the input row is copied to which position of the output row. We could add an optional parameter to copy the groupSetMapping to the output as well.
  • setAggregationResults(accs, output): The aggMapping parameter controls to which output fields the aggregation results are copied. If we add another parameter partialResults: Boolean, we can control whether to copy final results (AggregateFunction.getValue()) or partial results (the accumulator).
  • createOutputRow(): the outputArity parameter specfies the arity of the output row.
  • mergeAccumulatorsPair(accs, other): This is the only inflexible method. We could change the behavior of the method as follows: The method expects as first parameter (accs) a Row with the same layout as generated by createAccumulators. The second parameter can be any row with accumulators at arbitrary positions. To enable the merging, we add a parameter mergeMapping: Array[Int] to the code generating function which defines which fields of the other parameter are merged with the fields in the accs Row. The method returns a Row with the default layout (as generated by createAccumulators()).
  • resetAccumulator(accs): resets a Row of accumulators of the known layout.

I haven't checked this thoroughly, but I think with these parameters, we can control the generated code sufficiently to support all aggregation operators for DataSet and DataStream, i.e., we can generate the currently existing functions such that they behave as the more specialized ones that you added. Since all code gen parameters (accFields, retractFields, fwdMapping, groupSetMapping, aggMapping, partialResults, outputArity, mergeMapping) can be independently set for each type of operator, this should give us the flexibility for all types for operators. We only need to parameterize the code generation method appropriately.

In addition, we could make all parameters Option and generate empty methods if the parameters for a function are not set. (This could also be a follow up issue, IMO)

What do you think @shaoxuan-wang ?

@shaoxuan-wang
Copy link
Contributor Author

@fhueske thanks for your feedback.
Yes, we could keep GeneratedAggregations interface very clean as

abstract class GeneratedAggregations extends Function {
 def setAggregationResults(accumulators: Row, output: Row)
 def setForwardedFields(input: Row, output: Row)
 def accumulate(accumulators: Row, input: Row)
 def retract(accumulators: Row, input: Row)
 def createAccumulators(): Row
 def mergeAccumulatorsPair(a: Row, b: Row): Row
 def resetAccumulator(accumulators: Row)
}

But I feel it might be not very good to add more parameters into code generate function as caller function will usually have to construct unnecessary empty parameters. I think we can break code generate functions into 2-3 functions (these are just the interface to process code-gen parameters, the fundamental implementation of each function will be shared). Let me prototype the changes, and we can continue the discussions from there.

Regarding to your other comments. I did not look into the logic of previous implementations while just focused on the code-gen. I will take a look and optimize them.

@fhueske
Copy link
Contributor

fhueske commented Apr 19, 2017

Hi @shaoxuan-wang, I'm fine with both approaches, single method with additional parameters or multiple methods. If you think the multiple methods approach is better, let's go for it.

Thanks, Fabian

@fhueske
Copy link
Contributor

fhueske commented Apr 20, 2017

Hi @shaoxuan-wang, thanks for the PR. The changes look good.
I opened a PR against your PR branch and refactored the `CodeGenerator.generateAggregations() method a bit. Among other things, I added code-gen for grouping sets.

Let me know what you think.

Best, Fabian

@shaoxuan-wang
Copy link
Contributor Author

@fhueske , your changes look good to me, I left a few comments.

fhueske pushed a commit to fhueske/flink that referenced this pull request Apr 21, 2017
@fhueske
Copy link
Contributor

fhueske commented Apr 21, 2017

Thanks @shaoxuan-wang!
Merging

@asfgit asfgit closed this in 3b4542b Apr 21, 2017
StefanRRichter pushed a commit to StefanRRichter/flink that referenced this pull request Apr 24, 2017
PangZhi pushed a commit to PangZhi/flink that referenced this pull request May 1, 2017
StefanRRichter pushed a commit to StefanRRichter/flink that referenced this pull request May 5, 2017
fanyon pushed a commit to fanyon/flink that referenced this pull request May 11, 2017
hequn8128 pushed a commit to hequn8128/flink that referenced this pull request Jun 22, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants