New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-7044] portable Spark: support stateful dofns #8812
Conversation
Run Java Spark PortableValidatesRunner Batch |
Run Java Spark PortableValidatesRunner Batch |
Run Python Spark ValidatesRunner |
Run Java PreCommit |
#8802 is merged now, please rebase it. Java precommit is breaking because checkstyle is not happy about some javadoc issue. |
@@ -109,8 +109,7 @@ def portableValidatesRunnerTask(String name) { | |||
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like map state isn't supported yet by the fn api.
https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L302
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, there is only bagstate for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice to see this come along. Some comments inline.
|
||
// Need to discard the old key's state | ||
if (bagUserStateHandlerFactory != null) { | ||
bagUserStateHandlerFactory.resetForNewKey(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you are resetting, but further down you create a new instance of bagUserStateHandlerFactory
.
...rk/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
Outdated
Show resolved
Hide resolved
60afe63
to
b3d4746
Compare
Run Java Spark PortableValidatesRunner Batch |
Run Python Spark ValidatesRunner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@iemejia Rebased and fixed Javadoc. Please take a look at the Spark stuff |
Depends on #8802.
We need a new custom group by key function so that we can run the executable stage function separately for each key, while keeping the original key/value pairs.
Something I'm not too proud of here is re-using helper functions within typing constraints set by guava
Iterables.transform
. These are all pretty trivial functions I could've just as easily copied into lambdas or something, so let me know what you think.R: @iemejia @angoenka @mxm
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.