From 6ac0c852660187d98a036b0541a87d994f262b39 Mon Sep 17 00:00:00 2001 From: andreibara Date: Sat, 29 Mar 2014 14:57:46 +0300 Subject: [PATCH] CBQE-0: use wait_for_persistence instead of waiting many stats Change-Id: I67bd6541e8389186a2f49359a64d1aaa0631d3f5 Reviewed-on: http://review.couchbase.org/35067 Reviewed-by: Andrei Baranouski Tested-by: Andrei Baranouski --- pytests/backuptests.py | 51 +++++++++++-------------------- pytests/drainratetests.py | 8 ++--- pytests/memcachedops/stats_ops.py | 24 +++------------ pytests/memcapable.py | 5 ++- pytests/performance/perf.py | 16 +++------- pytests/rebalancetests.py | 15 ++++----- pytests/warmupcluster.py | 3 +- 7 files changed, 38 insertions(+), 84 deletions(-) diff --git a/pytests/backuptests.py b/pytests/backuptests.py index b2540cbdc8..1d487d7ad5 100644 --- a/pytests/backuptests.py +++ b/pytests/backuptests.py @@ -122,10 +122,8 @@ def _test_backup_add_restore_bucket_body(self, msg="replication did not complete") self.log.info("Sleep {0} seconds after data load".format(delay_after_data_load)) - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_queue_size', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_flusher_todo', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") + ready = RebalanceHelper.wait_for_persistence(self.master, bucket) + self.assertTrue(ready, "not all items persisted. see logs") node = RestConnection(self.master).get_nodes_self() if not startup_flag: for server in self.servers: @@ -187,10 +185,8 @@ def _test_backup_add_restore_bucket_with_expiration_key(self, replica): self.fail(msg.format(key, client.vbucketId, error.status)) client.close() self.log.info("inserted {0} keys with expiry set to {1}".format(len(keys), expiry)) - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_queue_size', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_flusher_todo', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") + ready = RebalanceHelper.wait_for_persistence(self.master, bucket) + self.assertTrue(ready, "not all items persisted. see logs") node = RestConnection(self.master).get_nodes_self() output, error = self.shell.execute_command(self.perm_command) @@ -236,10 +232,8 @@ def _test_backup_and_restore_bucket_overwriting_body(self, overwrite_flag=True): self.fail(msg.format(key, client.vbucketId, error.status)) self.log.info("inserted {0} keys with expiry set to {1}".format(len(keys), expiry)) - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_queue_size', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_flusher_todo', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") + ready = RebalanceHelper.wait_for_persistence(self.master, bucket) + self.assertTrue(ready, "not all items persisted. see logs") for server in self.servers: shell = RemoteMachineShellConnection(server) @@ -288,11 +282,8 @@ def _test_cluster_topology_change_body(self): number_of_threads=2) self.log.info("Sleep after data load") - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_queue_size', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_flusher_todo', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") - + ready = RebalanceHelper.wait_for_persistence(self.master, bucket) + self.assertTrue(ready, "not all items persisted. see logs") #let's create a unique folder in the remote location for server in self.servers: shell = RemoteMachineShellConnection(server) @@ -347,11 +338,8 @@ def _test_delete_key_and_backup_and_restore_body(self): client.delete(keys[0]) - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_queue_size', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_flusher_todo', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") - + ready = RebalanceHelper.wait_for_persistence(self.master, bucket) + self.assertTrue(ready, "not all items persisted. see logs") #let's create a unique folder in the remote location for server in self.servers: shell = RemoteMachineShellConnection(server) @@ -402,11 +390,8 @@ def _test_backup_and_restore_on_different_port_body(self): number_of_threads=2) self.log.info("Sleep after data load") - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket_before_backup, 'ep_queue_size', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket_before_backup, 'ep_flusher_todo', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") - + ready = RebalanceHelper.wait_for_persistence(self.master, bucket) + self.assertTrue(ready, "not all items persisted. see logs") for server in self.servers: shell = RemoteMachineShellConnection(server) output, error = shell.execute_command(self.perm_command) @@ -448,10 +433,8 @@ def _test_backup_and_restore_from_to_different_buckets(self): number_of_threads=2) self.log.info("Sleep after data load") - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket_before_backup, 'ep_queue_size', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") - ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket_before_backup, 'ep_flusher_todo', 0) - self.assertTrue(ready, "wait_for ep_queue_size == 0 failed") + ready = RebalanceHelper.wait_for_persistence(self.master, bucket) + self.assertTrue(ready, "not all items persisted. see logs") for server in self.servers: shell = RemoteMachineShellConnection(server) @@ -485,14 +468,14 @@ def test_backup_upgrade_restore_default(self): return original_set = copy.copy(self.servers) worker = self.servers[len(self.servers) - 1] - self.servers = self.servers[:len(self.servers)-1] + self.servers = self.servers[:len(self.servers) - 1] shell = RemoteMachineShellConnection(self.master) o, r = shell.execute_command("cat /opt/couchbase/VERSION.txt") fin = o[0] shell.disconnect() initial_version = self.input.param("initial_version", fin) final_version = self.input.param("final_version", fin) - if initial_version==final_version: + if initial_version == final_version: self.log.error("Same initial and final versions ..") return if not final_version.startswith('2.0'): @@ -623,7 +606,7 @@ def test_backup_upgrade_restore_default(self): if len(self.servers) > 1: removed = helper.remove_nodes(knownNodes=[node.id for node in nodes], ejectedNodes=[node.id for node in nodes if node.id != master_id], - wait_for_rebalance=True ) + wait_for_rebalance=True) shell = RemoteMachineShellConnection(worker) shell.remove_directory(remote_tmp) diff --git a/pytests/drainratetests.py b/pytests/drainratetests.py index 066e9a63c2..2e350511f5 100644 --- a/pytests/drainratetests.py +++ b/pytests/drainratetests.py @@ -51,7 +51,7 @@ def _create_default_bucket(self, replica=1): node_ram_ratio = BucketOperationHelper.base_bucket_ratio(self.input.servers) info = rest.get_nodes_self() available_ram = info.memoryQuota * node_ram_ratio - if( available_ram < 256): + if(available_ram < 256): available_ram = 256 rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram), replicaNumber=replica) ready = BucketOperationHelper.wait_for_memcached(master, name) @@ -68,7 +68,7 @@ def _load_data_for_buckets(self): for bucket in buckets: name = bucket.name.encode("ascii", "ignore") self.bucket_data[name] = {} - self.bucket_data[name]["inserted_keys"], self.bucket_data[name]["rejected_keys"] =\ + self.bucket_data[name]["inserted_keys"], self.bucket_data[name]["rejected_keys"] = \ MemcachedClientHelper.load_bucket_and_return_the_keys(name=self.bucket, servers=[self.master], value_size_distribution=distribution, @@ -94,9 +94,7 @@ def _monitor_drain_queue(self): start = time.time() stats = rest.get_bucket_stats(self.bucket) self.log.info("current ep_queue_size: {0}".format(stats["ep_queue_size"])) - verified = RebalanceHelper.wait_for_stats(self.master, self.bucket, 'ep_queue_size', 0, timeout_in_seconds=300, verbose=False)\ - and RebalanceHelper.wait_for_stats(self.master, self.bucket, 'ep_flusher_todo', 0, timeout_in_seconds=300, verbose=False) - self.drained = verified + self.drained = RebalanceHelper.wait_for_persistence(self.master, self.bucket, timeout=300) self.drained_in_seconds = time.time() - start diff --git a/pytests/memcachedops/stats_ops.py b/pytests/memcachedops/stats_ops.py index 3eac973759..20686b9cfe 100644 --- a/pytests/memcachedops/stats_ops.py +++ b/pytests/memcachedops/stats_ops.py @@ -1,13 +1,10 @@ import time from membase.api.rest_client import RestConnection, Bucket -from couchbase.documentgenerator import DocumentGenerator +from membase.helper.rebalance_helper import RebalanceHelper from memcached.helper.data_helper import MemcachedClientHelper from basetestcase import BaseTestCase -from memcached.helper.kvstore import KVStore from mc_bin_client import MemcachedError -from membase.helper.cluster_helper import ClusterOperationHelper from couchbase.documentgenerator import BlobGenerator -from remote.remote_util import RemoteMachineShellConnection from threading import Thread class StatsCrashRepro(BaseTestCase): @@ -47,17 +44,6 @@ def _load_doc_data_all_buckets(self, op_type='create', start=0, expiry=0): count += 1 time.sleep(5) - def _wait_for_stats_all_buckets(self, servers): - - for server in servers: - for bucket in self.buckets: - self.cluster.wait_for_stats([server], bucket, '', - 'ep_queue_size', '==', 0) - self.cluster.wait_for_stats([server], bucket, '', - 'ep_flusher_todo', '==', 0) - self.cluster.wait_for_stats([server], bucket, '', - 'ep_uncommitted_items', '==', 0) - def _get_stats(self, stat_str='all'): # for server in self.nodes_server: @@ -101,7 +87,8 @@ def run_test(self): self.log.info("DGM state achieved!!!!") # wait for draining of data before restart and warm up - self._wait_for_stats_all_buckets(self.nodes_server) + for bucket in self.buckets: + RebalanceHelper.wait_for_persistence(self.nodes_server[0], bucket) while 1: @@ -134,7 +121,4 @@ def run_test(self): del stats_reset_thread # read_data_task.result() - read_data_task.join() - - - + read_data_task.join() \ No newline at end of file diff --git a/pytests/memcapable.py b/pytests/memcapable.py index 7bdba8703c..9c33eb9c6e 100644 --- a/pytests/memcapable.py +++ b/pytests/memcapable.py @@ -397,7 +397,7 @@ def test_getr(self): prefix = str(uuid.uuid4())[:7] BucketOperationHelper.delete_all_buckets_or_assert([self.master], self) - BucketOperationHelper.create_bucket(self.master, name=self.default_bucket_name, replica=replica_count, port=11210, test_case=self, bucket_ram= -1, password="") + BucketOperationHelper.create_bucket(self.master, name=self.default_bucket_name, replica=replica_count, port=11210, test_case=self, bucket_ram=-1, password="") if rebalance == GetrTests.DURING_REBALANCE or rebalance == GetrTests.AFTER_REBALANCE: # leave 1 node unclustered for rebalance in @@ -1074,8 +1074,7 @@ def _do_warmup(self, howmany, timeout_in_seconds=1800): time.sleep(10) curr_items = int(self.onenodemc.stats()["curr_items"]) uptime = int(self.onenodemc.stats()["uptime"]) - RebalanceHelper.wait_for_stats(self.master, "default", 'ep_queue_size', 0) - RebalanceHelper.wait_for_stats(self.master, "default", 'ep_flusher_todo', 0) + RebalanceHelper.wait_for_persistence(self.master, "default") self.log.info("sleeping for 10 seconds") time.sleep(10) rest = RestConnection(self.master) diff --git a/pytests/performance/perf.py b/pytests/performance/perf.py index 2ea7118554..5856652a14 100644 --- a/pytests/performance/perf.py +++ b/pytests/performance/perf.py @@ -677,7 +677,7 @@ def load(self, num_items, min_value_size=None, doc_cache=1, use_direct=True, report=0, - start_at= -1, + start_at=-1, collect_server_stats=True, is_eperf=False, hot_shift=0): @@ -733,7 +733,7 @@ def load(self, num_items, min_value_size=None, if is_eperf: collect_server_stats = self.parami("prefix", 0) == 0 client_id = self.parami("prefix", 0) - sc = self.start_stats("{0}.{1}".format(self.spec_reference, phase), # stats spec e.x: testname.load + sc = self.start_stats("{0}.{1}".format(self.spec_reference, phase), # stats spec e.x: testname.load test_params=cfg_params, client_id=client_id, collect_server_stats=collect_server_stats) @@ -902,7 +902,7 @@ def loop(self, num_ops=None, doc_cache=1, use_direct=True, collect_server_stats=True, - start_at= -1, + start_at=-1, report=0, ctl=None, hot_shift=0, @@ -1064,16 +1064,10 @@ def wait_until_drained(self): master = self.input.servers[0] bucket = self.param("bucket", "default") - - RebalanceHelper.wait_for_stats_on_all(master, bucket, - 'ep_queue_size', 0, - fn=RebalanceHelper.wait_for_stats_no_timeout) - RebalanceHelper.wait_for_stats_on_all(master, bucket, - 'ep_flusher_todo', 0, - fn=RebalanceHelper.wait_for_stats_no_timeout) + ready = RebalanceHelper.wait_for_persistence(self.master, bucket) + self.assertTrue(ready, "not all items persisted. see logs") self.log.info("disk write queue has been drained") - return time.time() def wait_until_repl(self): diff --git a/pytests/rebalancetests.py b/pytests/rebalancetests.py index e88a87f8fa..9a6bbb6e0c 100644 --- a/pytests/rebalancetests.py +++ b/pytests/rebalancetests.py @@ -199,7 +199,7 @@ def bucket_data_init(rest): return bucket_data @staticmethod - def load_data(master, bucket, keys_count= -1, load_ratio= -1, delete_ratio=0, expiry_ratio=0, test=None): + def load_data(master, bucket, keys_count=-1, load_ratio=-1, delete_ratio=0, expiry_ratio=0, test=None): log = logger.Logger.get_logger() inserted_keys, rejected_keys = \ MemcachedClientHelper.load_bucket_and_return_the_keys(servers=[master], @@ -222,10 +222,7 @@ def load_data(master, bucket, keys_count= -1, load_ratio= -1, delete_ratio=0, ex def verify_data(master, inserted_keys, bucket, test): log = logger.Logger.get_logger() log.info("Verifying data") - ready = RebalanceHelper.wait_for_stats_on_all(master, bucket, 'ep_queue_size', 0) - test.assertTrue(ready, "wait_for ep_queue_size == 0 failed") - ready = RebalanceHelper.wait_for_stats_on_all(master, bucket, 'ep_flusher_todo', 0) - test.assertTrue(ready, "wait_for ep_queue_size == 0 failed") + ready = RebalanceHelper.wait_for_persistence(master, bucket) BucketOperationHelper.keys_exist_or_assert_in_parallel(keys=inserted_keys, server=master, bucket_name=bucket, test=test, concurrency=4) @@ -233,7 +230,7 @@ def verify_data(master, inserted_keys, bucket, test): def tasks_for_buckets(rest, task_manager, bucket_data, new_doc_seed=None, - new_doc_count= -1, + new_doc_count=-1, DELETE_RATIO=0, ACCESS_RATIO=0, EXPIRY_RATIO=0, @@ -314,7 +311,7 @@ def finish_bucket_task(bucket_name_info): @staticmethod def load_all_buckets_task(rest, task_manager, bucket_data, ram_load_ratio, - distribution=None, keys_count= -1, seed=None, + distribution=None, keys_count=-1, seed=None, monitor=True): buckets = rest.get_buckets() tasks = None @@ -335,7 +332,7 @@ def load_all_buckets_task(rest, task_manager, bucket_data, ram_load_ratio, @staticmethod def load_bucket_task_helper(rest, task_manager, bucket, ram_load_ratio, kv_store=None, distribution=None, - keys_count= -1, seed=None, monitor=True): + keys_count=-1, seed=None, monitor=True): log = logger.Logger().get_logger() tasks = [] @@ -749,7 +746,7 @@ def _common_test_body(self, moxi=False): inserted_keys, rejected_keys = \ MemcachedClientHelper.load_bucket_and_return_the_keys(servers=[self.servers[0]], name=name, - ram_load_ratio= -1, + ram_load_ratio=-1, number_of_items=self.keys_count, number_of_threads=1, write_only=True) diff --git a/pytests/warmupcluster.py b/pytests/warmupcluster.py index 1d0e6aa113..f34a8e33f2 100644 --- a/pytests/warmupcluster.py +++ b/pytests/warmupcluster.py @@ -94,8 +94,7 @@ def do_warmup(self): self.servers = self.input.servers self._insert_data(howmany) - RebalanceHelper.wait_for_stats_on_all(self.master, "default", "ep_queue_size", 0) - RebalanceHelper.wait_for_stats_on_all(self.master, "default", "ep_flusher_todo", 0) + RebalanceHelper.wait_for_persistence(self.master, "default") time.sleep(5) rest = RestConnection(self.master)