diff --git a/bin/nova-dhcpbridge b/bin/nova-dhcpbridge index e5ce64c1162..ae77fc7b1db 100755 --- a/bin/nova-dhcpbridge +++ b/bin/nova-dhcpbridge @@ -125,5 +125,8 @@ def main(): print init_leases(network_id) + rpc.cleanup() + + if __name__ == "__main__": main() diff --git a/bin/nova-manage b/bin/nova-manage index 1275d72d99d..b899dab40e9 100755 --- a/bin/nova-manage +++ b/bin/nova-manage @@ -2354,6 +2354,7 @@ def main(): # call the action with the remaining arguments try: fn(*fn_args, **fn_kwargs) + rpc.cleanup() sys.exit(0) except TypeError: print _("Possible wrong number of arguments supplied") diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index db42640b042..1fbd9aead59 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -139,6 +139,19 @@ def notify(context, topic, msg): return _get_impl().notify(context, topic, msg) +def cleanup(): + """Clean up resoruces in use by implementation. + + Clean up any resources that have been allocated by the RPC implementation. + This is typically open connections to a messaging service. This function + would get called before an application using this API exits to allow + connections to get torn down cleanly. + + :returns: None + """ + return _get_impl().cleanup() + + _RPCIMPL = None diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py index 92f1478b07c..4831008061c 100644 --- a/nova/rpc/amqp.py +++ b/nova/rpc/amqp.py @@ -127,6 +127,11 @@ def __getattr__(self, key): else: raise exception.InvalidRPCConnectionReuse() + @classmethod + def empty_pool(cls): + while cls._connection_pool.free_items: + cls._connection_pool.get().close() + def msg_reply(msg_id, reply=None, failure=None, ending=False): """Sends a reply or an error on the channel signified by msg_id. @@ -353,3 +358,7 @@ def notify(context, topic, msg): pack_context(msg, context) with ConnectionContext() as conn: conn.notify_send(topic, msg) + + +def cleanup(): + ConnectionContext.empty_pool() diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py index 1dbec177d0a..5750e59899e 100644 --- a/nova/rpc/impl_carrot.py +++ b/nova/rpc/impl_carrot.py @@ -635,6 +635,10 @@ def notify(context, topic, msg): publisher.close() +def cleanup(): + pass + + def generic_response(message_data, message): """Logs a result and exits.""" LOG.debug(_('response %s'), message_data) diff --git a/nova/rpc/impl_fake.py b/nova/rpc/impl_fake.py index 9d7f867a2b5..dc30522b828 100644 --- a/nova/rpc/impl_fake.py +++ b/nova/rpc/impl_fake.py @@ -136,6 +136,10 @@ def notify(context, topic, msg): pass +def cleanup(): + pass + + def fanout_cast(context, topic, msg): """Cast to all consumers of a topic""" method = msg.get('method') diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index 600da4a9f65..e2c0b9036af 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -618,3 +618,7 @@ def fanout_cast(context, topic, msg): def notify(context, topic, msg): """Sends a notification event on a topic.""" return rpc_amqp.notify(context, topic, msg) + + +def cleanup(): + return rpc_amqp.cleanup() diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py index 3ea921a8c4d..f4b6b9ffaa1 100644 --- a/nova/rpc/impl_qpid.py +++ b/nova/rpc/impl_qpid.py @@ -506,3 +506,7 @@ def fanout_cast(context, topic, msg): def notify(context, topic, msg): """Sends a notification event on a topic.""" return rpc_amqp.notify(context, topic, msg) + + +def cleanup(): + return rpc_amqp.cleanup() diff --git a/nova/service.py b/nova/service.py index 88ba1ef10bf..56f706c4bff 100644 --- a/nova/service.py +++ b/nova/service.py @@ -414,3 +414,4 @@ def wait(): _launcher.wait() except KeyboardInterrupt: _launcher.stop() + rpc.cleanup()