Skip to content
Permalink
Browse files

[SPARK-29885][PYTHON][CORE] Improve the exception message when readin…

…g the daemon port

### What changes were proposed in this pull request?
In production environment, my PySpark application occurs an exception and it's message as below:
```
19/10/28 16:15:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: No port number in pyspark.daemon's stdout
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:204)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:122)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:95)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
```
At first, I think a physical node has many ports are occupied by a large number of processes.
But I found the total number of ports in use is only 671.
```
[yarnr1115 ~]$ netstat -a | wc -l 671
671
```
I  checked the code of PythonWorkerFactory in line 204 and found:
```
 daemon = pb.start()
 val in = new DataInputStream(daemon.getInputStream)
 try {
 daemonPort = in.readInt()
 } catch {
 case _: EOFException =>
 throw new SparkException(s"No port number in $daemonModule's stdout")
 }
```
I added some code here:
```
logError("Meet EOFException, daemon is alive: ${daemon.isAlive()}")
logError("Exit value: ${daemon.exitValue()}")
```
Then I recurrent the exception and it's message as below:
```
19/10/28 16:15:03 ERROR PythonWorkerFactory: Meet EOFException, daemon is alive: false
19/10/28 16:15:03 ERROR PythonWorkerFactory: Exit value: 139
19/10/28 16:15:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: No port number in pyspark.daemon's stdout
 at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:206)
 at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:122)
 at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:95)
 at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
 at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
 at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
 at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
 at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
 at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
 at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:121)
 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
```
I think the exception message has caused me a lot of confusion.
This PR will add meaningful log for exception information.

### Why are the changes needed?
In order to clarify the exception and try three times default.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Exists UT.

Closes #26510 from beliefer/improve-except-message.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
beliefer authored and HyukjinKwon committed Nov 21, 2019
1 parent affaefe commit 85c004d5b0303435dc207e139cdc51f0f2d3e160
Showing with 6 additions and 1 deletion.
  1. +6 −1 core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -212,8 +212,13 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try {
daemonPort = in.readInt()
} catch {
case _: EOFException if daemon.isAlive =>
throw new SparkException("EOFException occurred while reading the port number " +
s"from $daemonModule's stdout")
case _: EOFException =>
throw new SparkException(s"No port number in $daemonModule's stdout")
throw new SparkException(
s"EOFException occurred while reading the port number from $daemonModule's" +
s" stdout and terminated with code: ${daemon.exitValue}.")
}

// test that the returned port number is within a valid range.

0 comments on commit 85c004d

Please sign in to comment.
You can’t perform that action at this time.