Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34726][SQL][2.4] Fix collectToPython timeouts #31818

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Mar 12, 2021

What changes were proposed in this pull request?

One of our customers frequently encounters "serve-DataFrame" java.net.SocketTimeoutException: Accept timed errors in PySpark because DataSet.collectToPython() in Spark 2.4 does the following:

  1. Collects the results
  2. Opens up a socket server that is then listening to the connection from Python side
  3. Runs the event listeners as part of withAction on the same thread as SPARK-25680 is not available in Spark 2.4
  4. Returns the address of the socket server to Python
  5. The Python side connects to the socket server and fetches the data

As the customer has a custom, long running event listener the time between 2. and 5. is frequently longer than the default connection timeout and increasing the connect timeout is not a good solution as we don't know how long running the listeners can take.

Why are the changes needed?

This PR simply moves the socket server creation (2.) after running the listeners (3.). I think this approach has has a minor side effect that errors in socket server creation are not reported as onFailure events, but currently errors happening during opening the connection from Python side or data transfer from JVM to Python are also not reported as events so IMO this is not a big change.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added new UT + manual test.

@peter-toth
Copy link
Contributor Author

cc @HyukjinKwon

@peter-toth
Copy link
Contributor Author

cc @gaborgsomogyi

@peter-toth
Copy link
Contributor Author

cc @attilapiros

@gaborgsomogyi
Copy link
Contributor

Hmmm, this is exactly one reason why we've added this: #30389
Good to see that this has been solved in 3.x line with a separate thread but it would be good to solve it on 2.x line too.

@gaborgsomogyi
Copy link
Contributor

Since I've seen a thread about 2.4.8 it would be good to solve this there, right @HyukjinKwon @dongjoon-hyun ?
I'm asking it whether we should go further with the in-depth review or not.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, the fix looks making sense to me.

@HyukjinKwon
Copy link
Member

cc @viirya FYI

@gaborgsomogyi
Copy link
Contributor

I would like to torture this a bit but I agree with the main intention too.

@gaborgsomogyi
Copy link
Contributor

KafkaContinuousSinkSuite failure is unrelated.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Validated manually and passed, additionally don't see high risk in it.

@srowen
Copy link
Member

srowen commented Mar 12, 2021

Jenkins test this please

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks good. But I think you can come up with a good unit test where a longer running onSuccess method is given with the listener (longer than a socket timeout...).

WDYT?

@gaborgsomogyi
Copy link
Contributor

You mean measuring time as assert criteria?

@attilapiros
Copy link
Contributor

You mean measuring time as assert criteria?

No. Just having a onSuccess method with a longer sleep than socket timeout itself.
We expect the data will be still available at the python side or even we should not leave the JVM and the test can be in one of the existing scala Suites.

If this turn out to be too complex I can let it go. But let's check it first.

Change-Id: If8017ce93dae2ead782567afea1eeba5438d73ba
@peter-toth
Copy link
Contributor Author

You mean measuring time as assert criteria?

No. Just having a onSuccess method with a longer sleep than socket timeout itself.
We expect the data will be still available at the python side or even we should not leave the JVM and the test can be in one of the existing scala Suites.

If this turn out to be too complex I can let it go. But let's check it first.

Thanks, added in 6b18cc7

override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
// Longer than 15s in `PythonServer.setupOneConnectionServer`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's extract the 15000 into a private[spark] var as member:

And add a comment above // visible for testing.
Then we can speed up this single test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, added in 6b18cc7 and 8f6b811

authHelper.authToServer(socket)
Source.fromInputStream(socket.getInputStream)

spark.listenerManager.unregister(listener)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put this into finally to be on the safe side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right, added in 8f6b811


override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
// Longer than 15s in `PythonServer.setupOneConnectionServer`
Thread.sleep(20 * 1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own understanding does this mean the test waits 15 seconds to pass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to wait a bit longer than the timeout. Without the fix in Dataset.collectToPython this UT fails.

Peter Toth added 2 commits March 12, 2021 16:06
Change-Id: I61cd8a8ea5eadbc72b38a3ecbdf1cd41556d5de6
Change-Id: I3d8077ba9e65c0367e401b3fdd8590fd8a83cf73
@gaborgsomogyi
Copy link
Contributor

Let's see jenkins...

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @gaborgsomogyi . Yes, it's a good timing to discuss this and the decision is up to the release manager, @viirya .

Since I've seen a thread about 2.4.8 it would be good to solve this there, right @HyukjinKwon @dongjoon-hyun ?
I'm asking it whether we should go further with the in-depth review or not.

BTW, we should be more careful because 2.4.8 is the last release.

@SparkQA
Copy link

SparkQA commented Mar 12, 2021

Test build #136006 has finished for PR 31818 at commit b4b425e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented Mar 12, 2021

Test build #136010 has finished for PR 31818 at commit 8f6b811.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Mar 12, 2021

This PR simply moves the socket server creation (2.) after running the listeners (3.). I think this approach has has a minor side effect that errors in socket server creation are not reported as onFailure events, but currently errors happening during opening the connection from Python side or data transfer from JVM to Python are also not reported as events so IMO this is not a big change.

Hmm, I'm not sure about this. It sounds like if any people rely onFailure events, there will be a breaking change for them. As 2.4.8 is the last release and is only maintenance release, above seems to be a behavior change so it is still risky to me.

@peter-toth
Copy link
Contributor Author

peter-toth commented Mar 12, 2021

This PR simply moves the socket server creation (2.) after running the listeners (3.). I think this approach has has a minor side effect that errors in socket server creation are not reported as onFailure events, but currently errors happening during opening the connection from Python side or data transfer from JVM to Python are also not reported as events so IMO this is not a big change.

Hmm, I'm not sure about this. It sounds like if any people rely onFailure events, there will be a breaking change for them. As 2.4.8 is the last release and is only maintenance release, above seems to be a behavior change so it is still risky to me.

I think it is very rare that a ServerSocket throws an exception because it can't bind to some port on 127.0.0.1. Other communication errors between Java and Python are also not reported as onFailure event currently...

@viirya
Copy link
Member

viirya commented Mar 12, 2021

I think it is very rare that a SocketServer throws an exception because it can't bind to some port on 127.0.0.1. Other communication errors between Java and Python are also not reported as onFailure event currently...

Oh, I see. If only socket server creation is not caught by the listener, although it is still a behavior change however seems less risky. Okay it looks making sense. I'm not against this but would like to have more eyes on this. cc @cloud-fan @maropu

@gaborgsomogyi
Copy link
Contributor

@viirya you see it well and you're right, it's a tradeoff which should be discussed in-depth to come up with balanced decision.
Pro:

  • Used don't need to guess how much time listeners will consume and mustn't make guesses to set spark.python.authenticate.socketTimeout properly.

Con:

  • If ServerSocket can't bind to localhost then an event will be dropped which is behavior change.
  • 2.4.8 is the last 2.4 release so sudden movements are not suggested.

Couple of heavy users complained about this but that said spark.python.authenticate.socketTimeout can be increased as workaround. If we decide not to merge this then we must mention in our doc that such cases the timeout must be increased to a best guess timeout (this is only true for 2.4 line, because on 3.x it runs in separate thread).
I've seen cases where this timeout was between 20 to 60 seconds and the complain was exactly this wide range.
I see their point because configuring huge and sensitive computations in PROD based on best guesses freaks out the guys.

@attilapiros
Copy link
Contributor

  • Used don't need to guess how much time listeners will consume and mustn't make guesses to set spark.python.authenticate.socketTimeout properly.

Moreover spark.python.authenticate.socketTimeout is only available from 3.1.0 so they even cannot use that config as a workaround here.

@gaborgsomogyi
Copy link
Contributor

True, so backport needed such case. I guess it wouldn't be brain surgery though.

@peter-toth
Copy link
Contributor Author

@cloud-fan, @maropu could you please review this PR?

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'll merge in few days if there are no more comments.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this fix if no one raises objection.

@viirya
Copy link
Member

viirya commented Mar 22, 2021

Thanks all. Merging to 2.4.

viirya pushed a commit that referenced this pull request Mar 22, 2021
### What changes were proposed in this pull request?

One of our customers frequently encounters `"serve-DataFrame" java.net.SocketTimeoutException: Accept timed` errors in PySpark because `DataSet.collectToPython()` in Spark 2.4 does the following:
1. Collects the results
2. Opens up a socket server that is then listening to the connection from Python side
3. Runs the event listeners as part of `withAction` on the same thread as SPARK-25680 is not available in Spark 2.4
4. Returns the address of the socket server to Python
5. The Python side connects to the socket server and fetches the data

As the customer has a custom, long running event listener the time between 2. and 5. is frequently longer than the default connection timeout and increasing the connect timeout is not a good solution as we don't know how long running the listeners can take.

### Why are the changes needed?

This PR simply moves the socket server creation (2.) after running the listeners (3.). I think this approach has has a minor side effect that errors in socket server creation are not reported as `onFailure` events, but currently errors happening during opening the connection from Python side or data transfer from JVM to Python are also not reported as events so IMO this is not a big change.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added new UT + manual test.

Closes #31818 from peter-toth/SPARK-34726-fix-collectToPython-timeouts-2.4.

Lead-authored-by: Peter Toth <ptoth@cloudera.com>
Co-authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
@viirya viirya closed this Mar 22, 2021
@peter-toth
Copy link
Contributor Author

Thanks all for the review.

@viirya
Copy link
Member

viirya commented Mar 22, 2021

BTW, the JIRA description is empty. I just copied the description here to the JIRA.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants