Skip to content

Commit

Permalink
Detect failed fork() calls; improve error logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Aug 1, 2014
1 parent 282c2c4 commit b79254d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.api.python

import java.io.{DataInputStream, InputStream, OutputStreamWriter}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
import java.net._

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -64,10 +64,16 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

// Attempt to connect, restart and retry once if it fails
try {
new Socket(daemonHost, daemonPort)
val socket = new Socket(daemonHost, daemonPort)
val launchStatus = new DataInputStream(socket.getInputStream).readInt()
if (launchStatus != 0) {
logWarning("Python daemon failed to launch worker")
}
socket
} catch {
case exc: SocketException =>
logWarning("Python daemon unexpectedly quit, attempting to restart")
logWarning("Failed to open socket to Python daemon:", exc)
logWarning("Assuming that daemon unexpectedly quit, attempting to restart")
stopDaemon()
startDaemon()
new Socket(daemonHost, daemonPort)
Expand Down
28 changes: 19 additions & 9 deletions python/pyspark/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def waitSocketClose(sock):
outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
exit_code = 0
try:
write_int(0, outfile) # Acknowledge that the fork was successful
outfile.flush()
worker_main(infile, outfile)
except SystemExit as exc:
exit_code = exc.code
Expand Down Expand Up @@ -128,16 +130,24 @@ def handle_sigchld(*args):
if listen_sock in ready_fds:
sock, addr = listen_sock.accept()
# Launch a worker process
if os.fork() == 0:
listen_sock.close()
try:
worker(sock)
except:
traceback.print_exc()
os._exit(1)
try:
fork_return_code = os.fork()
if fork_return_code == 0:
listen_sock.close()
try:
worker(sock)
except:
traceback.print_exc()
os._exit(1)
else:
os._exit(0)
else:
os._exit(0)
else:
sock.close()
except OSError as e:
print >> sys.stderr, "Daemon failed to fork PySpark worker: %s" % e
outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
write_int(-1, outfile) # Signal that the fork failed
outfile.flush()
sock.close()
finally:
shutdown(1)
Expand Down

0 comments on commit b79254d

Please sign in to comment.