-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Samza runner support for non unique stateId across multiple ParDos #24276
Samza runner support for non unique stateId across multiple ParDos #24276
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor style/readability comments, thanks for the diff!
...s/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two main issues:
- no rewriting of configs, instead, do a prescan of pipeline and build the non-unique state id set
- the changes in the SamzaStateInternals are not needed if you make the stores map still follow the same definition of {stateId -> store}. The state internals is generated per PTransform so it will not have dups here.
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
Outdated
Show resolved
Hide resolved
...samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
Outdated
Show resolved
Hide resolved
...s/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java
Outdated
Show resolved
Hide resolved
…or Samza Runner - Add unit tests - Re-enable ParDoTest with the support on same staste id across DoFns
b4f6ba2
to
95540c1
Compare
* Perform prescan of a pipeline to identify non-unique state ids
c1cda56
to
76e8fe6
Compare
Thank you so much for taking the time reviewing, providing suggestions on the pre-scan/ better naming, and fixing my understanding in SamzaStoreInternals. I have addressed all the comments and please have another look. |
Thanks for fixing the problem for #1 in my previous comments. However, seems some of the necessary changes are also reverted for #2. I expect passing a mapping {stateId -> storeId} (currently it's a set of stateId) to the |
Thank you @xinyuiscool for the review again. SamzaStoreStateInternals logic has been included and verified the changes with all the tests passing from ./gradlew :runners:samza:build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks great! Have some minor comments about consolidating some duplicate logic into a single class.
...samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
Outdated
Show resolved
Hide resolved
...samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
Outdated
Show resolved
Hide resolved
runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java
Outdated
Show resolved
Hide resolved
…ndMultiTranslator within ConfigContext/TranslationContext
@xinyuiscool Awesome, I've addressed all the comments above. Please review upon your availability! Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Run Java PreCommit |
* Samza runner support for non unique stateId across multiple ParDos (apache#24276) * Update version
Samza runner support for non-unique stateIds across multiple ParDos
(1) Do not touch existing RocksDB Store Id with stateId used only by one ParDo
(2) Add a prescan logic in SamzaRunner to traverse Beam pipeline topologically and identify non-unique stateIds.
(3) Using the set of non-unique stateIds, introduce a new mapping of user-provided stateIds to store ID
// stateId used in a single ParDo continue to use the same stateId as storeId
.apply("First stateful ParDo", ParDo.of(fn));
→ “foo"
// stateId used in multiple ParDos use a new mapping to store Id with PTransform name
.apply("Second stateful ParDo", ParDo.of(fn2));
→ “foo-Second_stateful_ParDo”
(4) Re-enable ParDoTest in :runners:samza build.gradle
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.