Skip to content

Commit

Permalink
Adds lock for create from vol/snap to avoid race conditions
Browse files Browse the repository at this point in the history
This patch protects create from volume/snapshot by using a
lockfile to protect the operation from concurrent deletes of
the volume/snapshot used in the create operation.

Currently, if a volume/snapshot is deleted while a volume is
being created from it that delete may complete during the
create operation thus leaving the new volume in error or stuck
state. This lock will ensure that:

(a) if a create of VolA from snap/volB is in progress, any
    delete requests for snap/volB will wait until the create
    is complete.

(b) if a delete of snap/volA is in progress, any create from
    snap/volA will wait until snap/volA delete is complete.

Co-authored-by: Takashi Natsume <natsume.takashi@lab.ntt.co.jp>
Closes-Bug: 1251334
Change-Id: Ie4bc0af789ab232593f55aa2f6b34345eb9b9929
  • Loading branch information
dosaboy and natsumetakashi committed Dec 5, 2013
1 parent e99cd78 commit 4f6e5fc
Show file tree
Hide file tree
Showing 2 changed files with 302 additions and 4 deletions.
233 changes: 233 additions & 0 deletions cinder/tests/test_volume.py
Expand Up @@ -46,6 +46,7 @@
from cinder.openstack.common import rpc
import cinder.policy
from cinder import quota
from cinder.taskflow.patterns import linear_flow
from cinder import test
from cinder.tests.brick.fake_lvm import FakeBrickLVM
from cinder.tests import conf_fixture
Expand All @@ -62,6 +63,8 @@
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volutils

import eventlet

QUOTAS = quota.QUOTAS

CONF = cfg.CONF
Expand Down Expand Up @@ -110,6 +113,8 @@ def setUp(self):
self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True)
self.stubs.Set(os.path, 'exists', lambda x: True)
self.volume.driver.set_initialized()
# keep ordered record of what we execute
self.called = []

def tearDown(self):
try:
Expand Down Expand Up @@ -443,6 +448,234 @@ def test_create_volume_from_snapshot(self):
self.volume.delete_snapshot(self.context, snapshot_id)
self.volume.delete_volume(self.context, volume_src['id'])

def _mock_synchronized(self, name, *s_args, **s_kwargs):
def inner_sync1(f):
def inner_sync2(*args, **kwargs):
self.called.append('lock-%s' % (name))
ret = f(*args, **kwargs)
self.called.append('unlock-%s' % (name))
return ret
return inner_sync2
return inner_sync1

def test_create_volume_from_snapshot_check_locks(self):
# mock the synchroniser so we can record events
self.stubs.Set(utils, 'synchronized', self._mock_synchronized)

self.stubs.Set(self.volume.driver, 'create_volume_from_snapshot',
lambda *args, **kwargs: None)

orig_flow = linear_flow.Flow.run

def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
self.assertEqual(len(self.called), 1)
# now proceed with the flow.
ret = orig_flow(*args, **kwargs)
return ret

# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']

# no lock
self.volume.create_volume(self.context, src_vol_id)

snap_id = self._create_snapshot(src_vol_id)['id']
# no lock
self.volume.create_snapshot(self.context, src_vol_id, snap_id)

dst_vol = tests_utils.create_volume(self.context,
snapshot_id=snap_id,
**self.volume_params)
dst_vol_id = dst_vol['id']
admin_ctxt = context.get_admin_context()

# mock the flow runner so we can do some checks
self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)

# locked
self.volume.create_volume(self.context, volume_id=dst_vol_id,
snapshot_id=snap_id)
self.assertEqual(len(self.called), 2)
self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
self.assertEqual(snap_id,
db.volume_get(admin_ctxt, dst_vol_id).snapshot_id)

# locked
self.volume.delete_volume(self.context, dst_vol_id)
self.assertEqual(len(self.called), 4)

# locked
self.volume.delete_snapshot(self.context, snap_id)
self.assertEqual(len(self.called), 6)

# locked
self.volume.delete_volume(self.context, src_vol_id)
self.assertEqual(len(self.called), 8)

self.assertEqual(self.called,
['lock-%s' % ('%s-delete_snapshot' % (snap_id)),
'unlock-%s' % ('%s-delete_snapshot' % (snap_id)),
'lock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'lock-%s' % ('%s-delete_snapshot' % (snap_id)),
'unlock-%s' % ('%s-delete_snapshot' % (snap_id)),
'lock-%s' % ('%s-delete_volume' % (src_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (src_vol_id))])

def test_create_volume_from_volume_check_locks(self):
# mock the synchroniser so we can record events
self.stubs.Set(utils, 'synchronized', self._mock_synchronized)

orig_flow = linear_flow.Flow.run

def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
self.assertEqual(len(self.called), 1)
# now proceed with the flow.
ret = orig_flow(*args, **kwargs)
return ret

# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']

# no lock
self.volume.create_volume(self.context, src_vol_id)

dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**self.volume_params)
dst_vol_id = dst_vol['id']
admin_ctxt = context.get_admin_context()

# mock the flow runner so we can do some checks
self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)

# locked
self.volume.create_volume(self.context, volume_id=dst_vol_id,
source_volid=src_vol_id)
self.assertEqual(len(self.called), 2)
self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
self.assertEqual(src_vol_id,
db.volume_get(admin_ctxt, dst_vol_id).source_volid)

# locked
self.volume.delete_volume(self.context, dst_vol_id)
self.assertEqual(len(self.called), 4)

# locked
self.volume.delete_volume(self.context, src_vol_id)
self.assertEqual(len(self.called), 6)

self.assertEqual(self.called,
['lock-%s' % ('%s-delete_volume' % (src_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (src_vol_id)),
'lock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'lock-%s' % ('%s-delete_volume' % (src_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (src_vol_id))])

def test_create_volume_from_volume_delete_lock_taken(self):
# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']

# no lock
self.volume.create_volume(self.context, src_vol_id)

dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**self.volume_params)
dst_vol_id = dst_vol['id']
admin_ctxt = context.get_admin_context()

orig_elevated = self.context.elevated

ctxt_deepcopy = self.context.deepcopy()
gthreads = []

def mock_elevated(*args, **kwargs):
# unset mock so it is only called once
self.stubs.Set(self.context, 'elevated', orig_elevated)

# we expect this to block and then fail
t = eventlet.spawn(self.volume.create_volume,
ctxt_deepcopy,
volume_id=dst_vol_id, source_volid=src_vol_id)
gthreads.append(t)

return orig_elevated(*args, **kwargs)

# mock something from early on in the delete operation and within the
# lock so that when we do the create we expect it to block.
self.stubs.Set(self.context, 'elevated', mock_elevated)

# locked
self.volume.delete_volume(self.context, src_vol_id)

# we expect the volume create to fail with the following err since the
# source volume was deleted while the create was locked. Note that the
# volume is still in the db since it was created by the test prior to
# calling manager.create_volume.
self.assertRaises(exception.VolumeNotFound, gthreads[0].wait)

def test_create_volume_from_snapshot_delete_lock_taken(self):
# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']

# no lock
self.volume.create_volume(self.context, src_vol_id)

# create snapshot
snap_id = self._create_snapshot(src_vol_id)['id']
# no lock
self.volume.create_snapshot(self.context, src_vol_id, snap_id)

# create vol from snapshot...
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**self.volume_params)
dst_vol_id = dst_vol['id']
admin_ctxt = context.get_admin_context()

orig_elevated = self.context.elevated

ctxt_deepcopy = self.context.deepcopy()
gthreads = []

def mock_elevated(*args, **kwargs):
# unset mock so it is only called once
self.stubs.Set(self.context, 'elevated', orig_elevated)

# We expect this to block and then fail
t = eventlet.spawn(self.volume.create_volume, ctxt_deepcopy,
volume_id=dst_vol_id, snapshot_id=snap_id)
gthreads.append(t)

return orig_elevated(*args, **kwargs)

# mock something from early on in the delete operation and within the
# lock so that when we do the create we expect it to block.
self.stubs.Set(self.context, 'elevated', mock_elevated)

# locked
self.volume.delete_snapshot(self.context, snap_id)

# we expect the volume create to fail with the following err since the
# snapshot was deleted while the create was locked. Note that the
# volume is still in the db since it was created by the test prior to
# calling manager.create_volume.
self.assertRaises(exception.SnapshotNotFound, gthreads[0].wait)

# locked
self.volume.delete_volume(self.context, src_vol_id)
# make sure it is gone
self.assertRaises(exception.VolumeNotFound, db.volume_get,
self.context, src_vol_id)

def test_create_volume_from_snapshot_with_encryption(self):
"""Test volume can be created from a snapshot of
an encrypted volume.
Expand Down
73 changes: 69 additions & 4 deletions cinder/volume/manager.py
Expand Up @@ -138,6 +138,49 @@
'cinder.volume.drivers.huawei.HuaweiVolumeDriver'}


def locked_volume_operation(f):
"""Lock decorator for volume operations.
Takes a named lock prior to executing the operation. The lock is named with
the operation executed and the id of the volume. This lock can then be used
by other operations to avoid operation conflicts on shared volumes.
Example use:
If a volume operation uses this decorator, it will block until the named
lock is free. This is used to protect concurrent operations on the same
volume e.g. delete VolA while create volume VolB from VolA is in progress.
"""
def lvo_inner1(inst, context, volume_id, **kwargs):
@utils.synchronized("%s-%s" % (volume_id, f.__name__), external=True)
def lvo_inner2(*_args, **_kwargs):
return f(*_args, **_kwargs)
return lvo_inner2(inst, context, volume_id, **kwargs)
return lvo_inner1


def locked_snapshot_operation(f):
"""Lock decorator for snapshot operations.
Takes a named lock prior to executing the operation. The lock is named with
the operation executed and the id of the snapshot. This lock can then be
used by other operations to avoid operation conflicts on shared snapshots.
Example use:
If a snapshot operation uses this decorator, it will block until the named
lock is free. This is used to protect concurrent operations on the same
snapshot e.g. delete SnapA while create volume VolA from SnapA is in
progress.
"""
def lso_inner1(inst, context, snapshot_id, **kwargs):
@utils.synchronized("%s-%s" % (snapshot_id, f.__name__), external=True)
def lso_inner2(*_args, **_kwargs):
return f(*_args, **_kwargs)
return lso_inner2(inst, context, snapshot_id, **kwargs)
return lso_inner1


class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""

Expand Down Expand Up @@ -264,15 +307,36 @@ def create_volume(self, context, volume_id, request_spec=None,

assert flow, _('Manager volume flow not retrieved')

flow.run(context.elevated())
if flow.state != states.SUCCESS:
raise exception.CinderException(_("Failed to successfully complete"
" manager volume workflow"))
if snapshot_id is not None:
# Make sure the snapshot is not deleted until we are done with it.
locked_action = "%s-%s" % (snapshot_id, 'delete_snapshot')
elif source_volid is not None:
# Make sure the volume is not deleted until we are done with it.
locked_action = "%s-%s" % (source_volid, 'delete_volume')
else:
locked_action = None

def _run_flow():
flow.run(context.elevated())
if flow.state != states.SUCCESS:
msg = _("Failed to successfully complete manager volume "
"workflow")
raise exception.CinderException(msg)

@utils.synchronized(locked_action, external=True)
def _run_flow_locked():
_run_flow()

if locked_action is None:
_run_flow()
else:
_run_flow_locked()

self._reset_stats()
return volume_id

@utils.require_driver_initialized
@locked_volume_operation
def delete_volume(self, context, volume_id):
"""Deletes and unexports volume."""
context = context.elevated()
Expand Down Expand Up @@ -401,6 +465,7 @@ def create_snapshot(self, context, volume_id, snapshot_id):
return snapshot_id

@utils.require_driver_initialized
@locked_snapshot_operation
def delete_snapshot(self, context, snapshot_id):
"""Deletes and unexports snapshot."""
caller_context = context
Expand Down

0 comments on commit 4f6e5fc

Please sign in to comment.