Skip to content

Commit

Permalink
added fts-autoscaling_summary & fts-autoscaling with queries
Browse files Browse the repository at this point in the history
Change-Id: Ie32e88ae2c10c0c7a3d1dbba4be3caf103abb6dd
Reviewed-on: https://review.couchbase.org/c/testrunner/+/183888
Tested-by: Sarthak Dua <sarthak.dua@couchbase.com>
Reviewed-by: Girish Benakappa <girish.benakappa@couchbase.com>
  • Loading branch information
sarthak-dua27 committed Dec 14, 2022
1 parent 1015a70 commit afdca07
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 25 deletions.
2 changes: 1 addition & 1 deletion conf/fts/serverless/serverless_fts.conf
Expand Up @@ -2,7 +2,7 @@ fts.serverless.sanity.FTSElixirSanity:
test_sanity,java_sdk_client=True
test_sanity,java_sdk_client=True,num_databases=20
create_custom_map_index_and_update_defn,java_sdk_client=True,custom_map=True
run_fts_rest_based_queries,java_sdk_client=True,items=1000,custom_map=True,cm_id=0,num_queries=10,compare_es=True,num_of_docs_per_collection=1000,query_max_matches=1000
run_fts_rest_based_queries,java_sdk_client=True,items=1000,custom_map=True,cm_id=0,num_queries=10,compare_es=True,query_max_matches=1000
override_database_verify_nodes,java_sdk_client=True
create_index_multiple_collections,java_sdk_client=True,num_databases=2
delete_collections_check_index_delete,java_sdk_client=True,num_databases=3
Expand Down
14 changes: 12 additions & 2 deletions lib/capella/utils.py
Expand Up @@ -355,6 +355,11 @@ def create_dataplane_wait_for_ready(self, overRide=None):
return resp.json()['dataplaneId']
else:
self.log.info(f"Timed out waiting for dataplane to be ready, Aborting...")
job_resp = self.serverless_api.get_dataplane_job_info(resp.json()['dataplaneId']).json()
if 'errors' in job_resp['clusterJobs'][0]:
self.log.info("ABORTING ERROR :-")
print(job_resp['clusterJobs'][0]['errors'])
self.delete_dataplane(resp.json()['dataplaneId'])
return None

def wait_for_dataplane_ready(self, dataplane_id):
Expand All @@ -372,12 +377,16 @@ def override_width_and_weight(self, database_id, override):
resp = self.serverless_api.update_database(database_id, override_obj)
resp.raise_for_status()

def get_databases_id(self):
def get_databases_id(self, dataplane_id=None):
if dataplane_id is not None:
dp_id = dataplane_id
else:
dp_id = self.dataplane_id
resp = self.serverless_api.get_all_serverless_databases()
all_ids = []
if resp and isinstance(resp.json(), list):
for database in resp.json():
if 'dataplaneId' in database['config'] and database['config']['dataplaneId'] == self.dataplane_id:
if 'dataplaneId' in database['config'] and database['config']['dataplaneId'] == dp_id:
all_ids.append(database['id'])
return all_ids

Expand Down Expand Up @@ -417,6 +426,7 @@ def get_fts_nodes_of_dataplane(self, dataplane_id):
return fts_hostname

def delete_dataplane(self, dataplane_id):
self.log.info(f"Deleting serverless dataplane : {dataplane_id}")
resp = self.serverless_api.delete_dataplane(dataplane_id)
resp.raise_for_status()

Expand Down
93 changes: 73 additions & 20 deletions pytests/fts/serverless/sanity.py
Expand Up @@ -11,6 +11,7 @@
import threading
import pprint
from datetime import datetime
from prettytable import PrettyTable

class FTSElixirSanity(ServerlessBaseTestCase):
def setUp(self):
Expand All @@ -27,7 +28,8 @@ def setUp(self):
self.LWM_limit = self.input.param("LWM_limit", 0.5)
self.UWM_limit = self.input.param("UWM_limit", 0.3)
self.scaling_time = self.input.param("scaling_time", 30)
self.num_queries = self.input.param("num_queries", 1)
self.num_queries = self.input.param("num_queries", 20)
self.autoscaling_with_queries = self.input.param("autoscaling_with_queries", False)
global_vars.system_event_logs = EventHelper()
return super().setUp()

Expand Down Expand Up @@ -160,7 +162,7 @@ def run_fts_rest_based_queries(self):
collection_name = f'db_{counter}_collection_{random.randint(0, 1000)}'
self.create_scope(database_obj=database, scope_name=scope_name)
self.create_collection(database_obj=database, scope_name=scope_name, collection_name=collection_name)
self.load_databases(load_all_databases=False, doc_template="Employee",
self.load_databases(load_all_databases=False, doc_template="emp",
num_of_docs=self.num_of_docs_per_collection,
database_obj=database, scope=scope_name, collection=collection_name)
self.init_input_servers(database)
Expand Down Expand Up @@ -548,11 +550,10 @@ def get_fts_stats(self):
pprint.pprint(stat)
return stats, fts_nodes

def verify_HWM_index_rejection(self):
def verify_HWM_index_rejection(self, stats):
"""
Returns true if HWM is actually the reason for index rejection
"""
stats, fts_nodes = self.get_fts_stats()
mem_util = int(float(self.HWM_limit) * int(stats[0]['limits:memoryBytes']))
cpu_util = int(float(self.UWM_limit) * 100)
count = 0
Expand All @@ -579,13 +580,14 @@ def verify_scale_out(self, variables):
variables['node_count'] = new_node_count
return True

def verify_scale_in(self, variables):
def verify_scale_in(self, variables, summary):
"""
Returns True if Scale In not possible or Scale In was successful
"""
fts_stats, fts_nodes = self.get_fts_stats()
self.log.info("Verifying Scale In ...")
if len(fts_nodes) == 2:
self.generate_summary(summary, fts_stats, fts_nodes, variables, "No Scale-In Required")
self.log.info("CONDITIONAL BUG : ScaleIn verification not possible since scale out didn't happen")
return True

Expand All @@ -604,6 +606,7 @@ def verify_scale_in(self, variables):
break

if not scale_in_status:
self.generate_summary(summary, fts_stats, fts_nodes, variables, "Scale-In not possible")
self.log.info("CONDITIONAL BUG : ScaleIn verification not possible since Memory didn't decrease")
return False

Expand All @@ -613,20 +616,42 @@ def verify_scale_in(self, variables):
fts_stats, fts_nodes = self.get_fts_stats()
if len(fts_nodes) < variables['node_count']:
self.log.info(f"Scale In passed : {fts_stats},{fts_nodes}")
self.log.info("SUCCESS : Scale In passed !!")
self.log.info("PASS : Scale In passed !!")
self.generate_summary(summary, fts_stats, fts_nodes, variables, "P : Scale-In")
return True

self.generate_summary(summary, fts_stats, fts_nodes, variables, "F : Scale-In")
return False

def generate_summary(self, summary, fts_stats, fts_nodes, variables, name="Default", time_taken=0):
mem_arr = []
for i in range(len(fts_stats)):
node_name = str(fts_nodes[i]).strip(".sandbox.nonprod-project-avengers.com")
mem_util = (str(int(fts_stats[i]['utilization:memoryBytes'])/pow(10, 9)) + "GB").strip()
mem_arr.append(f"{node_name} - {mem_util}")
obj = {
"Detail": name,
"Time": str(datetime.now().replace(microsecond=0)),
"Memory Status": mem_arr,
"Scale Out Count ": variables['scale_out_count'],
"Scale Out Time": time_taken,
"HWM Index Rejection": True if variables['HWM_hit'] == 1 else False
}
summary.append(obj)

def autoscaling_synchronous(self):
self.delete_all_database(True)
if self.create_dataplane and self.new_dataplane_id is None:
if self.create_dataplane and self.new_dataplane_id is None:
self.fail("Failed to provision a new dataplane, ABORTING")
self.provision_databases(self.num_databases, None, self.new_dataplane_id)

summary = []
self.provision_databases(self.num_databases, None, self.new_dataplane_id)
self.log.info(f"------------ Initial STATS ------------")
fts_stats, fts_nodes = self.get_fts_stats()

variables = {'node_count': len(fts_nodes), 'scale_out_time': 0, 'HWM_hit': 0, 'scale_out_status': False,
'scale_out_count': 0}
self.generate_summary(summary, fts_stats, fts_nodes, variables, "Initial Summary")

failout_encounter = False
index_fail_check = False

Expand All @@ -637,7 +662,8 @@ def autoscaling_synchronous(self):
self.create_scope(database_obj=database, scope_name=scope_name)
self.create_collection(database_obj=database, scope_name=scope_name, collection_name=collection_name)
self.load_databases(load_all_databases=False, num_of_docs=self.num_of_docs_per_collection,
database_obj=database, scope=scope_name, collection=collection_name)
database_obj=database, scope=scope_name, collection=collection_name,
doc_template="emp")
self.init_input_servers(database)
fts_callable = FTSCallable(self.input.servers, es_validate=False, es_reset=False, scope=scope_name,
collections=collection_name, collection_index=True)
Expand All @@ -647,31 +673,39 @@ def autoscaling_synchronous(self):
plan_params = self.construct_plan_params()
for i in range(self.num_indexes):
try:
fts_callable.create_fts_index(f"counter_{counter + 1}_idx_{i + 1}", source_type='couchbase',
index = fts_callable.create_fts_index(f"counter_{counter + 1}_idx_{i + 1}", source_type='couchbase',
source_name=database.id, index_type='fulltext-index',
index_params=None, plan_params=plan_params,
source_params=None, source_uuid=None, collection_index=True,
_type=_type, analyzer="standard",
scope=scope_name, collections=[collection_name], no_check=False)
fts_callable.wait_for_indexing_complete(self.num_of_docs_per_collection)
if self.autoscaling_with_queries:
fts_callable.run_query_and_compare(index, self.num_queries)

if variables['scale_out_count'] != 0 and not index_fail_check:
with self.subTest("Verifying Index Rejection for HWM"):
if self.verify_HWM_index_rejection():
fts_stats, fts_nodes = self.get_fts_stats()
if self.verify_HWM_index_rejection(fts_stats):
if variables['HWM_hit'] == 0:
variables['HWM_hit'] += 1
self.generate_summary(summary, fts_stats, fts_nodes, variables, "HWM Exempt HIT")
self.log.info("HWM Hit has taken place, exempting first rejection")
else:
index_fail_check = True
self.log.info("Index created even though HWM had been satisfied")
self.generate_summary(summary, fts_stats, fts_nodes, variables, "F : HWM HIT,Index Created")
self.fail("BUG : Index created even though HWM had been satisfied")
except Exception as e:
print("Here in the except block")
with self.subTest("Index Creation Check"):
if self.verify_HWM_index_rejection():
fts_stats, fts_nodes = self.get_fts_stats()
if self.verify_HWM_index_rejection(fts_stats):
# self.assertEqual(str(e), f"rest_create_index: error creating index: , err: manager_api: CreateIndex, Prepare failed, err: limitIndexDef: Cannot accommodate index request: {index_name}, resource utilization over limit(s)")
self.log.info(f"SUCCESS : HWM Index Rejection Passed : {e}")
self.generate_summary(summary, fts_stats, fts_nodes, variables, "P : HWM HIT")
self.log.info(f"PASS : HWM Index Rejection Passed : {e}")
with self.subTest("Scale In Condition Check"):
if not self.verify_scale_in(variables):
if not self.verify_scale_in(variables, summary):
self.fail("BUG : Scale In Failed")
return
else:
Expand All @@ -683,6 +717,7 @@ def autoscaling_synchronous(self):

if not variables['scale_out_status'] and self.check_scale_out_condition(fts_stats):
self.log.info("Scale Out Satisfied")
self.generate_summary(summary, fts_stats, fts_nodes, variables, "P : Scale Out Condition Satisfied")
self.log.info(fts_stats)
variables['scale_out_time'] = datetime.now()
variables['scale_out_status'] = True
Expand All @@ -691,8 +726,10 @@ def autoscaling_synchronous(self):
if divmod((datetime.now() - variables['scale_out_time']).total_seconds(), 60)[0] > int(self.scaling_time):
with self.subTest("Scale Out Condition Check after scaling time"):
if self.verify_scale_out(variables):
self.log.info(f"Scale Out {variables['scale_out_count']}. Passed after {divmod((datetime.now() - variables['scale_out_time']).total_seconds(), 60)[0]} minutes!")
self.log.info(f"SUCCESS : Scale Out Passed")
time_taken = divmod((datetime.now() - variables['scale_out_time']).total_seconds(), 60)[0]
self.log.info(f"Scale Out {variables['scale_out_count']}. Passed after {time_taken} minutes!")
self.generate_summary(summary, fts_stats, fts_nodes, variables, "P : Scale Out Passed", time_taken)
self.log.info(f"PASS : Scale Out Passed")
self.log.info(f"Scale Out Stats: \n {fts_stats}, {fts_nodes}")
variables['scale_out_status'] = False
variables['scale_out_count'] += 1
Expand All @@ -701,25 +738,41 @@ def autoscaling_synchronous(self):
if not failout_encounter:
failout_encounter = True
self.log.info(f"Scale Out Failed -> {variables['scale_out_count']} : {fts_stats}, {fts_nodes}")
self.generate_summary(summary, fts_stats, fts_nodes, variables, "F : Scale Out Failed", self.scaling_time)
self.fail(f"BUG : Scale Out failed -> {variables['scale_out_count']} within {self.scaling_time} minutes")
else:
with self.subTest("Scale Out Condition Check before scaling time"):
if self.verify_scale_out(variables):
self.log.info(f"BUG : Scale Out {variables['scale_out_count']}. happened after {divmod((datetime.now() - variables['scale_out_time']).total_seconds(), 60)[0]} minutes!")
time_taken = divmod((datetime.now() - variables['scale_out_time']).total_seconds(), 60)[0]
self.log.info(f"BUG : Scale Out {variables['scale_out_count']}. happened after {time_taken} minutes!")
self.generate_summary(summary, fts_stats, fts_nodes, variables, f"F : Scale Out Passed < {self.scaling_time}", time_taken)
self.log.info(f"Scale Out Stats: \n {fts_stats}, {fts_nodes}")
variables['scale_out_status'] = False
variables['scale_out_count'] += 1
failout_encounter = False
self.fail(f"Scale Out Happened less than scaling time")

self.log.info(f"------------ FTS Stats @ Database {counter + 1} ------------")
self.get_fts_stats()
fts_stats, fts_nodes = self.get_fts_stats()
self.generate_summary(summary, fts_stats, fts_nodes, variables, f"INFO - DB-{counter+1}")

myTable = PrettyTable(summary[0].keys())
for json_obj in summary:
myTable.add_row(json_obj.values())
self.log.info("Summary Table")
print(myTable)

with self.subTest("Scale In Condition Check"):
if not self.verify_scale_in(variables):
if not self.verify_scale_in(variables, summary):
self.log.info(f"Scale In Failed : {fts_stats}, {fts_nodes}")
self.fail("BUG : Scale In Failed")

myTable = PrettyTable(summary[0].keys())
for json_obj in summary:
myTable.add_row(json_obj.values())
self.log.info("Summary Table")
print(myTable)

def test_n1ql_search(self):
self.provision_databases(self.num_databases)
for counter, database in enumerate(self.databases.values()):
Expand Down
6 changes: 4 additions & 2 deletions pytests/serverless/serverless_basetestcase.py
Expand Up @@ -62,6 +62,7 @@ def setUp(self):
with open('pytests/serverless/config/dataplane_spec_config.json') as f:
overRide = json.load(f)
overRide['couchbase']['image'] = self.input.capella.get("image")
overRide['couchbase']['version'] = self.input.capella.get("server_version")
self.new_dataplane_id = self.provision_dataplane(overRide)
if self.new_dataplane_id is not None:
self.dataplanes[self.new_dataplane_id] = ServerlessDataPlane(self.new_dataplane_id)
Expand All @@ -83,6 +84,7 @@ def tearDown(self):
self.delete_all_database()
if self.new_dataplane_id is not None:
self.log.info(f"Deleting dataplane : {self.new_dataplane_id}")
self.delete_all_database(True, self.new_dataplane_id)
self.delete_dataplane(self.new_dataplane_id)
self.task_manager.shutdown(force=True)

Expand Down Expand Up @@ -181,10 +183,10 @@ def provision_dataplane(self, overRide=None):
self.log.info('PROVISIONING DATAPLANE ...')
return self.api.create_dataplane_wait_for_ready(overRide)

def delete_all_database(self, all_db=False):
def delete_all_database(self, all_db=False, dataplane_id=None):
databases = None
if all_db:
databases = self.api.get_databases_id()
databases = self.api.get_databases_id(dataplane_id)
if len(databases) == 0:
return
else:
Expand Down

0 comments on commit afdca07

Please sign in to comment.