Skip to content

Commit

Permalink
Eventing: multiple handler and buckets tests
Browse files Browse the repository at this point in the history
Change-Id: I75a27e90e3a0a8c282cbd4431df2014cb72b7a75
Reviewed-on: http://review.couchbase.org/121792
Reviewed-by: Balakumaran G <balakumaran.gopal@couchbase.com>
Tested-by: vikas chaudhary <vikas.chaudhary@couchbase.com>
Reviewed-on: http://review.couchbase.org/122757
Tested-by: Balakumaran G <balakumaran.gopal@couchbase.com>
  • Loading branch information
vikas-getconnect authored and bkumaran committed Feb 25, 2020
1 parent a2672d6 commit 56da0af
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 6 deletions.
25 changes: 25 additions & 0 deletions conf/eventing/eventing_multihandler.conf
@@ -0,0 +1,25 @@
eventing.eventing_multihandler.EventingMultiHandler:
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=1,num_dst_buckets=0,num_handlers=1,deploy_handler=1,worker_count=3,handler_code=handler_code/no_op.js,sequential=False
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=5,num_dst_buckets=0,num_handlers=1,deploy_handler=1,worker_count=3,handler_code=handler_code/no_op.js,sequential=False
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=10,num_dst_buckets=0,num_handlers=1,deploy_handler=1,worker_count=3,handler_code=handler_code/no_op.js,sequential=False
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=30,num_dst_buckets=0,num_handlers=1,deploy_handler=1,worker_count=3,handler_code=handler_code/no_op.js,sequential=False
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=5,num_dst_buckets=0,num_handlers=5,deploy_handler=5,worker_count=3,handler_code=handler_code/no_op.js,sequential=False
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=10,num_dst_buckets=0,num_handlers=10,deploy_handler=10,worker_count=3,handler_code=handler_code/no_op.js,sequential=False
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=5,num_dst_buckets=5,num_handlers=10,deploy_handler=10,worker_count=3,handler_code=handler_code/no_op.js,sequential=False
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=5,num_dst_buckets=5,num_handlers=10,deploy_handler=10,worker_count=3,handler_code=handler_code/no_op.js,sequential=True
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=10,num_dst_buckets=0,num_handlers=15,deploy_handler=15,worker_count=3,handler_code=handler_code/no_op.js,sequential=True
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=5,num_dst_buckets=5,num_handlers=15,deploy_handler=10,worker_count=3,handler_code=handler_code/no_op.js,sequential=False
## create only tests no deployment
test_multiple_handle_multiple_create_only,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=10,num_dst_buckets=0,num_handlers=30,deploy_handler=0,worker_count=1,handler_code=handler_code/no_op.js,sequential=False
test_multiple_handle_multiple_create_only,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=5,num_dst_buckets=5,num_handlers=60,deploy_handler=0,worker_count=1,handler_code=handler_code/no_op.js,sequential=False
test_multiple_handle_multiple_create_only,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=15,num_dst_buckets=15,num_handlers=60,deploy_handler=0,worker_count=1,handler_code=handler_code/no_op.js,sequential=False
## timers handler
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=1,num_dst_buckets=1,num_handlers=1,deploy_handler=1,worker_count=3,handler_code=handler_code/bucket_op_with_timers.js,sequential=False
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=5,num_dst_buckets=5,num_handlers=10,deploy_handler=10,worker_count=3,handler_code=handler_code/bucket_op_with_timers.js,sequential=True
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=10,num_dst_buckets=0,num_handlers=15,deploy_handler=15,worker_count=3,handler_code=handler_code/bucket_op_with_timers.js,sequential=True
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=5,num_dst_buckets=5,num_handlers=15,deploy_handler=10,worker_count=3,handler_code=handler_code/bucket_op_with_timers.js,sequential=False
## bucket op handlers
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=1,num_dst_buckets=1,num_handlers=1,deploy_handler=1,worker_count=3,handler_code=handler_code/delete_doc_bucket_op.js,sequential=False
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=5,num_dst_buckets=5,num_handlers=10,deploy_handler=10,worker_count=3,handler_code=handler_code/delete_doc_bucket_op.js,sequential=True
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=10,num_dst_buckets=0,num_handlers=15,deploy_handler=15,worker_count=3,handler_code=handler_code/delete_doc_bucket_op.js,sequential=True
test_multiple_handle_multiple_buckets_preload,nodes_init=4,services_init=kv-eventing-index-n1ql,dataset=default,groups=simple,reset_services=True,skip_cleanup=True,num_src_buckets=5,num_dst_buckets=5,num_handlers=15,deploy_handler=10,worker_count=3,handler_code=handler_code/delete_doc_bucket_op.js,sequential=False
16 changes: 16 additions & 0 deletions lib/membase/api/rest_client.py
Expand Up @@ -4351,6 +4351,22 @@ def set_settings_for_function(self, name, body):
raise Exception(content)
return content

'''
deploy the Function
'''

def deploy_function_by_name(self, name):
authorization = base64.encodestring('%s:%s' % (self.username, self.password))
url = "api/v1/functions/" + name + "/settings"
body = {"deployment_status": True, "processing_status": True}
api = self.eventing_baseUrl + url
headers = {'Content-type': 'application/json', 'Authorization': 'Basic %s' % authorization}
status, content, header = self._http_request(api, 'POST', headers=headers,
params=json.dumps(body).encode("ascii", "ignore"))
if not status:
raise Exception(content)
return content

'''
undeploy the Function
'''
Expand Down
2 changes: 1 addition & 1 deletion pytests/basetestcase.py
Expand Up @@ -2542,7 +2542,7 @@ def load(self, generators_load, buckets=None, exp=0, flag=0,
for gen_load in gens_load[bucket]:
items += (gen_load.end - gen_load.start)
for bucket in buckets:
self.log.info("%s %s to %s documents..." % (op_type, items, bucket.name))
self.log.info("%s %s to %s documents..." % (op_type, items/len(buckets), bucket.name))
tasks.append(self.cluster.async_load_gen_docs(self.master, bucket.name,
gens_load[bucket],
bucket.kvs[kv_store], op_type, exp, flag,
Expand Down
21 changes: 18 additions & 3 deletions pytests/eventing/eventing_base.py
Expand Up @@ -114,7 +114,7 @@ def create_save_function_body(self, appname, appcode, description="Sample Descri
fh.close()
body['depcfg'] = {}
body['depcfg']['buckets'] = []
body['depcfg']['buckets'].append({"alias": self.dst_bucket_name, "bucket_name": self.dst_bucket_name,"access": "rw"})
body['depcfg']['buckets'].append({"alias": "dst_bucket", "bucket_name": self.dst_bucket_name,"access": "rw"})
if multi_dst_bucket:
body['depcfg']['buckets'].append({"alias": self.dst_bucket_name1, "bucket_name": self.dst_bucket_name1})
body['depcfg']['metadata_bucket'] = self.metadata_bucket_name
Expand Down Expand Up @@ -151,7 +151,7 @@ def create_save_function_body(self, appname, appcode, description="Sample Descri
body['settings']['dcp_gen_chan_size'] = self.dcp_gen_chan_size
if self.is_sbm:
del body['depcfg']['buckets'][0]
body['depcfg']['buckets'].append({"alias": self.src_bucket_name, "bucket_name": self.src_bucket_name,"access": "rw"})
body['depcfg']['buckets'].append({"alias": "src_bucket", "bucket_name": self.src_bucket_name,"access": "rw"})
body['depcfg']['curl'] = []
if self.is_curl:
body['depcfg']['curl'].append({"hostname": self.hostname, "value": "server", "auth_type": self.auth_type,
Expand Down Expand Up @@ -686,4 +686,19 @@ def change_retry_rebalance_settings(self, enabled=True,
rest.set_retry_rebalance_settings(body)
result = rest.get_retry_rebalance_settings()
self.log.info("Retry Rebalance settings changed to : {0}"
.format(json.loads(result)))
.format(json.loads(result)))

def handler_status_map(self):
m={}
result=self.rest.get_composite_eventing_status()
try:
for i in range(len(result['apps'])):
m[result['apps'][i]['name']] = result['apps'][i]['composite_status']
return m
except TypeError as e:
self.log.info("no handler is available")

def deploy_handler_by_name(self,name,wait_for_bootstrap=True):
self.rest.deploy_function_by_name(name)
if wait_for_bootstrap:
self.wait_for_handler_state(name, "deployed")
4 changes: 2 additions & 2 deletions pytests/eventing/eventing_bucket.py
Expand Up @@ -194,7 +194,7 @@ def test_eventing_where_source_bucket_is_in_dgm(self):
self.resume_function(body)
# Wait for eventing to catch up with all the update mutations and verify results
if self.is_sbm:
self.verify_eventing_results(self.function_name, self.docs_per_day * 2016 * 2, skip_stats_validation=True)
self.verify_eventing_results(self.function_name, self.docs_per_day * 2016 * 2, skip_stats_validation=True,expected_duplicate=True)
else:
self.verify_eventing_results(self.function_name, self.docs_per_day * 2016, skip_stats_validation=True)
if self.pause_resume:
Expand All @@ -206,7 +206,7 @@ def test_eventing_where_source_bucket_is_in_dgm(self):
self.resume_function(body)
# Wait for eventing to catch up with all the delete mutations and verify results
if self.is_sbm:
self.verify_eventing_results(self.function_name, self.docs_per_day * 2016, skip_stats_validation=True)
self.verify_eventing_results(self.function_name, self.docs_per_day * 2016, skip_stats_validation=True,expected_duplicate=True)
else:
self.verify_eventing_results(self.function_name, 0, skip_stats_validation=True)
self.undeploy_and_delete_function(body)
Expand Down
99 changes: 99 additions & 0 deletions pytests/eventing/eventing_multihandler.py
@@ -0,0 +1,99 @@
import json

from eventing.eventing_base import EventingBaseTest
from eventing.eventing_constants import HANDLER_CODE
from membase.api.rest_client import RestConnection
from testconstants import STANDARD_BUCKET_PORT


class EventingMultiHandler(EventingBaseTest):
def setUp(self):
super(EventingMultiHandler, self).setUp()
self.num_src_buckets=self.input.param('num_src_buckets', 1)
self.num_dst_buckets=self.input.param('num_dst_buckets', 1)
self.num_handlers=self.input.param('num_handlers', 1)
self.deploy_handler=self.input.param('deploy_handler',1)
self.sequential=self.input.param('sequential',True)
self.worker_count=self.input.param('worker_count',1)
self.handler_code=self.input.param('handler_code','handler_cod e/delete_doc_bucket_op.js')
self.gens_load = self.generate_docs(self.docs_per_day)
quota=(self.num_src_buckets+self.num_dst_buckets)*100+400
self.rest.set_service_memoryQuota(service='memoryQuota', memoryQuota=quota)
self.metadata_bucket_size = 400
bucket_params_meta = self._create_bucket_params(server=self.server, size=self.metadata_bucket_size,
replicas=self.num_replicas)
self.create_n_buckets(self.src_bucket_name,self.num_src_buckets)
self.buckets = RestConnection(self.master).get_buckets()
if self.num_dst_buckets > 0:
self.create_n_buckets(self.dst_bucket_name,self.num_dst_buckets)
self.cluster.create_standard_bucket(name=self.metadata_bucket_name, port=STANDARD_BUCKET_PORT + 1,
bucket_params=bucket_params_meta)
self.deploying=[]

def create_n_buckets(self,name,number):
self.bucket_size = 100
bucket_params = self._create_bucket_params(server=self.server, size=self.bucket_size,
replicas=self.num_replicas)
for i in range(number):
self.cluster.create_standard_bucket(name+"_"+str(i), port=STANDARD_BUCKET_PORT + 1,
bucket_params=bucket_params)

def test_multiple_handle_multiple_buckets_preload(self):
# load data
self.load(self.gens_load, buckets=self.buckets, flag=self.item_flag, verify_data=False,
batch_size=self.batch_size)
self.create_n_handler(self.num_handlers,self.num_src_buckets,self.num_dst_buckets,self.handler_code)
self.deploy_n_handler(self.deploy_handler,sequential=self.sequential)
self.wait_for_handlers_to_deployed()
self.log.info("==========================================================================")
self.log.info("handler status after the test \n {}".format(self.handler_status_map()))

def test_multiple_handle_multiple_buckets(self):
self.create_n_handler(self.num_handlers,self.num_src_buckets,self.num_dst_buckets,self.handler_code)
self.deploy_n_handler(self.deploy_handler,sequential=self.sequential)
# load data
self.load(self.gens_load, buckets=self.buckets, flag=self.item_flag, verify_data=False,
batch_size=self.batch_size)
self.wait_for_handlers_to_deployed()
self.log.info("==========================================================================")
self.log.info("handler status after the test \n {}".format(self.handler_status_map()))

def test_multiple_handle_multiple_create_only(self):
# load data
self.load(self.gens_load, buckets=self.buckets, flag=self.item_flag, verify_data=False,
batch_size=self.batch_size)
self.create_n_handler(self.num_handlers,self.num_src_buckets,self.num_dst_buckets,self.handler_code)
self.log.info("==========================================================================")
self.log.info("handler status after the test \n {}".format(self.handler_status_map()))

def create_n_handler(self,num_handler,num_src,num_dst,handler_code):
src_bucket=self.src_bucket_name
dst_bucket=self.dst_bucket_name
for i in range(num_handler):
self.src_bucket_name=src_bucket+"_"+str(i%num_src)
if num_dst > 0:
self.dst_bucket_name=dst_bucket+"_"+str(i%num_dst)
body = self.create_save_function_body(self.function_name+"_"+str(i),handler_code,
worker_count=self.worker_count,deployment_status=False,processing_status=False)
if num_dst == 0:
del body['depcfg']['buckets'][0]
self.log.info("Creating the following handler code : {0} with {1}".format(body['appname'], body['depcfg']))
self.log.info("\n{0}".format(body['appcode']))
self.rest.create_function(body["appname"],body)

def deploy_n_handler(self,num,sequential=True):
funcs = self.handler_status_map()
if num > len(funcs):
num=len(funcs)
deployed=0
for key in funcs:
if deployed == num:
break
self.log.info("Deploying the following handler code : {0}".format(key))
self.deploy_handler_by_name(key,wait_for_bootstrap=sequential)
self.deploying.append(key)
deployed=deployed+1

def wait_for_handlers_to_deployed(self):
for name in self.deploying:
self.wait_for_handler_state(name,"deployed")
3 changes: 3 additions & 0 deletions pytests/eventing/handler_code/no_op.js
@@ -0,0 +1,3 @@
function OnUpdate(doc, meta) {
log('document', doc);
}

0 comments on commit 56da0af

Please sign in to comment.