diff --git a/qa/tasks/cephfs/mount.py b/qa/tasks/cephfs/mount.py index bcc9aefd89543..1fffaa3ffea4e 100644 --- a/qa/tasks/cephfs/mount.py +++ b/qa/tasks/cephfs/mount.py @@ -208,6 +208,22 @@ def open_background(self, basename="background_file"): return rproc + def wait_for_dir_empty(self, dirname, timeout=30): + i = 0 + dirpath = os.path.join(self.mountpoint, dirname) + while i < timeout: + nr_entries = int(self.getfattr(dirpath, "ceph.dir.entries")) + if nr_entries == 0: + log.debug("Directory {0} seen empty from {1} after {2}s ".format( + dirname, self.client_id, i)) + return + else: + time.sleep(1) + i += 1 + + raise RuntimeError("Timed out after {0}s waiting for {1} to become empty from {2}".format( + i, dirname, self.client_id)) + def wait_for_visible(self, basename="background_file", timeout=30): i = 0 while i < timeout: diff --git a/qa/tasks/cephfs/test_volumes.py b/qa/tasks/cephfs/test_volumes.py index 11857a75a3a6b..30756e99506c7 100644 --- a/qa/tasks/cephfs/test_volumes.py +++ b/qa/tasks/cephfs/test_volumes.py @@ -14,6 +14,14 @@ class TestVolumes(CephFSTestCase): TEST_SUBVOLUME_PREFIX="subvolume" TEST_GROUP_PREFIX="group" TEST_SNAPSHOT_PREFIX="snapshot" + TEST_FILE_NAME_PREFIX="subvolume_file" + + # for filling subvolume with data + CLIENTS_REQUIRED = 1 + + # io defaults + DEFAULT_FILE_SIZE = 1 # MB + DEFAULT_NUMBER_OF_FILES = 1024 def _fs_cmd(self, *args): return self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", *args) @@ -51,6 +59,24 @@ def _get_subvolume_path(self, vol_name, subvol_name, group_name=None): def _delete_test_volume(self): self._fs_cmd("volume", "rm", self.volname) + def _do_subvolume_io(self, subvolume, number_of_files=DEFAULT_NUMBER_OF_FILES, + file_size=DEFAULT_FILE_SIZE): + # get subvolume path for IO + subvolpath = self._fs_cmd("subvolume", "getpath", self.volname, subvolume) + self.assertNotEqual(subvolpath, None) + subvolpath = subvolpath[1:].rstrip() # remove "/" prefix and any trailing newline + + log.debug("filling subvolume {0} with {1} files each {2}MB size".format(subvolume, number_of_files, file_size)) + for i in range(number_of_files): + filename = "{0}.{1}".format(TestVolumes.TEST_FILE_NAME_PREFIX, i) + self.mount_a.write_n_mb(os.path.join(subvolpath, filename), file_size) + + def _wait_for_trash_empty(self, timeout=30): + # XXX: construct the trash dir path (note that there is no mgr + # [sub]volume interface for this). + trashdir = os.path.join("./", "volumes", "_deleting") + self.mount_a.wait_for_dir_empty(trashdir) + def setUp(self): super(TestVolumes, self).setUp() self.volname = None @@ -83,6 +109,9 @@ def test_subvolume_create_and_rm(self): if ce.exitstatus != errno.ENOENT: raise + # verify trash dir is clean + self._wait_for_trash_empty() + def test_subvolume_create_idempotence(self): # create subvolume subvolume = self._generate_random_subvolume_name() @@ -94,6 +123,9 @@ def test_subvolume_create_idempotence(self): # remove subvolume self._fs_cmd("subvolume", "rm", self.volname, subvolume) + # verify trash dir is clean + self._wait_for_trash_empty() + def test_nonexistent_subvolume_rm(self): # remove non-existing subvolume subvolume = "non_existent_subvolume" @@ -134,6 +166,9 @@ def test_subvolume_create_and_rm_in_group(self): # remove subvolume self._fs_cmd("subvolume", "rm", self.volname, subvolume, group) + # verify trash dir is clean + self._wait_for_trash_empty() + # remove group self._fs_cmd("subvolumegroup", "rm", self.volname, group) @@ -250,8 +285,9 @@ def test_subvolume_create_with_desired_mode_in_group(self): self.assertEqual(actual_mode2, expected_mode2) self.assertEqual(actual_mode3, expected_mode2) - self._fs_cmd("subvolume", "rm", self.volname, subvol2, group) self._fs_cmd("subvolume", "rm", self.volname, subvol1, group) + self._fs_cmd("subvolume", "rm", self.volname, subvol2, group) + self._fs_cmd("subvolume", "rm", self.volname, subvol3, group) self._fs_cmd("subvolumegroup", "rm", self.volname, group) def test_nonexistent_subvolme_group_rm(self): @@ -285,6 +321,9 @@ def test_subvolume_snapshot_create_and_rm(self): # remove subvolume self._fs_cmd("subvolume", "rm", self.volname, subvolume) + # verify trash dir is clean + self._wait_for_trash_empty() + def test_subvolume_snapshot_create_idempotence(self): subvolume = self._generate_random_subvolume_name() snapshot = self._generate_random_snapshot_name() @@ -304,6 +343,9 @@ def test_subvolume_snapshot_create_idempotence(self): # remove subvolume self._fs_cmd("subvolume", "rm", self.volname, subvolume) + # verify trash dir is clean + self._wait_for_trash_empty() + def test_nonexistent_subvolume_snapshot_rm(self): subvolume = self._generate_random_subvolume_name() snapshot = self._generate_random_snapshot_name() @@ -330,6 +372,9 @@ def test_nonexistent_subvolume_snapshot_rm(self): # remove subvolume self._fs_cmd("subvolume", "rm", self.volname, subvolume) + # verify trash dir is clean + self._wait_for_trash_empty() + def test_subvolume_snapshot_in_group(self): subvolume = self._generate_random_subvolume_name() group = self._generate_random_group_name() @@ -350,6 +395,9 @@ def test_subvolume_snapshot_in_group(self): # remove subvolume self._fs_cmd("subvolume", "rm", self.volname, subvolume, group) + # verify trash dir is clean + self._wait_for_trash_empty() + # remove group self._fs_cmd("subvolumegroup", "rm", self.volname, group) @@ -373,6 +421,9 @@ def test_subvolume_group_snapshot_create_and_rm(self): # remove subvolume self._fs_cmd("subvolume", "rm", self.volname, subvolume, group) + # verify trash dir is clean + self._wait_for_trash_empty() + # remove group self._fs_cmd("subvolumegroup", "rm", self.volname, group) @@ -399,6 +450,9 @@ def test_subvolume_group_snapshot_idempotence(self): # remove subvolume self._fs_cmd("subvolume", "rm", self.volname, subvolume, group) + # verify trash dir is clean + self._wait_for_trash_empty() + # remove group self._fs_cmd("subvolumegroup", "rm", self.volname, group) @@ -429,5 +483,27 @@ def test_nonexistent_subvolume_group_snapshot_rm(self): # remove subvolume self._fs_cmd("subvolume", "rm", self.volname, subvolume, group) + # verify trash dir is clean + self._wait_for_trash_empty() + # remove group self._fs_cmd("subvolumegroup", "rm", self.volname, group) + + def test_async_subvolume_rm(self): + subvolume = self._generate_random_subvolume_name() + + # create subvolume + self._fs_cmd("subvolume", "create", self.volname, subvolume) + + # fill subvolume w/ some data + self._do_subvolume_io(subvolume) + + self.mount_a.umount_wait() + + # remove subvolume + self._fs_cmd("subvolume", "rm", self.volname, subvolume) + + self.mount_a.mount() + + # verify trash dir is clean + self._wait_for_trash_empty() diff --git a/src/client/Client.cc b/src/client/Client.cc index 5f5e04446677f..0e79823c5980a 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -6652,8 +6652,6 @@ int Client::mkdirs(const char *relpath, mode_t mode, const UserPerm& perms) break; cur.swap(next); } - //check that we have work left to do - if (i==path.depth()) return -EEXIST; if (r!=-ENOENT) return r; ldout(cct, 20) << __func__ << " got through " << i << " directories on path " << relpath << dendl; //make new directory at each level diff --git a/src/pybind/mgr/volumes/fs/purge_queue.py b/src/pybind/mgr/volumes/fs/purge_queue.py new file mode 100644 index 0000000000000..8a7429aa6497d --- /dev/null +++ b/src/pybind/mgr/volumes/fs/purge_queue.py @@ -0,0 +1,198 @@ +import time +import logging +import threading +import traceback +from collections import deque + +log = logging.getLogger(__name__) + +class PurgeQueueBase(object): + """ + Base class for implementing purge queue strategies. + """ + class PurgeThread(threading.Thread): + def __init__(self, name, purge_fn): + self.purge_fn = purge_fn + # event object to cancel ongoing purge + self.cancel_event = threading.Event() + threading.Thread.__init__(self, name=name) + + def run(self): + try: + self.purge_fn() + except Exception as e: + trace = "".join(traceback.format_exception(None, e, e.__traceback__)) + log.error("purge queue thread encountered fatal error:\n"+trace) + + def cancel_job(self): + self.cancel_event.set() + + def should_cancel(self): + return self.cancel_event.isSet() + + def reset_cancel(self): + self.cancel_event.clear() + + def __init__(self, volume_client): + self.vc = volume_client + # volumes whose subvolumes need to be purged + self.q = deque() + # job tracking + self.jobs = {} + # lock, cv for kickstarting purge + self.lock = threading.Lock() + self.cv = threading.Condition(self.lock) + # lock, cv for purge cancellation + self.waiting = False + self.c_lock = threading.Lock() + self.c_cv = threading.Condition(self.c_lock) + + def queue_purge_job(self, volname): + with self.lock: + if not self.q.count(volname): + self.q.append(volname) + self.jobs[volname] = [] + self.cv.notifyAll() + + def cancel_purge_job(self, volname): + log.info("cancelling purge jobs for volume '{0}'".format(volname)) + self.lock.acquire() + unlock = True + try: + if not self.q.count(volname): + return + self.q.remove(volname) + if not self.jobs.get(volname, []): + return + # cancel in-progress purge operation and wait until complete + for j in self.jobs[volname]: + j[1].cancel_job() + # wait for cancellation to complete + with self.c_lock: + unlock = False + self.waiting = True + self.lock.release() + while self.waiting: + log.debug("waiting for {0} in-progress purge jobs for volume '{1}' to " \ + "cancel".format(len(self.jobs[volname]), volname)) + self.c_cv.wait() + finally: + if unlock: + self.lock.release() + + def register_job(self, volname, purge_dir): + log.debug("registering purge job: {0}.{1}".format(volname, purge_dir)) + + thread_id = threading.currentThread() + self.jobs[volname].append((purge_dir, thread_id)) + + def unregister_job(self, volname, purge_dir): + log.debug("unregistering purge job: {0}.{1}".format(volname, purge_dir)) + + thread_id = threading.currentThread() + self.jobs[volname].remove((purge_dir, thread_id)) + + cancelled = thread_id.should_cancel() + thread_id.reset_cancel() + + # wake up cancellation waiters if needed + if not self.jobs[volname] and cancelled: + logging.info("waking up cancellation waiters") + self.jobs.pop(volname) + with self.c_lock: + self.waiting = False + self.c_cv.notifyAll() + + def get_trash_entry_for_volume(self, volname): + log.debug("fetching trash entry for volume '{0}'".format(volname)) + + exclude_entries = [v[0] for v in self.jobs[volname]] + ret = self.vc.get_subvolume_trash_entry( + None, vol_name=volname, exclude_entries=exclude_entries) + if not ret[0] == 0: + log.error("error fetching trash entry for volume '{0}': {1}".format(volname), ret[0]) + return ret[0], None + return 0, ret[1] + + def purge_trash_entry_for_volume(self, volname, purge_dir): + log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname)) + + thread_id = threading.currentThread() + ret = self.vc.purge_subvolume_trash_entry( + None, vol_name=volname, purge_dir=purge_dir, should_cancel=lambda: thread_id.should_cancel()) + return ret[0] + +class ThreadPoolPurgeQueueMixin(PurgeQueueBase): + """ + Purge queue mixin class maintaining a pool of threads for purging trash entries. + Subvolumes are chosen from volumes in a round robin fashion. If some of the purge + entries (belonging to a set of volumes) have huge directory tree's (such as, lots + of small files in a directory w/ deep directory trees), this model may lead to + _all_ threads purging entries for one volume (starving other volumes). + """ + def __init__(self, volume_client, tp_size): + super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client) + self.threads = [] + for i in range(tp_size): + self.threads.append( + PurgeQueueBase.PurgeThread(name="purgejob.{}".format(i), purge_fn=self.run)) + self.threads[-1].start() + + def pick_purge_dir_from_volume(self): + log.debug("processing {0} purge job entries".format(len(self.q))) + nr_vols = len(self.q) + to_remove = [] + to_purge = None, None + while nr_vols > 0: + volname = self.q[0] + # do this now so that the other thread picks up trash entry + # for next volume. + self.q.rotate(1) + ret, purge_dir = self.get_trash_entry_for_volume(volname) + if purge_dir: + to_purge = volname, purge_dir + break + # this is an optimization when for a given volume there are no more + # entries in trash and no purge operations are in progress. in such + # a case we remove the volume from the tracking list so as to: + # + # a. not query the filesystem for trash entries over and over again + # b. keep the filesystem connection idle so that it can be freed + # from the connection pool + # + # if at all there are subvolume deletes, the volume gets added again + # to the tracking list and the purge operations kickstarts. + # note that, we do not iterate the volume list fully if there is a + # purge entry to process (that will take place eventually). + if ret == 0 and not purge_dir and not self.jobs[volname]: + to_remove.append(volname) + nr_vols -= 1 + for vol in to_remove: + log.debug("auto removing volume '{0}' from purge job".format(vol)) + self.q.remove(vol) + self.jobs.pop(vol) + return to_purge + + def get_next_trash_entry(self): + while True: + # wait till there's a purge job + while not self.q: + log.debug("purge job list empty, waiting...") + self.cv.wait() + volname, purge_dir = self.pick_purge_dir_from_volume() + if purge_dir: + return volname, purge_dir + log.debug("no purge jobs available, waiting...") + self.cv.wait() + + def run(self): + while True: + with self.lock: + volname, purge_dir = self.get_next_trash_entry() + self.register_job(volname, purge_dir) + ret = self.purge_trash_entry_for_volume(volname, purge_dir) + if ret != 0: + log.warn("failed to purge {0}.{1}".format(volname, purge_dir)) + with self.lock: + self.unregister_job(volname, purge_dir) + time.sleep(1) diff --git a/src/pybind/mgr/volumes/fs/subvolspec.py b/src/pybind/mgr/volumes/fs/subvolspec.py index 6efb24ca8679f..60fb6d06af009 100644 --- a/src/pybind/mgr/volumes/fs/subvolspec.py +++ b/src/pybind/mgr/volumes/fs/subvolspec.py @@ -1,4 +1,5 @@ import os +import uuid class SubvolumeSpec(object): """ @@ -48,42 +49,42 @@ def subvolume_path(self): """ return the subvolume path from subvolume specification """ - return os.path.join(self.subvolume_prefix, self.groupid, self.subvolumeid) + return os.path.join(self.group_path, self.subvolumeid.encode('utf-8')) @property def group_path(self): """ return the group path from subvolume specification """ - return os.path.join(self.subvolume_prefix, self.groupid) + return os.path.join(self.subvolume_prefix.encode('utf-8'), self.groupid.encode('utf-8')) @property def trash_path(self): """ return the trash path from subvolume specification """ - return os.path.join(self.subvolume_prefix, "_deleting", self.subvolumeid) + return os.path.join(self.subvolume_prefix.encode('utf-8'), b"_deleting", self.subvolumeid.encode('utf-8')) @property - def fs_namespace(self): + def unique_trash_path(self): """ - return a filesystem namespace by stashing pool namespace prefix and subvolume-id + return a unique trash directory entry path """ - return "{0}{1}".format(self.pool_ns_prefix, self.subvolumeid) + return os.path.join(self.subvolume_prefix.encode('utf-8'), b"_deleting", str(uuid.uuid4()).encode('utf-8')) @property - def group_dir(self): + def fs_namespace(self): """ - return the group directory path + return a filesystem namespace by stashing pool namespace prefix and subvolume-id """ - return self.subvolume_prefix + return "{0}{1}".format(self.pool_ns_prefix, self.subvolumeid) @property def trash_dir(self): """ return the trash directory path """ - return os.path.join(self.subvolume_prefix, "_deleting") + return os.path.join(self.subvolume_prefix.encode('utf-8'), b"_deleting") def make_subvol_snap_path(self, snapdir, snapname): """ diff --git a/src/pybind/mgr/volumes/fs/subvolume.py b/src/pybind/mgr/volumes/fs/subvolume.py index dba8201419acb..6d0232c56e1fe 100644 --- a/src/pybind/mgr/volumes/fs/subvolume.py +++ b/src/pybind/mgr/volumes/fs/subvolume.py @@ -33,31 +33,27 @@ class SubVolume(object): """ - def __init__(self, mgr, fs_name=None): - self.fs = None - self.fs_name = fs_name - self.connected = False - + def __init__(self, mgr, fs_handle): + self.fs = fs_handle self.rados = mgr.rados - def _mkdir_p(self, path, mode=0o755): + def _get_single_dir_entry(self, dir_path, exclude=[]): + """ + Return a directory entry in a given directory excluding passed + in entries. + """ + exclude.extend((b".", b"..")) try: - self.fs.stat(path) - except cephfs.ObjectNotFound: - pass - else: - return + with self.fs.opendir(dir_path) as d: + entry = self.fs.readdir(d) + while entry: + if entry.d_name not in exclude and entry.is_dir(): + return entry.d_name + entry = self.fs.readdir(d) + return None + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) - parts = path.split(os.path.sep) - - for i in range(1, len(parts) + 1): - subpath = os.path.join(*parts[0:i]) - try: - self.fs.stat(subpath) - except cephfs.ObjectNotFound: - self.fs.mkdir(subpath, mode) - except cephfs.Error as e: - raise VolumeException(e.args[0], e.args[1]) ### basic subvolume operations @@ -77,7 +73,7 @@ def create_subvolume(self, spec, size=None, namespace_isolated=True, mode=0o755, subvolpath = spec.subvolume_path log.info("creating subvolume with path: {0}".format(subvolpath)) - self._mkdir_p(subvolpath, mode) + self.fs.mkdirs(subvolpath, mode) if size is not None: self.fs.setxattr(subvolpath, 'ceph.quota.max_bytes', str(size).encode('utf-8'), 0) @@ -102,8 +98,8 @@ def create_subvolume(self, spec, size=None, namespace_isolated=True, mode=0o755, def remove_subvolume(self, spec, force): """ Make a subvolume inaccessible to guests. This function is idempotent. - This is the fast part of tearing down a subvolume: you must also later - call purge_subvolume, which is the slow part. + This is the fast part of tearing down a subvolume. The subvolume will + get purged in the background. :param spec: subvolume path specification :param force: flag to ignore non-existent path (never raise exception) @@ -115,9 +111,12 @@ def remove_subvolume(self, spec, force): # Create the trash directory if it doesn't already exist trashdir = spec.trash_dir - self._mkdir_p(trashdir) + self.fs.mkdirs(trashdir, 0o700) - trashpath = spec.trash_path + # mangle the trash directroy entry to a random string so that subsequent + # subvolume create and delete with same name moves the subvolume directory + # to a unique trash dir (else, rename() could fail if the trash dir exist). + trashpath = spec.unique_trash_path try: self.fs.rename(subvolpath, trashpath) except cephfs.ObjectNotFound: @@ -125,12 +124,11 @@ def remove_subvolume(self, spec, force): raise VolumeException( -errno.ENOENT, "Subvolume '{0}' not found, cannot remove it".format(spec.subvolume_id)) except cephfs.Error as e: - raise VolumeException(e.args[0], e.args[1]) + raise VolumeException(-e.args[0], e.args[1]) - def purge_subvolume(self, spec): + def purge_subvolume(self, spec, should_cancel): """ - Finish clearing up a subvolume that was previously passed to delete_subvolume. This - function is idempotent. + Finish clearing up a subvolume from the trash directory. """ def rmtree(root_path): @@ -140,15 +138,11 @@ def rmtree(root_path): except cephfs.ObjectNotFound: return except cephfs.Error as e: - raise VolumeException(e.args[0], e.args[1]) + raise VolumeException(-e.args[0], e.args[1]) d = self.fs.readdir(dir_handle) - while d: - d_name = d.d_name.decode('utf-8') - if d_name not in [".", ".."]: - # Do not use os.path.join because it is sensitive - # to string encoding, we just pass through dnames - # as byte arrays - d_full = "{0}/{1}".format(root_path, d_name) + while d and not should_cancel(): + if d.d_name not in (b".", b".."): + d_full = os.path.join(root_path, d.d_name) if d.is_dir(): rmtree(d_full) else: @@ -156,10 +150,17 @@ def rmtree(root_path): d = self.fs.readdir(dir_handle) self.fs.closedir(dir_handle) - self.fs.rmdir(root_path) + # remove the directory only if we were not asked to cancel + # (else we would fail to remove this anyway) + if not should_cancel(): + self.fs.rmdir(root_path) trashpath = spec.trash_path - rmtree(trashpath) + # catch any unlink errors + try: + rmtree(trashpath) + except cephfs.Error as e: + raise VolumeException(-e.args[0], e.args[1]) def get_subvolume_path(self, spec): path = spec.subvolume_path @@ -168,14 +169,14 @@ def get_subvolume_path(self, spec): except cephfs.ObjectNotFound: return None except cephfs.Error as e: - raise VolumeException(e.args[0], e.args[1]) + raise VolumeException(-e.args[0], e.args[1]) return path ### group operations def create_group(self, spec, mode=0o755, pool=None): path = spec.group_path - self._mkdir_p(path, mode) + self.fs.mkdirs(path, mode) if not pool: pool = self._get_ancestor_xattr(path, "ceph.dir.layout.pool") self.fs.setxattr(path, 'ceph.dir.layout.pool', pool.encode('utf-8'), 0) @@ -188,7 +189,7 @@ def remove_group(self, spec, force): if not force: raise VolumeException(-errno.ENOENT, "Subvolume group '{0}' not found".format(spec.group_id)) except cephfs.Error as e: - raise VolumeException(e.args[0], e.args[1]) + raise VolumeException(-e.args[0], e.args[1]) def get_group_path(self, spec): path = spec.group_path @@ -222,7 +223,7 @@ def _snapshot_create(self, snappath, mode=0o755): except cephfs.ObjectNotFound: self.fs.mkdir(snappath, mode) except cephfs.Error as e: - raise VolumeException(e.args[0], e.args[1]) + raise VolumeException(-e.args[0], e.args[1]) else: log.warn("Snapshot '{0}' already exists".format(snappath)) @@ -237,7 +238,7 @@ def _snapshot_delete(self, snappath, force): if not force: raise VolumeException(-errno.ENOENT, "Snapshot '{0}' not found, cannot remove it".format(snappath)) except cephfs.Error as e: - raise VolumeException(e.args[0], e.args[1]) + raise VolumeException(-e.args[0], e.args[1]) def create_subvolume_snapshot(self, spec, snapname, mode=0o755): snappath = spec.make_subvol_snap_path(self.rados.conf_get('client_snapdir'), snapname) @@ -255,31 +256,20 @@ def remove_group_snapshot(self, spec, snapname, force): snappath = spec.make_group_snap_path(self.rados.conf_get('client_snapdir'), snapname) return self._snapshot_delete(snappath, force) - ### context manager routines + def get_trash_entry(self, spec, exclude): + try: + trashdir = spec.trash_dir + return self._get_single_dir_entry(trashdir, exclude) + except VolumeException as ve: + if ve.errno == -errno.ENOENT: + # trash dir does not exist yet, signal success + return None + raise - def connect(self): - log.debug("Connecting to cephfs...") - self.fs = cephfs.LibCephFS(rados_inst=self.rados) - log.debug("CephFS initializing...") - self.fs.init() - log.debug("CephFS mounting...") - self.fs.mount(filesystem_name=self.fs_name.encode('utf-8')) - log.debug("Connection to cephfs complete") - - def disconnect(self): - log.info("disconnect") - if self.fs: - log.debug("Disconnecting cephfs...") - self.fs.shutdown() - self.fs = None - log.debug("Disconnecting cephfs complete") + ### context manager routines def __enter__(self): - self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): - self.disconnect() - - def __del__(self): - self.disconnect() + pass diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index 8cc7ef26fa058..e3e4501c25287 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -1,6 +1,14 @@ import json +import time import errno import logging +from threading import Lock +try: + # py2 + from threading import _Timer as Timer +except ImportError: + #py3 + from threading import Timer import cephfs import orchestrator @@ -8,12 +16,161 @@ from .subvolspec import SubvolumeSpec from .subvolume import SubVolume from .exception import VolumeException +from .purge_queue import ThreadPoolPurgeQueueMixin log = logging.getLogger(__name__) +class ConnectionPool(object): + class Connection(object): + def __init__(self, mgr, fs_name): + self.fs = None + self.mgr = mgr + self.fs_name = fs_name + self.ops_in_progress = 0 + self.last_used = time.time() + self.fs_id = self.get_fs_id() + + def get_fs_id(self): + fs_map = self.mgr.get('fs_map') + for fs in fs_map['filesystems']: + if fs['mdsmap']['fs_name'] == self.fs_name: + return fs['id'] + raise VolumeException( + -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name)) + + def get_fs_handle(self): + self.last_used = time.time() + self.ops_in_progress += 1 + return self.fs + + def put_fs_handle(self): + assert self.ops_in_progress > 0 + self.ops_in_progress -= 1 + + def del_fs_handle(self): + if self.is_connection_valid(): + self.disconnect() + else: + self.abort() + + def is_connection_valid(self): + fs_id = None + try: + fs_id = self.get_fs_id() + except: + # the filesystem does not exist now -- connection is not valid. + pass + return self.fs_id == fs_id + + def is_connection_idle(self, timeout): + return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout)) + + def connect(self): + assert self.ops_in_progress == 0 + log.debug("Connecting to cephfs '{0}'".format(self.fs_name)) + self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados) + log.debug("CephFS initializing...") + self.fs.init() + log.debug("CephFS mounting...") + self.fs.mount(filesystem_name=self.fs_name.encode('utf-8')) + log.debug("Connection to cephfs '{0}' complete".format(self.fs_name)) + + def disconnect(self): + assert self.ops_in_progress == 0 + log.info("disconnecting from cephfs '{0}'".format(self.fs_name)) + self.fs.shutdown() + self.fs = None + + def abort(self): + assert self.ops_in_progress == 0 + log.info("aborting connection from cephfs '{0}'".format(self.fs_name)) + self.fs.abort_conn() + self.fs = None + + class RTimer(Timer): + """ + recurring timer variant of Timer + """ + def run(self): + while not self.finished.is_set(): + self.finished.wait(self.interval) + self.function(*self.args, **self.kwargs) + self.finished.set() + + # TODO: make this configurable + TIMER_TASK_RUN_INTERVAL = 30.0 # seconds + CONNECTION_IDLE_INTERVAL = 60.0 # seconds + + def __init__(self, mgr): + self.mgr = mgr + self.connections = {} + self.lock = Lock() + self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL, + self.cleanup_connections) + self.timer_task.start() + + def cleanup_connections(self): + with self.lock: + log.info("scanning for idle connections..") + idle_fs = [fs_name for fs_name,conn in self.connections.iteritems() + if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)] + for fs_name in idle_fs: + log.info("cleaning up connection for '{}'".format(fs_name)) + self._del_fs_handle(fs_name) + + def get_fs_handle(self, fs_name): + with self.lock: + conn = None + try: + conn = self.connections.get(fs_name, None) + if conn: + if conn.is_connection_valid(): + return conn.get_fs_handle() + else: + # filesystem id changed beneath us (or the filesystem does not exist). + # this is possible if the filesystem got removed (and recreated with + # same name) via "ceph fs rm/new" mon command. + log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name)) + self._del_fs_handle(fs_name) + conn = ConnectionPool.Connection(self.mgr, fs_name) + conn.connect() + except cephfs.Error as e: + # try to provide a better error string if possible + if e.args[0] == errno.ENOENT: + raise VolumeException( + -errno.ENOENT, "Volume '{0}' not found".format(fs_name)) + raise VolumeException(-e.args[0], e.args[1]) + self.connections[fs_name] = conn + return conn.get_fs_handle() + + def put_fs_handle(self, fs_name): + with self.lock: + conn = self.connections.get(fs_name, None) + if conn: + conn.put_fs_handle() + + def _del_fs_handle(self, fs_name): + conn = self.connections.pop(fs_name, None) + if conn: + conn.del_fs_handle() + def del_fs_handle(self, fs_name): + with self.lock: + self._del_fs_handle(fs_name) + class VolumeClient(object): def __init__(self, mgr): self.mgr = mgr + self.connection_pool = ConnectionPool(self.mgr) + # TODO: make thread pool size configurable + self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) + # on startup, queue purge job for available volumes to kickstart + # purge for leftover subvolume entries in trash. note that, if the + # trash directory does not exist or if there are no purge entries + # available for a volume, the volume is removed from the purge + # job list. + fs_map = self.mgr.get('fs_map') + for fs in fs_map['filesystems']: + self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name']) def gen_pool_names(self, volname): """ @@ -70,6 +227,10 @@ def create_filesystem(self, fs_name, metadata_pool, data_pool): return self.mgr.mon_command(command) def remove_filesystem(self, fs_name): + command = {'prefix': 'fs fail', 'fs_name': fs_name} + r, outb, outs = self.mgr.mon_command(command) + if r != 0: + return r, outb, outs command = {'prefix': 'fs rm', 'fs_name': fs_name, 'yes_i_really_mean_it': True} return self.mgr.mon_command(command) @@ -89,18 +250,6 @@ def create_mds(self, fs_name): return -errno.EINVAL, "", str(e) return 0, "", "" - def set_mds_down(self, fs_name): - command = {'prefix': 'fs set', 'fs_name': fs_name, 'var': 'cluster_down', 'val': 'true'} - r, outb, outs = self.mgr.mon_command(command) - if r != 0: - return r, outb, outs - for mds in self.get_mds_names(fs_name): - command = {'prefix': 'mds fail', 'role_or_gid': mds} - r, outb, outs = self.mgr.mon_command(command) - if r != 0: - return r, outb, outs - return 0, "", "" - ### volume operations -- create, rm, ls def create_volume(self, volname, size=None): @@ -127,6 +276,8 @@ def delete_volume(self, volname): """ delete the given module (tear down mds, remove filesystem) """ + self.purge_queue.cancel_purge_job(volname) + self.connection_pool.del_fs_handle(volname) # Tear down MDS daemons try: completion = self.mgr.remove_stateless_service("mds", volname) @@ -143,9 +294,6 @@ def delete_volume(self, volname): # In case orchestrator didn't tear down MDS daemons cleanly, or # there was no orchestrator, we force the daemons down. if self.volume_exists(volname): - r, outb, outs = self.set_mds_down(volname) - if r != 0: - return r, outb, outs r, outb, outs = self.remove_filesystem(volname) if r != 0: return r, outb, outs @@ -176,16 +324,51 @@ def octal_str_to_decimal_int(mode): except ValueError: raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode)) + def connection_pool_wrap(func): + """ + decorator that wraps subvolume calls by fetching filesystem handle + from the connection pool when fs_handle argument is empty, otherwise + just invoke func with the passed in filesystem handle. Also handles + call made to non-existent volumes (only when fs_handle is empty). + """ + def conn_wrapper(self, fs_handle, **kwargs): + fs_h = fs_handle + fs_name = kwargs['vol_name'] + # note that force arg is available for remove type commands + force = kwargs.get('force', False) + + # fetch the connection from the pool + if not fs_handle: + try: + fs_h = self.connection_pool.get_fs_handle(fs_name) + except VolumeException as ve: + if not force: + return self.volume_exception_to_retval(ve) + return 0, "", "" + + # invoke the actual routine w/ fs handle + result = func(self, fs_h, **kwargs) + + # hand over the connection back to the pool + if fs_h: + self.connection_pool.put_fs_handle(fs_name) + return result + return conn_wrapper + ### subvolume operations - def create_subvolume(self, volname, subvolname, groupname, size, mode='755', pool=None): - ret = 0, "", "" + @connection_pool_wrap + def create_subvolume(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + size = kwargs['size'] + pool = kwargs['pool_layout'] + mode = kwargs['mode'] + try: - if not self.volume_exists(volname): - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \ - "volume create` before trying to create subvolumes".format(volname)) - with SubVolume(self.mgr, fs_name=volname) as sv: + with SubVolume(self.mgr, fs_handle) as sv: spec = SubvolumeSpec(subvolname, groupname) if not self.group_exists(sv, spec): raise VolumeException( @@ -196,36 +379,35 @@ def create_subvolume(self, volname, subvolname, groupname, size, mode='755', poo ret = self.volume_exception_to_retval(ve) return ret - def remove_subvolume(self, volname, subvolname, groupname, force): - ret = 0, "", "" + @connection_pool_wrap + def remove_subvolume(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + force = kwargs['force'] try: - fs = self.get_fs(volname) - if fs: - with SubVolume(self.mgr, fs_name=volname) as sv: - spec = SubvolumeSpec(subvolname, groupname) - if self.group_exists(sv, spec): - sv.remove_subvolume(spec, force) - sv.purge_subvolume(spec) - elif not force: - raise VolumeException( - -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \ - "subvolume '{1}'".format(groupname, subvolname)) - elif not force: - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \ - "'{1}'".format(volname, subvolname)) + with SubVolume(self.mgr, fs_handle) as sv: + spec = SubvolumeSpec(subvolname, groupname) + if self.group_exists(sv, spec): + sv.remove_subvolume(spec, force) + self.purge_queue.queue_purge_job(volname) + elif not force: + raise VolumeException( + -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \ + "subvolume '{1}'".format(groupname, subvolname)) except VolumeException as ve: ret = self.volume_exception_to_retval(ve) return ret - def subvolume_getpath(self, volname, subvolname, groupname): - ret = None + @connection_pool_wrap + def subvolume_getpath(self, fs_handle, **kwargs): + ret = None + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] try: - if not self.volume_exists(volname): - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found".format(volname)) - - with SubVolume(self.mgr, fs_name=volname) as sv: + with SubVolume(self.mgr, fs_handle) as sv: spec = SubvolumeSpec(subvolname, groupname) if not self.group_exists(sv, spec): raise VolumeException( @@ -241,15 +423,16 @@ def subvolume_getpath(self, volname, subvolname, groupname): ### subvolume snapshot - def create_subvolume_snapshot(self, volname, subvolname, snapname, groupname): - ret = 0, "", "" - try: - if not self.volume_exists(volname): - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \ - "'{1}'".format(volname, snapname)) + @connection_pool_wrap + def create_subvolume_snapshot(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] - with SubVolume(self.mgr, fs_name=volname) as sv: + try: + with SubVolume(self.mgr, fs_handle) as sv: spec = SubvolumeSpec(subvolname, groupname) if not self.group_exists(sv, spec): raise VolumeException( @@ -264,76 +447,76 @@ def create_subvolume_snapshot(self, volname, subvolname, snapname, groupname): ret = self.volume_exception_to_retval(ve) return ret - def remove_subvolume_snapshot(self, volname, subvolname, snapname, groupname, force): - ret = 0, "", "" + @connection_pool_wrap + def remove_subvolume_snapshot(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + force = kwargs['force'] try: - if self.volume_exists(volname): - with SubVolume(self.mgr, fs_name=volname) as sv: - spec = SubvolumeSpec(subvolname, groupname) - if self.group_exists(sv, spec): - if sv.get_subvolume_path(spec): - sv.remove_subvolume_snapshot(spec, snapname, force) - elif not force: - raise VolumeException( - -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \ - "subvolume snapshot '{1}'".format(subvolname, snapname)) + with SubVolume(self.mgr, fs_handle) as sv: + spec = SubvolumeSpec(subvolname, groupname) + if self.group_exists(sv, spec): + if sv.get_subvolume_path(spec): + sv.remove_subvolume_snapshot(spec, snapname, force) elif not force: raise VolumeException( - -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \ - "remove subvolume snapshot '{1}'".format(groupname, snapname)) - elif not force: - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \ - "snapshot '{1}'".format(volname, snapname)) + -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \ + "subvolume snapshot '{1}'".format(subvolname, snapname)) + elif not force: + raise VolumeException( + -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \ + "remove subvolume snapshot '{1}'".format(groupname, snapname)) except VolumeException as ve: ret = self.volume_exception_to_retval(ve) return ret ### group operations - def create_subvolume_group(self, volname, groupname, mode='755', pool=None): - ret = 0, "", "" - try: - if not self.volume_exists(volname): - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \ - "volume create` before trying to create subvolume groups".format(volname)) + @connection_pool_wrap + def create_subvolume_group(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + pool = kwargs['pool_layout'] + mode = kwargs['mode'] + try: # TODO: validate that subvol size fits in volume size - with SubVolume(self.mgr, fs_name=volname) as sv: + with SubVolume(self.mgr, fs_handle) as sv: spec = SubvolumeSpec("", groupname) sv.create_group(spec, pool=pool, mode=self.octal_str_to_decimal_int(mode)) except VolumeException as ve: ret = self.volume_exception_to_retval(ve) return ret - def remove_subvolume_group(self, volname, groupname, force): - ret = 0, "", "" + @connection_pool_wrap + def remove_subvolume_group(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + force = kwargs['force'] try: - if self.volume_exists(volname): - with SubVolume(self.mgr, fs_name=volname) as sv: - # TODO: check whether there are no subvolumes in the group - spec = SubvolumeSpec("", groupname) - sv.remove_group(spec, force) - elif not force: - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \ - "group '{0}'".format(volname, groupname)) + with SubVolume(self.mgr, fs_handle) as sv: + # TODO: check whether there are no subvolumes in the group + spec = SubvolumeSpec("", groupname) + sv.remove_group(spec, force) except VolumeException as ve: ret = self.volume_exception_to_retval(ve) return ret ### group snapshot - def create_subvolume_group_snapshot(self, volname, groupname, snapname): - ret = 0, "", "" + @connection_pool_wrap + def create_subvolume_group_snapshot(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + snapname = kwargs['snap_name'] try: - if not self.volume_exists(volname): - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \ - "'{1}'".format(volname, snapname)) - - with SubVolume(self.mgr, fs_name=volname) as sv: + with SubVolume(self.mgr, fs_handle) as sv: spec = SubvolumeSpec("", groupname) if not self.group_exists(sv, spec): raise VolumeException( @@ -344,22 +527,52 @@ def create_subvolume_group_snapshot(self, volname, groupname, snapname): ret = self.volume_exception_to_retval(ve) return ret - def remove_subvolume_group_snapshot(self, volname, groupname, snapname, force): + @connection_pool_wrap + def remove_subvolume_group_snapshot(self, fs_handle, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + groupname = kwargs['group_name'] + snapname = kwargs['snap_name'] + force = kwargs['force'] + try: + with SubVolume(self.mgr, fs_handle) as sv: + spec = SubvolumeSpec("", groupname) + if self.group_exists(sv, spec): + sv.remove_group_snapshot(spec, snapname, force) + elif not force: + raise VolumeException( + -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \ + "remove it".format(groupname)) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + @connection_pool_wrap + def get_subvolume_trash_entry(self, fs_handle, **kwargs): + ret = None + volname = kwargs['vol_name'] + exclude = kwargs.get('exclude_entries', []) + + try: + with SubVolume(self.mgr, fs_handle) as sv: + spec = SubvolumeSpec("", "") + path = sv.get_trash_entry(spec, exclude) + ret = 0, path, "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + @connection_pool_wrap + def purge_subvolume_trash_entry(self, fs_handle, **kwargs): ret = 0, "", "" + volname = kwargs['vol_name'] + purge_dir = kwargs['purge_dir'] + should_cancel = kwargs.get('should_cancel', lambda: False) + try: - if self.volume_exists(volname): - with SubVolume(self.mgr, fs_name=volname) as sv: - spec = SubvolumeSpec("", groupname) - if self.group_exists(sv, spec): - sv.remove_group_snapshot(spec, snapname, force) - elif not force: - raise VolumeException( - -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \ - "remove it".format(groupname)) - elif not force: - raise VolumeException( - -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \ - "snapshot '{1}'".format(volname, snapname)) + with SubVolume(self.mgr, fs_handle) as sv: + spec = SubvolumeSpec(purge_dir.decode('utf-8'), "") + sv.purge_subvolume(spec, should_cancel) except VolumeException as ve: ret = self.volume_exception_to_retval(ve) return ret diff --git a/src/pybind/mgr/volumes/module.py b/src/pybind/mgr/volumes/module.py index 6548cc0c0c844..167b7e0044b17 100644 --- a/src/pybind/mgr/volumes/module.py +++ b/src/pybind/mgr/volumes/module.py @@ -6,7 +6,6 @@ except ImportError: import Queue -import cephfs from mgr_module import MgrModule import orchestrator @@ -200,82 +199,63 @@ def _cmd_fs_subvolumegroup_create(self, inbuf, cmd): """ :return: a 3-tuple of return code(int), empty string(str), error message (str) """ - vol_name = cmd['vol_name'] - group_name = cmd['group_name'] - pool_layout = cmd.get('pool_layout', None) - mode = cmd.get('mode', '755') - - return self.vc.create_subvolume_group(vol_name, group_name, mode=mode, pool=pool_layout) + return self.vc.create_subvolume_group( + None, vol_name=cmd['vol_name'], group_name=cmd['group_name'], + pool_layout=cmd.get('pool_layout', None), mode=cmd.get('mode', '755')) def _cmd_fs_subvolumegroup_rm(self, inbuf, cmd): """ :return: a 3-tuple of return code(int), empty string(str), error message (str) """ - vol_name = cmd['vol_name'] - group_name = cmd['group_name'] - force = cmd.get('force', False) - - return self.vc.remove_subvolume_group(vol_name, group_name, force) + return self.vc.remove_subvolume_group(None, vol_name=cmd['vol_name'], + group_name=cmd['group_name'], + force=cmd.get('force', False)) def _cmd_fs_subvolume_create(self, inbuf, cmd): """ :return: a 3-tuple of return code(int), empty string(str), error message (str) """ - vol_name = cmd['vol_name'] - sub_name = cmd['sub_name'] - size = cmd.get('size', None) - group_name = cmd.get('group_name', None) - pool_layout = cmd.get('pool_layout', None) - mode = cmd.get('mode', '755') - - return self.vc.create_subvolume(vol_name, sub_name, group_name, size, mode=mode, pool=pool_layout) + return self.vc.create_subvolume(None, vol_name=cmd['vol_name'], + sub_name=cmd['sub_name'], + group_name=cmd.get('group_name', None), + size=cmd.get('size', None), + pool_layout=cmd.get('pool_layout', None), + mode=cmd.get('mode', '755')) def _cmd_fs_subvolume_rm(self, inbuf, cmd): """ :return: a 3-tuple of return code(int), empty string(str), error message (str) """ - vol_name = cmd['vol_name'] - sub_name = cmd['sub_name'] - force = cmd.get('force', False) - group_name = cmd.get('group_name', None) - - return self.vc.remove_subvolume(vol_name, sub_name, group_name, force) + return self.vc.remove_subvolume(None, vol_name=cmd['vol_name'], + sub_name=cmd['sub_name'], + group_name=cmd.get('group_name', None), + force=cmd.get('force', False)) def _cmd_fs_subvolume_getpath(self, inbuf, cmd): - vol_name = cmd['vol_name'] - sub_name = cmd['sub_name'] - group_name = cmd.get('group_name', None) - - return self.vc.subvolume_getpath(vol_name, sub_name, group_name) + return self.vc.subvolume_getpath(None, vol_name=cmd['vol_name'], + sub_name=cmd['sub_name'], + group_name=cmd.get('group_name', None)) def _cmd_fs_subvolumegroup_snapshot_create(self, inbuf, cmd): - vol_name = cmd['vol_name'] - group_name = cmd['group_name'] - snap_name = cmd['snap_name'] - - return self.vc.create_subvolume_group_snapshot(vol_name, group_name, snap_name) + return self.vc.create_subvolume_group_snapshot(None, vol_name=cmd['vol_name'], + group_name=cmd['group_name'], + snap_name=cmd['snap_name']) def _cmd_fs_subvolumegroup_snapshot_rm(self, inbuf, cmd): - vol_name = cmd['vol_name'] - group_name = cmd['group_name'] - snap_name = cmd['snap_name'] - force = cmd.get('force', False) - - return self.vc.remove_subvolume_group_snapshot(vol_name, group_name, snap_name, force) + return self.vc.remove_subvolume_group_snapshot(None, vol_name=cmd['vol_name'], + group_name=cmd['group_name'], + snap_name=cmd['snap_name'], + force=cmd.get('force', False)) def _cmd_fs_subvolume_snapshot_create(self, inbuf, cmd): - vol_name = cmd['vol_name'] - sub_name = cmd['sub_name'] - snap_name = cmd['snap_name'] - group_name = cmd.get('group_name', None) - - return self.vc.create_subvolume_snapshot(vol_name, sub_name, snap_name, group_name) + return self.vc.create_subvolume_snapshot(None, vol_name=cmd['vol_name'], + sub_name=cmd['sub_name'], + snap_name=cmd['snap_name'], + group_name=cmd.get('group_name', None)) def _cmd_fs_subvolume_snapshot_rm(self, inbuf, cmd): - vol_name = cmd['vol_name'] - sub_name = cmd['sub_name'] - snap_name = cmd['snap_name'] - force = cmd.get('force', False) - group_name = cmd.get('group_name', None) - - return self.vc.remove_subvolume_snapshot(vol_name, sub_name, snap_name, group_name, force) + return self.vc.remove_subvolume_snapshot(None, vol_name=cmd['vol_name'], + sub_name=cmd['sub_name'], + snap_name=cmd['snap_name'], + group_name=cmd.get('group_name', None), + force=cmd.get('force', False))