Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

Root cause of "CallbackServer shutdown prevents Python program from exiting" #117

Closed
jason-ni opened this Issue Feb 5, 2013 · 5 comments

Comments

Projects
None yet
2 participants

jason-ni commented Feb 5, 2013

I think I find the root cause of issue #47. I'm testing the callback feature of py4j today and find that the process does not exit. After some debug work, it's observed that the callback server thread is blocked on the "self.server_socket.accept()".

I modified 2 places of CallbackServer code:

  1. In "run" method, I use select to detect socket state change. The idea is from http://stackoverflow.com/questions/5308080/python-socket-accept-nonblocking
  2. In "shutdown" method, "self.server_socket.shutdown(socket.SHUT_RDWR)" raise exception, it's because the java process has already been shutdown and the client socket has already been closed. So we need to catch it and continue do the socket close.
class CallbackServer(object):

    #...

    def run(self):
        """Starts listening and accepting connection requests.

           This method is called when invoking `CallbackServer.start()`. A
           CallbackServer instance is created and started automatically when
           a :class:`JavaGateway <py4j.java_gateway.JavaGateway>` instance is
           created.
        """
        try:
            with self.lock:
                self.is_shutdown = False
            logger.info('Callback Server Starting')
            self.server_socket.listen(5)
            logger.info('Socket listening on {0}'.
                    format(smart_decode(self.server_socket.getsockname())))

            read_list = [self.server_socket]
            while not self.is_shutdown:
                readable, writable, errored = select.select(read_list, [], [])
                if self.is_shutdown: break
                for s in readable:
                    socket_instance, _ = self.server_socket.accept()
                    input = socket_instance.makefile('rb', 0)
                    connection = CallbackConnection(self.pool, input,
                            socket_instance,
                            self.gateway_client)
                    with self.lock:
                        if not self.is_shutdown:
                            self.connections.append(connection)
                            connection.start()
                        else:
                            connection.socket.shutdown(socket.SHUT_RDWR)
                            connection.socket.close()
        except Exception:
            if self.is_shutdown:
                logger.info('Error while waiting for a connection.')
            else:
                logger.exception('Error while waiting for a connection.')

    def shutdown(self):
        """Stops listening and accepting connection requests. All live
           connections are closed.

           This method can safely be called by another thread.
        """
        logger.info('Callback Server Shutting Down')
        with self.lock:
            self.is_shutdown = True
            try:
                try:
                    self.server_socket.shutdown(socket.SHUT_RDWR)
                except Exception, err:
                    pass  # socket error 10057 observed. We should continue to close this socket.
                self.server_socket.close()
                self.server_socket = None
            except Exception:
                pass

            for connection in self.connections:
                try:
                    connection.socket.shutdown(socket.SHUT_RDWR)
                    connection.socket.close()
                except Exception:
                    pass

            self.pool.clear()
        self.thread.join()
        self.thread = None
Owner

bartdag commented Feb 6, 2013

Hi Jason, thanks for your help!

select() is supposed to block if we don't provide a timeout, so we probably need a timeout (something configurable by the user I guess).

Thanks for the catch about the except: the "except Exception: pass" was there for that purpose, but I had missed that server_socket.close() was not executed.

Did you try that code on your end? Did it work? I haven't encountered issue #47 for a while so I just want to be sure that this really fix the problem before moving on.

jason-ni commented Feb 7, 2013

The reason to use select() is that socket.accept() won't be interrupted when socket.close() is invoked. So the callback server block there and prevents the main process exiting. Another solution is to create a socket and connect to server_socket in the "shutdown" method after the code of server_socket.close(). It will also wake up the "accept" blocking. I tested both of 2 solution and they all works.
I found issue #47 when I ran example code of callback feature in the document. It can be reproduced every time.

Owner

bartdag commented Feb 7, 2013

Hi Jason,

I understand the problem with the actual code, and I agree that select seems a better alternative than other (e.g., using thread.daemon = True). My only concern is that calling select without a timeout value is a blocking operation. From the python documentation:

"When the timeout argument is omitted the function blocks until at least one file descriptor is ready. A time-out value of zero specifies a poll and never blocks."

But thanks for the clarification. If it works for you, I'll implement it, but I'll add a timeout to the select().

@bartdag bartdag pushed a commit that referenced this issue Feb 10, 2013

Barthelemy Dagenais more robust python close and shutdown. refs #117 e83f8e6
Owner

bartdag commented Feb 10, 2013

Ok, I found out why select() works without specifying a timeout. Essentially, when the socket is closed in shutdown(), select raises this exception:

Traceback (most recent call last):
  File "/Users/barthelemy/projects/py4j/py4j-python/src/py4j/java_gateway.py", line 1021, in run
    readable, writable, errored = select.select(read_list, [], [])
error: (9, 'Bad file descriptor')

Thanks for your patch! I'm running the tests to see if everything is green and I will push the change.

@bartdag bartdag pushed a commit that referenced this issue Feb 10, 2013

Barthelemy Dagenais Use select() before accept() to prevent infinite blocking. refs #117 34ca26a

@ghost ghost assigned bartdag Feb 10, 2013

@bartdag bartdag pushed a commit that referenced this issue Feb 10, 2013

Barthelemy Dagenais Added default timeout for linux (macosx and presumably windows do not…
… need this). refs #117
fa8495b
Owner

bartdag commented Feb 10, 2013

Alright, on MacOSX (BSD?) and presumably Windows, select() raises an exception if the underlying socket is shut down/closed, but on Linux, it waits indefinitely. Therefore, I added a timeout, which makes the tests slow on Linux. Please reopen the issue if the problem is still present.

@bartdag bartdag closed this Feb 10, 2013

@bartdag bartdag pushed a commit that referenced this issue May 27, 2016

Barthelemy Dagenais more robust python close and shutdown. refs #117 c2c57ce

@bartdag bartdag pushed a commit that referenced this issue May 27, 2016

Barthelemy Dagenais Use select() before accept() to prevent infinite blocking. refs #117 cdf2b6f

@bartdag bartdag pushed a commit that referenced this issue May 27, 2016

Barthelemy Dagenais Added default timeout for linux (macosx and presumably windows do not…
… need this). refs #117
6d1c518
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment