Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pacific: mgr/volumes: Add config to insert delay at the beginning of the clone #42086

Merged
merged 1 commit into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 34 additions & 0 deletions qa/tasks/cephfs/test_volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3226,6 +3226,9 @@ def test_subvolume_clone_in_progress_getpath(self):
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)

# Insert delay at the beginning of snapshot clone
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)

# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)

Expand Down Expand Up @@ -3272,6 +3275,9 @@ def test_subvolume_clone_in_progress_snapshot_rm(self):
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)

# Insert delay at the beginning of snapshot clone
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)

# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)

Expand Down Expand Up @@ -3317,6 +3323,9 @@ def test_subvolume_clone_in_progress_source(self):
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)

# Insert delay at the beginning of snapshot clone
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)

# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)

Expand Down Expand Up @@ -3801,6 +3810,9 @@ def test_subvolume_snapshot_clone_cancel_in_progress(self):
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)

# Insert delay at the beginning of snapshot clone
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)

# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)

Expand Down Expand Up @@ -4200,6 +4212,9 @@ def test_subvolume_snapshot_clone_with_upgrade(self):
# ensure metadata file is in legacy location, with required version v1
self._assert_meta_location_and_version(self.volname, subvolume, version=1, legacy=True)

# Insert delay at the beginning of snapshot clone
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)

# schedule a clone
self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)

Expand Down Expand Up @@ -4249,6 +4264,25 @@ def test_subvolume_snapshot_reconf_max_concurrent_clones(self):
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
self.assertEqual(max_concurrent_clones, 2)

def test_subvolume_snapshot_config_snapshot_clone_delay(self):
"""
Validate 'snapshot_clone_delay' config option
"""

# get the default delay before starting the clone
default_timeout = int(self.config_get('mgr', 'mgr/volumes/snapshot_clone_delay'))
self.assertEqual(default_timeout, 0)

# Insert delay of 2 seconds at the beginning of the snapshot clone
self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 2)
default_timeout = int(self.config_get('mgr', 'mgr/volumes/snapshot_clone_delay'))
self.assertEqual(default_timeout, 2)

# Decrease number of cloner threads
self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
self.assertEqual(max_concurrent_clones, 2)

def test_subvolume_under_group_snapshot_clone(self):
subvolume = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
Expand Down
17 changes: 12 additions & 5 deletions src/pybind/mgr/volumes/fs/async_cloner.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,15 @@ def handle_clone_complete(volume_client, volname, index, groupname, subvolname,
log.error("failed to detach clone from snapshot: {0}".format(e))
return (None, True)

def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel):
def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay):
finished = False
current_state = None
try:
current_state = get_clone_state(volume_client, volname, groupname, subvolname)
log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
if current_state == SubvolumeStates.STATE_PENDING:
time.sleep(snapshot_clone_delay)
log.info("Delayed cloning ({0}, {1}, {2}) -- by {3} seconds".format(volname, groupname, subvolname, snapshot_clone_delay))
while not finished:
handler = state_table.get(current_state, None)
if not handler:
Expand All @@ -244,7 +247,7 @@ def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_t
log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
subvolname, current_state, ve))

def clone(volume_client, volname, index, clone_path, state_table, should_cancel):
def clone(volume_client, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay):
log.info("cloning to subvolume path: {0}".format(clone_path))
resolved = resolve(volume_client.volspec, clone_path)

Expand All @@ -254,7 +257,7 @@ def clone(volume_client, volname, index, clone_path, state_table, should_cancel)

try:
log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel)
start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay)
log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
except VolumeException as ve:
log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
Expand All @@ -265,8 +268,9 @@ class Cloner(AsyncJobs):
this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
the driver. file types supported are directories, symbolic links and regular files.
"""
def __init__(self, volume_client, tp_size):
def __init__(self, volume_client, tp_size, snapshot_clone_delay):
self.vc = volume_client
self.snapshot_clone_delay = snapshot_clone_delay
self.state_table = {
SubvolumeStates.STATE_PENDING : handle_clone_pending,
SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
Expand All @@ -279,6 +283,9 @@ def __init__(self, volume_client, tp_size):
def reconfigure_max_concurrent_clones(self, tp_size):
return super(Cloner, self).reconfigure_max_async_threads(tp_size)

def reconfigure_snapshot_clone_delay(self, timeout):
self.snapshot_clone_delay = timeout

def is_clone_cancelable(self, clone_state):
return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))

Expand Down Expand Up @@ -344,4 +351,4 @@ def get_next_job(self, volname, running_jobs):
return get_next_clone_entry(self.vc, volname, running_jobs)

def execute_job(self, volname, job, should_cancel):
clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel)
clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay)
2 changes: 1 addition & 1 deletion src/pybind/mgr/volumes/fs/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, mgr):
super().__init__(mgr)
# volume specification
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
# on startup, queue purge job for available volumes to kickstart
# purge for leftover subvolume entries in trash. note that, if the
Expand Down
11 changes: 9 additions & 2 deletions src/pybind/mgr/volumes/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,19 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule):
'max_concurrent_clones',
type='int',
default=4,
desc='Number of asynchronous cloner threads',
)
desc='Number of asynchronous cloner threads'),
Option(
'snapshot_clone_delay',
type='int',
default=0,
desc='Delay clone begin operation by snapshot_clone_delay seconds')
]

def __init__(self, *args, **kwargs):
self.inited = False
# for mypy
self.max_concurrent_clones = None
self.snapshot_clone_delay = None
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
# Initialize config option members
Expand Down Expand Up @@ -378,6 +383,8 @@ def config_notify(self):
if self.inited:
if opt['name'] == "max_concurrent_clones":
self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
elif opt['name'] == "snapshot_clone_delay":
self.vc.cloner.reconfigure_snapshot_clone_delay(self.snapshot_clone_delay)

def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
Expand Down