Skip to content

Commit

Permalink
mgr/volumes: support to reject CephFS clones if cloner threads are no…
Browse files Browse the repository at this point in the history
…t available

CephFS clone creation have a limit of 4 parallel clones by default at a time and rest
of the clone create requests are queued. This makes CephFS cloning very slow when
there is large amount of clones being created.After this patch clone requests won't be accepeted
when the requests exceed the `max_concurrent_clones` config value.

Fixes:  https://tracker.ceph.com/issues/59714
Signed-off-by: Neeraj Pratap Singh <neesingh@redhat.com>
  • Loading branch information
neeraj pratap singh committed Feb 8, 2024
1 parent f9d771f commit 079f722
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 6 deletions.
6 changes: 5 additions & 1 deletion src/pybind/mgr/volumes/fs/async_cloner.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,10 @@ class Cloner(AsyncJobs):
this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
the driver. file types supported are directories, symbolic links and regular files.
"""
def __init__(self, volume_client, tp_size, snapshot_clone_delay):
def __init__(self, volume_client, tp_size, snapshot_clone_delay, clone_no_wait):
self.vc = volume_client
self.snapshot_clone_delay = snapshot_clone_delay
self.snapshot_clone_no_wait = clone_no_wait
self.state_table = {
SubvolumeStates.STATE_PENDING : handle_clone_pending,
SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
Expand All @@ -355,6 +356,9 @@ def reconfigure_max_concurrent_clones(self, tp_size):
def reconfigure_snapshot_clone_delay(self, timeout):
self.snapshot_clone_delay = timeout

def reconfigure_reject_clones(self, clone_no_wait):
self.snapshot_clone_no_wait = clone_no_wait

def is_clone_cancelable(self, clone_state):
return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))

Expand Down
27 changes: 26 additions & 1 deletion src/pybind/mgr/volumes/fs/operations/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
import orchestrator

from .lock import GlobalLock
from ..exception import VolumeException
from ..exception import VolumeException, IndexException
from ..fs_util import create_pool, remove_pool, rename_pool, create_filesystem, \
remove_filesystem, rename_filesystem, create_mds, volume_exists, listdir
from .trash import Trash
from mgr_util import open_filesystem, CephfsConnectionException
from .clone_index import open_clone_index

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -260,6 +261,30 @@ def get_pending_subvol_deletions_count(fs, path):
return {'pending_subvolume_deletions': num_pending_subvol_del}


def get_all_pending_clones_count(self, mgr, vol_spec):
pending_clones_cnt = 0
index_path = ""
fs_map = mgr.get('fs_map')
for fs in fs_map['filesystems']:
volname = fs['mdsmap']['fs_name']
try:
with open_volume(self, volname) as fs_handle:
with open_clone_index(fs_handle, vol_spec) as index:
index_path = index.path.decode('utf-8')
pending_clones_cnt = pending_clones_cnt \
+ len(listdir(fs_handle, index_path,
filter_entries=None, filter_files=False))
except IndexException as e:
if e.errno == -errno.ENOENT:
continue
raise VolumeException(-e.args[0], e.args[1])
except VolumeException as ve:
log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
raise ve

return pending_clones_cnt


@contextmanager
def open_volume(vc, volname):
"""
Expand Down
13 changes: 10 additions & 3 deletions src/pybind/mgr/volumes/fs/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
from .operations.group import open_group, create_group, remove_group, \
open_group_unique, set_group_attrs
from .operations.volume import create_volume, delete_volume, rename_volume, \
list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count
list_volumes, open_volume, get_pool_names, get_pool_ids, \
get_pending_subvol_deletions_count, get_all_pending_clones_count
from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
create_clone

from .vol_spec import VolSpec
from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
from .exception import VolumeException, ClusterError, ClusterTimeout, \
EvictionError, IndexException
from .async_cloner import Cloner
from .purge_queue import ThreadPoolPurgeQueueMixin
from .operations.template import SubvolumeOpType
Expand Down Expand Up @@ -53,7 +55,8 @@ def __init__(self, mgr):
super().__init__(mgr)
# volume specification
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay,
self.mgr.snapshot_clone_no_wait)
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
# on startup, queue purge job for available volumes to kickstart
# purge for leftover subvolume entries in trash. note that, if the
Expand Down Expand Up @@ -764,6 +767,10 @@ def clone_subvolume_snapshot(self, **kwargs):
s_groupname = kwargs['group_name']

try:
if self.mgr.snapshot_clone_no_wait and \
get_all_pending_clones_count(self, self.mgr, self.volspec) >= self.mgr.max_concurrent_clones:
raise(VolumeException(-errno.EAGAIN, "all cloner threads are busy, please try again later"))

with open_volume(self, volname) as fs_handle:
with open_group(fs_handle, self.volspec, s_groupname) as s_group:
with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
Expand Down
10 changes: 9 additions & 1 deletion src/pybind/mgr/volumes/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,12 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
'periodic_async_work',
type='bool',
default=False,
desc='Periodically check for async work')
desc='Periodically check for async work'),
Option(
'snapshot_clone_no_wait',
type='bool',
default=True,
desc='Reject subvolume clone request when cloner threads are busy')
]

def __init__(self, *args, **kwargs):
Expand All @@ -498,6 +503,7 @@ def __init__(self, *args, **kwargs):
self.max_concurrent_clones = None
self.snapshot_clone_delay = None
self.periodic_async_work = False
self.snapshot_clone_no_wait = None
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
# Initialize config option members
Expand Down Expand Up @@ -532,6 +538,8 @@ def config_notify(self):
else:
self.vc.cloner.unset_wakeup_timeout()
self.vc.purge_queue.unset_wakeup_timeout()
elif opt['name'] == "snapshot_clone_no_wait":
self.vc.cloner.reconfigure_reject_clones(self.snapshot_clone_no_wait)

def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
Expand Down

0 comments on commit 079f722

Please sign in to comment.