Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mgr / volumes: background purge queue for subvolumes #28003

Merged
merged 14 commits into from Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions qa/tasks/cephfs/mount.py
Expand Up @@ -247,6 +247,22 @@ def open_background(self, basename="background_file", write=True):

return rproc

def wait_for_dir_empty(self, dirname, timeout=30):
i = 0
dirpath = os.path.join(self.mountpoint, dirname)
while i < timeout:
nr_entries = int(self.getfattr(dirpath, "ceph.dir.entries"))
if nr_entries == 0:
log.debug("Directory {0} seen empty from {1} after {2}s ".format(
dirname, self.client_id, i))
return
else:
time.sleep(1)
i += 1

raise RuntimeError("Timed out after {0}s waiting for {1} to become empty from {2}".format(
i, dirname, self.client_id))

def wait_for_visible(self, basename="background_file", timeout=30):
i = 0
while i < timeout:
Expand Down
78 changes: 77 additions & 1 deletion qa/tasks/cephfs/test_volumes.py
Expand Up @@ -14,6 +14,14 @@ class TestVolumes(CephFSTestCase):
TEST_SUBVOLUME_PREFIX="subvolume"
TEST_GROUP_PREFIX="group"
TEST_SNAPSHOT_PREFIX="snapshot"
TEST_FILE_NAME_PREFIX="subvolume_file"

# for filling subvolume with data
CLIENTS_REQUIRED = 1

# io defaults
DEFAULT_FILE_SIZE = 1 # MB
DEFAULT_NUMBER_OF_FILES = 1024

def _fs_cmd(self, *args):
return self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", *args)
Expand Down Expand Up @@ -51,6 +59,24 @@ def _get_subvolume_path(self, vol_name, subvol_name, group_name=None):
def _delete_test_volume(self):
self._fs_cmd("volume", "rm", self.volname)

def _do_subvolume_io(self, subvolume, number_of_files=DEFAULT_NUMBER_OF_FILES,
file_size=DEFAULT_FILE_SIZE):
# get subvolume path for IO
subvolpath = self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
self.assertNotEqual(subvolpath, None)
subvolpath = subvolpath[1:].rstrip() # remove "/" prefix and any trailing newline

log.debug("filling subvolume {0} with {1} files each {2}MB size".format(subvolume, number_of_files, file_size))
for i in range(number_of_files):
filename = "{0}.{1}".format(TestVolumes.TEST_FILE_NAME_PREFIX, i)
self.mount_a.write_n_mb(os.path.join(subvolpath, filename), file_size)

def _wait_for_trash_empty(self, timeout=30):
# XXX: construct the trash dir path (note that there is no mgr
# [sub]volume interface for this).
trashdir = os.path.join("./", "volumes", "_deleting")
self.mount_a.wait_for_dir_empty(trashdir)

def setUp(self):
super(TestVolumes, self).setUp()
self.volname = None
Expand Down Expand Up @@ -83,6 +109,9 @@ def test_subvolume_create_and_rm(self):
if ce.exitstatus != errno.ENOENT:
raise

# verify trash dir is clean
self._wait_for_trash_empty()

def test_subvolume_create_idempotence(self):
# create subvolume
subvolume = self._generate_random_subvolume_name()
Expand All @@ -94,6 +123,9 @@ def test_subvolume_create_idempotence(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)

# verify trash dir is clean
self._wait_for_trash_empty()

def test_nonexistent_subvolume_rm(self):
# remove non-existing subvolume
subvolume = "non_existent_subvolume"
Expand Down Expand Up @@ -134,6 +166,9 @@ def test_subvolume_create_and_rm_in_group(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)

# verify trash dir is clean
self._wait_for_trash_empty()

# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)

Expand Down Expand Up @@ -250,8 +285,9 @@ def test_subvolume_create_with_desired_mode_in_group(self):
self.assertEqual(actual_mode2, expected_mode2)
self.assertEqual(actual_mode3, expected_mode2)

self._fs_cmd("subvolume", "rm", self.volname, subvol2, group)
self._fs_cmd("subvolume", "rm", self.volname, subvol1, group)
self._fs_cmd("subvolume", "rm", self.volname, subvol2, group)
self._fs_cmd("subvolume", "rm", self.volname, subvol3, group)
self._fs_cmd("subvolumegroup", "rm", self.volname, group)

def test_nonexistent_subvolme_group_rm(self):
Expand Down Expand Up @@ -285,6 +321,9 @@ def test_subvolume_snapshot_create_and_rm(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)

# verify trash dir is clean
self._wait_for_trash_empty()

def test_subvolume_snapshot_create_idempotence(self):
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
Expand All @@ -304,6 +343,9 @@ def test_subvolume_snapshot_create_idempotence(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)

# verify trash dir is clean
self._wait_for_trash_empty()

def test_nonexistent_subvolume_snapshot_rm(self):
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
Expand All @@ -330,6 +372,9 @@ def test_nonexistent_subvolume_snapshot_rm(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)

# verify trash dir is clean
self._wait_for_trash_empty()

def test_subvolume_snapshot_in_group(self):
subvolume = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
Expand All @@ -350,6 +395,9 @@ def test_subvolume_snapshot_in_group(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)

# verify trash dir is clean
self._wait_for_trash_empty()

# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)

Expand All @@ -373,6 +421,9 @@ def test_subvolume_group_snapshot_create_and_rm(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)

# verify trash dir is clean
self._wait_for_trash_empty()

# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)

Expand All @@ -399,6 +450,9 @@ def test_subvolume_group_snapshot_idempotence(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)

# verify trash dir is clean
self._wait_for_trash_empty()

# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)

Expand Down Expand Up @@ -429,5 +483,27 @@ def test_nonexistent_subvolume_group_snapshot_rm(self):
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)

# verify trash dir is clean
self._wait_for_trash_empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I run the tests locally (with vstart_runner on vstart cluster), this test times out,

2019-06-21 23:57:14,760.760 INFO:__main__:Running ['./bin/ceph', 'fs', 'flag', 'set', 'enable_multiple', 'true', '--yes-i-really-mean-it']
2019-06-21 23:57:15,912.912 INFO:__main__:Running ['./bin/ceph', 'fs', 'volume', 'ls']
2019-06-21 23:57:16,258.258 INFO:__main__:Running ['./bin/ceph', 'fs', 'subvolumegroup', 'create', u'cephfs', 'group_31']
2019-06-21 23:57:16,665.665 INFO:__main__:Running ['./bin/ceph', 'fs', 'subvolume', 'create', u'cephfs', 'subvolume_9913', '--group_name', 'group_31']
2019-06-21 23:57:17,061.061 INFO:__main__:Running ['./bin/ceph', 'fs', 'subvolumegroup', 'snapshot', 'create', u'cephfs', 'group_31', 'snapshot_9']
2019-06-21 23:57:17,454.454 INFO:__main__:Running ['./bin/ceph', 'fs', 'subvolumegroup', 'snapshot', 'rm', u'cephfs', 'group_31', 'snapshot_9']
2019-06-21 23:57:17,856.856 INFO:__main__:Running ['./bin/ceph', 'fs', 'subvolumegroup', 'snapshot', 'rm', u'cephfs', 'group_31', 'snapshot_9']
2019-06-21 23:57:17.984 7f67000ff700 -1 WARNING: all dangerous and experimental features are enabled.
2019-06-21 23:57:18.053 7f67000ff700 -1 WARNING: all dangerous and experimental features are enabled.
Error ENOENT: Snapshot '/volumes/group_31/.snap/snapshot_9' not found, cannot remove it
2019-06-21 23:57:18,220.220 INFO:__main__:Running ['./bin/ceph', 'fs', 'subvolume', 'rm', u'cephfs', 'subvolume_9913', 'group_31']
2019-06-21 23:57:18,590.590 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:19,622.622 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:20,668.668 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:21,693.693 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:22,719.719 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:23,760.760 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:24,807.807 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:25,855.855 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:26,903.903 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:27,926.926 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:28,971.971 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:30,015.015 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:31,060.060 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:32,107.107 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:33,152.152 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:34,198.198 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:35,222.222 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:36,242.242 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:37,261.261 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:38,280.280 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:39,306.306 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:40,347.347 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:41,363.363 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:42,406.406 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:43,451.451 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:44,497.497 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:45,526.526 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:46,573.573 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:47,590.590 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:48,637.637 INFO:__main__:Running ['getfattr', '--only-values', '-n', 'ceph.dir.entries', '/tmp/tmpuh6UYV/mnt.0/./volumes/_deleting']
2019-06-21 23:57:49,681.681 INFO:__main__:test_nonexistent_subvolume_group_snapshot_rm (tasks.cephfs.test_volumes.TestVolumes) ... ERROR
2019-06-21 23:57:49,682.682 ERROR:__main__:Traceback (most recent call last):
  File "/home/rraja/git/ceph/qa/tasks/cephfs/test_volumes.py", line 360, in test_nonexistent_subvolume_group_snapshot_rm
    self._wait_for_trash_empty()
  File "/home/rraja/git/ceph/qa/tasks/cephfs/test_volumes.py", line 69, in _wait_for_trash_empty
    self.mount_a.wait_for_dir_empty(trashdir)
  File "/home/rraja/git/ceph/qa/tasks/cephfs/mount.py", line 255, in wait_for_dir_empty
    i, dirname, self.client_id))
RuntimeError: Timed out after 30s waiting for ./volumes/_deleting to become empty from 0

2019-06-21 23:57:49,682.682 ERROR:__main__:Error in test 'test_nonexistent_subvolume_group_snapshot_rm (tasks.cephfs.test_volumes.TestVolumes)', going interactive


I checked the contents of the filesystem with a manual fuse-mount, the trash folder wasn't empty. It never got purged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I manually populated a subvolume with data, and removed the subvolume. The corresponding trash folder didn't get purged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I manually populated a subvolume with data, and removed the subvolume. The corresponding trash folder didn't get purged.

what was the uid/gid of the populated data? did you perform a plain cp? did you observe any error messages in manager log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the purge thread will fail to remove the directory (and it's entries) from trash if it does not have perms to remove those based on uid/gid. Were the files/dirs created by root?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajarr ping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajarr -- pinging back on this? If this is not an issue then I can post the updated PR for review (minus the fix for assertion).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I manually populated a subvolume with data, and removed the subvolume. The corresponding trash folder didn't get purged.

what was the uid/gid of the populated data? did you perform a plain cp? did you observe any error messages in manager log?

The uid/gid of the populated data was same as that of the ceph-mgr process.

Copy link
Contributor

@ajarr ajarr Jun 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more logging and the d_name.decode() fix as follows,

diff --git a/src/pybind/mgr/volumes/fs/purge_queue.py b/src/pybind/mgr/volumes/fs/purge_queue.py
index e1e9dc1dae..81aa4bb436 100644
--- a/src/pybind/mgr/volumes/fs/purge_queue.py
+++ b/src/pybind/mgr/volumes/fs/purge_queue.py
@@ -80,6 +80,8 @@ class PurgeQueueBase(object):
 
         thread_id = threading.currentThread()
         self.jobs[volname].append((purge_dir, thread_id))
+        log.debug("In register_job: job for volume {0} is {1}".format(volname, self.jobs[volname]))
+
 
     def unregister_job(self, volname, purge_dir):
         log.debug("unregistering purge job: {0}.{1}".format(volname, purge_dir))
@@ -109,6 +111,7 @@ class PurgeQueueBase(object):
 
     def purge_trash_entry_for_volume(self, volname, purge_dir):
         thread_id = threading.currentThread()
+        log.debug("In purge_trash_entry_for_volume: vol name '(0}' and purge dir '{1}'".format(volname, purge_dir))
         ret = self.vc.purge_subvolume_trash_entry(
             None, vol_name=volname, purge_dir=purge_dir, should_cancel=lambda: thread_id.should_cancel())
         return ret[0]
@@ -140,6 +143,7 @@ class ThreadPoolPurgeQueueMixin(PurgeQueueBase):
             # for next volume.
             self.q.rotate(1)
             ret, purge_dir = self.get_trash_entry_for_volume(volname)
+            log.debug("In pick_purge_dir_from_volume: purge dir is '{0}'".format(purge_dir))
             if purge_dir:
                 to_purge = volname, purge_dir
                 break
diff --git a/src/pybind/mgr/volumes/fs/subvolume.py b/src/pybind/mgr/volumes/fs/subvolume.py
index 9b86a510ac..cd349f5046 100644
--- a/src/pybind/mgr/volumes/fs/subvolume.py
+++ b/src/pybind/mgr/volumes/fs/subvolume.py
@@ -71,8 +71,8 @@ class SubVolume(object):
         d = self.fs.readdir(dir_handle)
         d_name = None
         while d:
-            if not d.d_name in exclude and d.is_dir():
-                d_name = d.d_name
+            if not d.d_name.decode('utf-8') in exclude and d.is_dir():
+                d_name = d.d_name.decode('utf-8')
                 break
             d = self.fs.readdir(dir_handle)
         self.fs.closedir(dir_handle)
diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py
index e69d35ad4f..64f0690026 100644
--- a/src/pybind/mgr/volumes/fs/volume.py
+++ b/src/pybind/mgr/volumes/fs/volume.py
@@ -550,6 +550,7 @@ class VolumeClient(object):
             with SubVolume(self.mgr, fs_handle) as sv:
                 spec = SubvolumeSpec("", "")
                 path = sv.get_trash_entry(spec, exclude)
+                log.debug("In get_subvolume_trash_entry: trash path is '{0}'".format(path))
                 ret = 0, path, ""
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)

Even though a thread picks up the trashdir and registers it, the thread doesn't seem to purge it, i.e., execute purge_trash_entry_for_volume. See the mgr log snippet, after the fs subvolume rm of a subvolume is called, https://pastebin.com/0MWqCJG7 . I didn't see the debug log message of being in purge_trash_entry_for_volume in the mgr log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though a thread picks up the trashdir and registers it, the thread doesn't seem to purge it, i.e., execute purge_trash_entry_for_volume. See the mgr log snippet, after the fs subvolume rm of a subvolume is called, https://pastebin.com/0MWqCJG7 . I didn't see the debug log message of being in purge_trash_entry_for_volume in the mgr log.

that may be due to buffered logging (I've seen that before in an mgr module). if an exception is raised in purge_trash_entry_for_volume(), the string is never logged.


# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)

def test_async_subvolume_rm(self):
subvolume = self._generate_random_subvolume_name()

# create subvolume
self._fs_cmd("subvolume", "create", self.volname, subvolume)

# fill subvolume w/ some data
self._do_subvolume_io(subvolume)

self.mount_a.umount_wait()

# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume)

self.mount_a.mount()

# verify trash dir is clean
self._wait_for_trash_empty()
2 changes: 0 additions & 2 deletions src/client/Client.cc
Expand Up @@ -6655,8 +6655,6 @@ int Client::mkdirs(const char *relpath, mode_t mode, const UserPerm& perms)
break;
cur.swap(next);
}
//check that we have work left to do
if (i==path.depth()) return -EEXIST;
if (r!=-ENOENT) return r;
ldout(cct, 20) << __func__ << " got through " << i << " directories on path " << relpath << dendl;
//make new directory at each level
Expand Down
198 changes: 198 additions & 0 deletions src/pybind/mgr/volumes/fs/purge_queue.py
@@ -0,0 +1,198 @@
import time
import logging
import threading
import traceback
from collections import deque

log = logging.getLogger(__name__)

class PurgeQueueBase(object):
"""
Base class for implementing purge queue strategies.
"""
class PurgeThread(threading.Thread):
def __init__(self, name, purge_fn):
self.purge_fn = purge_fn
# event object to cancel ongoing purge
self.cancel_event = threading.Event()
threading.Thread.__init__(self, name=name)

def run(self):
try:
self.purge_fn()
except Exception as e:
trace = "".join(traceback.format_exception(None, e, e.__traceback__))
log.error("purge queue thread encountered fatal error:\n"+trace)

def cancel_job(self):
self.cancel_event.set()

def should_cancel(self):
return self.cancel_event.isSet()

def reset_cancel(self):
self.cancel_event.clear()

def __init__(self, volume_client):
self.vc = volume_client
# volumes whose subvolumes need to be purged
self.q = deque()
# job tracking
self.jobs = {}
# lock, cv for kickstarting purge
self.lock = threading.Lock()
self.cv = threading.Condition(self.lock)
# lock, cv for purge cancellation
self.waiting = False
self.c_lock = threading.Lock()
self.c_cv = threading.Condition(self.c_lock)

def queue_purge_job(self, volname):
with self.lock:
if not self.q.count(volname):
self.q.append(volname)
self.jobs[volname] = []
self.cv.notifyAll()

def cancel_purge_job(self, volname):
log.info("cancelling purge jobs for volume '{0}'".format(volname))
self.lock.acquire()
unlock = True
try:
if not self.q.count(volname):
return
self.q.remove(volname)
if not self.jobs.get(volname, []):
return
# cancel in-progress purge operation and wait until complete
for j in self.jobs[volname]:
j[1].cancel_job()
# wait for cancellation to complete
with self.c_lock:
unlock = False
self.waiting = True
Copy link
Contributor Author

@vshankar vshankar Jun 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajarr -- note that this essentially makes this interface unsafe for reentrancy. so, my question is (and I think Patrick mentioned about this somewhere): is there a need for the cancel interface to be reentrant safe?

self.lock.release()
while self.waiting:
log.debug("waiting for {0} in-progress purge jobs for volume '{1}' to " \
"cancel".format(len(self.jobs[volname]), volname))
self.c_cv.wait()
finally:
if unlock:
self.lock.release()

def register_job(self, volname, purge_dir):
log.debug("registering purge job: {0}.{1}".format(volname, purge_dir))

thread_id = threading.currentThread()
self.jobs[volname].append((purge_dir, thread_id))

def unregister_job(self, volname, purge_dir):
log.debug("unregistering purge job: {0}.{1}".format(volname, purge_dir))

thread_id = threading.currentThread()
self.jobs[volname].remove((purge_dir, thread_id))

cancelled = thread_id.should_cancel()
thread_id.reset_cancel()

# wake up cancellation waiters if needed
if not self.jobs[volname] and cancelled:
logging.info("waking up cancellation waiters")
self.jobs.pop(volname)
with self.c_lock:
self.waiting = False
self.c_cv.notifyAll()

def get_trash_entry_for_volume(self, volname):
log.debug("fetching trash entry for volume '{0}'".format(volname))

exclude_entries = [v[0] for v in self.jobs[volname]]
ret = self.vc.get_subvolume_trash_entry(
None, vol_name=volname, exclude_entries=exclude_entries)
if not ret[0] == 0:
log.error("error fetching trash entry for volume '{0}': {1}".format(volname), ret[0])
return ret[0], None
return 0, ret[1]

def purge_trash_entry_for_volume(self, volname, purge_dir):
ajarr marked this conversation as resolved.
Show resolved Hide resolved
log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname))

thread_id = threading.currentThread()
ret = self.vc.purge_subvolume_trash_entry(
None, vol_name=volname, purge_dir=purge_dir, should_cancel=lambda: thread_id.should_cancel())
return ret[0]

class ThreadPoolPurgeQueueMixin(PurgeQueueBase):
"""
Purge queue mixin class maintaining a pool of threads for purging trash entries.
Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
entries (belonging to a set of volumes) have huge directory tree's (such as, lots
of small files in a directory w/ deep directory trees), this model may lead to
_all_ threads purging entries for one volume (starving other volumes).
"""
def __init__(self, volume_client, tp_size):
super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client)
self.threads = []
for i in range(tp_size):
self.threads.append(
PurgeQueueBase.PurgeThread(name="purgejob.{}".format(i), purge_fn=self.run))
self.threads[-1].start()
ajarr marked this conversation as resolved.
Show resolved Hide resolved

def pick_purge_dir_from_volume(self):
log.debug("processing {0} purge job entries".format(len(self.q)))
nr_vols = len(self.q)
to_remove = []
to_purge = None, None
while nr_vols > 0:
volname = self.q[0]
# do this now so that the other thread picks up trash entry
# for next volume.
self.q.rotate(1)
ret, purge_dir = self.get_trash_entry_for_volume(volname)
if purge_dir:
to_purge = volname, purge_dir
break
# this is an optimization when for a given volume there are no more
# entries in trash and no purge operations are in progress. in such
# a case we remove the volume from the tracking list so as to:
#
# a. not query the filesystem for trash entries over and over again
# b. keep the filesystem connection idle so that it can be freed
# from the connection pool
#
# if at all there are subvolume deletes, the volume gets added again
# to the tracking list and the purge operations kickstarts.
# note that, we do not iterate the volume list fully if there is a
# purge entry to process (that will take place eventually).
if ret == 0 and not purge_dir and not self.jobs[volname]:
to_remove.append(volname)
nr_vols -= 1
for vol in to_remove:
log.debug("auto removing volume '{0}' from purge job".format(vol))
self.q.remove(vol)
self.jobs.pop(vol)
return to_purge

def get_next_trash_entry(self):
while True:
# wait till there's a purge job
while not self.q:
log.debug("purge job list empty, waiting...")
self.cv.wait()
volname, purge_dir = self.pick_purge_dir_from_volume()
if purge_dir:
return volname, purge_dir
log.debug("no purge jobs available, waiting...")
self.cv.wait()

def run(self):
while True:
with self.lock:
volname, purge_dir = self.get_next_trash_entry()
self.register_job(volname, purge_dir)
ret = self.purge_trash_entry_for_volume(volname, purge_dir)
if ret != 0:
log.warn("failed to purge {0}.{1}".format(volname, purge_dir))
with self.lock:
self.unregister_job(volname, purge_dir)
time.sleep(1)