Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3301] Refactor DoFn validation & allow specifying main inputs. #10991

Merged
merged 3 commits into from
Mar 17, 2020

Conversation

youngoli
Copy link
Contributor

The current version of this validation is a bit permissive because it
doesn't require the number of main inputs to be specified. This change
allows specifying the number of main inputs, while also preserving
the existing code path of not specifying it. Along with that change,
I made some refactors to existing validation code to try to improve
readability and make it more organized.

This is filed under BEAM-3301 (SDF) because it is intended to enable
validation for SDFs which is difficult without a known number of
main inputs.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

The current version of this validation is a bit permissive because it
doesn't require the number of main inputs to be specified. This change
allows specifying the number of main inputs, while also preserving
the existing code path of not specifying it. Along with that change,
I made some refactors to existing validation code to try to improve
readability and make it more organized.

This is filed under BEAM-3301 (SDF) because it is intended to enable
validation for SDFs which is difficult without a known number of
main inputs.
@youngoli
Copy link
Contributor Author

Run Go PostCommit

@youngoli
Copy link
Contributor Author

R: @lostluck

@lostluck lostluck self-requested a review February 28, 2020 01:23
Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm generally in favour of this change! I do have one API commentary though, and I'd love to hear your perspective since you know better what's coming up WRT what SDF needs out of this.

sdks/go/pkg/beam/core/graph/fn.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/graph/fn.go Outdated Show resolved Hide resolved
A bit of fixup to make NewDoFn a cleaner interface. Gets rid of
NewDoFnKv and replaces that functionality with variadic options.
@@ -209,21 +209,74 @@ func (f *DoFn) RestrictionT() *reflect.Type {
// a KV or not based on the other signatures (unless we're more loose about which
// sideinputs are present). Bind should respect that.

// The following constants prefixed with "Main" represent possible numbers of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wary about exporting these constants.

For one, they're untyped constants, so they're functionally the numbers themselves.

Otherwise the "right" go way to expose them so they have meaning would be to have an unexported type so users can't define their own, and then define the constants.

type mainInputs int32

const (
  MainUnknown mainInputs = -1
  MainSingle mainInputs = 1
  MainKV mainInputs = 2
)

Then any functional option configuration method can accept them to have type safe, pre-validated input numbers.

func NumInputs(mi mainInputs) Option {
  return func(c *config) {
     c.numMainIn = mi
  }
}

This then saves needing to have a validation error, since package users can't define their own mainInputs.

Another alternative is to do away with the exported constants altogether, keep the validation, but simply document that valid inputs are 1 and 2 for singletons and KVs respectively. Either is preferable to the current approach.

Lets not lose sight that the purpose here is to pass a hint down to make the DoFn parameters easier to analyse. Windows and EventTimes are propagated with the main input, but don't "count" since they are easily detectable by type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely like those options better. Went with the unexported constant type, since it makes the code more self-documenting as opposed to raw numbers. Also removed the validation check on that parameter, like you suggested.

// DoFn main inputs for DoFn construction and validation. Any value not defined
// here is an invalid number of main inputs.
const (
MainUnknown = -1 // The number of main inputs is unknown for DoFn validation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider if it is necessary to have an unknown constant exported at all? Even in the unexported type version of this code, Unknown a side effect of not passing the NumMainInput hint, rather than something a user should explicitly set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm leaving it exported only because AsDoFn is currently exported and takes one of these constants as an input. Making this unexported would make it impossible to call AsDoFn with the existing behavior (unknown num. of inputs).

Adding a type representing the constants, to prevent them from being
misused. This also makes the validation check on them obsolete.
@youngoli
Copy link
Contributor Author

youngoli commented Mar 5, 2020

Run Go PostCommit

@youngoli
Copy link
Contributor Author

youngoli commented Mar 6, 2020

The Postcommit error doesn't seem to be directly related to my change from what I can tell:

Error message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -488: process bundle failed for instruction -488 using plan -445 : panic: Unexpected coder: CoGBK<string[string],int[varintz],int[varintz],string[string]> goroutine 87 [running]:
runtime/debug.Stack(0xc00109d970, 0xd2c5e0, 0xc00113cb00)
/usr/lib/go-1.12/src/runtime/debug/stack.go:24 +0x9d
github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc00109db90)
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:40 +0x60
panic(0xd2c5e0, 0xc00113cb00)
/usr/lib/go-1.12/src/runtime/panic.go:522 +0x1b5
github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MakeElementEncoder(0xc00009bdb0, 0xc00114b620, 0xc000822000)
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/coder.go:91 +0x479
github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*PCollection).Up(0xc000c20fc0, 0x10018e0, 0xc000c40f00, 0x0, 0xc0010b7b50)
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pcollection.go:59 +0xfe
github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0x10018e0, 0xc000c40f00, 0xc0010b7c28, 0x0, 0x0)
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:43 +0x6c
github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc001222ee0, 0x10018e0, 0xc000c40f00, 0xc000d1a490, 0x4, 0xff0340, 0xc00114b440, 0xff0380, 0xc000c40f40, 0xc0010b7de0, ...)
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:93 +0xdf
github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc0001f4480, 0x10017a0, 0xc0001bafc0, 0xc000c40d40, 0xc0001bafc0)
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:211 +0xa34
github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0x10017a0, 0xc0001bafc0, 0xc000c40d40)
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:118 +0x1cf
created by github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:131 +0x6e8

java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
...

@youngoli
Copy link
Contributor Author

youngoli commented Mar 6, 2020

Run Go PostCommit

@lostluck
Copy link
Contributor

lostluck commented Mar 6, 2020

No, but it looks like it's somehow related to mine. I'm going to roll it back.

@lostluck
Copy link
Contributor

lostluck commented Mar 6, 2020

Could you file a JIRA with the trace and assign it to me please? I'm in the middle of packing.
#11061 is the revert.

@youngoli
Copy link
Contributor Author

youngoli commented Mar 6, 2020

@youngoli
Copy link
Contributor Author

youngoli commented Mar 9, 2020

Run Go PostCommit

@youngoli
Copy link
Contributor Author

R: @lukecwik

Adding Luke to finish up this review since Robert (lostluck@) is on vacation for a bit.

Since this was already mostly reviewed, the main thing I'm looking for is someone to confirm that I addressed Robert's previous review comments with the latest commit. You don't need to review the full change.

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it would make sense to perform this validation once we bind it to a transform which would make sense if AsDoFn/NewDoFn had restricted visibility and were only accessible from the few internal places where we are expanding a PTransform.

For example, we should be able to tell whether we have a KV coder and expect two inputs when applied as a ParDo. Or we should expect two inputs when we are the output of a GBK. Finally, for CoGBK we can know that there are N+1 outputs. Ditto for num side inputs and num outputs.

I'm not sure if this style of change would be too expensive of a change to implement but it would seem like it would remove all guessing as to the "purpose" and "shape" of the Fn.

Are all inputs, outputs, side inputs have to be declared as part of the function signature and these may not be optional arguments?

// one main input.
pos, num, _ := processFn.Inputs()
if numMainIn == MainUnknown && (num == 1 || processFn.Param[pos+1].Kind != funcx.FnValue) {
numMainIn = MainSingle
}

// If the ProcessElement function includes side inputs or emit functions those must also be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR but why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's part of the API for start/finishBundle. I don't remember why it's done that way though. lostluck@ might be able to answer why when he gets back.

There might be room to make the side inputs/emits in start/finishBundle optional, but I believe right now it's mandatory (if we don't catch and throw an error here, it'll just break later on in translation or execution or something).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At most relaxed we'd be able to either not require them at all if none are used, or isolate them by their types. All instances of a given side input or emit with the same type would need to be listed at once, since otherwise we have no way to distinguish them except by position. Permitting Nothing to be set would be the most convenient, or permitting only the Side Inputs and not requireing the Emits.

For now though, it's better to be more strict now and relax later, since the inverse is impossible, and such variety is harder to maintain if unnecessary.

// If there is none, or it's not a FnValue type, then we can safely infer that there's only
// one main input.
pos, num, _ := processFn.Inputs()
if numMainIn == MainUnknown && (num == 1 || processFn.Param[pos+1].Kind != funcx.FnValue) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it make sense to infer the number of inputs before validateMainInputs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateMainInputs performs error checks we need to do before we can infer # of main inputs (stuff like making sure we have at least 1 input present). So moving this before validateMainInputs would just mean moving those error checks back above the inferring and nothing really changes.

@youngoli
Copy link
Contributor Author

I think doing this validation in the ParDo transform is something worth looking into, and I'd be up for it if it worked as well as you describe. I'm definitely not a fan of having to do validation without any info about the actual output/input involved. I've even entertained the idea of doing something similar, but it would be a decently large refactor (2-3 days?) and has the chance of hitting additional roadblocks, so I haven't really made time to look into it yet. Definitely something worth taking a day or two to look into after SDF is done.

@youngoli youngoli merged commit c27abd4 into apache:master Mar 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants