Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLINK-3179 Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned (Ram) #1553

Closed
wants to merge 4 commits into from

Conversation

ramkrish86
Copy link
Contributor

Followed the guidance given in the description in order to fix this. Is the approach correct here? Also using this to learn the code.
Once we see that a partition node is the input of a reduce node or group reduce node - we try to inject the combiner to the source node (the data source node) and the reducer node will take the actual partition node as the input.
So now the structure would be DataSource->Combine->Partition->Reduce.
Suggestions and feedback welcome as am not sure if I have covered all the cases here.

@ramkrish86
Copy link
Contributor Author

Also ensured that the related test cases passes and also the Wordcount program output with and without partition remains the same.

@fhueske
Copy link
Contributor

fhueske commented Jan 28, 2016

Thanks for the PR!
I'll have a look at it and give feedback hopefully today or tomorrow.

@@ -102,36 +107,72 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
} else {
// non forward case. all local properties are killed anyways, so we can safely plug in a combiner
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The else branch will not be entered if the GroupReduce's predecessor is a Partition operator.
You need to add an if else branch to the condition.

@fhueske
Copy link
Contributor

fhueske commented Jan 29, 2016

You identified the right classes and methods for the fix, but the place within the method is not correct. Let me explain the issue.

In the common case as for example in a WordCount program, the operator order looks like this:

[Map] --hash-partition--> [Reduce]

in this case, a combiner will be append to the Map to reduce the data before it is partitioned over the network. This looks like:

[Map] --local-fwd--> [Combine] --hash-partition--> [Reduce]

In some cases, Flink knows that the data is already appropriately partitioned (e.g. after a join):

[Join] --local-fwd--> [Reduce]

in this case, the data is already local and no combiner needs to injected. The check is based on the shipping strategy of the input channel (this is the if case in instantiate()).

In case of an explicit partition operator, the operators look as follows:

[Map] --partition--> [Partition] --local-fwd--> [Reduce]

hence, the code enters the if case, because the input shipping strategy is FORWARD.
Instead we would like to inject a combiner between Map and Partition as follows:

[Map] --local-fwd--> [Combine] --partition--> [Partition] --local-fwd--> [Reduce]

Hence, we should adapt the condition to inject a combiner if the input strategy of Reduce is FORWARD and the input operator is a PartitionNode.

We should add appropriate tests for this feature. I suggest:

  • a unit test case in GroupReduceCompilationTest
  • a unit test case in ReduceCompilationTest
  • an end-to-end integration test in javaApiOperators.GroupReduceITCase
  • an end-to-end integration test in javaApiOperators.ReduceITCase

@ramkrish86
Copy link
Contributor Author

Thank you very much for the feedback. Let me try to understand this thing better and update the PR sooner. I will reach out here in case of any questions or doubts that I have. Thanks a lot.

@ramkrish86
Copy link
Contributor Author

I went through the code. In both cases of WordCount program with and without explicit partition
[Map] --hash-partition--> [Reduce]
[Map] --partition--> [Partition] --local-fwd--> [Reduce]
I see that it goes to the 'else' part of the GroupReduceWithCombineProperties#instantiate() method. I debugged once again for both cases.

hence, the code enters the if case, because the input shipping strategy is FORWARD.
So with an explicit partitioner I don't see that happening. The input shipping strategy of the PARTITION node seems to have PARTITION_HASH as the strategy.
So what am I missing here?

@fhueske
Copy link
Contributor

fhueske commented Feb 1, 2016

The GroupReduceWithCombineProperties.instanciate() method checks the shipping strategy of the input channel. In case of the WordCount example without explicit hash combiner, the shipping strategy is PARTITION_HASH and the else branch will inject a combiner. If you add an explicit partition operator, the input shipping strategy of the Reduce operator is FORWARD and the if branch is executed and does not add a combiner.

Hence the logic has to into the if branch and not into the else branch. Or even better add an additional condition to the if case !(in.getSource().getOptimizerNode() instanceof PartitionNode) and add an if else branch to handle the special case of the explicit partition operator.

@tillrohrmann
Copy link
Contributor

Might be a stupid question, but what if the partitioner depends on the number of elements. E.g. if you use partitionCustom with Partitioner which counts internally the elements and assigns the output channel number with respect to this count. In such a case, a combiner would change the result.

@fhueske
Copy link
Contributor

fhueske commented Feb 11, 2016

If a Partitioner is implemented such that is does not partition based on the key attribute, it cannot be used for a Reduce or GroupReduce transformation anyways. Also users should expect that a combiner is applied if a ReduceFunction or a GroupReduceFunction that implements a combine interface is used.

@ramkrish86
Copy link
Contributor Author

@fhueske
I got the problem that I was making. My bad. I was not applying the partition function on the Key ie. the String part returned from the flat map and hence the flow was going to the 'else' case always. Now that I got what was the issue I was having I think I can update this PR shortly. Thanks for the patient review.

@fhueske
Copy link
Contributor

fhueske commented Feb 12, 2016

@ramkrish86, no worries :-)
I guess the issue description lacked a bit of detail. Flink's optimizer checks, if the partitioning produced by the explicit partitioning operator (hash, range, custom) can be reused for the Reduce. If not, the data is partitioned again and this time the combiner can be applied, since it is the regular.

Thanks for working on this..

@ramkrish86
Copy link
Contributor Author

A new push request has been submitted. JYFI @fhueske .

@@ -66,7 +66,7 @@ public static void main(String[] args) throws Exception {

DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
(text.flatMap(new Tokenizer()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary change

@fhueske
Copy link
Contributor

fhueske commented Feb 17, 2016

Hi @ramkrish86, thanks for the update.
In addition to my comments inline we also need to extend the ReduceITCase.

Also we must take care of the case where the result of the partition operator goes into more than one function. Consider the following case:

                                     /--fwd--> [Reduce]
[Input] --shuffle--> [Partitioner] -<
                                     \--fwd--> [Map]

which should be translated to:

           /--fwd--> [Combine] --shuffle--> [Partitioner] --fwd--> [Reduce]
[Input] --<
           \--shuffle--> [Partitioner] --fwd--> [Map]

Both translation tests need to be extended to cover this case.

Thanks, Fabian

@ramkrish86
Copy link
Contributor Author

@fhueske
Could you give some examples for the above use case where the partition is an input to more than one function?

@fhueske
Copy link
Contributor

fhueske commented Feb 29, 2016

I do not have a concrete use case in mind, but it is certainly possible to implement such a job in the DataSet API. Hence, it should be correctly translated.
You can do this for example like this:

DataSet<Tuple2<String, Long>> data = ...
DataSet<Tuple2<String, Long>> pData = data.partitionByHash(0);
pData.map(new SomeMapFunc())
     .output(new DiscardingOutputFormat<Tuple2<String, Integer>>());
pData.groupReduce(new SomeCombinableGReduceFunc())
     .output(new DiscardingOutputFormat<Tuple2<String, Integer>>());

@ramkrish86
Copy link
Contributor Author

@fhueske
So for doing the above example where the partioned input goes both to the map and reducer as input
should this class AllGroupWithPartialPreGroupProperties be changed like GroupReduceCombineProperties? I can find both having similar code and the code flow too goes there only?

@fhueske
Copy link
Contributor

fhueske commented Mar 3, 2016

Sorry, I forgot a groupBy() in my example.
It should be

DataSet<Tuple2<String, Long>> data = ...
DataSet<Tuple2<String, Long>> pData = data.partitionByHash(0);
pData.map(new SomeMapFunc())
     .output(new DiscardingOutputFormat<Tuple2<String, Integer>>());
pData.groupBy(0)
     .groupReduce(new SomeCombinableGReduceFunc())
     .output(new DiscardingOutputFormat<Tuple2<String, Integer>>());

@ramkrish86
Copy link
Contributor Author

New PR submitted @fhueske . Thanks for helping me thro this code review. It is was more of a beginner and there is a lot to learn from my side.

return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")",
toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
}
}

private SingleInputPlanNode injectCombinerBeforPartitioner(Channel in, SingleInputNode node) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in method name injectCombinerBeforePartitioner

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use meaningful parameter name: in -> toReducer, node -> reduceNode

@fhueske
Copy link
Contributor

fhueske commented Mar 7, 2016

Hi @ramkrish86,
I just realized that the approach taken here might not work. We are modifying the plan while it is enumerated. There might be cases, where this leads to compiler errors or wrong plans. I have to check which side effects the plan modification might have.

I would suggest we put this PR for a few days on ice and I check whether it is possible to continue or if we have to find another approach.

@ramkrish86
Copy link
Contributor Author

@fhueske
I was looking into the comments and the refactoring I can avoid by creating a new patch altogether. But seeing the last comment I think I can hold this off for some time. Felt sad as I wanted to bring this to closure. Anyway you are the expert so I need to adhere to your words here. No problem. Will check if I can take up some other JIRA or do you have any suggestions till then?

@fhueske
Copy link
Contributor

fhueske commented Mar 8, 2016

Hi @ramkrish86, I totally understand that you are disappointed. I'm very sorry to raise these concerns this late after you put a lot of effort into this PR. I should have noticed this issue much earlier :-(

Touching the optimizer is always a little bit like open heart surgery and must be done very carefully with the whole picture in mind. I have not completely investigated the possible side effects yet, but will definitely let you know once I have.

Would you like to work on a different issue in the meantime?

@ramkrish86
Copy link
Contributor Author

@fhueske
Everthing is a learning. good that I got to know some flows out of this issue. Ya am interested to take up some other JIRA in the meantime.

@fhueske
Copy link
Contributor

fhueske commented Mar 18, 2016

Hi @ramkrish86, I thought about this PR and came to the conclusion that we should not continue. The optimizer's design does not allow to modify operators in or inject operators into enumerated subplans. This might cause invalid execution plans and in worst case wrong results without somebody noticing it.

I would simply log a WARN message that a combiner was not added if the optimizer identifies a Partition operator in front of a Reduce or combinable GroupReduce operator and give a hint that an explicit CombinerFunction can be added with groupCombine in front of the partition operator.

Sorry again @ramkrish86 that I lead you into a dead end with this PR.

fhueske pushed a commit to fhueske/flink that referenced this pull request Mar 22, 2016
…ot added in front of PartitionOperator

This closes apache#1822
This closes apache#1553
@asfgit asfgit closed this in 76da442 Mar 22, 2016
@fhueske
Copy link
Contributor

fhueske commented Mar 22, 2016

Closed this PR in favor of PR #1822

fijolekProjects pushed a commit to fijolekProjects/flink that referenced this pull request May 1, 2016
…ot added in front of PartitionOperator

This closes apache#1822
This closes apache#1553
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants