Skip to content
Browse files

MB-7189: add wait_for_replication in swap test

backfill_completed is not tracked now

Change-Id: I3c13c78071b61cba311a0ff71d277f6864b19f08
Reviewed-on: http://review.couchbase.org/22564
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Andrei Baranouski <andrei.baranouski@gmail.com>
  • Loading branch information...
1 parent 9059d0c commit 83ffc7754ca65a0e87b4d4d97c7e0eaa97d856da @andreibaranouski andreibaranouski committed Nov 15, 2012
Showing with 55 additions and 58 deletions.
  1. +16 −1 lib/membase/helper/rebalance_helper.py
  2. +39 −57 pytests/swaprebalance.py
View
17 lib/membase/helper/rebalance_helper.py
@@ -74,6 +74,21 @@ def wait_for_mc_stats_all_nodes(master, bucket, stat_key, stat_value, timeout_in
return verified
@staticmethod
+ def wait_for_replication(servers, cluster_helper, timeout=600):
+ tasks = []
+ rest = RestConnection(servers[0])
+ buckets = rest.get_buckets()
+ for server in servers:
+ for bucket in buckets:
+ for server_repl in list(set(servers) - set([server])):
+ tasks.append(cluster_helper.async_wait_for_stats([server], bucket, 'tap',
+ 'eq_tapq:replication_ns_1@' + server_repl.ip + ':idle', '==', 'true'))
+ tasks.append(cluster_helper.async_wait_for_stats([server], bucket, 'tap',
+ 'eq_tapq:replication_ns_1@' + server_repl.ip + ':backfill_completed', '==', 'true'))
+ for task in tasks:
+ task.result(timeout)
+
+ @staticmethod
#bucket is a json object that contains name,port,password
def wait_for_stats(master, bucket, stat_key, stat_value, timeout_in_seconds=120, verbose=True):
log.info("waiting for bucket {0} stat : {1} to match {2} on {3}".format(bucket, stat_key, \
@@ -292,7 +307,7 @@ def print_taps_from_all_nodes(rest, bucket='default'):
@staticmethod
def log_interesting_taps(node, tap_stats, logger):
interesting_stats = ['ack_log_size', 'ack_seqno', 'ack_window_full', 'has_item', 'has_queued_item',
- 'idle', 'paused', 'pending_backfill', 'pending_disk_backfill', 'recv_ack_seqno',
+ 'idle', 'paused', 'backfill_completed', 'pending_backfill', 'pending_disk_backfill', 'recv_ack_seqno',
'ep_num_new_']
for name in tap_stats:
for interesting_stat in interesting_stats:
View
96 pytests/swaprebalance.py
@@ -3,6 +3,7 @@
import unittest
from TestInput import TestInputSingleton
import logger
+from couchbase.cluster import Cluster
from membase.api.rest_client import RestConnection, RestHelper
from membase.helper.bucket_helper import BucketOperationHelper
from membase.helper.cluster_helper import ClusterOperationHelper as ClusterHelper, ClusterOperationHelper
@@ -32,7 +33,7 @@ def common_setup(self):
self.log.info("============== SwapRebalanceBase setup was started for test #{0} {1}=============="\
.format(self.case_number, self._testMethodName))
SwapRebalanceBase.reset(self)
-
+ self.cluster_helper = Cluster()
# Initialize test params
self.replica = self.input.param("replica", 1)
self.keys_count = self.input.param("keys-count", 100000)
@@ -63,14 +64,17 @@ def common_setup(self):
@staticmethod
def common_tearDown(self):
+ self.cluster_helper.shutdown()
if (hasattr(self, '_resultForDoCleanups') and len(self._resultForDoCleanups.failures) > 0 \
and TestInputSingleton.input.param("stop-on-failure", False))\
or self.skip_cleanup:
self.log.warn("CLEANUP WAS SKIPPED")
+
else:
SwapRebalanceBase.reset(self)
SwapRebalanceBase._log_finish(self)
+
@staticmethod
def reset(self):
self.log.info("============== SwapRebalanceBase cleanup was started for test #{0} {1} =============="\
@@ -140,7 +144,7 @@ def _create_multiple_buckets(self, replica=1):
# Used for items verification active vs. replica
@staticmethod
- def items_verification(master, test):
+ def items_verification(test, master):
rest = RestConnection(master)
#Verify items count across all node
timeout = 600
@@ -205,6 +209,23 @@ def create_buckets(self):
SwapRebalanceBase._create_multiple_buckets(self, replica=self.replica)
@staticmethod
+ def verification_phase(test, master):
+ # Stop loaders
+ SwapRebalanceBase.stop_load(test.loaders)
+ test.log.info("DONE DATA ACCESS PHASE")
+
+ test.log.info("VERIFICATION PHASE")
+ rest = RestConnection(master)
+ servers_in_cluster = []
+ nodes = rest.get_nodes()
+ for server in test.servers:
+ for node in nodes:
+ if node.ip == server.ip:
+ servers_in_cluster.append(server)
+ RebalanceHelper.wait_for_replication(servers_in_cluster, test.cluster_helper)
+ SwapRebalanceBase.items_verification(test, master)
+
+ @staticmethod
def _common_test_body_swap_rebalance(self, do_stop_start=False):
master = self.servers[0]
rest = RestConnection(master)
@@ -220,10 +241,10 @@ def _common_test_body_swap_rebalance(self, do_stop_start=False):
RebalanceHelper.rebalance_in(intial_severs, len(intial_severs) - 1)
self.log.info("DATA LOAD PHASE")
- loaders = SwapRebalanceBase.start_load_phase(self, master)
+ self.loaders = SwapRebalanceBase.start_load_phase(self, master)
# Wait till load phase is over
- SwapRebalanceBase.stop_load(loaders, do_stop=False)
+ SwapRebalanceBase.stop_load(self.loaders, do_stop=False)
self.log.info("DONE LOAD PHASE")
# Start the swap rebalance
@@ -256,7 +277,7 @@ def _common_test_body_swap_rebalance(self, do_stop_start=False):
if self.do_access:
self.log.info("DATA ACCESS PHASE")
- loaders = SwapRebalanceBase.start_access_phase(self, master)
+ self.loaders = SwapRebalanceBase.start_access_phase(self, master)
self.log.info("SWAP REBALANCE PHASE")
rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()],
@@ -292,17 +313,8 @@ def _common_test_body_swap_rebalance(self, do_stop_start=False):
self.assertTrue(rest.monitorRebalance(),
msg="rebalance operation failed after adding node {0}".format(optNodesIds))
- # Stop loaders
- SwapRebalanceBase.stop_load(loaders)
-
- self.log.info("DONE DATA ACCESS PHASE")
- #for bucket in rest.get_buckets():
- # SwapRebalanceBase.verify_data(new_swap_servers[0], bucket_data[bucket.name].get('inserted_keys'),\
- # bucket.name, self)
- #RebalanceHelper.wait_for_persistence(master, bucket.name)
+ SwapRebalanceBase.verification_phase(self, master)
- self.log.info("VERIFICATION PHASE")
- SwapRebalanceBase.items_verification(master, self)
@staticmethod
def _common_test_body_failed_swap_rebalance(self):
@@ -320,10 +332,10 @@ def _common_test_body_failed_swap_rebalance(self):
RebalanceHelper.rebalance_in(intial_severs, len(intial_severs) - 1)
self.log.info("DATA LOAD PHASE")
- loaders = SwapRebalanceBase.start_load_phase(self, master)
+ self.loaders = SwapRebalanceBase.start_load_phase(self, master)
# Wait till load phase is over
- SwapRebalanceBase.stop_load(loaders, do_stop=False)
+ SwapRebalanceBase.stop_load(self.loaders, do_stop=False)
self.log.info("DONE LOAD PHASE")
# Start the swap rebalance
@@ -355,7 +367,7 @@ def _common_test_body_failed_swap_rebalance(self):
master = new_swap_servers[0]
self.log.info("DATA ACCESS PHASE")
- loaders = SwapRebalanceBase.start_access_phase(self, master)
+ self.loaders = SwapRebalanceBase.start_access_phase(self, master)
self.log.info("SWAP REBALANCE PHASE")
rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()],
@@ -410,17 +422,7 @@ def _common_test_body_failed_swap_rebalance(self):
self.assertTrue(rest.monitorRebalance(),
msg="rebalance operation failed after adding node {0}".format(toBeEjectedNodes))
- # Stop loaders
- SwapRebalanceBase.stop_load(loaders)
-
- self.log.info("DONE DATA ACCESS PHASE")
- #for bucket in rest.get_buckets():
- # SwapRebalanceBase.verify_data(new_swap_servers[0], bucket_data[bucket.name].get('inserted_keys'),\
- # bucket.name, self)
- # RebalanceHelper.wait_for_persistence(master, bucket.name)
-
- self.log.info("VERIFICATION PHASE")
- SwapRebalanceBase.items_verification(master, self)
+ SwapRebalanceBase.verification_phase(self, master)
@staticmethod
def _add_back_failed_node(self, do_node_cleanup=False):
@@ -436,10 +438,10 @@ def _add_back_failed_node(self, do_node_cleanup=False):
RebalanceHelper.rebalance_in(self.servers, len(self.servers) - 1)
self.log.info("DATA LOAD PHASE")
- loaders = SwapRebalanceBase.start_load_phase(self, master)
+ self.loaders = SwapRebalanceBase.start_load_phase(self, master)
# Wait till load phase is over
- SwapRebalanceBase.stop_load(loaders, do_stop=False)
+ SwapRebalanceBase.stop_load(self.loaders, do_stop=False)
self.log.info("DONE LOAD PHASE")
# Start the swap rebalance
@@ -467,7 +469,7 @@ def _add_back_failed_node(self, do_node_cleanup=False):
master = not_failed_over[-1]
self.log.info("DATA ACCESS PHASE")
- loaders = SwapRebalanceBase.start_access_phase(self, master)
+ self.loaders = SwapRebalanceBase.start_access_phase(self, master)
#Failover selected nodes
for node in optNodesIds:
@@ -511,17 +513,7 @@ def _add_back_failed_node(self, do_node_cleanup=False):
self.assertTrue(rest.monitorRebalance(),
msg="rebalance operation failed after adding node {0}".format(add_back_servers))
- # Stop loaders
- SwapRebalanceBase.stop_load(loaders)
-
- self.log.info("DONE DATA ACCESS PHASE")
- #for bucket in rest.get_buckets():
- # SwapRebalanceBase.verify_data(new_swap_servers[0], bucket_data[bucket.name].get('inserted_keys'),\
- # bucket.name, self)
- # RebalanceHelper.wait_for_persistence(master, bucket.name)
-
- self.log.info("VERIFICATION PHASE")
- SwapRebalanceBase.items_verification(master, self)
+ SwapRebalanceBase.verification_phase(self, master)
@staticmethod
def _failover_swap_rebalance(self):
@@ -539,10 +531,10 @@ def _failover_swap_rebalance(self):
RebalanceHelper.rebalance_in(intial_severs, len(intial_severs) - 1)
self.log.info("DATA LOAD PHASE")
- loaders = SwapRebalanceBase.start_load_phase(self, master)
+ self.loaders = SwapRebalanceBase.start_load_phase(self, master)
# Wait till load phase is over
- SwapRebalanceBase.stop_load(loaders, do_stop=False)
+ SwapRebalanceBase.stop_load(self.loaders, do_stop=False)
self.log.info("DONE LOAD PHASE")
# Start the swap rebalance
@@ -572,25 +564,15 @@ def _failover_swap_rebalance(self):
master = new_swap_servers[0]
self.log.info("DATA ACCESS PHASE")
- loaders = SwapRebalanceBase.start_access_phase(self, master)
+ self.loaders = SwapRebalanceBase.start_access_phase(self, master)
rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()], \
ejectedNodes=optNodesIds)
self.assertTrue(rest.monitorRebalance(),
msg="rebalance operation failed after adding node {0}".format(new_swap_servers))
- # Stop loaders
- SwapRebalanceBase.stop_load(loaders)
-
- self.log.info("DONE DATA ACCESS PHASE")
- #for bucket in rest.get_buckets():
- # SwapRebalanceBase.verify_data(new_swap_servers[0], bucket_data[bucket.name].get('inserted_keys'),\
- # bucket.name, self)
- # RebalanceHelper.wait_for_persistence(master, bucket.name)
-
- self.log.info("VERIFICATION PHASE")
- SwapRebalanceBase.items_verification(master, self)
+ SwapRebalanceBase.verification_phase(self, master)
class SwapRebalanceBasicTests(unittest.TestCase):

0 comments on commit 83ffc77

Please sign in to comment.
Something went wrong with that request. Please try again.