Skip to content

Commit

Permalink
Clear out RPC connection pool before exit.
Browse files Browse the repository at this point in the history
Fixes bug 767984.

This patch ensures that pooled connections to a messaging system get
cleaned up before a process that has used the RPC API exits.

Change-Id: I56eca54334075378534a7a5d3434c420319672b4
  • Loading branch information
russellb committed Jan 31, 2012
1 parent 59c0a72 commit bd32abf
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 0 deletions.
3 changes: 3 additions & 0 deletions bin/nova-dhcpbridge
Expand Up @@ -125,5 +125,8 @@ def main():

print init_leases(network_id)

rpc.cleanup()


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions bin/nova-manage
Expand Up @@ -2354,6 +2354,7 @@ def main():
# call the action with the remaining arguments
try:
fn(*fn_args, **fn_kwargs)
rpc.cleanup()
sys.exit(0)
except TypeError:
print _("Possible wrong number of arguments supplied")
Expand Down
13 changes: 13 additions & 0 deletions nova/rpc/__init__.py
Expand Up @@ -139,6 +139,19 @@ def notify(context, topic, msg):
return _get_impl().notify(context, topic, msg)


def cleanup():
"""Clean up resoruces in use by implementation.
Clean up any resources that have been allocated by the RPC implementation.
This is typically open connections to a messaging service. This function
would get called before an application using this API exits to allow
connections to get torn down cleanly.
:returns: None
"""
return _get_impl().cleanup()


_RPCIMPL = None


Expand Down
9 changes: 9 additions & 0 deletions nova/rpc/amqp.py
Expand Up @@ -127,6 +127,11 @@ def __getattr__(self, key):
else:
raise exception.InvalidRPCConnectionReuse()

@classmethod
def empty_pool(cls):
while cls._connection_pool.free_items:
cls._connection_pool.get().close()


def msg_reply(msg_id, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id.
Expand Down Expand Up @@ -353,3 +358,7 @@ def notify(context, topic, msg):
pack_context(msg, context)
with ConnectionContext() as conn:
conn.notify_send(topic, msg)


def cleanup():
ConnectionContext.empty_pool()
4 changes: 4 additions & 0 deletions nova/rpc/impl_carrot.py
Expand Up @@ -635,6 +635,10 @@ def notify(context, topic, msg):
publisher.close()


def cleanup():
pass


def generic_response(message_data, message):
"""Logs a result and exits."""
LOG.debug(_('response %s'), message_data)
Expand Down
4 changes: 4 additions & 0 deletions nova/rpc/impl_fake.py
Expand Up @@ -136,6 +136,10 @@ def notify(context, topic, msg):
pass


def cleanup():
pass


def fanout_cast(context, topic, msg):
"""Cast to all consumers of a topic"""
method = msg.get('method')
Expand Down
4 changes: 4 additions & 0 deletions nova/rpc/impl_kombu.py
Expand Up @@ -618,3 +618,7 @@ def fanout_cast(context, topic, msg):
def notify(context, topic, msg):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg)


def cleanup():
return rpc_amqp.cleanup()
4 changes: 4 additions & 0 deletions nova/rpc/impl_qpid.py
Expand Up @@ -506,3 +506,7 @@ def fanout_cast(context, topic, msg):
def notify(context, topic, msg):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg)


def cleanup():
return rpc_amqp.cleanup()
1 change: 1 addition & 0 deletions nova/service.py
Expand Up @@ -414,3 +414,4 @@ def wait():
_launcher.wait()
except KeyboardInterrupt:
_launcher.stop()
rpc.cleanup()

0 comments on commit bd32abf

Please sign in to comment.