Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11050] Duplicate accumulator if it appears in multiple windows. #13061

Merged
merged 1 commit into from
Oct 12, 2020

Conversation

lukecwik
Copy link
Member

@lukecwik lukecwik commented Oct 9, 2020

Accumulators can be mutated during merging by the combine fn so we must ensure that we use a unique instance of the accumulator per window.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@lukecwik
Copy link
Member Author

lukecwik commented Oct 9, 2020

R: @iemejia

Accumulators can be mutated during merging by the combine fn so we must ensure that we use a unique instance of the accumulator per window.
@lukecwik
Copy link
Member Author

Run Spark StructuredStreaming ValidatesRunner

@lukecwik
Copy link
Member Author

Run Java PreCommit

1 similar comment
@lukecwik
Copy link
Member Author

Run Java PreCommit

@iemejia
Copy link
Member

iemejia commented Oct 10, 2020

This LGTM but I prefer that @echauchot takes a look before merging because he has been optimizing this code for a while so better to make him aware of the issue and the minor performance hit of the extra encoding needed.

@lukecwik
Copy link
Member Author

I would prefer taking the fix and then further optimizing for performance as the implementation I suggested only duplicates when a value is in multiple windows which is uncommon in practice.

@iemejia
Copy link
Member

iemejia commented Oct 12, 2020

Good point, I suppose if @echauchot has a suggestion or a better way to do this we can improve it in the future, at least this fixes the breakage on tests and it produces correct results.

Copy link
Member

@iemejia iemejia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@iemejia iemejia merged commit 996fea2 into apache:master Oct 12, 2020
@echauchot echauchot mentioned this pull request Nov 5, 2020
4 tasks
@echauchot
Copy link
Contributor

echauchot commented Nov 5, 2020

@lukecwik I don't see why this change is necessary because of 2 reasons:

  1. all the validates runner tests including multiple window (eg. sliding windows) already passed.
  2. when I wrote this code, I already took some safety mesures about the modification of the (first only) accumulator during the combineFn.mergeAccumulator by creating a new first accumulator for each merged window see initial code below and the comment in the code:
   // merge the accumulators for each mergedWindow
    ...
    for (Map.Entry<W, List<Tuple2<AccumT, Instant>>> entry :
        mergedWindowToAccumulators.entrySet()) {
       ...
      // we need to create the first accumulator because combineFn.mergerAccumulators can modify the
      // first accumulator
      AccumT first = combineFn.createAccumulator();
      Iterable<AccumT> accumulatorsToMerge =
          Iterables.concat(
              Collections.singleton(first),
              accumsAndInstantsForMergedWindow.stream()
                  .map(x -> x._1())
                  .collect(Collectors.toList()));
               ...
              combineFn.mergeAccumulators(accumulatorsToMerge),
             ...
  }

@iemejia
Copy link
Member

iemejia commented Nov 6, 2020

@echauchot The VR tests were breaking on this (I don't know why, maybe the tests were improved). That's the reason why Luke did this PR, it was needed at least for correctness. You can reproduce this by reverting this PR and running the tests:

git revert 6264b47afd51d33d95d6c04a2106b4208a89ca41
./gradlew :runners:spark:validatesStructuredStreamingRunnerBatch

produces

org.apache.beam.sdk.transforms.CombineTest$WindowingTests > testSlidingWindowsCombine FAILED
    java.lang.AssertionError at CombineTest.java:1156
        Caused by: java.lang.AssertionError at MatcherAssert.java:18

Something odd I noticed is that if you run the single test instance it passes so I am not sure if there is some interleaving issue with other tests.

The VR suite of the Structured Streaming Runner has been broken since September 10 also because of this issue and BEAM-11023 too.
http://104.154.241.245/d/8N6LVeCmk/post-commits-status-dashboard?refresh=30s&orgId=1

@echauchot
Copy link
Contributor

thanks @iemejia for the context. Strange org.apache.beam.sdk.transforms.CombineTest$WindowingTests > testSlidingWindowsCombine was passing. So I would prefer to figure out what was changed in the between in the Combine translation. https://issues.apache.org/jira/browse/BEAM-11023 refers to groupByKey so it is unrelated. But I just checked it was passing when the runner was merged to master. I guess we need to dig into the history of commits rather

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants