Skip to content

Commit

Permalink
Restore old SIGPIPE handler in a proxy server test.
Browse files Browse the repository at this point in the history
I think not doing this was setting the SIGPIPE handler to signal.SIG_DFL
instead of the Python default of signal.SIG_IGN.  This could cause other
tests which make a client stop reading before all data "chunks" are read
to fail harder than they should (i.e. the SIGPIPE there is benign and
even expected--the other side of the socket really did get closed
early).

Fixed leak on 499s

This fixes an issue where Request objects (and related objects) were
not getting garbage collected when a 499 (client disconnect) occurred
for responses that still would have had more than the proxy server's
client chunk size left to send.

Fixed bug #1055834

Change-Id: I40266a0874cd2142c90f26b9f030d534286fc6da
  • Loading branch information
dbishop authored and notmyname committed Sep 25, 2012
1 parent 50d72a1 commit 7368af5
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 33 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG
@@ -1,3 +1,7 @@
swift (1.7.4)

* Fix issue where early client disconnects may have caused a memory leak

swift (1.7.2)

* Fix issue where memcache serialization was not properly loading
Expand Down
2 changes: 1 addition & 1 deletion swift/__init__.py
Expand Up @@ -14,7 +14,7 @@ def pretty_version(self):
return '%s-dev' % (self.canonical_version,)


_version = Version('1.7.2', True)
_version = Version('1.7.4', True)
__version__ = _version.pretty_version
__canonical_version__ = _version.canonical_version

Expand Down
55 changes: 25 additions & 30 deletions swift/proxy/controllers/base.py
Expand Up @@ -505,45 +505,40 @@ def _make_app_iter_reader(self, node, source, queue, logger_thread_locals):
if getattr(source, 'swift_conn', None):
self.close_swift_conn(source)

def _make_app_iter(self, node, source, response):
def _make_app_iter(self, node, source):
"""
Returns an iterator over the contents of the source (via its read
func). There is also quite a bit of cleanup to ensure garbage
collection works and the underlying socket of the source is closed.
:param response: The webob.Response object this iterator should be
assigned to via response.app_iter.
:param source: The httplib.Response object this iterator should read
from.
:param node: The node the source is reading from, for logging purposes.
"""
try:
try:
# Spawn reader to read from the source and place in the queue.
# We then drop any reference to the source or node, for garbage
# collection purposes.
queue = Queue(1)
spawn_n(self._make_app_iter_reader, node, source, queue,
self.app.logger.thread_locals)
source = node = None
while True:
chunk = queue.get(timeout=self.app.node_timeout)
if isinstance(chunk, bool): # terminator
success = chunk
if not success:
raise Exception(_('Failed to read all data'
' from the source'))
break
yield chunk
except Empty:
raise ChunkReadTimeout()
except (GeneratorExit, Timeout):
self.app.logger.warn(_('Client disconnected on read'))
except Exception:
self.app.logger.exception(_('Trying to send to client'))
raise
finally:
response.app_iter = None
# Spawn reader to read from the source and place in the queue.
# We then drop any reference to the source or node, for garbage
# collection purposes.
queue = Queue(1)
spawn_n(self._make_app_iter_reader, node, source, queue,
self.app.logger.thread_locals)
source = node = None
while True:
chunk = queue.get(timeout=self.app.node_timeout)
if isinstance(chunk, bool): # terminator
success = chunk
if not success:
raise Exception(_('Failed to read all data'
' from the source'))
break
yield chunk
except Empty:
raise ChunkReadTimeout()
except (GeneratorExit, Timeout):
self.app.logger.warn(_('Client disconnected on read'))
except Exception:
self.app.logger.exception(_('Trying to send to client'))
raise

def close_swift_conn(self, src):
try:
Expand Down Expand Up @@ -656,7 +651,7 @@ def GETorHEAD_base(self, req, server_type, partition, nodes, path,
self.close_swift_conn(src)

res = Response(request=req, conditional_response=True)
res.app_iter = self._make_app_iter(node, source, res)
res.app_iter = self._make_app_iter(node, source)
# See NOTE: swift_conn at top of file about this.
res.swift_conn = source.swift_conn
update_headers(res, source.getheaders())
Expand Down
64 changes: 62 additions & 2 deletions test/unit/proxy/test_server.py
Expand Up @@ -58,9 +58,29 @@
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))


_request_instances = 0


def request_init(self, *args, **kwargs):
global _request_instances
self._orig_init(*args, **kwargs)
_request_instances += 1


def request_del(self):
global _request_instances
if self._orig_del:
self._orig_del()
_request_instances -= 1


def setup():
global _testdir, _test_servers, _test_sockets, \
_orig_container_listing_limit, _test_coros
Request._orig_init = Request.__init__
Request.__init__ = request_init
Request._orig_del = getattr(Request, '__del__', None)
Request.__del__ = request_del
monkey_patch_mimetools()
# Since we're starting up a lot here, we're going to test more than
# just chunked puts; we're also going to test parts of
Expand Down Expand Up @@ -152,6 +172,9 @@ def teardown():
swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT = \
_orig_container_listing_limit
rmtree(os.path.dirname(_testdir))
Request.__init__ = Request._orig_init
if Request._orig_del:
Request.__del__ = Request._orig_del


def fake_http_connect(*code_iter, **kwargs):
Expand Down Expand Up @@ -711,8 +734,8 @@ def test_GET_newest_large_file(self):
def handler(_junk1, _junk2):
calls[0] += 1

old_handler = signal.signal(signal.SIGPIPE, handler)
try:
signal.signal(signal.SIGPIPE, handler)
prolis = _test_sockets[0]
prosrv = _test_servers[0]
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
Expand All @@ -739,7 +762,7 @@ def handler(_junk1, _junk2):
self.assertEqual(res.body, obj)
self.assertEqual(calls[0], 0)
finally:
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
signal.signal(signal.SIGPIPE, old_handler)

def test_PUT_auto_content_type(self):
with save_globals():
Expand Down Expand Up @@ -3250,6 +3273,43 @@ def fake_connect_put_node(nodes, part, path, headers,
self.assertEquals(resp.status_int, 400)
self.assertTrue('X-Delete-At in past' in resp.body)

def test_leak_1(self):
global _request_instances
prolis = _test_sockets[0]
prosrv = _test_servers[0]
obj_len = prosrv.client_chunk_size * 2
# PUT test file
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a/c/test_leak_1 HTTP/1.1\r\n'
'Host: localhost\r\n'
'Connection: close\r\n'
'X-Auth-Token: t\r\n'
'Content-Length: %s\r\n'
'Content-Type: application/octet-stream\r\n'
'\r\n%s' % (obj_len, 'a' * obj_len))
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
self.assertEqual(headers[:len(exp)], exp)
# Remember Request instance count
before_request_instances = _request_instances
# GET test file, but disconnect early
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('GET /v1/a/c/test_leak_1 HTTP/1.1\r\n'
'Host: localhost\r\n'
'Connection: close\r\n'
'X-Auth-Token: t\r\n'
'\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 200'
self.assertEqual(headers[:len(exp)], exp)
fd.read(1)
fd.close()
sock.close()
self.assertEquals(before_request_instances, _request_instances)

class TestContainerController(unittest.TestCase):
"Test swift.proxy_server.ContainerController"
Expand Down

0 comments on commit 7368af5

Please sign in to comment.