[FLINK-4113] [runtime] Always copy first value in ChainedAllReduceDriver#2156
[FLINK-4113] [runtime] Always copy first value in ChainedAllReduceDriver#2156greghogan wants to merge 2 commits intoapache:masterfrom
Conversation
| base = objectReuseEnabled ? record : serializer.copy(record); | ||
| base = serializer.copy(record); | ||
| } else { | ||
| base = objectReuseEnabled ? reducer.reduce(base, record) : serializer.copy(reducer.reduce(base, record)); |
There was a problem hiding this comment.
What if we return the record as the result of reducer.reduce (with objectReuseEnabled == true)? That would also be problematic, right?
There was a problem hiding this comment.
We fixed this in FLINK-3340 for non-chained reduce drivers (where the driver chooses the object to deserialize into) but for chained drivers we cannot prevent one UDF from overwriting an object from a previous UDF. If you look in OverwriteObjects.java you will see testReduce fail.
There was a problem hiding this comment.
Alright. I guess users then have to live with that if they enable object reuse.
|
Good fix @greghogan. We should definitely include it in the upcoming release. Would it be possible to write a test to guard the fix from future changes? |
f5f162b to
e9ee10e
Compare
|
@tillrohrmann just added a test for |
|
@greghogan the tests looks good. Thanks for your work :-) The failing test cases are unrelated. Will merge this PR. |
Guard test for ChainedAllReduceDriver This closes #2156.
No description provided.