Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled #3515

Conversation

greghogan
Copy link
Contributor

The initial fix for this ticket is not working on larger data sets.

Reduce supports returning the left input, right input, a new object, or a locally reused object. The trouble with the initial fix was that the returned local object was reusing fields from the input tuples.

The problem is with ReduceDriver#run managing two values (reuse1 and reuse2) and with a third, local value returned by GatherSumApplyIteration.SumUDF. After the first grouping value.f1 == reuse1.f1. Following UDF calls may swap value.f1 and reuse2.f1, which causes reuse1.f1 == reuse2.f1. With an odd number of swaps the next grouping will reduce with reuse1 and reuse2 sharing a field and deserialization will overwrite stored values.

The simple fix is to only use and return the provided inputs.

The initial fix for this ticket is not working on larger data sets.

Reduce supports returning the left input, right input, a new object, or
a locally reused object. The trouble with the initial fix was that the
returned local object was reusing fields from the input tuples.

The problem is with ReduceDriver#run managing two values (reuse1 and
reuse2) and with a third, local value returned by
GatherSumApplyIteration.SumUDF. After the first grouping value.f1 ==
reuse1.f1. Following UDF calls may swap value.f1 and reuse2.f1, which
causes reuse1.f1 == reuse2.f1. With an odd number of swaps the next
grouping will reduce with reuse1 and reuse2 sharing a field and
deserialization will overwrite stored values.

The simple fix is to only use and return the provided inputs.
@greghogan
Copy link
Contributor Author

@StephanEwen deja vu FLINK-2883 / FLINK-3340.

I'm also looking to run the FLINK-4949 IT tests with object reuse both enabled and disabled which would have highlighted this issue as the unit test input was too small.

@StephanEwen
Copy link
Contributor

I think having the IT test makes sense here. You can try to artificially reduce the memory to trigger spilling situations earlier.

@greghogan
Copy link
Contributor Author

Not an issue of spilling memory but at least three elements are required to trigger two reduces and the error condition depends on which value is returned.

@StephanEwen is this a +1?

@StephanEwen
Copy link
Contributor

It is a +1 :-)

@asfgit asfgit closed this in 524b20f Mar 24, 2017
kateri1 pushed a commit to kateri1/flink that referenced this pull request Mar 26, 2017
The initial fix for this ticket is not working on larger data sets.

Reduce supports returning the left input, right input, a new object, or
a locally reused object. The trouble with the initial fix was that the
returned local object was reusing fields from the input tuples.

The problem is with ReduceDriver#run managing two values (reuse1 and
reuse2) and with a third, local value returned by
GatherSumApplyIteration.SumUDF. After the first grouping value.f1 ==
reuse1.f1. Following UDF calls may swap value.f1 and reuse2.f1, which
causes reuse1.f1 == reuse2.f1. With an odd number of swaps the next
grouping will reduce with reuse1 and reuse2 sharing a field and
deserialization will overwrite stored values.

The simple fix is to only use and return the provided inputs.

This closes apache#3515
p16i pushed a commit to p16i/flink that referenced this pull request Apr 16, 2017
The initial fix for this ticket is not working on larger data sets.

Reduce supports returning the left input, right input, a new object, or
a locally reused object. The trouble with the initial fix was that the
returned local object was reusing fields from the input tuples.

The problem is with ReduceDriver#run managing two values (reuse1 and
reuse2) and with a third, local value returned by
GatherSumApplyIteration.SumUDF. After the first grouping value.f1 ==
reuse1.f1. Following UDF calls may swap value.f1 and reuse2.f1, which
causes reuse1.f1 == reuse2.f1. With an odd number of swaps the next
grouping will reduce with reuse1 and reuse2 sharing a field and
deserialization will overwrite stored values.

The simple fix is to only use and return the provided inputs.

This closes apache#3515
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants