Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
CBQE-0: use wait_for_persistence instead of waiting many stats
Change-Id: I67bd6541e8389186a2f49359a64d1aaa0631d3f5
Reviewed-on: http://review.couchbase.org/35067
Reviewed-by: Andrei Baranouski <andrei.baranouski@gmail.com>
Tested-by: Andrei Baranouski <andrei.baranouski@gmail.com>
  • Loading branch information
andreibaranouski committed Mar 29, 2014
1 parent d03759a commit 6ac0c85
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 84 deletions.
51 changes: 17 additions & 34 deletions pytests/backuptests.py
Expand Up @@ -122,10 +122,8 @@ def _test_backup_add_restore_bucket_body(self,
msg="replication did not complete")

self.log.info("Sleep {0} seconds after data load".format(delay_after_data_load))
ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_queue_size', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")
ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_flusher_todo', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")
ready = RebalanceHelper.wait_for_persistence(self.master, bucket)
self.assertTrue(ready, "not all items persisted. see logs")
node = RestConnection(self.master).get_nodes_self()
if not startup_flag:
for server in self.servers:
Expand Down Expand Up @@ -187,10 +185,8 @@ def _test_backup_add_restore_bucket_with_expiration_key(self, replica):
self.fail(msg.format(key, client.vbucketId, error.status))
client.close()
self.log.info("inserted {0} keys with expiry set to {1}".format(len(keys), expiry))
ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_queue_size', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")
ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_flusher_todo', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")
ready = RebalanceHelper.wait_for_persistence(self.master, bucket)
self.assertTrue(ready, "not all items persisted. see logs")
node = RestConnection(self.master).get_nodes_self()

output, error = self.shell.execute_command(self.perm_command)
Expand Down Expand Up @@ -236,10 +232,8 @@ def _test_backup_and_restore_bucket_overwriting_body(self, overwrite_flag=True):
self.fail(msg.format(key, client.vbucketId, error.status))
self.log.info("inserted {0} keys with expiry set to {1}".format(len(keys), expiry))

ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_queue_size', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")
ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_flusher_todo', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")
ready = RebalanceHelper.wait_for_persistence(self.master, bucket)
self.assertTrue(ready, "not all items persisted. see logs")

for server in self.servers:
shell = RemoteMachineShellConnection(server)
Expand Down Expand Up @@ -288,11 +282,8 @@ def _test_cluster_topology_change_body(self):
number_of_threads=2)

self.log.info("Sleep after data load")
ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_queue_size', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")
ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_flusher_todo', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")

ready = RebalanceHelper.wait_for_persistence(self.master, bucket)
self.assertTrue(ready, "not all items persisted. see logs")
#let's create a unique folder in the remote location
for server in self.servers:
shell = RemoteMachineShellConnection(server)
Expand Down Expand Up @@ -347,11 +338,8 @@ def _test_delete_key_and_backup_and_restore_body(self):

client.delete(keys[0])

ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_queue_size', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")
ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket, 'ep_flusher_todo', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")

ready = RebalanceHelper.wait_for_persistence(self.master, bucket)
self.assertTrue(ready, "not all items persisted. see logs")
#let's create a unique folder in the remote location
for server in self.servers:
shell = RemoteMachineShellConnection(server)
Expand Down Expand Up @@ -402,11 +390,8 @@ def _test_backup_and_restore_on_different_port_body(self):
number_of_threads=2)

self.log.info("Sleep after data load")
ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket_before_backup, 'ep_queue_size', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")
ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket_before_backup, 'ep_flusher_todo', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")

ready = RebalanceHelper.wait_for_persistence(self.master, bucket)
self.assertTrue(ready, "not all items persisted. see logs")
for server in self.servers:
shell = RemoteMachineShellConnection(server)
output, error = shell.execute_command(self.perm_command)
Expand Down Expand Up @@ -448,10 +433,8 @@ def _test_backup_and_restore_from_to_different_buckets(self):
number_of_threads=2)

self.log.info("Sleep after data load")
ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket_before_backup, 'ep_queue_size', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")
ready = RebalanceHelper.wait_for_stats_on_all(self.master, bucket_before_backup, 'ep_flusher_todo', 0)
self.assertTrue(ready, "wait_for ep_queue_size == 0 failed")
ready = RebalanceHelper.wait_for_persistence(self.master, bucket)
self.assertTrue(ready, "not all items persisted. see logs")

for server in self.servers:
shell = RemoteMachineShellConnection(server)
Expand Down Expand Up @@ -485,14 +468,14 @@ def test_backup_upgrade_restore_default(self):
return
original_set = copy.copy(self.servers)
worker = self.servers[len(self.servers) - 1]
self.servers = self.servers[:len(self.servers)-1]
self.servers = self.servers[:len(self.servers) - 1]
shell = RemoteMachineShellConnection(self.master)
o, r = shell.execute_command("cat /opt/couchbase/VERSION.txt")
fin = o[0]
shell.disconnect()
initial_version = self.input.param("initial_version", fin)
final_version = self.input.param("final_version", fin)
if initial_version==final_version:
if initial_version == final_version:
self.log.error("Same initial and final versions ..")
return
if not final_version.startswith('2.0'):
Expand Down Expand Up @@ -623,7 +606,7 @@ def test_backup_upgrade_restore_default(self):
if len(self.servers) > 1:
removed = helper.remove_nodes(knownNodes=[node.id for node in nodes],
ejectedNodes=[node.id for node in nodes if node.id != master_id],
wait_for_rebalance=True )
wait_for_rebalance=True)

shell = RemoteMachineShellConnection(worker)
shell.remove_directory(remote_tmp)
Expand Down
8 changes: 3 additions & 5 deletions pytests/drainratetests.py
Expand Up @@ -51,7 +51,7 @@ def _create_default_bucket(self, replica=1):
node_ram_ratio = BucketOperationHelper.base_bucket_ratio(self.input.servers)
info = rest.get_nodes_self()
available_ram = info.memoryQuota * node_ram_ratio
if( available_ram < 256):
if(available_ram < 256):
available_ram = 256
rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram), replicaNumber=replica)
ready = BucketOperationHelper.wait_for_memcached(master, name)
Expand All @@ -68,7 +68,7 @@ def _load_data_for_buckets(self):
for bucket in buckets:
name = bucket.name.encode("ascii", "ignore")
self.bucket_data[name] = {}
self.bucket_data[name]["inserted_keys"], self.bucket_data[name]["rejected_keys"] =\
self.bucket_data[name]["inserted_keys"], self.bucket_data[name]["rejected_keys"] = \
MemcachedClientHelper.load_bucket_and_return_the_keys(name=self.bucket,
servers=[self.master],
value_size_distribution=distribution,
Expand All @@ -94,9 +94,7 @@ def _monitor_drain_queue(self):
start = time.time()
stats = rest.get_bucket_stats(self.bucket)
self.log.info("current ep_queue_size: {0}".format(stats["ep_queue_size"]))
verified = RebalanceHelper.wait_for_stats(self.master, self.bucket, 'ep_queue_size', 0, timeout_in_seconds=300, verbose=False)\
and RebalanceHelper.wait_for_stats(self.master, self.bucket, 'ep_flusher_todo', 0, timeout_in_seconds=300, verbose=False)
self.drained = verified
self.drained = RebalanceHelper.wait_for_persistence(self.master, self.bucket, timeout=300)
self.drained_in_seconds = time.time() - start


Expand Down
24 changes: 4 additions & 20 deletions pytests/memcachedops/stats_ops.py
@@ -1,13 +1,10 @@
import time
from membase.api.rest_client import RestConnection, Bucket
from couchbase.documentgenerator import DocumentGenerator
from membase.helper.rebalance_helper import RebalanceHelper
from memcached.helper.data_helper import MemcachedClientHelper
from basetestcase import BaseTestCase
from memcached.helper.kvstore import KVStore
from mc_bin_client import MemcachedError
from membase.helper.cluster_helper import ClusterOperationHelper
from couchbase.documentgenerator import BlobGenerator
from remote.remote_util import RemoteMachineShellConnection
from threading import Thread

class StatsCrashRepro(BaseTestCase):
Expand Down Expand Up @@ -47,17 +44,6 @@ def _load_doc_data_all_buckets(self, op_type='create', start=0, expiry=0):
count += 1
time.sleep(5)

def _wait_for_stats_all_buckets(self, servers):

for server in servers:
for bucket in self.buckets:
self.cluster.wait_for_stats([server], bucket, '',
'ep_queue_size', '==', 0)
self.cluster.wait_for_stats([server], bucket, '',
'ep_flusher_todo', '==', 0)
self.cluster.wait_for_stats([server], bucket, '',
'ep_uncommitted_items', '==', 0)

def _get_stats(self, stat_str='all'):

# for server in self.nodes_server:
Expand Down Expand Up @@ -101,7 +87,8 @@ def run_test(self):
self.log.info("DGM state achieved!!!!")

# wait for draining of data before restart and warm up
self._wait_for_stats_all_buckets(self.nodes_server)
for bucket in self.buckets:
RebalanceHelper.wait_for_persistence(self.nodes_server[0], bucket)


while 1:
Expand Down Expand Up @@ -134,7 +121,4 @@ def run_test(self):
del stats_reset_thread

# read_data_task.result()
read_data_task.join()



read_data_task.join()
5 changes: 2 additions & 3 deletions pytests/memcapable.py
Expand Up @@ -397,7 +397,7 @@ def test_getr(self):
prefix = str(uuid.uuid4())[:7]

BucketOperationHelper.delete_all_buckets_or_assert([self.master], self)
BucketOperationHelper.create_bucket(self.master, name=self.default_bucket_name, replica=replica_count, port=11210, test_case=self, bucket_ram= -1, password="")
BucketOperationHelper.create_bucket(self.master, name=self.default_bucket_name, replica=replica_count, port=11210, test_case=self, bucket_ram=-1, password="")

if rebalance == GetrTests.DURING_REBALANCE or rebalance == GetrTests.AFTER_REBALANCE:
# leave 1 node unclustered for rebalance in
Expand Down Expand Up @@ -1074,8 +1074,7 @@ def _do_warmup(self, howmany, timeout_in_seconds=1800):
time.sleep(10)
curr_items = int(self.onenodemc.stats()["curr_items"])
uptime = int(self.onenodemc.stats()["uptime"])
RebalanceHelper.wait_for_stats(self.master, "default", 'ep_queue_size', 0)
RebalanceHelper.wait_for_stats(self.master, "default", 'ep_flusher_todo', 0)
RebalanceHelper.wait_for_persistence(self.master, "default")
self.log.info("sleeping for 10 seconds")
time.sleep(10)
rest = RestConnection(self.master)
Expand Down
16 changes: 5 additions & 11 deletions pytests/performance/perf.py
Expand Up @@ -677,7 +677,7 @@ def load(self, num_items, min_value_size=None,
doc_cache=1,
use_direct=True,
report=0,
start_at= -1,
start_at=-1,
collect_server_stats=True,
is_eperf=False,
hot_shift=0):
Expand Down Expand Up @@ -733,7 +733,7 @@ def load(self, num_items, min_value_size=None,
if is_eperf:
collect_server_stats = self.parami("prefix", 0) == 0
client_id = self.parami("prefix", 0)
sc = self.start_stats("{0}.{1}".format(self.spec_reference, phase), # stats spec e.x: testname.load
sc = self.start_stats("{0}.{1}".format(self.spec_reference, phase), # stats spec e.x: testname.load
test_params=cfg_params, client_id=client_id,
collect_server_stats=collect_server_stats)

Expand Down Expand Up @@ -902,7 +902,7 @@ def loop(self, num_ops=None,
doc_cache=1,
use_direct=True,
collect_server_stats=True,
start_at= -1,
start_at=-1,
report=0,
ctl=None,
hot_shift=0,
Expand Down Expand Up @@ -1064,16 +1064,10 @@ def wait_until_drained(self):

master = self.input.servers[0]
bucket = self.param("bucket", "default")

RebalanceHelper.wait_for_stats_on_all(master, bucket,
'ep_queue_size', 0,
fn=RebalanceHelper.wait_for_stats_no_timeout)
RebalanceHelper.wait_for_stats_on_all(master, bucket,
'ep_flusher_todo', 0,
fn=RebalanceHelper.wait_for_stats_no_timeout)
ready = RebalanceHelper.wait_for_persistence(self.master, bucket)
self.assertTrue(ready, "not all items persisted. see logs")

self.log.info("disk write queue has been drained")

return time.time()

def wait_until_repl(self):
Expand Down
15 changes: 6 additions & 9 deletions pytests/rebalancetests.py
Expand Up @@ -199,7 +199,7 @@ def bucket_data_init(rest):
return bucket_data

@staticmethod
def load_data(master, bucket, keys_count= -1, load_ratio= -1, delete_ratio=0, expiry_ratio=0, test=None):
def load_data(master, bucket, keys_count=-1, load_ratio=-1, delete_ratio=0, expiry_ratio=0, test=None):
log = logger.Logger.get_logger()
inserted_keys, rejected_keys = \
MemcachedClientHelper.load_bucket_and_return_the_keys(servers=[master],
Expand All @@ -222,18 +222,15 @@ def load_data(master, bucket, keys_count= -1, load_ratio= -1, delete_ratio=0, ex
def verify_data(master, inserted_keys, bucket, test):
log = logger.Logger.get_logger()
log.info("Verifying data")
ready = RebalanceHelper.wait_for_stats_on_all(master, bucket, 'ep_queue_size', 0)
test.assertTrue(ready, "wait_for ep_queue_size == 0 failed")
ready = RebalanceHelper.wait_for_stats_on_all(master, bucket, 'ep_flusher_todo', 0)
test.assertTrue(ready, "wait_for ep_queue_size == 0 failed")
ready = RebalanceHelper.wait_for_persistence(master, bucket)
BucketOperationHelper.keys_exist_or_assert_in_parallel(keys=inserted_keys, server=master, bucket_name=bucket,
test=test, concurrency=4)

@staticmethod
def tasks_for_buckets(rest, task_manager,
bucket_data,
new_doc_seed=None,
new_doc_count= -1,
new_doc_count=-1,
DELETE_RATIO=0,
ACCESS_RATIO=0,
EXPIRY_RATIO=0,
Expand Down Expand Up @@ -314,7 +311,7 @@ def finish_bucket_task(bucket_name_info):

@staticmethod
def load_all_buckets_task(rest, task_manager, bucket_data, ram_load_ratio,
distribution=None, keys_count= -1, seed=None,
distribution=None, keys_count=-1, seed=None,
monitor=True):
buckets = rest.get_buckets()
tasks = None
Expand All @@ -335,7 +332,7 @@ def load_all_buckets_task(rest, task_manager, bucket_data, ram_load_ratio,
@staticmethod
def load_bucket_task_helper(rest, task_manager, bucket, ram_load_ratio,
kv_store=None, distribution=None,
keys_count= -1, seed=None, monitor=True):
keys_count=-1, seed=None, monitor=True):
log = logger.Logger().get_logger()
tasks = []

Expand Down Expand Up @@ -749,7 +746,7 @@ def _common_test_body(self, moxi=False):
inserted_keys, rejected_keys = \
MemcachedClientHelper.load_bucket_and_return_the_keys(servers=[self.servers[0]],
name=name,
ram_load_ratio= -1,
ram_load_ratio=-1,
number_of_items=self.keys_count,
number_of_threads=1,
write_only=True)
Expand Down
3 changes: 1 addition & 2 deletions pytests/warmupcluster.py
Expand Up @@ -94,8 +94,7 @@ def do_warmup(self):
self.servers = self.input.servers
self._insert_data(howmany)

RebalanceHelper.wait_for_stats_on_all(self.master, "default", "ep_queue_size", 0)
RebalanceHelper.wait_for_stats_on_all(self.master, "default", "ep_flusher_todo", 0)
RebalanceHelper.wait_for_persistence(self.master, "default")
time.sleep(5)
rest = RestConnection(self.master)

Expand Down

0 comments on commit 6ac0c85

Please sign in to comment.