Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mgr / volumes: background purge queue for subvolumes #28003

Merged
merged 14 commits into from Jul 16, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

mgr / volumes: purge queue for async subvolume delete

Support asynchronous subvolume deletes by handing off the delete
operation to a dedicated set of threads. A subvolume delete operation
renames the subvolume (subdirectory) to a unique trash path entry
and signals the set of worker threads to pick up entries from the
trash directory for background removal.

This commit implements a `thread pool` strategy as a class mixin.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
  • Loading branch information
vshankar committed Jun 17, 2019
commit 483a2141fe8c9a58bc25a544412cdf5b047ad772
@@ -0,0 +1,193 @@
import time
import logging
import threading
from collections import deque

log = logging.getLogger(__name__)

class PurgeQueueBase(object):
"""
Base class for implementing purge queue strategies.
"""
class PurgeThread(threading.Thread):
def __init__(self, name, purge_fn):
self.purge_fn = purge_fn
# event object to cancel ongoing purge
self.cancel_event = threading.Event()
threading.Thread.__init__(self, name=name)

def run(self):
self.purge_fn()

def cancel_job(self):
self.cancel_event.set()

def should_cancel(self):
return self.cancel_event.isSet()

def reset_cancel(self):
self.cancel_event.clear()

def __init__(self, volume_client):
self.vc = volume_client
# volumes whose subvolumes need to be purged
self.q = deque()
# job tracking
self.jobs = {}
# lock, cv for kickstarting purge
self.lock = threading.Lock()
self.cv = threading.Condition(self.lock)
# lock, cv for purge cancellation
self.waiting = False
self.c_lock = threading.Lock()
self.c_cv = threading.Condition(self.c_lock)

def queue_purge_job(self, volname):
with self.lock:
if not self.q.count(volname):
self.q.append(volname)
self.jobs[volname] = []
self.cv.notifyAll()

def cancel_purge_job(self, volname):
log.info("cancelling purge jobs for volume '{0}'".format(volname))
self.lock.acquire()
unlock = True
try:
if not self.q.count(volname):
return
self.q.remove(volname)
if not self.jobs.get(volname, []):
return
# cancel in-progress purge operation and wait until complete
for j in self.jobs[volname]:
j[1].cancel_job()
# wait for cancellation to complete
with self.c_lock:
unlock = False
self.waiting = True

This comment has been minimized.

Copy link
@vshankar

vshankar Jun 18, 2019

Author Contributor

@ajarr -- note that this essentially makes this interface unsafe for reentrancy. so, my question is (and I think Patrick mentioned about this somewhere): is there a need for the cancel interface to be reentrant safe?

self.lock.release()
while self.waiting:
log.debug("waiting for {0} in-progress purge jobs for volume '{1}' to " \
"cancel".format(len(self.jobs[volname]), volname))
self.c_cv.wait()
finally:
if unlock:
self.lock.release()

def register_job(self, volname, purge_dir):
log.debug("registering purge job: {0}.{1}".format(volname, purge_dir))

thread_id = threading.currentThread()
self.jobs[volname].append((purge_dir, thread_id))

def unregister_job(self, volname, purge_dir):
log.debug("unregistering purge job: {0}.{1}".format(volname, purge_dir))

thread_id = threading.currentThread()
self.jobs[volname].remove((purge_dir, thread_id))

cancelled = thread_id.should_cancel()
thread_id.reset_cancel()

# wake up cancellation waiters if needed
if not self.jobs[volname] and cancelled:
logging.info("waking up cancellation waiters")
self.jobs.pop(volname)
with self.c_lock:
self.waiting = False
self.c_cv.notifyAll()

def get_trash_entry_for_volume(self, volname):
log.debug("fetching trash entry for volume '{0}'".format(volname))

exclude_entries = [v[0] for v in self.jobs[volname]]
ret = self.vc.get_subvolume_trash_entry(
None, vol_name=volname, exclude_entries=exclude_entries)
if not ret[0] == 0:
log.error("error fetching trash entry for volume '{0}': {1}".format(volname), ret[0])
return ret[0], None
return 0, ret[1]

def purge_trash_entry_for_volume(self, volname, purge_dir):
This conversation was marked as resolved by ajarr

This comment has been minimized.

Copy link
@ajarr

ajarr Jun 20, 2019

Contributor

I don't follow why this method and the method above belong to the PurgeQueueBase. Can these two methods belong to ThreadPoolPurgeQueueMixin, and you wouldn't need to pass volume_client object to PurgeQueueBase?

This comment has been minimized.

Copy link
@vshankar

vshankar Jun 21, 2019

Author Contributor

These are helpers to fetch a trash entry and delete it for a given volume. There is nothing that is specific to the implemented Mixin (ThreadPoolPurgeQueueMixin). So, if there is another mixin that's implemented (say, a thread per volume), that mixin would just need to maintain a dedicated thread per volume and invoke these routines to fetch and delete trash entries.

On the question of passing volume_client to PurgeQueueBase, that's already done in ThreadPoolPurgeQueueMixin::__init__() here: 8cf44d7#diff-596601fcaf9d34736d792f855d720c91R125

log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname))

thread_id = threading.currentThread()
ret = self.vc.purge_subvolume_trash_entry(
None, vol_name=volname, purge_dir=purge_dir, should_cancel=lambda: thread_id.should_cancel())
return ret[0]

class ThreadPoolPurgeQueueMixin(PurgeQueueBase):
"""
Purge queue mixin class maintaining a pool of threads for purging trash entries.
Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
entries (belonging to a set of volumes) have huge directory tree's (such as, lots
of small files in a directory w/ deep directory trees), this model may lead to
_all_ threads purging entries for one volume (starving other volumes).
"""
def __init__(self, volume_client, tp_size):
super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client)
self.threads = []
for i in range(tp_size):
self.threads.append(
PurgeQueueBase.PurgeThread(name="purgejob.{}".format(i), purge_fn=self.run))
self.threads[-1].start()
This conversation was marked as resolved by ajarr

This comment has been minimized.

Copy link
@ajarr

ajarr Jun 17, 2019

Contributor

same as self.threads[i].start()?

This comment has been minimized.

Copy link
@vshankar

vshankar Jun 17, 2019

Author Contributor

yeh -- but accessing w/ [-1] is more pythonic...


def pick_purge_dir_from_volume(self):
log.debug("processing {0} purge job entries".format(len(self.q)))
nr_vols = len(self.q)
to_remove = []
to_purge = None, None
while nr_vols > 0:
volname = self.q[0]
# do this now so that the other thread picks up trash entry
# for next volume.
self.q.rotate(1)
ret, purge_dir = self.get_trash_entry_for_volume(volname)
if purge_dir:
to_purge = volname, purge_dir
break
# this is an optimization when for a given volume there are no more
# entries in trash and no purge operations are in progress. in such
# a case we remove the volume from the tracking list so as to:
#
# a. not query the filesystem for trash entries over and over again
# b. keep the filesystem connection idle so that it can be freed
# from the connection pool
#
# if at all there are subvolume deletes, the volume gets added again
# to the tracking list and the purge operations kickstarts.
# note that, we do not iterate the volume list fully if there is a
# purge entry to process (that will take place eventually).
if ret == 0 and not purge_dir and not self.jobs[volname]:
to_remove.append(volname)
nr_vols -= 1
for vol in to_remove:
log.debug("auto removing volume '{0}' from purge job".format(vol))
self.q.remove(vol)
self.jobs.pop(vol)
return to_purge

def get_next_trash_entry(self):
while True:
# wait till there's a purge job
while not self.q:
log.debug("purge job list empty, waiting...")
self.cv.wait()
volname, purge_dir = self.pick_purge_dir_from_volume()
if purge_dir:
return volname, purge_dir
log.debug("no purge jobs available, waiting...")
self.cv.wait()

def run(self):
while True:
with self.lock:
volname, purge_dir = self.get_next_trash_entry()
self.register_job(volname, purge_dir)
ret = self.purge_trash_entry_for_volume(volname, purge_dir)
if ret != 0:
log.warn("failed to purge {0}.{1}".format(volname, purge_dir))
with self.lock:
self.unregister_job(volname, purge_dir)
time.sleep(1)
@@ -1,4 +1,5 @@
import os
import uuid

class SubvolumeSpec(object):
"""
@@ -64,6 +65,13 @@ def trash_path(self):
"""
return os.path.join(self.subvolume_prefix, "_deleting", self.subvolumeid)

@property
def unique_trash_path(self):
"""
return a unique trash directory entry path
"""
return os.path.join(self.subvolume_prefix, "_deleting", str(uuid.uuid4()))

@property
def fs_namespace(self):
"""
@@ -56,6 +56,28 @@ def _mkdir_p(self, path, mode=0o755):
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])

def _get_single_dir_entry(self, dir_path, exclude=[]):
"""
Return a directory entry in a given directory exclusing passed
in entries.
"""
try:
dir_handle = self.fs.opendir(dir_path)
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])

exclude.extend([".", ".."])
This conversation was marked as resolved by batrick

This comment has been minimized.

Copy link
@batrick

batrick Jul 10, 2019

Member

b".", b".."


d = self.fs.readdir(dir_handle)
d_name = None
while d:
if not d.d_name.decode('utf-8') in exclude and d.is_dir():
This conversation was marked as resolved by batrick

This comment has been minimized.

Copy link
@batrick

batrick Jul 10, 2019

Member

We should avoid decoding the dentries except when presenting them externally in an error/output.

d_name = d.d_name.decode('utf-8')
break
d = self.fs.readdir(dir_handle)
self.fs.closedir(dir_handle)
return d_name

### basic subvolume operations

def create_subvolume(self, spec, size=None, namespace_isolated=True, mode=0o755, pool=None):
@@ -99,8 +121,8 @@ def create_subvolume(self, spec, size=None, namespace_isolated=True, mode=0o755,
def remove_subvolume(self, spec, force):
"""
Make a subvolume inaccessible to guests. This function is idempotent.
This is the fast part of tearing down a subvolume: you must also later
call purge_subvolume, which is the slow part.
This is the fast part of tearing down a subvolume. The subvolume will
get purged in the background.
:param spec: subvolume path specification
:param force: flag to ignore non-existent path (never raise exception)
@@ -114,7 +136,10 @@ def remove_subvolume(self, spec, force):
trashdir = spec.trash_dir
self._mkdir_p(trashdir)

trashpath = spec.trash_path
# mangle the trash directroy entry to a random string so that subsequent
# subvolume create and delete with same name moves the subvolume directory
# to a unique trash dir (else, rename() could fail if the trash dir exist).
trashpath = spec.unique_trash_path
try:
self.fs.rename(subvolpath, trashpath)
except cephfs.ObjectNotFound:
@@ -124,10 +149,9 @@ def remove_subvolume(self, spec, force):
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])

def purge_subvolume(self, spec):
def purge_subvolume(self, spec, should_cancel):
"""
Finish clearing up a subvolume that was previously passed to delete_subvolume. This
function is idempotent.
Finish clearing up a subvolume from the trash directory.
"""

def rmtree(root_path):
@@ -139,7 +163,7 @@ def rmtree(root_path):
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
d = self.fs.readdir(dir_handle)
while d:
while d and not should_cancel():
d_name = d.d_name.decode('utf-8')
if d_name not in [".", ".."]:
# Do not use os.path.join because it is sensitive
@@ -153,7 +177,10 @@ def rmtree(root_path):

d = self.fs.readdir(dir_handle)
self.fs.closedir(dir_handle)
self.fs.rmdir(root_path)
# remove the directory only if we were not asked to cancel
# (else we would fail to remove this anyway)
if not should_cancel():
self.fs.rmdir(root_path)

trashpath = spec.trash_path
# catch any unlink errors
@@ -256,6 +283,16 @@ def remove_group_snapshot(self, spec, snapname, force):
snappath = spec.make_group_snap_path(self.rados.conf_get('client_snapdir'), snapname)
return self._snapshot_delete(snappath, force)

def get_trash_entry(self, spec, exclude):
try:
trashdir = spec.trash_dir
return self._get_single_dir_entry(trashdir, exclude)
except VolumeException as ve:
if ve.errno == -errno.ENOENT:
# trash dir does not exist yet, signal success
return None
raise

### context manager routines

def __enter__(self):
This conversation was marked as resolved by ajarr

This comment has been minimized.

Copy link
@ajarr

ajarr Jun 16, 2019

Contributor

Do we need to implement the enter and exit methods anymore? Does the class Subvolume need to be a context manager?

This comment has been minimized.

Copy link
@vshankar

vshankar Jun 17, 2019

Author Contributor

It need not be -- I still kept it as it for future cases (maybe we still require some auto-fu method which would make sense via context managers).

@@ -16,6 +16,7 @@
from .subvolspec import SubvolumeSpec
from .subvolume import SubVolume
from .exception import VolumeException
from .purge_queue import ThreadPoolPurgeQueueMixin

log = logging.getLogger(__name__)

@@ -160,6 +161,8 @@ class VolumeClient(object):
def __init__(self, mgr):
self.mgr = mgr
self.connection_pool = ConnectionPool(self.mgr)
# TODO: make thread pool size configurable
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)

def gen_pool_names(self, volname):
"""
@@ -273,6 +276,7 @@ def delete_volume(self, volname):
"""
delete the given module (tear down mds, remove filesystem)
"""
self.purge_queue.cancel_purge_job(volname)
self.connection_pool.del_fs_handle(volname)
# Tear down MDS daemons
try:
@@ -390,7 +394,7 @@ def remove_subvolume(self, fs_handle, **kwargs):
spec = SubvolumeSpec(subvolname, groupname)
if self.group_exists(sv, spec):
sv.remove_subvolume(spec, force)
sv.purge_subvolume(spec)
self.purge_queue.queue_purge_job(volname)
elif not force:
raise VolumeException(
-errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \
@@ -545,3 +549,33 @@ def remove_subvolume_group_snapshot(self, fs_handle, **kwargs):
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret

@connection_pool_wrap
def get_subvolume_trash_entry(self, fs_handle, **kwargs):
ret = None
volname = kwargs['vol_name']
exclude = kwargs.get('exclude_entries', [])

try:
with SubVolume(self.mgr, fs_handle) as sv:
spec = SubvolumeSpec("", "")
path = sv.get_trash_entry(spec, exclude)
ret = 0, path, ""
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret

@connection_pool_wrap
def purge_subvolume_trash_entry(self, fs_handle, **kwargs):
ret = 0, "", ""
volname = kwargs['vol_name']
purge_dir = kwargs['purge_dir']
should_cancel = kwargs.get('should_cancel', lambda: False)

try:
with SubVolume(self.mgr, fs_handle) as sv:
spec = SubvolumeSpec(purge_dir, "")
sv.purge_subvolume(spec, should_cancel)
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.