Skip to content

Commit

Permalink
SPARK-1700: Close socket file descriptors on task completion
Browse files Browse the repository at this point in the history
This will ensure that sockets do not build up over the course of a job, and that cancellation successfully cleans up sockets.

Tested in standalone mode. More file descriptors spawn than expected (around 1000ish rather than the expected 8ish) but they do not pile up between runs, or as high as before (where they went up to around 5k).

Author: Aaron Davidson <aaron@databricks.com>

Closes #623 from aarondav/pyspark2 and squashes the following commits:

0ca13bb [Aaron Davidson] SPARK-1700: Close socket file descriptors on task completion
  • Loading branch information
aarondav committed May 3, 2014
1 parent 2b961d8 commit 0a14421
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,16 @@ private[spark] class PythonRDD[T: ClassTag](
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
val env = SparkEnv.get
val worker = env.createPythonWorker(pythonExec, envVars.toMap)
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)

// Ensure worker socket is closed on task completion. Closing sockets is idempotent.
context.addOnCompleteCallback(() =>
try {
worker.close()
} catch {
case e: Exception => logWarning("Failed to close worker socket", e)
}
)

@volatile var readerException: Exception = null

Expand Down

0 comments on commit 0a14421

Please sign in to comment.