Skip to content

Commit

Permalink
Fixed exception handling for rsync and improved cleanup
Browse files Browse the repository at this point in the history
- Refactored the code for all worknunits/non-all workunits execution
- Handle exception raised by rsync(files vanished and cannot be transferred) during removal of kernel build
- Cleanup of rsync dir when the workunit task is finished

Signed-off-by: Sidharth Anupkrishnan <sanupkri@redhat.com>
  • Loading branch information
sidharthanup committed Feb 18, 2021
1 parent 4861387 commit 0dc1bc2
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 77 deletions.
4 changes: 4 additions & 0 deletions qa/tasks/cephfs/filesystem.py
Expand Up @@ -595,6 +595,10 @@ def set_allow_new_snaps(self, yes):
def required_client_features(self, *args, **kwargs):
c = ["fs", "required_client_features", self.name, *args]
return self.mon_manager.run_cluster_cmd(args=c, **kwargs)

def create_snapshot(self, path, snapshot_name):
snap_shot = path + ".snap/" + snapshot_name
self.mon_manager.raw_cluster_cmd("mkdir", snap_shot)

# In Octopus+, the PG count can be omitted to use the default. We keep the
# hard-coded value for deployments of Mimic/Nautilus.
Expand Down
5 changes: 5 additions & 0 deletions qa/tasks/cephfs/mount.py
Expand Up @@ -657,6 +657,11 @@ def create_destroy(self):
'sudo', 'rm', '-f', os.path.join(self.hostfs_mntpt, filename)
])

def create_snapshot(self, path, snapshot_name):
assert(self.is_mounted())
snap_shot = path + ".snap/" + snapshot_name
self.run_shell(["mkdir", snap_shot])

def _run_python(self, pyscript, py_version='python3'):
return self.client_remote.run(
args=['sudo', 'adjust-ulimits', 'daemon-helper', 'kill',
Expand Down
116 changes: 68 additions & 48 deletions qa/tasks/rsync.py
@@ -1,7 +1,9 @@
import logging
import contextlib
import time
import socket
from teuthology import misc
from teuthology.exceptions import CommandFailedError
from gevent.greenlet import Greenlet
from gevent.event import Event
from tasks.cephfs.filesystem import Filesystem
Expand Down Expand Up @@ -98,30 +100,28 @@ def __init__(self, ctx, config, logger):
self.wait_time = self.config.get('waittime', 5)
self.mount_point = self.config.get('mountpoint')

self.fs = Filesystem(self.ctx)

self.snap_enable = bool(self.config.get('snapenable', False))

self.data_dir = self.config.get('data_dir')

#Get CephFS mount client and mount object
if len(self.ctx.mounts.items()):
flag = False
for i, j in sorted(self.ctx.mounts.items()):
tmp = int(i)
if tmp == self.mount_point:
self.mount_point = tmp
self.my_mnt = j
if (not self.mount_point) and (not flag):
self.mount_point = tmp
self.my_mnt = j
flag = True
else:
assert len(self.ctx.mounts.items()) > 0, 'No mount available asserting rsync'
assert len(self.ctx.mounts.items()) > 0, 'No mount available asserting rsync'

if not self.mount_point:
mounts = list(self.ctx.mounts.items())
i, j = mounts[0]
self.mount_point = i
self.my_mnt = j
else:
for i, j in list(sorted(self.ctx.mounts.items())):
if self.mount_point == i:
self.my_mnt = j
break
#Set source directory for rsync
if self.config.get('data_dir'):
if self.data_dir:
self.work_unit = True
self.source_dir = misc.get_testdir(self.ctx) + '/mnt.{}'.format(self.mount_point) + \
'/{}/'.format(self.config.get('data_dir'))
'/{}/'.format(self.data_dir)
else:
self.data_dir = misc.get_testdir(self.ctx) + '/mnt.{}/'.format(self.mount_point) + 'source'
self.source_dir = self.data_dir + '/subdir'
Expand All @@ -148,69 +148,79 @@ def check_if_dir_exists(self, path):
try:
self.my_mnt.stat(path)
return True
except Exception, e :
except Exception as e :
logging.error(e)
return False

def do_rsync(self):

self.fs = Filesystem(self.ctx)

iteration = 0
finished = False
should_stop = False

# Create destination directory
self.my_mnt.run_shell(["mkdir", "rsyncdir"])

if self.snap_enable:
# Enable snapshots
self.fs.mon_manager.raw_cluster_cmd("mds", "set", "allow_new_snaps", "true", "--yes-i-really-mean-it")

if not self.work_unit:
# Create a data directory, sub directory and rsync directory
self.my_mnt.run_shell(["mkdir", "{}".format(self.data_dir)])
self.my_mnt.run_shell(["mkdir", "{}".format(self.source_dir)])
should_stop = False

#Check for source directory exists
while not self.check_if_dir_exists(self.source_dir):
time.sleep(5) # if source dorectory not exists wait for 5s and poll
time.sleep(5) # if source directory not exists wait for 5s and poll
iteration += 1
if iteration > 5:
assert self.check_if_dir_exists(self.source_dir), 'assert, source Directory doesnot exists'
assert self.check_if_dir_exists(self.source_dir), 'assert, source Directory does not exists'

# Start observing the event started by workunit task.
if self.work_unit:
should_stop = self.ctx.workunit_state.start_observing()

iteration = 0

while not (should_stop or self.stopping.is_set()):

# rsync data from snapshot. snap is created using workunit IO data
if self.work_unit and self.snap_enable:

snap_shot = self.source_dir + '.snap/snap' + '{}'.format(iteration)
snapshot_name = 'snap' + '{}'.format(iteration)

# Create Snapshot
self.my_mnt.run_shell(["mkdir", "{}".format(snap_shot)])
self.my_mnt.create_snapshot(self.source_dir, snapshot_name)
iteration += 1

self.my_mnt.run_shell(["rsync", "-azvh", "{}".format(snap_shot), "rsyncdir/dir1/"])

# Delete snapshot
self.my_mnt.run_shell(["rmdir", "{}".format(snap_shot)])

# Check for even handler stop message
finished = self.ctx.workunit_state.observer_should_stop()
snap_shot = self.source_dir + '.snap/' + snapshot_name

try:
self.my_mnt.run_shell(["rsync", "-azvh", snap_shot, "rsyncdir/dir1/"])
except CommandFailedError as e:
if e.exitstatus == 24:
log.info("Some files vanished before they could be transferred")
else:
raise
except socket.timeout:
log.info("IO timeout between worker and observer")
finally:
# Delete snapshot
self.my_mnt.run_shell(["rmdir", "{}".format(snap_shot)])

# Check for even handler stop message
should_stop = self.ctx.workunit_state.observer_should_stop()

# rsync data from snapshot, snap is created using written pattern data
elif self.snap_enable:
# Create file and add data to the file
self.my_mnt.write_test_pattern("{}/file_a".format(self.source_dir), self.file_size * 1024 * 1024)
snap_shot = self.data_dir + '/.snap/snap' + '{}'.format(iteration)

snapshot_name = 'snap' + '{}'.format(iteration)

# Create Snapshot
self.my_mnt.run_shell(["mkdir", "{}".format(snap_shot)])
# Create Snapshoti
self.my_mnt.create_snapshot(self.data_dir, snapshot_name)
iteration += 1

self.my_mnt.run_shell(["rsync", "-azvh", "{}".format(snap_shot), "rsyncdir/dir{}/".format(iteration)])
snap_shot = self.data_dir + './snap' + snapshot_name

# Delete snapshot
self.my_mnt.run_shell(["rmdir", "{}".format(snap_shot)])
Expand All @@ -220,9 +230,18 @@ def do_rsync(self):

# rsync data from workunit IO data
elif self.work_unit:
self.my_mnt.run_shell(["rsync", "-azvh", "{}".format(self.source_dir), "rsyncdir/dir1/"])
# Check for event handler stop message
finished = self.ctx.workunit_state.observer_should_stop()
try:
self.my_mnt.run_shell(["rsync", "-azvh", "{}".format(self.source_dir), "rsyncdir/dir1/"])
except CommandFailedError as e:
if e.exitstatus == 24:
log.info("Some files vanished before they could be transferred")
else:
raise
except socket.timeout:
log.info("IO timeout between worker and observer")
finally:
# Check for event handler stop message
should_stop = self.ctx.workunit_state.observer_should_stop()

# rsync data from written pattern data
else:
Expand All @@ -236,7 +255,10 @@ def do_rsync(self):
self.my_mnt.run_shell(["rm", "-f", "{}/file_a".format(self.source_dir)])

# Send back stop request to event handler in workunit task.
if finished:
if should_stop:
log.debug("I am here")
self.my_mnt.run_shell(["rm", "-rf", "rsyncdir/"])
self.my_mnt.run_shell(["rm", "-rf", "{}".format(self.source_dir)])
self.ctx.workunit_state.stop_observing()

time.sleep(self.wait_time)
Expand All @@ -253,18 +275,16 @@ def task(ctx, config):

run_time = config.get('runtime', 0)

log.info("Create object and start the gevent thread")
start_rsync = RSync(ctx, config, logger=log.getChild('rsync'))
start_rsync.start()
start_rsync_thread = start_rsync

try:
log.debug('Yielding')
yield
time.sleep(run_time)
finally:
log.info('joining rsync thread')
start_rsync_thread.stop()
start_rsync_thread.get()
start_rsync_thread.join()
start_rsync.stop()
start_rsync.get()
start_rsync.join()
log.info("Done joining")
49 changes: 20 additions & 29 deletions qa/tasks/workunit.py
Expand Up @@ -9,6 +9,7 @@

from tasks.util import get_remote_for_role
from tasks.util.workunit import get_refspec_after_overrides
from gevent.event import Event

from teuthology import misc
from teuthology.config import config as teuth_config
Expand All @@ -23,25 +24,21 @@ class WorkunitState(object):
Shared state between the workunit task and any other task that
might want to observe and/or interact with the data being created,
such as creating snapshots of it or copying it out (like the rsync task).
Usage from workunit:
ctx.workunit_state = WorkunitState()
# ... Create my directory
ctx.workunit_state.started()
# ... Do some long running activity
ctx.workunit_state.finished()
# ... Remove my directory
Usage from observer:
# ... wait until hasattr(ctx, "workunit_state") is true...
should_stop = ctx.workunit_state.start_observing()
while not should_stop:
# ... do some work on the workunit directory, it is
# guaranteed to exist until we call stop_observing...
finished = ctx.workunit_state.observer_should_stop()
should_stop = ctx.workunit_state.observer_should_stop()
ctx.workunit_state.stop_observing()
"""
def __init__(self):
# Whether some other task may be acting on this
Expand Down Expand Up @@ -86,15 +83,14 @@ def finished(self):
def start_observing(self):
"""
Call this from the observer thread to start observing.
:return: True if the workunit already finished (observer should
not touch the workunit dir)
False if the workunit has not finished and the observer
is free to proceed as normal.
"""
assert not self.observed
self.workunit_started.wait()
finished = self.workunit_finished.is_set():
finished = self.workunit_finished.is_set()
if not finished:
self.observed = True
self.workunit_finished.is_set()
Expand Down Expand Up @@ -122,7 +118,6 @@ def observer_should_stop(self):
return self.workunit_finished.is_set()



def task(ctx, config):
"""
Run ceph on all workunits found under the specified path.
Expand Down Expand Up @@ -227,35 +222,31 @@ def task(ctx, config):

ctx.workunit_state = WorkunitState()

# Execute any non-all workunits
log.info("timeout={}".format(timeout))
log.info("cleanup={}".format(cleanup))
ctx.workunit_state.started()
with parallel() as p:
for role, tests in clients.items():
if role != "all":
if 'all' in clients:
# Execute any 'all' workunits
all_tasks = clients["all"]
_spawn_on_all_clients(ctx, refspec, all_tasks, config.get('env'),
config.get('basedir', 'qa/workunits'),
config.get('subdir'), timeout=timeout,
cleanup=cleanup)
else:
# Execute any non-all workunits
ctx.workunit_state.started()
log.info("timeout={}".format(timeout))
log.info("cleanup={}".format(cleanup))
with parallel() as p:
for role, tests in clients.items():
p.spawn(_run_tests, ctx, refspec, role, tests,
config.get('env'),
basedir=config.get('basedir','qa/workunits'),
timeout=timeout,
cleanup=cleanup,
coverage_and_limits=not config.get('no_coverage_and_limits', None))
ctx.workunit_state.finished()

timeout=timeout,cleanup=cleanup)
ctx.workunit_state.finished()

if cleanup:
# Clean up dirs from any non-all workunits
for role, created in created_mountpoint.items():
_delete_dir(ctx, role, created)

# Execute any 'all' workunits
if 'all' in clients:
all_tasks = clients["all"]
_spawn_on_all_clients(ctx, refspec, all_tasks, config.get('env'),
config.get('basedir', 'qa/workunits'),
config.get('subdir'), timeout=timeout,
cleanup=cleanup)


def _client_mountpoint(ctx, cluster, id_):
"""
Returns the path to the expected mountpoint for workunits running
Expand Down

0 comments on commit 0dc1bc2

Please sign in to comment.