Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8292] Portable Reshuffle for Go SDK #11197

Merged
merged 5 commits into from Mar 27, 2020
Merged

Conversation

lostluck
Copy link
Contributor

@lostluck lostluck commented Mar 23, 2020

This adds a Reshuffle transform to the Go SDK.

  • In particular, it configures windowing & trigggers for a GBK to allow for fusion breaks, where parallelism needs to increase, or decrease due to data bundling properties.
  • Previous element window and timestamps are preserved.
  • The SDK operations are wrapped with a higher level reshuffle URN so runners can optimize this step better.
  • The Go Direct Runner is aware of the Reshuffle and correctly ignores it, as it's a single bundle runner.

Note: While this should work for streaming cases, it hasn't been tested with them yet, due to the current state of streaming the Go SDK.

It further adds one small optimization for the internal Decoding interface, called the DecodeTo method to avoid extra allocations to the heap incurred by returning a *FullValue. Can't avoid extra allocations for KV types at present, but value PCollections should have lower overhead. A subsequent PR will use the DecodeTo method at a other applicable places in the decode stack.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@lostluck
Copy link
Contributor Author

Run Go Postcommit

@lostluck
Copy link
Contributor Author

R: @youngoli

@lostluck
Copy link
Contributor Author

I'm definitely not merging this until both the PostCommit runs, and someone more familiar with windowing/trigger semantics looks over the configuration I copied over from python:
https://github.com/apache/beam/pull/11197/files#diff-ef420fdb9afbce0674282b4ed4481042R530

@lostluck
Copy link
Contributor Author

Retest this please

@lostluck
Copy link
Contributor Author

Run Go Postcommit

@lostluck
Copy link
Contributor Author

Post commits run and pass which is a good sign!

return err
}
*fv = FullValue{Elm: val}
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is just preserving the existing behavior, but it seems weird to return err here instead of return nil, even if it is guaranteed to be nil at this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Thanks!

sdks/go/pkg/beam/core/runtime/exec/reshuffle.go Outdated Show resolved Hide resolved

// FinishBundle propagates finish bundle to downstream nodes.
func (n *ReshuffleOutput) FinishBundle(ctx context.Context) error {
n.b = bytes.Buffer{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also be clearing n.ret like ReshuffleInput does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should. Thanks!

func (n *ReshuffleOutput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error {
// Marshal the pieces into a temporary buffer since they must be transmitted on FnAPI as a single
// unit.
vs, err := values[0].Open()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange to me that values[] can have multiple elements, but this method ends up actually reading all the values from the first element of it. Could you explain why that happens? Are there sometimes multiple ReStreams representing different things?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the Go SDK future proofing itself, against CoGBK supporting multiple datastreams from the runner. Functionally, only datasource.go would need to change in that case. You can see a comment to that effect in datasource.go, and then nearly everything else deals with the values streams properly under that assumption.

In this case, we know that if this code is being used, it's coming from a single GBK, which means there's only a single stream of values, and then since we're framework side, we just handle the stream directly. In that way, it's similar to how we're handling CoGBKs presently, with synthetic inject and expand steps to get to the right number of joined streams, even though the Runner is only providing us with a single data stream for the grouped data.

gbk := &pb.PTransform{
UniqueName: gbkID,
Spec: &pb.FunctionSpec{Urn: URNGBK},
Inputs: map[string]string{"i0": postReify},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this input supposed to be postReify? I would've expected inputID from the previous step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. postReify is the PCollection that has the random keys and the full windowed value as serialized bytes.
The input from the previous step is used on line 577.
in represents strictly inbound data (and specifically the main input), and From indicates the "Node" in the graph. In this model of the pipeline PCollections are Nodes, and Transforms are Edges.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok, I see it now. I was completely misinterpreting the expansion here. So if I understand correctly, it should look like this, right? (With pcollections/nodes in square brackets and transforms/edges as arrows)

[in.From] ---input---> [postReify] ---gbk---> [gbkOut] ---output---> [out.To]

Where input and output are the newly added Reshuffle transforms. That looks right to me, and rereading the code it looks consistent with that.

sdks/go/pkg/beam/gbk.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/gbk.go Outdated Show resolved Hide resolved
@lukecwik
Copy link
Member

R: @reuvenlax

@lukecwik lukecwik requested a review from reuvenlax March 24, 2020 15:06
lostluck and others added 2 commits March 24, 2020 09:52
same code, but shorter

Co-Authored-By: Daniel Oliveira <younghoono@gmail.com>
Copy link
Contributor Author

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL

gbk := &pb.PTransform{
UniqueName: gbkID,
Spec: &pb.FunctionSpec{Urn: URNGBK},
Inputs: map[string]string{"i0": postReify},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. postReify is the PCollection that has the random keys and the full windowed value as serialized bytes.
The input from the previous step is used on line 577.
in represents strictly inbound data (and specifically the main input), and From indicates the "Node" in the graph. In this model of the pipeline PCollections are Nodes, and Transforms are Edges.

return err
}
*fv = FullValue{Elm: val}
return err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Thanks!

func (n *ReshuffleOutput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error {
// Marshal the pieces into a temporary buffer since they must be transmitted on FnAPI as a single
// unit.
vs, err := values[0].Open()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the Go SDK future proofing itself, against CoGBK supporting multiple datastreams from the runner. Functionally, only datasource.go would need to change in that case. You can see a comment to that effect in datasource.go, and then nearly everything else deals with the values streams properly under that assumption.

In this case, we know that if this code is being used, it's coming from a single GBK, which means there's only a single stream of values, and then since we're framework side, we just handle the stream directly. In that way, it's similar to how we're handling CoGBKs presently, with synthetic inject and expand steps to get to the right number of joined streams, even though the Runner is only providing us with a single data stream for the grouped data.


// FinishBundle propagates finish bundle to downstream nodes.
func (n *ReshuffleOutput) FinishBundle(ctx context.Context) error {
n.b = bytes.Buffer{}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should. Thanks!

@lostluck
Copy link
Contributor Author

Run Go Postcommit

Copy link
Contributor

@youngoli youngoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

WindowFn: makeWindowFn(wfn),
// ...output after every element is received...
Trigger: &pb.Trigger{
// Should this be an Always trigger instead?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In answer to this comment, yes it should be Always

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@lostluck
Copy link
Contributor Author

Retest this please

@lostluck
Copy link
Contributor Author

Run Go Postcommit

@lostluck lostluck merged commit bbc0c18 into apache:master Mar 27, 2020
@lostluck lostluck deleted the reshuffle branch March 27, 2020 23:40
lostluck added a commit to lostluck/beam that referenced this pull request Mar 28, 2020
This adds a Reshuffle transform to the Go SDK.

In particular, it configures windowing & trigggers for a GBK to allow for fusion breaks, where parallelism needs to increase, or decrease due to data bundling properties.
Previous element window and timestamps are preserved.
The SDK operations are wrapped with a higher level reshuffle URN so runners can optimize this step better.
The Go Direct Runner is aware of the Reshuffle and correctly ignores it, as it's a single bundle runner.
Note: While this should work for streaming cases, it hasn't been tested with them yet, due to the current state of streaming the Go SDK.

Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
Co-authored-by: Daniel Oliveira <younghoono@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants