Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3304] Go triggering support #15239

Merged
merged 36 commits into from
Aug 19, 2021
Merged

Conversation

riteshghorse
Copy link
Contributor

Trigger support for Go SDK


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@riteshghorse riteshghorse changed the title [BEAM-3304] added struct field for trigger [BEAM-3304] Go triggering support Jul 28, 2021
@riteshghorse
Copy link
Contributor Author

R: @lostluck

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're missing a few files, or it's out of date? This a good start for validating and getting a test working though. As written I can't see how you'd get to a pane encoding error however, hence guessing files are missing.

Feel free to add a new test and pipeline functions to the windowinto.go files, in order to test out triggers. It will certainly fail on the Go direct runner, so also add a filter to prevent it from running on the direct runner during the integration tests here:

var directFilters = []string{

// TODO(BEAM-3304): trigger support
Trigger *pipepb.Trigger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a rule, we avoid having users have access to, or deal with, the proto directly, including via any of the built up structs. The protos are an implementation detail, not an interface for users to use.

This largely means building up SDK specific mechanisms that are then translated to the proto in graphx/translate.go. Essentially, the graph/** directories shouldn't be depending on the protos.

That said, this is fine during development as Pane plumbing and such is being developed.

@riteshghorse
Copy link
Contributor Author

riteshghorse commented Jul 29, 2021

I changed parts in marshalWindowingStrategy in translate.go to try out triggers. But did undo all before committing. I'll write a separate test doing this.

@@ -49,7 +50,11 @@ func (p PCollection) IsValid() bool {
}

// TODO(herohde) 5/30/2017: add name for PCollections? Java supports it.

// TODO(herohde) 5/30/2017: add windowing strategy and documentation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this becomes part of the public API for PCollection, it should have a real doc comment

// WindowingStrategy returns the windowing strategy for the PCollection

Comment on lines 1001 to 1002
}
func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {
}
func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need to run go fmt over your changes, as for some reason newlines between function declarations seem to be going missing.


// change below statement to: windowed.WindowingStrategy().SetAlways()
// to get the decoding error.
windowed.WindowingStrategy().SetDefault()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. It pains me to suggest this, but this user experience leaves a little to be desired and we have a short window before the Go SDK exits experimental where we can make it better.

I propose we make a beam.WindowIntoOption interface type, and change beam.WindowInto to take a variadic parameter of those to configure the trigger and other windowing strategy properties. Then we can type assert and apply the options to the windowing strategy without much concern.

This would be similar to how beam.ParDo has an option type https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/option.go#L25
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/pardo.go#L356

My recommendation would be to make that into a separate change by itself, not defining any of the concrete beam.WindowIntoOption types and handling, and just changing the signature to take in ...options beam.WindowIntoOption as the final parameter (variadic parameters must be the final one in a function signature).

While this change would not be 100% backwards compatible, it would be source compatible for all reasonable uses of beam.WindowInto. The main place it would break is if a user were doing something like:

var myWindowInto func(s Scope, ws *window.Fn, col PCollection) PCollection = beam.WindowInto

which is unlikely and weird to begin with (it wouldn't serve anything beyond ensuring the signature never changes).

Then we can rebase and get back to here and finish handling the implementation.

@@ -527,8 +527,8 @@ func NewImpulse(g *Graph, s *Scope, value []byte) *MultiEdge {
}

// NewWindowInto inserts a new WindowInto edge into the graph.
func NewWindowInto(g *Graph, s *Scope, wfn *window.Fn, in *Node) *MultiEdge {
n := g.NewNode(in.Type(), &window.WindowingStrategy{Fn: wfn}, in.Bounded())
func NewWindowInto(g *Graph, s *Scope, wfn *window.Fn, tr window.TriggerType, in *Node) *MultiEdge {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewWindowInto should just change to take in a *window.WindowingStrategy instead of adding more and more parameters to it. There are other modes to handle in the future and by populating the windowing strategy in the Beam package before it gets here, will save time and effort when adding features later.

Copy link
Contributor Author

@riteshghorse riteshghorse Aug 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

@@ -21,21 +21,47 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)

type WindowIntoOption interface {
private()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"private" isn't a good name for this blocking function, because beam.Option is already using it. You can test this out yourself by seeing that you can pass WindowIntoOptions as an Option into beam.ParDo.

A better unexported method name would be windowIntoOption() to avoid this conflation.

sdks/go/pkg/beam/windowing.go Show resolved Hide resolved

func (t WindowTrigger) windowIntoOption() {}

func (t WindowTrigger) GetName() window.TriggerType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need for a GetName function. As a rule, don't add "Get" to a method that just returns a field.
See https://golang.org/doc/effective_go#Getters

for _, opt := range opts {
switch opt.(type) {
case WindowTrigger:
edge = graph.NewWindowInto(s.real, s.scope, &window.WindowingStrategy{Fn: ws, Trigger: opt.(WindowTrigger).GetName()}, col.n)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider accessing the trigger kind directly with .Name without a method instead, and remove the method.

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slowly but surely this is shaping up! I'll stop peaking in at this point and wait for you to ask me to Please Take Another Look (PTAL) before my next pass.

Comment on lines 52 to 57
// TODO(herohde) 5/30/2017: add windowing strategy and documentation.

// WindowingStrategy returns the WindowingStrategy of PCollection.
func (p PCollection) WindowingStrategy() *window.WindowingStrategy {
return p.n.WindowingStrategy()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably leave out this method for now. It's easier to add something like this later than to remove it later.
I'd still remove the TODO. We don't know what herohde's goal was with the TODO, and we've gone in our own design direction.

// TriggerWindowSums, much like WindowSums described above has an addition of configuring
// a trigger here. SetDefault works fine. Other triggers such as SetAlways throws
// pane decoding error.
func TriggerWindowSums(s beam.Scope, sumPerKey func(beam.Scope, beam.PCollection) beam.PCollection) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider re-writing this test to make it easier to validate triggers rather than windows. Certainly re-use what you can, but you are allowed to write new code.


package window

type TriggerType string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider how we'd implement the more complicated triggers that have sub triggers. For example, AfterAll and AfterAny are missing their sub trigger configurations.

Default TriggerType = "Trigger_Default_"
Always TriggerType = "Trigger_Always_"
AfterAny TriggerType = "Trigger_AfterAny_"
AfterAll TriggerType = "Trigger_AfterAny_"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: duplicated as AfterAny instead of AfterAll.

@riteshghorse
Copy link
Contributor Author

PTAL.

Added pane helpers and changed all affecting function signatures/receivers. The error of source decode failed cause by ____ seems to have gone. I tested this the old way (TestTriggerWindowSums_GBK). Next step would be writing isolated trigger tests.

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have many comments, but this is excellent progress. You've found many of the correct places to change without additional guidance.

We'll need maybe one or two more passes after these changes to sort out user side things and further initial testing, but this is great.

sdks/go/pkg/beam/core/graph/coder/panes.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/typex/special.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/typex/special.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/typex/special.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go Outdated Show resolved Hide resolved
sdks/go/test/integration/primitives/windowinto.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/graph/coder/panes.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/graph/coder/panes.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/graph/coder/panes.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/graph/coder/panes.go Outdated Show resolved Hide resolved
@riteshghorse
Copy link
Contributor Author

PTAL. Basic triggers. Will add more tests in windowinto as they are similar to what we currently have.

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the home stretch.

sdks/go/pkg/beam/core/graph/coder/coder.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/graph/coder/panes.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/graph/coder/panes.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/graph/coder/panes.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/graph/window/strategy.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/typex/special.go Outdated Show resolved Hide resolved
sdks/go/test/integration/primitives/windowinto.go Outdated Show resolved Hide resolved
sdks/go/test/integration/primitives/windowinto.go Outdated Show resolved Hide resolved
@riteshghorse riteshghorse marked this pull request as ready for review August 12, 2021 17:42
@riteshghorse
Copy link
Contributor Author

Resolved the comments and added a test for Repeat Trigger.
PTAL.

@riteshghorse
Copy link
Contributor Author

PTAL. AfterAll and AfterAny Triggers are throwing Illegal Argument Exception. Working on it.

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good for now once the last small nits are worked out. Then we can work out the leftover details in smaller PRs.

sdks/go/pkg/beam/core/runtime/exec/datasource.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/runtime/graphx/translate.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/windowing.go Outdated Show resolved Hide resolved
@riteshghorse
Copy link
Contributor Author

Run Go PostCommit

@lostluck
Copy link
Contributor

Run Go Flink ValidatesRunner

@lostluck
Copy link
Contributor

Run Go PostCommit

@lostluck
Copy link
Contributor

It's not clear what's causing the failure with flink in this case. It's almost certainly the same TestStream issues. Lets disable the tests on flink entirely for now (like they are for the other runners) and we'll investigate it properly from there.

@riteshghorse
Copy link
Contributor Author

Run Go Flink ValidatesRunner

@riteshghorse
Copy link
Contributor Author

Run Go PostCommit

@lostluck lostluck merged commit b0e9f26 into apache:master Aug 19, 2021
@riteshghorse riteshghorse deleted the beam3304 branch August 31, 2021 17:50
calvinleungyk pushed a commit to calvinleungyk/beam that referenced this pull request Sep 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants