Skip to content

Commit

Permalink
Better handling for process exit
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Feb 12, 2014
1 parent e36f443 commit 546e89f
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 20 deletions.
6 changes: 6 additions & 0 deletions CHANGES.txt
@@ -1,6 +1,12 @@
CHANGES
=======

0.6.0 (02-12-2014)
------------------

- Better handling for process exit.


0.5.0 (01-29-2014)
------------------

Expand Down
17 changes: 15 additions & 2 deletions aiohttp/server.py
Expand Up @@ -66,6 +66,19 @@ def __init__(self, *, loop=None,
self.debug = debug
self.access_log = access_log
self.access_log_format = access_log_format
self.transport = None

def closing(self):
"""Worker process is about to exit, we need cleanup everything and
stop accepting requests. It is especially important for keep-alive
connections."""
self._keep_alive = False
if self._keep_alive_handle is not None:
self._keep_alive_handle.cancel()
self._keep_alive_handle = None

if self._request_handler is None and self.transport is not None:
self.transport.close()

def connection_made(self, transport):
self.transport = transport
Expand Down Expand Up @@ -152,8 +165,8 @@ def start(self):
if self._request_handler:
if self._keep_alive and self._keep_alive_period:
self.log_debug(
'Start keep-alive timer for %s sec.',
self._keep_alive_period)
'Start keep-alive timer for %s sec.',
self._keep_alive_period)
self._keep_alive_handle = self._loop.call_later(
self._keep_alive_period, self.transport.close)
else:
Expand Down
33 changes: 20 additions & 13 deletions aiohttp/worker.py
Expand Up @@ -35,9 +35,9 @@ def run(self):

def wrap_protocol(self, proto):
proto.connection_made = _wrp(
id(proto), proto.connection_made, self.connections)
proto, proto.connection_made, self.connections)
proto.connection_lost = _wrp(
id(proto), proto.connection_lost, self.connections, False)
proto, proto.connection_lost, self.connections, False)
return proto

def factory(self, wsgi, host, port):
Expand Down Expand Up @@ -85,13 +85,18 @@ def add_server(t):
self.alive = False

# stop accepting requests
if not self.alive and self.servers:
self.log.info(
"Stopping server: %s, connections: %s",
pid, len(self.connections))
for server in self.servers:
server.close()
self.servers.clear()
if not self.alive:
if self.servers:
self.log.info(
"Stopping server: %s, connections: %s",
pid, len(self.connections))
for server in self.servers:
server.close()
self.servers.clear()

# prepare connections for closing
for conn in self.connections.values():
conn.closing()

yield from asyncio.sleep(1.0, loop=self.loop)
except KeyboardInterrupt:
Expand Down Expand Up @@ -125,16 +130,18 @@ def close(self):

class _wrp:

def __init__(self, id, meth, tracking, add=True):
self._id = id
def __init__(self, proto, meth, tracking, add=True):
self._proto = proto
self._id = id(proto)
self._meth = meth
self._tracking = tracking
self._add = add

def __call__(self, *args):
if self._add:
self._tracking[self._id] = 1
self._tracking[self._id] = self._proto
elif self._id in self._tracking:
del self._tracking[self._id]

return self._meth(*args)
conn = self._meth(*args)
return conn
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -2,7 +2,7 @@
import sys
from setuptools import setup, find_packages

version = '0.5.0'
version = '0.6.0'

if sys.version_info >= (3,4):
install_requires = []
Expand Down
13 changes: 13 additions & 0 deletions tests/http_server_test.py
Expand Up @@ -37,6 +37,19 @@ def test_handle_request(self):
content = b''.join([c[1][0] for c in list(transport.write.mock_calls)])
self.assertTrue(content.startswith(b'HTTP/1.1 404 Not Found\r\n'))

def test_closing(self):
srv = server.ServerHttpProtocol(loop=self.loop)
srv._keep_alive = True

keep_alive_handle = unittest.mock.Mock()
srv._keep_alive_handle = keep_alive_handle
srv.transport = unittest.mock.Mock()

srv.closing()
self.assertIsNone(srv._keep_alive_handle)
self.assertTrue(srv.transport.close.called)
self.assertTrue(keep_alive_handle.cancel.called)

def test_connection_made(self):
srv = server.ServerHttpProtocol(loop=self.loop)
self.assertIsNone(srv._request_handler)
Expand Down
11 changes: 7 additions & 4 deletions tests/worker_test.py
Expand Up @@ -75,10 +75,11 @@ def test__run(self, m_asyncio):
self.assertTrue(self.worker.notify.called)

def test__run_connections(self):
conn = unittest.mock.Mock()
self.worker.ppid = 1
self.worker.alive = False
self.worker.servers = [unittest.mock.Mock()]
self.worker.connections = {1: object()}
self.worker.connections = {1: conn}
self.worker.sockets = []
self.worker.wsgi = unittest.mock.Mock()
self.worker.log = unittest.mock.Mock()
Expand All @@ -96,6 +97,7 @@ def _close_conns():
self.assertTrue(self.worker.log.info.called)
self.assertTrue(self.worker.notify.called)
self.assertFalse(self.worker.servers)
self.assertTrue(conn.closing.called)

@unittest.mock.patch('aiohttp.worker.os')
@unittest.mock.patch('aiohttp.worker.asyncio.sleep')
Expand Down Expand Up @@ -189,16 +191,17 @@ def __init__(self, wsgi):
self.assertTrue(wsgi[2].close.called)

def test_wrp(self):
conn = object()
tracking = {}
meth = unittest.mock.Mock()
wrp = worker._wrp(1, meth, tracking)
wrp = worker._wrp(conn, meth, tracking)
wrp()

self.assertIn(1, tracking)
self.assertIn(id(conn), tracking)
self.assertTrue(meth.called)

meth = unittest.mock.Mock()
wrp = worker._wrp(1, meth, tracking, False)
wrp = worker._wrp(conn, meth, tracking, False)
wrp()

self.assertNotIn(1, tracking)
Expand Down

0 comments on commit 546e89f

Please sign in to comment.