Skip to content

Commit

Permalink
[FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled
Browse files Browse the repository at this point in the history
The initial fix for this ticket is not working on larger data sets.

Reduce supports returning the left input, right input, a new object, or
a locally reused object. The trouble with the initial fix was that the
returned local object was reusing fields from the input tuples.

The problem is with ReduceDriver#run managing two values (reuse1 and
reuse2) and with a third, local value returned by
GatherSumApplyIteration.SumUDF. After the first grouping value.f1 ==
reuse1.f1. Following UDF calls may swap value.f1 and reuse2.f1, which
causes reuse1.f1 == reuse2.f1. With an odd number of swaps the next
grouping will reduce with reuse1 and reuse2 sharing a field and
deserialization will overwrite stored values.

The simple fix is to only use and return the provided inputs.

This closes #3515
  • Loading branch information
greghogan committed Mar 24, 2017
1 parent 4b19e27 commit 524b20f
Showing 1 changed file with 3 additions and 2 deletions.
Expand Up @@ -330,7 +330,6 @@ private SumUdf(SumFunction<VV, EV, M> sumFunction, TypeInformation<Tuple2<K, M>>

@Override
public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception {
K key = arg0.f0;
M result = this.sumFunction.sum(arg0.f1, arg1.f1);

// if the user returns value from the right argument then swap as
Expand All @@ -339,9 +338,11 @@ public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exceptio
M tmp = arg1.f1;
arg1.f1 = arg0.f1;
arg0.f1 = tmp;
} else {
arg0.f1 = result;
}

return new Tuple2<>(key, result);
return arg0;
}

@Override
Expand Down

0 comments on commit 524b20f

Please sign in to comment.