Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11088] Add TestStream package to Go SDK testing capabilities #15253

Merged
merged 12 commits into from Aug 3, 2021

Conversation

jrmccluskey
Copy link
Contributor

Create the teststream package to provide an interface for configuring the TestStream primitive in the Go SDK. Provides a type to set up a TestStreamConfig proto and insert it into the pipeline. Currently only supported on Flink.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

for _, val := range outputMap {
ret = append(ret, val)
}
return ret
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only ever be a single PCollection, so I was thinking this should be changed so that the return type is just a beam.PCollection, but didn't know if there was a clean looking way to do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Since implementations seem to all be single output collections, lets punt on tagged multi-collection map support for now. A later iteration can add a CreateN or CreateMulticall which would return a map.


// CreatePayload converts the Config object into a TestStreamPayload to be sent to the runner.
func (c *Config) CreatePayload() *pipepb.TestStreamPayload {
return &pipepb.TestStreamPayload{CoderId: "c0", Events: c.Events, Endpoint: c.Endpoint}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard-coding the c0 coder is a little hacky but works. The translation code always inserts a coder for the type and it's c0 as long as the TestStream is the first thing inserted into the pipeline from what I've seen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct! That's what's expected for now. Add a comment to the "teststream.Create" method to call out this current limitation that the test stream must be the first PTransform in a pipeline.

@jrmccluskey jrmccluskey marked this pull request as ready for review July 30, 2021 15:47
@jrmccluskey
Copy link
Contributor Author

R: @lostluck

@jrmccluskey
Copy link
Contributor Author

Run Go PostCommit

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great start! Lets iterate on the user experience a bit. Ruby on Rails got popular purely by having sensible defaults, and that being a key to usability is just as true now as it was then. Requiring too many manual configurations when the system can make a sensible choice for you is death by paper cuts.


// Package teststream contains code configuring the TestStream primitive for
// use in testing code that is meant to be run on streaming data sources.
// TestStream is not supported on the Go direct runner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a link to the blogpost describing test stream to the Package doc.

We can clarify that teststream is only supported on Flink.

It looks like it should be supported on Dataflow, so that's worth trying and calling out. It's only weird to mention because Dataflow costs money vs the intent of this is for testing...

sdks/go/pkg/beam/testing/teststream/teststream.go Outdated Show resolved Hide resolved
for _, val := range outputMap {
ret = append(ret, val)
}
return ret
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Since implementations seem to all be single output collections, lets punt on tagged multi-collection map support for now. A later iteration can add a CreateN or CreateMulticall which would return a map.

sdks/go/pkg/beam/testing/teststream/teststream.go Outdated Show resolved Hide resolved

// CreatePayload converts the Config object into a TestStreamPayload to be sent to the runner.
func (c *Config) CreatePayload() *pipepb.TestStreamPayload {
return &pipepb.TestStreamPayload{CoderId: "c0", Events: c.Events, Endpoint: c.Endpoint}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct! That's what's expected for now. Add a comment to the "teststream.Create" method to call out this current limitation that the test stream must be the first PTransform in a pipeline.

sdks/go/pkg/beam/testing/teststream/teststream.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/testing/teststream/teststream.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/testing/teststream/teststream.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/testing/teststream/teststream.go Outdated Show resolved Hide resolved
sdks/go/test/integration/primitives/teststream.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/testing/teststream/teststream.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/testing/teststream/teststream.go Outdated Show resolved Hide resolved
// SetEndpoint sets a URL for a TestStreamService that will emit events instead of having them
// defined manually. Currently does not support authentication, so the TestStreamService should
// be accessed in a trusted context.
func (c *Config) SetEndpoint(url string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Orthogonality is nice, but will delay the error case to later, dislocating it from where the mistake occured.

Alternatively, we could have a separate constructor function for Config that takes in the EndPoint and Coder so it avoids the 2 calls, and allows a user to inline the configuration in the teststream.Create call. Or even further, we just have a CreateWithEndpoint(beam.Scope, endpoint string, c coder.Coder) beam.PCollection call which avoids users from being able to misconfigure something (mixing endpoints and events in a config). Config could still handle these details for serializing as an implementation detail, but only the CreateWithEndpoint method can access and modify them before serialization. Basically, we can ensure correctness by construction, by preventing users from building something in-advisable.

Orthogonality is great, but unless all the options make sense with each other, it leaves confusing spaces for users to make cofiguration mistakes.

sdks/go/pkg/beam/testing/teststream/teststream.go Outdated Show resolved Hide resolved
// elements.
func (c *Config) AddElements(timestamp int64, elements ...interface{}) error {
if c.elmCoder == nil {
elmType := reflect.TypeOf(elements[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm increasingly annoyed that the node needs to know the coder it's got. It doesn't seem like it would need to since that information is on the subsequent PCollection. But it's also not great that we need to duplicate coder inference here either.

One could work around this by having a 2nd DoFn that actually contains the serialized data and keeps it in specified order. Then instead of the real data, the TestStream is only given successive int64s with the time stamps. Means we always can pass in coder.NewVarInt(), and the 2nd DoFn spits out whatever element the returned index has from it's cache. Basically the 2nd DoFn ends up as a variant on beam.Create's implementation.

Users as a rule won't be able to tell the difference, and it would resolve a small chunk of the issues we're having without moving this to the beam package with it's own Graph and graphx plumbing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to suggestions on how we can make it easier for external to beam packages to be able to do something like this, short of exposing everything. Essentially the risk being managed is users mis-assuming that certain hoops need to be jumped through to get something working....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at it now I think we could completely ditch keeping up with the coder and keep up with the element type passed into AddElements instead. beam.Create just gets the type of the first element and defines the encoder off of that using beam.NewElementEncoder in the same way I have now. End result we can remove any references to coders in AddElement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to the new, coder-free approach

sdks/go/pkg/beam/testing/teststream/teststream_test.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/testing/teststream/teststream_test.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/testing/teststream/teststream_test.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/testing/teststream/teststream.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/testing/teststream/teststream.go Outdated Show resolved Hide resolved
jrmccluskey and others added 4 commits August 2, 2021 13:14
@jrmccluskey
Copy link
Contributor Author

Run Go PostCommit

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after some light changes! Thank you.

Once we're able to observe Panes with Riteshe's trigger work, we could add passert test helpers as described in the test stream blog post, but this is good for now.

sdks/go/pkg/beam/testing/teststream/teststream.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/testing/teststream/teststream_test.go Outdated Show resolved Hide resolved
@jrmccluskey
Copy link
Contributor Author

Run GoPortable PreCommit

@jrmccluskey
Copy link
Contributor Author

Run Go PostCommit

@lostluck
Copy link
Contributor

lostluck commented Aug 3, 2021

Run Go Flink ValidatesRunner

@lostluck lostluck merged commit 92ea5f5 into apache:master Aug 3, 2021
@jrmccluskey jrmccluskey deleted the beam11088 branch August 5, 2021 14:14
@franklinlindemberg
Copy link

@jrmccluskey sorry to hijack this PR, but Im trying to use testStream with Flink and Im not having much success.

I'm trying to use teststream.CreateWithEndpoint but my grpc server implementation is never called. I'm wondering if you have tested this feature with Flink and if there's any example of it

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants