-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-7600] borrow SDK harness management code into Spark runner #9095
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
R: @angoenka |
bf0314b to
084e384
Compare
|
Run Java PreCommit |
5bc6e2c to
40700bf
Compare
|
Run Java Spark PortableValidatesRunner Batch |
|
Is this change ready for review? If so, gentle reminder @angoenka. |
404ff8f to
3b8af43
Compare
|
Run Java Spark PortableValidatesRunner Batch |
3b8af43 to
5fa467e
Compare
|
Alright, I've finally ironed out the problems with this one. However, we should merge #9410 first for best results. |
|
Run Portable_Python PreCommit |
17d6477 to
e2173d2
Compare
|
@angoenka PTAL |
e2173d2 to
7ef47e3
Compare
angoenka
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change.
| @Override | ||
| public FlinkExecutableStageContext get(JobInfo jobInfo) { | ||
| public ExecutableStageContext get( | ||
| JobInfo jobInfo, SerializableFunction<Object, Boolean> isReleaseSynchronous) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isReleaseSynchronous is not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to avoid this.
| }); | ||
|
|
||
| return state.getFactory().get(jobInfo); | ||
| return state.getFactory(isReleaseSynchronous).get(jobInfo, isReleaseSynchronous); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, I think we should pass isReleaseSynchronous only at 1 place to avoid confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to avoid this.
852b238 to
bce9c9e
Compare
| @Override | ||
| public FlinkExecutableStageContext get(JobInfo jobInfo) { | ||
| public ExecutableStageContext get( | ||
| JobInfo jobInfo, SerializableFunction<Object, Boolean> isReleaseSynchronous) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still feel that we should not have isReleaseSynchronous in get method as the usage can be unpredictable in case where we call get method at 2 locations for the same jobInfo but with different isReleaseSynchronous method. As we are using caching, the 1st isReleaseSynchronous will be applied and can lead to concurrency bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, I made separate contexts for Flink and Spark.
9dcbd6a to
7c8a637
Compare
|
@angoenka I have refactored this quite a bit. Unfortunately I only managed to eliminate a small amount of the complexity as most of it seems necessary. PTAL |
7c8a637 to
2954ee3
Compare
|
Run Java PreCommit |
|
Run Portable_Python PreCommit |
|
Run Python Spark ValidatesRunner |
|
Run Java Spark PortableValidatesRunner Batch |
angoenka
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some nits but looks good.
|
|
||
| @Override | ||
| public ExecutableStageContext get(JobInfo jobInfo) { | ||
| MultiInstanceFactory state = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we call this jobFactory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
|
||
| @Override | ||
| public ExecutableStageContext get(JobInfo jobInfo) { | ||
| MultiInstanceFactory state = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Now the Spark runner can reuse SDK harnesses, and multiple SDK harness can be used.
The latter will hopefully enable multicore processing on TFX, for example.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.