Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-4152] Implement session window merging in the Go direct runner #15196

Merged
merged 9 commits into from Jul 22, 2021

Conversation

jrmccluskey
Copy link
Contributor

Implements window merging for the session windowing strategy on the Go direct runner. Also re-enables the WindowSums integration test on the direct runner to verify that the implementation works.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@jrmccluskey
Copy link
Contributor Author

R: @lostluck

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty good start! Some style nits, and probably unnecessary duplication to clean up. I've suggested a few rewrites. Consider adding a unit or two for the window merging code in mergeWindows. It's tricky enough by itself to not rest on the rest of the repo to report a problem. Note, it doesn't need to follow the full lifecycle of a CoGBK node, just set up initial conditions properly in the fields and check for the expected results.

sort.Slice(n.wins, func(i int, j int) bool {
return n.wins[i].MaxTimestamp() < n.wins[j].MaxTimestamp()
})
n.mergeMap = make(map[typex.Window]int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely won't matter for Direct runner purposes, but since mergeWindows and reprocessByWindow are both called in FinishBundle (that is, it doesn't cross a Lifecycle Method boundary), have mergeWindow return the merge map, and reprocessByWindow take it as an argument.

This avoids the CoGBK value holding onto the map with all the window information, allowing it to be garbage collected sooner, lowing memory overhead.

I'm ambivalent about whether the sorted and merged window slice is given the same treatment, as long as it gets cleared away by the end of FinishBundle (similarly for Garbage collecting purposes).

Basically, this is especially important when a pipeline can have arbitrary amounts of GBK operations. Since the Go Direct Runner will always run in a single machine's memory, it pays not to be wasteful and let things get cleaned up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be worth a comment on mergedMap saying it's a map from the original windows to the index of the new window in the mergedWins slice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I figured using some memory to store the map was favorable to slowing things down by needing a binary search implementation to assign the original windows to the merged versions, but tweaking where that is kept to make GC happen sooner is a reasonable improvement.

for i := 0; i < len(n.wins); {
intWin, ok := n.wins[i].(window.IntervalWindow)
if !ok {
return errors.Errorf("tried to merge non-interval window type")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unlikely to crop up, but consider printing the actual type of the window in question using the %T formatting directive, and including that in the error message.
See https://play.golang.org/p/8GhLJucF0lA

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

newGroups := make(map[string]*group)
for _, g := range n.m {
ws := []typex.Window{n.wins[n.mergeMap[g.key.Windows[0]]]}
var buf bytes.Buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a helper function or two to reduce the duplication between this and ProcessElement.
Ideally we simply merge the two paths (so delaying work in process element to FinishBundle), but until then, better to clean things up a little via helper functions or helper types.

In this case, for example, the main difference is one is using m.g and this one is using newGroups. A helper type for grouping type grouper map[string]*group could handle this iteration in a method, avoiding the duplication. It just needs the right window information passed in somehow, in both cases... But two or so helper methods would be good, one for the key encoding and one for inserting into the group. Since the method receiver is a map technically, it should use a value rather than a pointer, as maps, even as underlying types are reference types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added helper functions for the key encoding and getting the group from the maps.

@jrmccluskey jrmccluskey marked this pull request as ready for review July 21, 2021 18:33
@jrmccluskey
Copy link
Contributor Author

retest this please

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I have small nits, but nothing that's important enough to do now. We can fix them when we're adding user defined window types.

@lostluck lostluck merged commit bbe7b64 into apache:master Jul 22, 2021
@jrmccluskey jrmccluskey deleted the beam4152 branch July 26, 2021 15:18
ryanthompson591 pushed a commit to ryanthompson591/beam that referenced this pull request Aug 10, 2021
…pache#15196)

* [BEAM-4152] Enable WindowSums tests for the Go direct runner
* [BEAM-4152] Implement window merging for the Go direct runner
* [BEAM-4152] Refactor mregeWindows to return mergeMap, not keep it in CoGBK struct
* [BEAM-4152] Update error message to include window type
* [BEAM-4152] Add unit testing for mergeWindows() functionality
* [BEAM-4152] Add Apache license to gbk_test.go
* [BEAM-4152] Split out key encoding logic into helper for code reuse
* [BEAM-4152] Fix formatting
* [BEAM-4152] Refactor group map population into helper
calvinleungyk pushed a commit to calvinleungyk/beam that referenced this pull request Sep 22, 2021
…pache#15196)

* [BEAM-4152] Enable WindowSums tests for the Go direct runner
* [BEAM-4152] Implement window merging for the Go direct runner
* [BEAM-4152] Refactor mregeWindows to return mergeMap, not keep it in CoGBK struct
* [BEAM-4152] Update error message to include window type
* [BEAM-4152] Add unit testing for mergeWindows() functionality
* [BEAM-4152] Add Apache license to gbk_test.go
* [BEAM-4152] Split out key encoding logic into helper for code reuse
* [BEAM-4152] Fix formatting
* [BEAM-4152] Refactor group map population into helper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants