[BEAM-9273] Explicitly disable @RequiresTimeSortedInput on unsupported runners#10816
[BEAM-9273] Explicitly disable @RequiresTimeSortedInput on unsupported runners#10816je-ik merged 2 commits intoapache:masterfrom
Conversation
|
Run Flink ValidatesRunner |
|
Run Spark ValidatesRunner |
|
Run Spark StructuredStreaming ValidatesRunner |
|
Run Apex ValidatesRunner |
|
Run Dataflow ValidatesRunner |
|
Run Samza ValidatesRunner |
|
Run Flink ValidatesRunner |
|
Run Dataflow ValidatesRunner |
|
Run Spark StructuredStreaming ValidatesRunner |
|
Run Apex ValidatesRunner |
|
Run Spark ValidatesRunner |
|
Run Samza ValidatesRunner |
|
Run Gearpump ValidatesRunner |
|
The fail seems not to be related to this PR. First failed post commit seems to be this one. |
kennknowles
left a comment
There was a problem hiding this comment.
Meta-comment: switching the whole world to use DoFnFeatures would be best as a separate commit, since it does not have to do with RequiresTimeSortedInput. It is OK to leave as one commit since it isn't that big a deal.
| import org.apache.beam.sdk.values.TypeDescriptor; | ||
|
|
||
| /** | ||
| * Features a {@link DoFn} can posses. Each runner might implement a different (sub)set of this |
There was a problem hiding this comment.
The DoFnSignature is precisely the list of features already, no? It is OK to have helper methods anyhow.
Style point: for a given class, utility static methods usually go in a companion class like DoFnSignature (class) & DoFnSignatures (utility methods)
There was a problem hiding this comment.
Agree. The reason I created this class is that I wanted to demonstrate the approach of declaring pipeline requirements and validating that runner supports all requirements.
That would go as follows:
Set<Features> required = PipelineFeaturs.extract(pipeline);
Runner.validateAllFeaturesSupported(required);That way, adding a new feature to pipeline would not require any change to runner core, but the runner would reject the pipeline.
I then realized that this change would be too big and really not related to the annotation, so I returned back to the original approach, where runners explicitly reject features they do not support (and adding unsupported feature needs modification in runners code). I will rename the class as you suggest.
There was a problem hiding this comment.
Hmmm ... DoFnSignatures already exists. It does something different (creates DoFnSignature from DoFn). Feels weird to mix these two helper functionalities.
There was a problem hiding this comment.
@kennknowles would you have any suggestions about naming the class? I think this code really should not go to DoFnSignatures, can we agree on some alternative name?
There was a problem hiding this comment.
I quite strongly believe it belongs in DoFnSignatures and that what you describe in the rest belongs in Pipelines. Static-method-only utility classes tend to be disorganized and undiscoverable. It is better to attach them to the thing that they are most related to.
There was a problem hiding this comment.
Moved the code there. The reason I didn't like it is that DoFnSignatures probably when created served the purpose to create signatures from DoFn, while the code I added makes no use of DoFnSignature itself. But there seems to be more code like that already, so I'm fine with that.
...struction-java/src/test/java/org/apache/beam/runners/core/construction/DoFnFeaturesTest.java
Outdated
Show resolved
Hide resolved
| DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn(); | ||
| verifyFnIsStateful(fn); | ||
| DataflowRunner.verifyStateSupported(fn); | ||
| DataflowRunner.verifyDoFnSupported(fn, false); |
There was a problem hiding this comment.
Passing a raw bool into a function call is not very readable. I suggest splitting into verifyDoFnSupportedForStreaming and verifyDoFnSupportedForBatch. These can each call the common code.
There was a problem hiding this comment.
I added both versions, although the second one verifyDoFnSupportedStreaming is not used. I added that so that the methods are not imbalanced. The streaming case is called from DataflowRunner.verifyDoFnSupported(fn, context.getPipelineOptions().isStreaming()), where it would be weird to do if (context..isStream()) verifyStreaming() else ...
...dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
Outdated
Show resolved
Hide resolved
je-ik
left a comment
There was a problem hiding this comment.
Fixed comments from CR. I'll squash the commit after approval.
|
|
||
| @Test | ||
| public void testAllDoFnFeatures() { | ||
| tests.forEach(FeatureTest::test); |
There was a problem hiding this comment.
Nit: there's a lot of logic in this test class. See https://testing.googleblog.com/2014/07/testing-on-toilet-dont-put-logic-in.html
It would be better to simply write out the boilerplate for each test, so each test can be read in a straight line with no context needed.
There was a problem hiding this comment.
I wanted to keep the declaration of a class and its features encapsulated in single class. Agree that this doesn't play well with the rest of the DoFnSignaturesTest. But because there is effort in the direction of superseding all this with "pipeline features", I think it is fine to keep is as is for now and drop it as part of the effort later.
kennknowles
left a comment
There was a problem hiding this comment.
I now see that you were raising it so that the feature checks take a DoFn as an argument instead of a DoFnSignature. So, yea, the methods would then belong in DoFns.java or since it is a class you can put static methods right on DoFn. The reason this isn't great is it creates a circular dependency. So IMO it is fine as-is.
|
Thanks @kennknowles |
Fixes [BEAM-9273]
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.