New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-7412] shut down executor service in fn harness #8722
Conversation
@@ -203,7 +204,9 @@ public static void main( | |||
JvmInitializers.runBeforeProcessing(options); | |||
|
|||
LOG.info("Entering instruction processing loop"); | |||
control.processInstructionRequests(options.as(GcsOptions.class).getExecutorService()); | |||
ExecutorService executorService = options.as(GcsOptions.class).getExecutorService(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not come from this PR, but shouldn't this be in a more general options type than GcsOptions
? I mean SparkPipelineOptions
does not have this thing, so not sure if this will have an effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also shouldn't a change in the harness trigger other tests to validate that we don't break portability (Flink PVR for example). (Note also unrelated to this PR, just thinking on things that can be done).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed on both points. will have to run spark and flink pvr tests manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM modulo the options comment, I prefer that @angoenka takes a look before merging too.
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Batch |
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
Outdated
Show resolved
Hide resolved
Run JavaPortabilityApi PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
I was somewhat surprised to find that the unbounded thread growth I encountered when running portable validates runner tests on local Spark is was actually caused by
GcsOptions.java
. I'm guessing this hasn't caused problems before because the SDK harness was always run in a separately managed process/container/whatever that terminated when work was complete.Incidentally, this was a pain to track down because thousands of threads used for any number of purposes were all uniformly named
pool-N-thread-M
(as they all used Java'sdefaultThreadFactory
). Going forward, I might look into naming Beam threads so it's easier to debug and analyze performance.R: @angoenka
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.