Skip to content

Commit

Permalink
make libvirt driver get_connection thread-safe
Browse files Browse the repository at this point in the history
libvirt driver's get_connection is not thread safe in the
presence of a libvirtd restart during concurrent incoming
requests.

With existing code each will in turn call get_connection,
find the connection is broken, try to create new one, block
for a while and yield to the next thread to do the same.
You get as many connections as there are incoming requests
and only the last one is used finally. If enough are incoming
these connections can exhaust the client pool configured
for libvirtd.
One fix is to hold a lock while creating the connection.
Note that has_min_version calls _conn which calls get_connection
and thus the direct call to _has_min_version()

Also added the exception text if it fails to register an event
handler for lifecycle events.

Change-Id: I090765802bfe443440f16722bc7c43b6280fe56a
Fixes: bug #1240905
  • Loading branch information
Tom Hancock committed Oct 23, 2013
1 parent eda3900 commit b2e64e3
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 20 deletions.
48 changes: 48 additions & 0 deletions nova/tests/virt/libvirt/test_libvirt.py
Expand Up @@ -5424,6 +5424,54 @@ def getLibVersion(self):
self.assertEqual('foo', conn.get_hypervisor_hostname())
self.assertEqual('foo', conn.get_hypervisor_hostname())

def test_get_connection_serial(self):

def get_conn_currency(driver):
driver._conn.getLibVersion()

def connect_with_block(*a, **k):
# enough to allow another connect to run
eventlet.sleep(0)
self.calls += 1
return self.conn

self.calls = 0
self.stubs.Set(libvirt_driver.LibvirtDriver,
'_connect', connect_with_block)
driver = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)

# call serially
get_conn_currency(driver)
get_conn_currency(driver)
self.assertEqual(self.calls, 1)

def test_get_connection_concurrency(self):

def get_conn_currency(driver):
driver._conn.getLibVersion()

def connect_with_block(*a, **k):
# enough to allow another connect to run
eventlet.sleep(0)
self.calls += 1
return self.conn

self.calls = 0
self.stubs.Set(libvirt_driver.LibvirtDriver,
'_connect', connect_with_block)
driver = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)

# call concurrently
thr1 = eventlet.spawn(get_conn_currency, driver=driver)
thr2 = eventlet.spawn(get_conn_currency, driver=driver)

# let threads run
eventlet.sleep(0)

thr1.wait()
thr2.wait()
self.assertEqual(self.calls, 1)


class HostStateTestCase(test.TestCase):

Expand Down
46 changes: 26 additions & 20 deletions nova/virt/libvirt/driver.py
Expand Up @@ -398,27 +398,31 @@ def set_cache_mode(self, conf):
driver_cache)
conf.driver_cache = cache_mode

def has_min_version(self, lv_ver=None, hv_ver=None, hv_type=None):
@staticmethod
def _has_min_version(conn, lv_ver=None, hv_ver=None, hv_type=None):
try:
if lv_ver is not None:
libvirt_version = self._conn.getLibVersion()
libvirt_version = conn.getLibVersion()
if libvirt_version < utils.convert_version_to_int(lv_ver):
return False

if hv_ver is not None:
hypervisor_version = self._conn.getVersion()
hypervisor_version = conn.getVersion()
if hypervisor_version < utils.convert_version_to_int(hv_ver):
return False

if hv_type is not None:
hypervisor_type = self._conn.getType()
hypervisor_type = conn.getType()
if hypervisor_type != hv_type:
return False

return True
except Exception:
return False

def has_min_version(self, lv_ver=None, hv_ver=None, hv_type=None):
return self._has_min_version(self._conn, lv_ver, hv_ver, hv_type)

def _native_thread(self):
"""Receives async events coming in from libvirtd.
Expand Down Expand Up @@ -577,18 +581,18 @@ def init_host(self, host):
self._init_events()

def _get_connection(self):
# multiple concurrent connections are protected by _wrapped_conn_lock
with self._wrapped_conn_lock:
wrapped_conn = self._wrapped_conn

if not wrapped_conn or not self._test_connection(wrapped_conn):
LOG.debug(_('Connecting to libvirt: %s'), self.uri())
if not CONF.libvirt_nonblocking:
wrapped_conn = self._connect(self.uri(), self.read_only)
else:
wrapped_conn = tpool.proxy_call(
(libvirt.virDomain, libvirt.virConnect),
self._connect, self.uri(), self.read_only)
with self._wrapped_conn_lock:
if not wrapped_conn or not self._test_connection(wrapped_conn):
LOG.debug(_('Connecting to libvirt: %s'), self.uri())
if not CONF.libvirt_nonblocking:
wrapped_conn = self._connect(self.uri(), self.read_only)
else:
wrapped_conn = tpool.proxy_call(
(libvirt.virDomain, libvirt.virConnect),
self._connect, self.uri(), self.read_only)
self._wrapped_conn = wrapped_conn

try:
Expand All @@ -599,19 +603,21 @@ def _get_connection(self):
libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
self._event_lifecycle_callback,
self)
except Exception:
LOG.warn(_("URI %s does not support events"),
self.uri())
except Exception as e:
LOG.warn(_("URI %(uri)s does not support events: %(error)s"),
{'uri': self.uri(), 'error': e})

if self.has_min_version(MIN_LIBVIRT_CLOSE_CALLBACK_VERSION):
if self._has_min_version(wrapped_conn,
MIN_LIBVIRT_CLOSE_CALLBACK_VERSION):
try:
LOG.debug(_("Registering for connection events: %s") %
str(self))
wrapped_conn.registerCloseCallback(
self._close_callback, None)
except libvirt.libvirtError:
LOG.debug(_("URI %s does not support connection events"),
self.uri())
except libvirt.libvirtError as e:
LOG.warn(_("URI %(uri)s does not support connection"
" events: %(error)s"),
{'uri': self.uri(), 'error': e})

return wrapped_conn

Expand Down

0 comments on commit b2e64e3

Please sign in to comment.