Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nautilus: mgr/volumes: background purge queue for subvolumes #29079

Merged
merged 15 commits into from Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions qa/tasks/cephfs/mount.py
Expand Up @@ -208,6 +208,22 @@ def open_background(self, basename="background_file"):

return rproc

def wait_for_dir_empty(self, dirname, timeout=30):
i = 0
dirpath = os.path.join(self.mountpoint, dirname)
while i < timeout:
nr_entries = int(self.getfattr(dirpath, "ceph.dir.entries"))
if nr_entries == 0:
log.debug("Directory {0} seen empty from {1} after {2}s ".format(
dirname, self.client_id, i))
return
else:
time.sleep(1)
i += 1

raise RuntimeError("Timed out after {0}s waiting for {1} to become empty from {2}".format(
i, dirname, self.client_id))

def wait_for_visible(self, basename="background_file", timeout=30):
i = 0
while i < timeout:
Expand Down
78 changes: 77 additions & 1 deletion qa/tasks/cephfs/test_volumes.py
Expand Up @@ -14,6 +14,14 @@ class TestVolumes(CephFSTestCase):
TEST_SUBVOLUME_PREFIX="subvolume"
TEST_GROUP_PREFIX="group"
TEST_SNAPSHOT_PREFIX="snapshot"
TEST_FILE_NAME_PREFIX="subvolume_file"

# for filling subvolume with data
CLIENTS_REQUIRED = 1

# io defaults
DEFAULT_FILE_SIZE = 1 # MB
DEFAULT_NUMBER_OF_FILES = 1024

def _fs_cmd(self, *args):
return self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", *args)
Expand Down Expand Up @@ -51,6 +59,24 @@ def _get_subvolume_path(self, vol_name, subvol_name, group_name=None):
def _delete_test_volume(self):
self._fs_cmd("volume", "rm", self.volname)

def _do_subvolume_io(self, subvolume, number_of_files=DEFAULT_NUMBER_OF_FILES,
file_size=DEFAULT_FILE_SIZE):
# get subvolume path for IO
subvolpath = self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
self.assertNotEqual(subvolpath, None)
subvolpath = subvolpath[1:].rstrip() # remove "/" prefix and any trailing newline

log.debug("filling subvolume {0} with {1} files each {2}MB size".format(subvolume, number_of_files, file_size))
for i in range(number_of_files):
filename = "{0}.{1}".format(TestVolumes.TEST_FILE_NAME_PREFIX, i)
self.mount_a.write_n_mb(os.path.join(subvolpath, filename), file_size)

def _wait_for_trash_empty(self, timeout=30):
# XXX: construct the trash dir path (note that there is no mgr
# [sub]volume interface for this).
trashdir = os.path.join("./", "volumes", "_deleting")
self.mount_a.wait_for_dir_empty(trashdir)

def setUp(self):
super(TestVolumes, self).setUp()
self.volname = None
Expand Down Expand Up @@ -83,6 +109,9 @@ def test_subvolume_create_and_rm(self):
if ce.exitstatus != errno.ENOENT:
raise

# verify trash dir is clean
self._wait_for_trash_empty()

def test_subvolume_create_idempotence(self):
# create subvolume
subvolume = self._generate_random_subvolume_name()
Expand All @@ -94,6 +123,9 @@ def test_subvolume_create_idempotence(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)

# verify trash dir is clean
self._wait_for_trash_empty()

def test_nonexistent_subvolume_rm(self):
# remove non-existing subvolume
subvolume = "non_existent_subvolume"
Expand Down Expand Up @@ -134,6 +166,9 @@ def test_subvolume_create_and_rm_in_group(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)

# verify trash dir is clean
self._wait_for_trash_empty()

# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)

Expand Down Expand Up @@ -250,8 +285,9 @@ def test_subvolume_create_with_desired_mode_in_group(self):
self.assertEqual(actual_mode2, expected_mode2)
self.assertEqual(actual_mode3, expected_mode2)

self._fs_cmd("subvolume", "rm", self.volname, subvol2, group)
self._fs_cmd("subvolume", "rm", self.volname, subvol1, group)
self._fs_cmd("subvolume", "rm", self.volname, subvol2, group)
self._fs_cmd("subvolume", "rm", self.volname, subvol3, group)
self._fs_cmd("subvolumegroup", "rm", self.volname, group)

def test_nonexistent_subvolme_group_rm(self):
Expand Down Expand Up @@ -285,6 +321,9 @@ def test_subvolume_snapshot_create_and_rm(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)

# verify trash dir is clean
self._wait_for_trash_empty()

def test_subvolume_snapshot_create_idempotence(self):
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
Expand All @@ -304,6 +343,9 @@ def test_subvolume_snapshot_create_idempotence(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)

# verify trash dir is clean
self._wait_for_trash_empty()

def test_nonexistent_subvolume_snapshot_rm(self):
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
Expand All @@ -330,6 +372,9 @@ def test_nonexistent_subvolume_snapshot_rm(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)

# verify trash dir is clean
self._wait_for_trash_empty()

def test_subvolume_snapshot_in_group(self):
subvolume = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
Expand All @@ -350,6 +395,9 @@ def test_subvolume_snapshot_in_group(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)

# verify trash dir is clean
self._wait_for_trash_empty()

# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)

Expand All @@ -373,6 +421,9 @@ def test_subvolume_group_snapshot_create_and_rm(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)

# verify trash dir is clean
self._wait_for_trash_empty()

# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)

Expand All @@ -399,6 +450,9 @@ def test_subvolume_group_snapshot_idempotence(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)

# verify trash dir is clean
self._wait_for_trash_empty()

# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)

Expand Down Expand Up @@ -429,5 +483,27 @@ def test_nonexistent_subvolume_group_snapshot_rm(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)

# verify trash dir is clean
self._wait_for_trash_empty()

# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)

def test_async_subvolume_rm(self):
subvolume = self._generate_random_subvolume_name()

# create subvolume
self._fs_cmd("subvolume", "create", self.volname, subvolume)

# fill subvolume w/ some data
self._do_subvolume_io(subvolume)

self.mount_a.umount_wait()

# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)

self.mount_a.mount()

# verify trash dir is clean
self._wait_for_trash_empty()
2 changes: 0 additions & 2 deletions src/client/Client.cc
Expand Up @@ -6652,8 +6652,6 @@ int Client::mkdirs(const char *relpath, mode_t mode, const UserPerm& perms)
break;
cur.swap(next);
}
//check that we have work left to do
if (i==path.depth()) return -EEXIST;
if (r!=-ENOENT) return r;
ldout(cct, 20) << __func__ << " got through " << i << " directories on path " << relpath << dendl;
//make new directory at each level
Expand Down
198 changes: 198 additions & 0 deletions src/pybind/mgr/volumes/fs/purge_queue.py
@@ -0,0 +1,198 @@
import time
import logging
import threading
import traceback
from collections import deque

log = logging.getLogger(__name__)

class PurgeQueueBase(object):
"""
Base class for implementing purge queue strategies.
"""
class PurgeThread(threading.Thread):
def __init__(self, name, purge_fn):
self.purge_fn = purge_fn
# event object to cancel ongoing purge
self.cancel_event = threading.Event()
threading.Thread.__init__(self, name=name)

def run(self):
try:
self.purge_fn()
except Exception as e:
trace = "".join(traceback.format_exception(None, e, e.__traceback__))
log.error("purge queue thread encountered fatal error:\n"+trace)

def cancel_job(self):
self.cancel_event.set()

def should_cancel(self):
return self.cancel_event.isSet()

def reset_cancel(self):
self.cancel_event.clear()

def __init__(self, volume_client):
self.vc = volume_client
# volumes whose subvolumes need to be purged
self.q = deque()
# job tracking
self.jobs = {}
# lock, cv for kickstarting purge
self.lock = threading.Lock()
self.cv = threading.Condition(self.lock)
# lock, cv for purge cancellation
self.waiting = False
self.c_lock = threading.Lock()
self.c_cv = threading.Condition(self.c_lock)

def queue_purge_job(self, volname):
with self.lock:
if not self.q.count(volname):
self.q.append(volname)
self.jobs[volname] = []
self.cv.notifyAll()

def cancel_purge_job(self, volname):
log.info("cancelling purge jobs for volume '{0}'".format(volname))
self.lock.acquire()
unlock = True
try:
if not self.q.count(volname):
return
self.q.remove(volname)
if not self.jobs.get(volname, []):
return
# cancel in-progress purge operation and wait until complete
for j in self.jobs[volname]:
j[1].cancel_job()
# wait for cancellation to complete
with self.c_lock:
unlock = False
self.waiting = True
self.lock.release()
while self.waiting:
log.debug("waiting for {0} in-progress purge jobs for volume '{1}' to " \
"cancel".format(len(self.jobs[volname]), volname))
self.c_cv.wait()
finally:
if unlock:
self.lock.release()

def register_job(self, volname, purge_dir):
log.debug("registering purge job: {0}.{1}".format(volname, purge_dir))

thread_id = threading.currentThread()
self.jobs[volname].append((purge_dir, thread_id))

def unregister_job(self, volname, purge_dir):
log.debug("unregistering purge job: {0}.{1}".format(volname, purge_dir))

thread_id = threading.currentThread()
self.jobs[volname].remove((purge_dir, thread_id))

cancelled = thread_id.should_cancel()
thread_id.reset_cancel()

# wake up cancellation waiters if needed
if not self.jobs[volname] and cancelled:
logging.info("waking up cancellation waiters")
self.jobs.pop(volname)
with self.c_lock:
self.waiting = False
self.c_cv.notifyAll()

def get_trash_entry_for_volume(self, volname):
log.debug("fetching trash entry for volume '{0}'".format(volname))

exclude_entries = [v[0] for v in self.jobs[volname]]
ret = self.vc.get_subvolume_trash_entry(
None, vol_name=volname, exclude_entries=exclude_entries)
if not ret[0] == 0:
log.error("error fetching trash entry for volume '{0}': {1}".format(volname), ret[0])
return ret[0], None
return 0, ret[1]

def purge_trash_entry_for_volume(self, volname, purge_dir):
log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname))

thread_id = threading.currentThread()
ret = self.vc.purge_subvolume_trash_entry(
None, vol_name=volname, purge_dir=purge_dir, should_cancel=lambda: thread_id.should_cancel())
return ret[0]

class ThreadPoolPurgeQueueMixin(PurgeQueueBase):
"""
Purge queue mixin class maintaining a pool of threads for purging trash entries.
Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
entries (belonging to a set of volumes) have huge directory tree's (such as, lots
of small files in a directory w/ deep directory trees), this model may lead to
_all_ threads purging entries for one volume (starving other volumes).
"""
def __init__(self, volume_client, tp_size):
super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client)
self.threads = []
for i in range(tp_size):
self.threads.append(
PurgeQueueBase.PurgeThread(name="purgejob.{}".format(i), purge_fn=self.run))
self.threads[-1].start()

def pick_purge_dir_from_volume(self):
log.debug("processing {0} purge job entries".format(len(self.q)))
nr_vols = len(self.q)
to_remove = []
to_purge = None, None
while nr_vols > 0:
volname = self.q[0]
# do this now so that the other thread picks up trash entry
# for next volume.
self.q.rotate(1)
ret, purge_dir = self.get_trash_entry_for_volume(volname)
if purge_dir:
to_purge = volname, purge_dir
break
# this is an optimization when for a given volume there are no more
# entries in trash and no purge operations are in progress. in such
# a case we remove the volume from the tracking list so as to:
#
# a. not query the filesystem for trash entries over and over again
# b. keep the filesystem connection idle so that it can be freed
# from the connection pool
#
# if at all there are subvolume deletes, the volume gets added again
# to the tracking list and the purge operations kickstarts.
# note that, we do not iterate the volume list fully if there is a
# purge entry to process (that will take place eventually).
if ret == 0 and not purge_dir and not self.jobs[volname]:
to_remove.append(volname)
nr_vols -= 1
for vol in to_remove:
log.debug("auto removing volume '{0}' from purge job".format(vol))
self.q.remove(vol)
self.jobs.pop(vol)
return to_purge

def get_next_trash_entry(self):
while True:
# wait till there's a purge job
while not self.q:
log.debug("purge job list empty, waiting...")
self.cv.wait()
volname, purge_dir = self.pick_purge_dir_from_volume()
if purge_dir:
return volname, purge_dir
log.debug("no purge jobs available, waiting...")
self.cv.wait()

def run(self):
while True:
with self.lock:
volname, purge_dir = self.get_next_trash_entry()
self.register_job(volname, purge_dir)
ret = self.purge_trash_entry_for_volume(volname, purge_dir)
if ret != 0:
log.warn("failed to purge {0}.{1}".format(volname, purge_dir))
with self.lock:
self.unregister_job(volname, purge_dir)
time.sleep(1)