diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index f5bcd62ffe..0e19744684 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -199,14 +199,16 @@ def get_hashes(partition_dir, recalculate=[], do_listdir=False, return hashed, hashes -# Hack to work around Eventlet's tpool not catching and reraising Timeouts. We -# return the Timeout, Timeout if it's raised, the caller looks for it and -# reraises it if found. def tpooled_get_hashes(*args, **kwargs): + """ + Hack to work around Eventlet's tpool not catching and reraising Timeouts. + We return the Timeout, None if it's raised, the caller looks for it + and reraises it if found. + """ try: return get_hashes(*args, **kwargs) except Timeout, err: - return err, err + return err, None class ObjectReplicator(Daemon): @@ -422,12 +424,13 @@ def update(self, job): local_hash[suffix] != remote_hash.get(suffix, -1)] if not suffixes: continue - hashed, local_hash = tpool.execute(tpooled_get_hashes, + hashed, recalc_hash = tpool.execute(tpooled_get_hashes, job['path'], recalculate=suffixes, reclaim_age=self.reclaim_age) # See tpooled_get_hashes "Hack". if isinstance(hashed, BaseException): raise hashed + local_hash = recalc_hash suffixes = [suffix for suffix in local_hash if local_hash[suffix] != remote_hash.get(suffix, -1)] self.rsync(node, job, suffixes) diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index a9f5497226..bd54ec8905 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -26,6 +26,7 @@ import tempfile from contextlib import contextmanager from eventlet.green import subprocess +from eventlet import Timeout, tpool from test.unit import FakeLogger from swift.common import utils from swift.common.utils import hash_path, mkdirs, normalize_timestamp @@ -201,7 +202,8 @@ def test_get_hashes(self): hashed, hashes = object_replicator.get_hashes(part, do_listdir=True) self.assertEquals(hashed, 0) self.assert_('a83' in hashes) - hashed, hashes = object_replicator.get_hashes(part, recalculate=['a83']) + hashed, hashes = object_replicator.get_hashes(part, + recalculate=['a83']) self.assertEquals(hashed, 1) self.assert_('a83' in hashes) @@ -364,42 +366,106 @@ def test_run_once_recover_from_failure(self): dict(swift_dir=self.testdir, devices=self.devices, mount_check='false', timeout='300', stats_interval='1')) was_connector = object_replicator.http_connect - object_replicator.http_connect = mock_http_connect(200) - # Write some files into '1' and run replicate- they should be moved - # to the other partitoins and then node should get deleted. - cur_part = '1' - df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o', - FakeLogger()) - mkdirs(df.datadir) - f = open(os.path.join(df.datadir, - normalize_timestamp(time.time()) + '.data'), - 'wb') - f.write('1234567890') - f.close() - ohash = hash_path('a', 'c', 'o') - data_dir = ohash[-3:] - whole_path_from = os.path.join(self.objects, cur_part, data_dir) - process_arg_checker = [] - nodes = [node for node in - self.ring.get_part_nodes(int(cur_part)) \ - if node['ip'] not in _ips()] - for node in nodes: - rsync_mod = '[%s]::object/sda/objects/%s' % (node['ip'], cur_part) - process_arg_checker.append( - (0, '', ['rsync', whole_path_from, rsync_mod])) - self.assertTrue(os.access(os.path.join(self.objects, - '1', data_dir, ohash), - os.F_OK)) - with _mock_process(process_arg_checker): - replicator.run_once() - self.assertFalse(process_errors) - for i, result in [('0', True), ('1', False), - ('2', True), ('3', True)]: - self.assertEquals(os.access( - os.path.join(self.objects, - i, object_replicator.HASH_FILE), - os.F_OK), result) - object_replicator.http_connect = was_connector + try: + object_replicator.http_connect = mock_http_connect(200) + # Write some files into '1' and run replicate- they should be moved + # to the other partitoins and then node should get deleted. + cur_part = '1' + df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o', + FakeLogger()) + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('1234567890') + f.close() + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, cur_part, data_dir) + process_arg_checker = [] + nodes = [node for node in + self.ring.get_part_nodes(int(cur_part)) \ + if node['ip'] not in _ips()] + for node in nodes: + rsync_mod = '[%s]::object/sda/objects/%s' % (node['ip'], + cur_part) + process_arg_checker.append( + (0, '', ['rsync', whole_path_from, rsync_mod])) + self.assertTrue(os.access(os.path.join(self.objects, + '1', data_dir, ohash), + os.F_OK)) + with _mock_process(process_arg_checker): + replicator.run_once() + self.assertFalse(process_errors) + for i, result in [('0', True), ('1', False), + ('2', True), ('3', True)]: + self.assertEquals(os.access( + os.path.join(self.objects, + i, object_replicator.HASH_FILE), + os.F_OK), result) + finally: + object_replicator.http_connect = was_connector + + def test_run_once_recover_from_timeout(self): + replicator = object_replicator.ObjectReplicator( + dict(swift_dir=self.testdir, devices=self.devices, + mount_check='false', timeout='300', stats_interval='1')) + was_connector = object_replicator.http_connect + was_get_hashes = object_replicator.get_hashes + was_execute = tpool.execute + self.get_hash_count = 0 + try: + + def fake_get_hashes(*args, **kwargs): + self.get_hash_count += 1 + if self.get_hash_count == 3: + # raise timeout on last call to get hashes + raise Timeout() + return 2, {'abc': 'def'} + + def fake_exc(tester, *args, **kwargs): + if 'Error syncing partition' in args[0]: + tester.i_failed = True + + self.i_failed = False + object_replicator.http_connect = mock_http_connect(200) + object_replicator.get_hashes = fake_get_hashes + replicator.logger.exception = \ + lambda *args, **kwargs: fake_exc(self, *args, **kwargs) + # Write some files into '1' and run replicate- they should be moved + # to the other partitoins and then node should get deleted. + cur_part = '1' + df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o', + FakeLogger()) + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('1234567890') + f.close() + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, cur_part, data_dir) + process_arg_checker = [] + nodes = [node for node in + self.ring.get_part_nodes(int(cur_part)) \ + if node['ip'] not in _ips()] + for node in nodes: + rsync_mod = '[%s]::object/sda/objects/%s' % (node['ip'], + cur_part) + process_arg_checker.append( + (0, '', ['rsync', whole_path_from, rsync_mod])) + self.assertTrue(os.access(os.path.join(self.objects, + '1', data_dir, ohash), + os.F_OK)) + with _mock_process(process_arg_checker): + replicator.run_once() + self.assertFalse(process_errors) + self.assertFalse(self.i_failed) + finally: + object_replicator.http_connect = was_connector + object_replicator.get_hashes = was_get_hashes + tpool.execute = was_execute def test_run(self): with _mock_process([(0, '')] * 100):