Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4088] [PySpark] Python worker should exit after socket is closed by JVM #2941

Closed
wants to merge 1 commit into from

Conversation

davies
Copy link
Contributor

@davies davies commented Oct 25, 2014

In case of take() or exception in Python, python worker may exit before JVM read() all the response, then the write thread may raise "Connection reset" exception.

Python should always wait JVM to close the socket first.

cc @JoshRosen This is a warm fix, or the tests will be flaky, sorry for that.

@SparkQA
Copy link

SparkQA commented Oct 25, 2014

Test build #22202 has started for PR 2941 at commit 9d4d21e.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor

Ah, so there was a race between Python telling Java that it was exiting and Java realizing that Python was going to exit? It looks like this should fix that by having the Python worker wait until the JVM has acknowledged that it knows that Python is going to exit.

@davies
Copy link
Contributor Author

davies commented Oct 25, 2014

The race is that which of reader or writer thread will know that the worker has exited, If reader find it first, then no problem, but if writer find it first, then it will throw IOException.

@davies
Copy link
Contributor Author

davies commented Oct 25, 2014

It's not easy to reproduce this failure, but it did fail in jenkins:

======================================================================
ERROR: test_reuse_worker_after_take (__main__.WorkerTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "pyspark/tests.py", line 1437, in test_reuse_worker_after_take
    self.assertEqual(0, rdd.first())
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 1109, in first
    rs = self.take(1)
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 1091, in take
    res = self.context.runJob(self, takeUpToNumLeft, p, True)
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/context.py", line 796, in runJob
    it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
    self.target_id, self.name)
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
    format(target_id, '.', name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketException: Connection reset
        java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118)
        java.net.SocketOutputStream.write(SocketOutputStream.java:159)
        java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        java.io.DataOutputStream.write(DataOutputStream.java:107)
        java.io.FilterOutputStream.write(FilterOutputStream.java:97)
        org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:395)
        org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:393)
        scala.collection.Iterator$class.foreach(Iterator.scala:727)
        scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:393)
        org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:243)
        org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
        org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
        org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1416)
        org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1191)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1180)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1179)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1179)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:694)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:694)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:694)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1397)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1352)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

@davies
Copy link
Contributor Author

davies commented Oct 25, 2014

Also I can not reproduce this without daemon.py (simulate the behavior in Windows).

@JoshRosen
Copy link
Contributor

This fix looks good to me, so I'm going to merge it in to unblock the other PRs (I ran the tests locally and it doesn't look like this introduced any regressions). We can continue to revisit if we encounter issues.

@SparkQA
Copy link

SparkQA commented Oct 25, 2014

Test build #22202 has finished for PR 2941 at commit 9d4d21e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22202/
Test FAILed.

@asfgit asfgit closed this in e41786c Oct 25, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants