Skip to content

Commit

Permalink
Merge PR #36554 into master
Browse files Browse the repository at this point in the history
* refs/pull/36554/head:
	mgr/volumes: Make number of cloner threads configurable

Reviewed-by: Venky Shankar <vshankar@redhat.com>
Reviewed-by: Shyamsundar R <srangana@redhat.com>
  • Loading branch information
batrick committed Oct 8, 2020
2 parents 5c9f77c + 83c4442 commit 2cb09bd
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 5 deletions.
1 change: 1 addition & 0 deletions .githubmap
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,4 @@ kotreshhr Kotresh Hiremath Ravishankar <khiremat@redhat.com>
vkmc Victoria Martinez de la Cruz <vkmc@redhat.com>
gouthampacha Goutham Pacha Ravi <gouthamr@redhat.com>
zdover23 Zac Dover <zac.dover@gmail.com>
ShyamsundarR Shyamsundar R <srangana@redhat.com>
4 changes: 4 additions & 0 deletions doc/cephfs/fs-volumes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ Similar to specifying a pool layout when creating a subvolume, pool layout can b

$ ceph fs subvolume snapshot clone <vol_name> <subvol_name> <snap_name> <target_subvol_name> --pool_layout <pool_layout>

Configure maximum number of concurrent clones. The default is set to 4::

$ ceph config set mgr mgr/volumes/max_concurrent_clones <value>

To check the status of a clone operation use::

$ ceph fs clone status <vol_name> <clone_name> [--group_name <group_name>]
Expand Down
19 changes: 19 additions & 0 deletions qa/tasks/cephfs/test_volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2767,6 +2767,25 @@ def test_subvolume_snapshot_clone(self):
# verify trash dir is clean
self._wait_for_trash_empty()

def test_subvolume_snapshot_reconf_max_concurrent_clones(self):
"""
Validate 'max_concurrent_clones' config option
"""

# get the default number of cloner threads
default_max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
self.assertEqual(default_max_concurrent_clones, 4)

# Increase number of cloner threads
self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 6)
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
self.assertEqual(max_concurrent_clones, 6)

# Decrease number of cloner threads
self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
self.assertEqual(max_concurrent_clones, 2)

def test_subvolume_snapshot_clone_pool_layout(self):
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
Expand Down
3 changes: 3 additions & 0 deletions src/pybind/mgr/volumes/fs/async_cloner.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ def __init__(self, volume_client, tp_size):
}
super(Cloner, self).__init__(volume_client, "cloner", tp_size)

def reconfigure_max_concurrent_clones(self, tp_size):
super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size)

def is_clone_cancelable(self, clone_state):
return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))

Expand Down
27 changes: 27 additions & 0 deletions src/pybind/mgr/volumes/fs/async_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@ def run(self):
thread_id = threading.currentThread()
assert isinstance(thread_id, JobThread)
thread_name = thread_id.getName()
log.debug("thread [{0}] starting".format(thread_name))

while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
vol_job = None
try:
# fetch next job to execute
with self.async_job.lock:
while True:
if self.should_reconfigure_num_threads():
log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
self.async_job.threads.remove(self)
return
vol_job = self.async_job.get_job()
if vol_job:
break
Expand Down Expand Up @@ -62,6 +67,12 @@ def run(self):
time.sleep(1)
log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
with self.async_job.lock:
self.async_job.threads.remove(self)

def should_reconfigure_num_threads(self):
# reconfigure of max_concurrent_clones
return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs

def cancel_job(self):
self.cancel_event.set()
Expand Down Expand Up @@ -103,12 +114,28 @@ def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
# cv for job cancelation
self.waiting = False
self.cancel_cv = threading.Condition(self.lock)
self.nr_concurrent_jobs = nr_concurrent_jobs

self.threads = []
for i in range(nr_concurrent_jobs):
self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
self.threads[-1].start()

def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs):
"""
reconfigure number of cloner threads
"""
with self.lock:
self.nr_concurrent_jobs = nr_concurrent_jobs
# Decrease in concurrency. Notify threads which are waiting for a job to terminate.
if len(self.threads) > nr_concurrent_jobs:
self.cv.notifyAll()
# Increase in concurrency
if len(self.threads) < nr_concurrent_jobs:
for i in range(len(self.threads), nr_concurrent_jobs):
self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i)))
self.threads[-1].start()

def get_job(self):
log.debug("processing {0} volume entries".format(len(self.q)))
nr_vols = len(self.q)
Expand Down
3 changes: 1 addition & 2 deletions src/pybind/mgr/volumes/fs/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ def __init__(self, mgr):
super().__init__(mgr)
# volume specification
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
# TODO: make thread pool size configurable
self.cloner = Cloner(self, 4)
self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
# on startup, queue purge job for available volumes to kickstart
# purge for leftover subvolume entries in trash. note that, if the
Expand Down
39 changes: 36 additions & 3 deletions src/pybind/mgr/volumes/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import traceback
import threading

from mgr_module import MgrModule
import orchestrator
Expand Down Expand Up @@ -375,18 +376,50 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
# volume in the lifetime of this module instance.
]

MODULE_OPTIONS = [
{
'name': 'max_concurrent_clones',
'type': 'int',
'default': 4,
'desc': 'Number of asynchronous cloner threads',
}
]

def __init__(self, *args, **kwargs):
self.inited = False
# for mypy
self.max_concurrent_clones = None
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
self.vc = VolumeClient(self)
self.fs_export = FSExport(self)
self.nfs = NFSCluster(self)
# Initialize config option members
self.config_notify()
with self.lock:
self.vc = VolumeClient(self)
self.fs_export = FSExport(self)
self.nfs = NFSCluster(self)
self.inited = True

def __del__(self):
self.vc.shutdown()

def shutdown(self):
self.vc.shutdown()

def config_notify(self):
"""
This method is called whenever one of our config options is changed.
"""
with self.lock:
for opt in self.MODULE_OPTIONS:
setattr(self,
opt['name'], # type: ignore
self.get_module_option(opt['name'])) # type: ignore
self.log.debug(' mgr option %s = %s',
opt['name'], getattr(self, opt['name'])) # type: ignore
if self.inited:
if opt['name'] == "max_concurrent_clones":
self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)

def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
try:
Expand Down

0 comments on commit 2cb09bd

Please sign in to comment.