Skip to content

Commit

Permalink
Fix concurrency of XenAPI sessions
Browse files Browse the repository at this point in the history
Fixes bug 879044

Nova currently does not serialize access to the XenAPI session which can
result in multiple (green)threads trying to use the same HTTP connection.
This will typically only affect Python 2.7 which has updated xmlrpclib to
try to use one HTTP connection for multiple requests.

Change-Id: I101d63b822c8bf8c28674a836e4b54aa8259e1a8
  • Loading branch information
Johannes Erdfelt committed Oct 24, 2011
1 parent 86b1011 commit 97cfccc
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 97 deletions.
7 changes: 5 additions & 2 deletions nova/tests/test_xenapi.py
Expand Up @@ -1072,8 +1072,11 @@ def wait_for_task(self, *args):
'free-computed': 40}
return json.dumps({'host_memory': vm})

def get_xenapi(self):
return FakeXenApi()
def call_xenapi(self, method, *args):
f = FakeXenApi()
for m in method.split('.'):
f = getattr(f, m)
return f(*args)


class HostStateTestCase(test.TestCase):
Expand Down
4 changes: 2 additions & 2 deletions nova/tests/xenapi/stubs.py
Expand Up @@ -38,7 +38,7 @@ def fake_fetch_image(cls, context, session, instance, image, user,
sr_ref = "fakesr"
vdi_ref = create_vdi(name_label=name_label, read_only=False,
sr_ref=sr_ref, sharable=False)
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
vdi_rec = session.call_xenapi("VDI.get_record", vdi_ref)
vdi_uuid = vdi_rec['uuid']
return [dict(vdi_type='os', vdi_uuid=vdi_uuid)]

Expand Down Expand Up @@ -307,7 +307,7 @@ def fake_create_snapshot(self, instance):
def fake_get_vdi(cls, session, vm_ref):
vdi_ref = fake.create_vdi(name_label='derp', read_only=False,
sr_ref='herp', sharable=False)
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
vdi_rec = session.call_xenapi("VDI.get_record", vdi_ref)
return vdi_ref, {'uuid': vdi_rec['uuid'], }

def fake_shutdown(self, inst, vm, hard=True):
Expand Down
80 changes: 40 additions & 40 deletions nova/virt/xenapi/vm_utils.py
Expand Up @@ -205,8 +205,8 @@ def ensure_free_mem(cls, session, instance):
mem = long(instance_type['memory_mb']) * 1024 * 1024
#get free memory from host
host = session.get_xenapi_host()
host_free_mem = long(session.get_xenapi().host.
compute_free_memory(host))
host_free_mem = long(session.call_xenapi("host.compute_free_memory",
host))
return host_free_mem >= mem

@classmethod
Expand Down Expand Up @@ -260,11 +260,11 @@ def create_cd_vbd(cls, session, vm_ref, vdi_ref, userdevice, bootable):
@classmethod
def find_vbd_by_number(cls, session, vm_ref, number):
"""Get the VBD reference from the device number"""
vbd_refs = session.get_xenapi().VM.get_VBDs(vm_ref)
vbd_refs = session.call_xenapi("VM.get_VBDs", vm_ref)
if vbd_refs:
for vbd_ref in vbd_refs:
try:
vbd_rec = session.get_xenapi().VBD.get_record(vbd_ref)
vbd_rec = session.call_xenapi("VBD.get_record", vbd_ref)
if vbd_rec['userdevice'] == str(number):
return vbd_ref
except cls.XenAPI.Failure, exc:
Expand Down Expand Up @@ -303,7 +303,7 @@ def destroy_vdi(cls, session, vdi_ref):
@classmethod
def create_vdi(cls, session, sr_ref, name_label, virtual_size, read_only):
"""Create a VDI record and returns its reference."""
vdi_ref = session.get_xenapi().VDI.create(
vdi_ref = session.call_xenapi("VDI.create",
{'name_label': name_label,
'name_description': '',
'SR': sr_ref,
Expand All @@ -322,18 +322,18 @@ def create_vdi(cls, session, sr_ref, name_label, virtual_size, read_only):

@classmethod
def set_vdi_name_label(cls, session, vdi_uuid, name_label):
vdi_ref = session.get_xenapi().VDI.get_by_uuid(vdi_uuid)
session.get_xenapi().VDI.set_name_label(vdi_ref, name_label)
vdi_ref = session.call_xenapi("VDI.get_by_uuid", vdi_uuid)
session.call_xenapi("VDI.set_name_label", vdi_ref, name_label)

@classmethod
def get_vdi_for_vm_safely(cls, session, vm_ref):
"""Retrieves the primary VDI for a VM"""
vbd_refs = session.get_xenapi().VM.get_VBDs(vm_ref)
vbd_refs = session.call_xenapi("VM.get_VBDs", vm_ref)
for vbd in vbd_refs:
vbd_rec = session.get_xenapi().VBD.get_record(vbd)
vbd_rec = session.call_xenapi("VBD.get_record", vbd)
# Convention dictates the primary VDI will be userdevice 0
if vbd_rec['userdevice'] == '0':
vdi_rec = session.get_xenapi().VDI.get_record(vbd_rec['VDI'])
vdi_rec = session.call_xenapi("VDI.get_record", vbd_rec['VDI'])
return vbd_rec['VDI'], vdi_rec
raise exception.Error(_("No primary VDI found for"
"%(vm_ref)s") % locals())
Expand Down Expand Up @@ -377,7 +377,7 @@ def get_sr_path(cls, session):
snapshots or by restoring an image in the DISK_VHD format.
"""
sr_ref = safe_find_sr(session)
sr_rec = session.get_xenapi().SR.get_record(sr_ref)
sr_rec = session.call_xenapi("SR.get_record", sr_ref)
sr_uuid = sr_rec["uuid"]
return os.path.join(FLAGS.xenapi_sr_base_path, sr_uuid)

Expand Down Expand Up @@ -602,9 +602,9 @@ def _fetch_image_glance_vhd(cls, context, session, instance, image,
os_vdi_uuid = vdis[0]['vdi_uuid']

# Set the name-label to ease debugging
vdi_ref = session.get_xenapi().VDI.get_by_uuid(os_vdi_uuid)
vdi_ref = session.call_xenapi("VDI.get_by_uuid", os_vdi_uuid)
primary_name_label = instance.name
session.get_xenapi().VDI.set_name_label(vdi_ref, primary_name_label)
session.call_xenapi("VDI.set_name_label", vdi_ref, primary_name_label)

cls._check_vdi_size(context, session, instance, os_vdi_uuid)
return vdis
Expand Down Expand Up @@ -696,7 +696,7 @@ def _fetch_image_glance_disk(cls, context, session, instance, image,
# If anything goes wrong, we need to remember its uuid.
try:
filename = None
vdi_uuid = session.get_xenapi().VDI.get_uuid(vdi_ref)
vdi_uuid = session.call_xenapi("VDI.get_uuid", vdi_ref)

with vdi_attached_here(session, vdi_ref, read_only=False) as dev:
_stream_disk(dev, image_type, virtual_size, image_file)
Expand All @@ -713,7 +713,7 @@ def _fetch_image_glance_disk(cls, context, session, instance, image,
task = session.async_call_plugin('glance', fn, args)
filename = session.wait_for_task(task, instance_id)
# Remove the VDI as it is not needed anymore.
session.get_xenapi().VDI.destroy(vdi_ref)
session.call_xenapi("VDI.destroy", vdi_ref)
LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi_ref)
return [dict(vdi_type=ImageType.to_string(image_type),
vdi_uuid=None,
Expand Down Expand Up @@ -828,12 +828,12 @@ def determine_is_pv(cls, session, instance_id, vdi_ref, disk_image_type,

@classmethod
def set_vm_name_label(cls, session, vm_ref, name_label):
session.get_xenapi().VM.set_name_label(vm_ref, name_label)
session.call_xenapi("VM.set_name_label", vm_ref, name_label)

@classmethod
def lookup(cls, session, name_label):
"""Look the instance up and return it if available"""
vm_refs = session.get_xenapi().VM.get_by_name_label(name_label)
vm_refs = session.call_xenapi("VM.get_by_name_label", name_label)
n = len(vm_refs)
if n == 0:
return None
Expand All @@ -847,14 +847,14 @@ def lookup_vm_vdis(cls, session, vm_ref):
"""Look for the VDIs that are attached to the VM"""
# Firstly we get the VBDs, then the VDIs.
# TODO(Armando): do we leave the read-only devices?
vbd_refs = session.get_xenapi().VM.get_VBDs(vm_ref)
vbd_refs = session.call_xenapi("VM.get_VBDs", vm_ref)
vdi_refs = []
if vbd_refs:
for vbd_ref in vbd_refs:
try:
vdi_ref = session.get_xenapi().VBD.get_VDI(vbd_ref)
vdi_ref = session.call_xenapi("VBD.get_VDI", vbd_ref)
# Test valid VDI
record = session.get_xenapi().VDI.get_record(vdi_ref)
record = session.call_xenapi("VDI.get_record", vdi_ref)
LOG.debug(_('VDI %s is still available'), record['uuid'])
except cls.XenAPI.Failure, exc:
LOG.exception(exc)
Expand Down Expand Up @@ -884,7 +884,7 @@ def preconfigure_instance(cls, session, instance, vdi_ref, network_info):

@classmethod
def lookup_kernel_ramdisk(cls, session, vm):
vm_rec = session.get_xenapi().VM.get_record(vm)
vm_rec = session.call_xenapi("VM.get_record", vm)
if 'PV_kernel' in vm_rec and 'PV_ramdisk' in vm_rec:
return (vm_rec['PV_kernel'], vm_rec['PV_ramdisk'])
else:
Expand All @@ -908,7 +908,7 @@ def compile_diagnostics(cls, session, record):
"""Compile VM diagnostics data"""
try:
host = session.get_xenapi_host()
host_ip = session.get_xenapi().host.get_record(host)["address"]
host_ip = session.call_xenapi("host.get_record", host)["address"]
except (cls.XenAPI.Failure, KeyError) as e:
return {"Unable to retrieve diagnostics": e}

Expand Down Expand Up @@ -936,7 +936,7 @@ def compile_metrics(cls, session, start_time, stop_time=None):
start_time = int(start_time)
try:
host = session.get_xenapi_host()
host_ip = session.get_xenapi().host.get_record(host)["address"]
host_ip = session.call_xenapi("host.get_record", host)["address"]
except (cls.XenAPI.Failure, KeyError) as e:
raise exception.CouldNotFetchMetrics()

Expand Down Expand Up @@ -1066,8 +1066,8 @@ def get_vhd_parent(session, vdi_rec):
"""
if 'vhd-parent' in vdi_rec['sm_config']:
parent_uuid = vdi_rec['sm_config']['vhd-parent']
parent_ref = session.get_xenapi().VDI.get_by_uuid(parent_uuid)
parent_rec = session.get_xenapi().VDI.get_record(parent_ref)
parent_ref = session.call_xenapi("VDI.get_by_uuid", parent_uuid)
parent_rec = session.call_xenapi("VDI.get_record", parent_ref)
vdi_uuid = vdi_rec['uuid']
LOG.debug(_("VHD %(vdi_uuid)s has parent %(parent_ref)s") % locals())
return parent_ref, parent_rec
Expand All @@ -1076,7 +1076,7 @@ def get_vhd_parent(session, vdi_rec):


def get_vhd_parent_uuid(session, vdi_ref):
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
vdi_rec = session.call_xenapi("VDI.get_record", vdi_ref)
ret = get_vhd_parent(session, vdi_rec)
if ret:
parent_ref, parent_rec = ret
Expand All @@ -1089,8 +1089,8 @@ def walk_vdi_chain(session, vdi_uuid):
"""Yield vdi_recs for each element in a VDI chain"""
# TODO(jk0): perhaps make get_vhd_parent use this
while True:
vdi_ref = session.get_xenapi().VDI.get_by_uuid(vdi_uuid)
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
vdi_ref = session.call_xenapi("VDI.get_by_uuid", vdi_uuid)
vdi_rec = session.call_xenapi("VDI.get_record", vdi_ref)
yield vdi_rec

parent_uuid = vdi_rec['sm_config'].get('vhd-parent')
Expand Down Expand Up @@ -1153,14 +1153,14 @@ def safe_find_sr(session):
def find_sr(session):
"""Return the storage repository to hold VM images"""
host = session.get_xenapi_host()
sr_refs = session.get_xenapi().SR.get_all()
sr_refs = session.call_xenapi("SR.get_all")
for sr_ref in sr_refs:
sr_rec = session.get_xenapi().SR.get_record(sr_ref)
sr_rec = session.call_xenapi("SR.get_record", sr_ref)
if not ('i18n-key' in sr_rec['other_config'] and
sr_rec['other_config']['i18n-key'] == 'local-storage'):
continue
for pbd_ref in sr_rec['PBDs']:
pbd_rec = session.get_xenapi().PBD.get_record(pbd_ref)
pbd_rec = session.call_xenapi("PBD.get_record", pbd_ref)
if pbd_rec['host'] == host:
return sr_ref
return None
Expand All @@ -1179,9 +1179,9 @@ def safe_find_iso_sr(session):
def find_iso_sr(session):
"""Return the storage repository to hold ISO images"""
host = session.get_xenapi_host()
sr_refs = session.get_xenapi().SR.get_all()
sr_refs = session.call_xenapi("SR.get_all")
for sr_ref in sr_refs:
sr_rec = session.get_xenapi().SR.get_record(sr_ref)
sr_rec = session.call_xenapi("SR.get_record", sr_ref)

LOG.debug(_("ISO: looking at SR %(sr_rec)s") % locals())
if not sr_rec['content_type'] == 'iso':
Expand All @@ -1198,7 +1198,7 @@ def find_iso_sr(session):
LOG.debug(_("ISO: SR MATCHing our criteria"))
for pbd_ref in sr_rec['PBDs']:
LOG.debug(_("ISO: ISO, looking to see if it is host local"))
pbd_rec = session.get_xenapi().PBD.get_record(pbd_ref)
pbd_rec = session.call_xenapi("PBD.get_record", pbd_ref)
pbd_rec_host = pbd_rec['host']
LOG.debug(_("ISO: PBD matching, want %(pbd_rec)s, have %(host)s") %
locals())
Expand Down Expand Up @@ -1257,14 +1257,14 @@ def vdi_attached_here(session, vdi_ref, read_only=False):
vbd_rec['qos_algorithm_params'] = {}
vbd_rec['qos_supported_algorithms'] = []
LOG.debug(_('Creating VBD for VDI %s ... '), vdi_ref)
vbd_ref = session.get_xenapi().VBD.create(vbd_rec)
vbd_ref = session.call_xenapi("VBD.create", vbd_rec)
LOG.debug(_('Creating VBD for VDI %s done.'), vdi_ref)
try:
LOG.debug(_('Plugging VBD %s ... '), vbd_ref)
session.get_xenapi().VBD.plug(vbd_ref)
session.call_xenapi("VBD.plug", vbd_ref)
try:
LOG.debug(_('Plugging VBD %s done.'), vbd_ref)
orig_dev = session.get_xenapi().VBD.get_device(vbd_ref)
orig_dev = session.call_xenapi("VBD.get_device", vbd_ref)
LOG.debug(_('VBD %(vbd_ref)s plugged as %(orig_dev)s') % locals())
dev = remap_vbd_dev(orig_dev)
if dev != orig_dev:
Expand All @@ -1280,7 +1280,7 @@ def vdi_attached_here(session, vdi_ref, read_only=False):
LOG.debug(_('Destroying VBD for VDI %s ... '), vdi_ref)
vbd_unplug_with_retry(session, vbd_ref)
finally:
ignore_failure(session.get_xenapi().VBD.destroy, vbd_ref)
ignore_failure(session.call_xenapi, "VBD.destroy", vbd_ref)
LOG.debug(_('Destroying VBD for VDI %s done.'), vdi_ref)


Expand All @@ -1292,7 +1292,7 @@ def vbd_unplug_with_retry(session, vbd_ref):
# FIXME(sirp): We can use LoopingCall here w/o blocking sleep()
while True:
try:
session.get_xenapi().VBD.unplug(vbd_ref)
session.call_xenapi("VBD.unplug", vbd_ref)
LOG.debug(_('VBD.unplug successful first time.'))
return
except VMHelper.XenAPI.Failure, e:
Expand Down Expand Up @@ -1325,7 +1325,7 @@ def get_this_vm_uuid():


def get_this_vm_ref(session):
return session.get_xenapi().VM.get_by_uuid(get_this_vm_uuid())
return session.call_xenapi("VM.get_by_uuid", get_this_vm_uuid())


def _is_vdi_pv(dev):
Expand Down

0 comments on commit 97cfccc

Please sign in to comment.