-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45245][PYTHON][CONNECT] PythonWorkerFactory: Timeout if worker does not connect back. #43023
Conversation
cc: @HyukjinKwon, @WweiL |
@@ -184,10 +185,20 @@ private[spark] class PythonWorkerFactory( | |||
redirectStreamsToStderr(workerProcess.getInputStream, workerProcess.getErrorStream) | |||
|
|||
// Wait for it to connect to our socket, and validate the auth secret. | |||
serverSocketChannel.socket().setSoTimeout(10000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was supposed to be 10 second timeout. But this call does not seem to affect serverSocketChannel.accept()
.
This set might only take effect if we did serverSocketChannel.socket().accept()
, but hat returns a socket, not a channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @ueshin
// return (accept() used to be blocking), the test doesn't hang for a long time. | ||
val createFuture = Future { | ||
val ex = intercept[SparkException] { | ||
workerFactory.createSimpleWorker(blockingMode = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I know if this blockingMode=true would affect serverSocketChannel.configureBlocking(false)
above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't. It is for client socket. Shall I remove it?
Seems the tests are failing because of format, maybe try
|
@HyukjinKwon the test failures seem unrelated. I tried multiple times. different types of tests are failing. Do you think we can merge this? |
Yeah I am merging it to master but I think we should switch this to use the daemonized worker instead of simple workers soon. Merged to master. |
I see. I have been looking for reasons doing so. This seems to be one of them. |
import org.apache.spark.util.ThreadUtils | ||
|
||
// Tests for PythonWorkerFactory. | ||
class PythonWorkerFactorySuite extends SparkFunSuite with Matchers with SharedSparkContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this suite have to extend Matchers
? @HyukjinKwon can you double check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Made a followup PR: #45459
…it in the test ### What changes were proposed in this pull request? This PR is a followup of #43023 that addresses a post-review comment. ### Why are the changes needed? It is unnecessary. It also matters with Scala compatibility so should better remove if unused. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45459 from HyukjinKwon/SPARK-45245-folllowup. Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org> Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…it in the test ### What changes were proposed in this pull request? This PR is a followup of apache#43023 that addresses a post-review comment. ### Why are the changes needed? It is unnecessary. It also matters with Scala compatibility so should better remove if unused. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#45459 from HyukjinKwon/SPARK-45245-folllowup. Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org> Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…it in the test ### What changes were proposed in this pull request? This PR is a followup of apache#43023 that addresses a post-review comment. ### Why are the changes needed? It is unnecessary. It also matters with Scala compatibility so should better remove if unused. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#45459 from HyukjinKwon/SPARK-45245-folllowup. Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org> Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
createSimpleWorker()
method inPythonWorkerFactory
waits forever if the worker fails to connect back to the server.This is because it calls accept() without a timeout. If the worker does not connect back, accept() waits forever. There is supposed to be 10 seconds timeout, but it was not implemented correctly.
This PR adds a 10 second timeout.
Why are the changes needed?
Otherwise create method could be stuck forever.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
Generated-by: ChatGPT 4.0
Asked ChatGPT to generate sample code to do non-blocking accept() on a socket channel in Java.