Skip to content

Commit

Permalink
CBQE-7515: Remove dead code
Browse files Browse the repository at this point in the history
Change-Id: Ic5f0e1e03473d207d76ad97ff4a0d0b8c09a9b23
Reviewed-on: https://review.couchbase.org/c/testrunner/+/172567
Tested-by: Couchbase QE
Reviewed-by: Chanabasappa Ghali <chanabasappa.ghali@couchbase.com>
  • Loading branch information
Sujay2611 committed Mar 21, 2022
1 parent 751cd44 commit 07a5467
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 115 deletions.
136 changes: 27 additions & 109 deletions lib/membase/api/rest_client.py
Expand Up @@ -323,7 +323,6 @@ def __init__(self, serverInfo):
new_services=fts-kv-index-n1ql """
self.services_node_init = self.input.param("new_services", None)
self.debug_logs = self.input.param("debug-logs", False)
self.eventing_role = self.input.param('eventing_role', False)

if CbServer.use_https:
self.port = CbServer.ssl_port_map.get(str(self.port),
Expand Down Expand Up @@ -5183,10 +5182,7 @@ def change_password_builtin_user(self, user_id, password):
'''

def lifecycle_operation(self, name, operation, function_scope=None, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/functions/" + name +"/"+ operation
if function_scope is not None:
url += "?bucket={0}&scope={1}".format(function_scope["bucket"],
Expand Down Expand Up @@ -5238,10 +5234,7 @@ def deploy_function(self, name, body, function_scope=None):
GET all the Functions
'''
def get_all_functions(self, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/functions"
api = self.eventing_baseUrl + url
headers = {'Content-type': 'application/json', 'Authorization': 'Basic %s' % authorization}
Expand All @@ -5254,10 +5247,7 @@ def get_all_functions(self, username="Administrator", password="password"):
Undeploy the Function
'''
def set_settings_for_function(self, name, body, function_scope=None, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/functions/" + name +"/settings"
if function_scope is not None:
url += "?bucket={0}&scope={1}".format(function_scope["bucket"],
Expand All @@ -5276,10 +5266,7 @@ def set_settings_for_function(self, name, body, function_scope=None, username="A
'''

def deploy_function_by_name(self, name, function_scope=None, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/functions/" + name + "/settings"
if function_scope is not None:
url += "?bucket={0}&scope={1}".format(function_scope["bucket"],
Expand All @@ -5298,10 +5285,7 @@ def deploy_function_by_name(self, name, function_scope=None, username="Administr
'''

def pause_function_by_name(self, name, function_scope=None, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/functions/" + name + "/settings"
if function_scope is not None:
url += "?bucket={0}&scope={1}".format(function_scope["bucket"],
Expand All @@ -5319,10 +5303,7 @@ def pause_function_by_name(self, name, function_scope=None, username="Administra
undeploy the Function
'''
def undeploy_function(self, name, function_scope=None, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/functions/" + name +"/settings"
if function_scope is not None:
url += "?bucket={0}&scope={1}".format(function_scope["bucket"],
Expand All @@ -5340,10 +5321,7 @@ def undeploy_function(self, name, function_scope=None, username="Administrator",
Delete all the functions
'''
def delete_all_function(self, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/functions"
api = self.eventing_baseUrl + url
headers = {'Content-type': 'application/json', 'Authorization': 'Basic %s' % authorization}
Expand All @@ -5356,10 +5334,7 @@ def delete_all_function(self, username="Administrator", password="password"):
Delete single function
'''
def delete_single_function(self, name, function_scope=None, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/functions/" + name
if function_scope is not None:
url += "?bucket={0}&scope={1}".format(function_scope["bucket"],
Expand Down Expand Up @@ -5408,10 +5383,7 @@ def delete_function(self, name, function_scope=None):
'''
def export_function(self, name, function_scope=None, username="Administrator", password="password"):
export_map = {}
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/export/" + name
if function_scope is not None:
url += "?bucket={0}&scope={1}".format(function_scope["bucket"],
Expand All @@ -5436,10 +5408,7 @@ def export_function(self, name, function_scope=None, username="Administrator", p
'''

def import_function(self, body, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/import"
api = self.eventing_baseUrl + url
headers = {'Content-type': 'application/json', 'Authorization': 'Basic %s' % authorization}
Expand All @@ -5454,10 +5423,7 @@ def import_function(self, body, username="Administrator", password="password"):
Ensure that the eventing node is out of bootstrap node
'''
def get_deployed_eventing_apps(self, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "getDeployedApps"
api = self.eventing_baseUrl + url
headers = {'Content-type': 'application/json', 'Authorization': 'Basic %s' % authorization}
Expand All @@ -5470,10 +5436,7 @@ def get_deployed_eventing_apps(self, username="Administrator", password="passwor
return list of eventing functions
'''
def get_list_of_eventing_functions(self):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(self.username, self.password)
authorization = self.get_authorization(self.username, self.password)
url = "api/v1/list/functions"
api = self.eventing_baseUrl + url
headers = {'Content-type': 'application/json',
Expand All @@ -5489,10 +5452,7 @@ def get_list_of_eventing_functions(self):
'''

def get_running_eventing_apps(self, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "getRunningApps"
api = self.eventing_baseUrl + url
headers = {'Content-type': 'application/json', 'Authorization': 'Basic %s' % authorization}
Expand All @@ -5505,10 +5465,7 @@ def get_running_eventing_apps(self, username="Administrator", password="password
composite status of a handler
'''
def get_composite_eventing_status(self, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/status"
api = self.eventing_baseUrl + url
headers = {'Content-type': 'application/json', 'Authorization': 'Basic %s' % authorization}
Expand All @@ -5523,10 +5480,7 @@ def get_composite_eventing_status(self, username="Administrator", password="pass
def get_event_processing_stats(self, name, function_scope=None, eventing_map=None, username="Administrator", password="password"):
if eventing_map is None:
eventing_map = {}
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "getEventProcessingStats?name=" + name
if function_scope is not None:
url += "&bucket={0}&scope={1}".format(function_scope["bucket"],
Expand All @@ -5550,10 +5504,7 @@ def get_event_processing_stats(self, name, function_scope=None, eventing_map=Non
def get_aggregate_event_processing_stats(self, name, function_scope=None, eventing_map=None, username="Administrator", password="password"):
if eventing_map is None:
eventing_map = {}
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "getAggEventProcessingStats?name=" + name
if function_scope is not None:
url += "&bucket={0}&scope={1}".format(function_scope["bucket"],
Expand All @@ -5577,10 +5528,7 @@ def get_aggregate_event_processing_stats(self, name, function_scope=None, eventi
def get_event_execution_stats(self, name, function_scope=None, eventing_map=None, username="Administrator", password="password"):
if eventing_map is None:
eventing_map = {}
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "getExecutionStats?name=" + name
if function_scope is not None:
url += "&bucket={0}&scope={1}".format(function_scope["bucket"],
Expand All @@ -5604,10 +5552,7 @@ def get_event_execution_stats(self, name, function_scope=None, eventing_map=None
def get_event_failure_stats(self, name, function_scope=None, eventing_map=None, username="Administrator", password="password"):
if eventing_map is None:
eventing_map = {}
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "getFailureStats?name=" + name
if function_scope is not None:
url += "&bucket={0}&scope={1}".format(function_scope["bucket"],
Expand All @@ -5631,10 +5576,7 @@ def get_event_failure_stats(self, name, function_scope=None, eventing_map=None,
def get_all_eventing_stats(self, seqs_processed=False, eventing_map=None, username="Administrator", password="password"):
if eventing_map is None:
eventing_map = {}
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
if seqs_processed:
url = "api/v1/stats?type=full"
else:
Expand All @@ -5650,10 +5592,7 @@ def get_all_eventing_stats(self, seqs_processed=False, eventing_map=None, userna
Cleanup eventing
'''
def cleanup_eventing(self):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(self.username, self.password)
authorization = self.get_authorization(self.username, self.password)
url = "cleanupEventing"
api = self.eventing_baseUrl + url
headers = {'Content-type': 'application/json', 'Authorization': 'Basic %s' % authorization}
Expand Down Expand Up @@ -5852,10 +5791,7 @@ def get_function_appcode(self, name, function_scope=None):
Get eventing rebalance status
'''
def get_eventing_rebalance_status(self, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "getAggRebalanceStatus"
api = self.eventing_baseUrl + url
headers = {'Content-type': 'application/json', 'Authorization': 'Basic %s' % authorization}
Expand All @@ -5867,10 +5803,7 @@ def get_eventing_rebalance_status(self, username="Administrator", password="pass
Get application logs
'''
def get_app_logs(self,handler_name, function_scope=None, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "getAppLog?aggregate=true&name=" + handler_name
if function_scope is not None:
url += "&bucket={0}&scope={1}".format(function_scope["bucket"],
Expand All @@ -5882,10 +5815,7 @@ def get_app_logs(self,handler_name, function_scope=None, username="Administrator
return content

def create_function(self, name, body, function_scope=None, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/functions/" + name
if function_scope is not None:
url += "?bucket={0}&scope={1}".format(function_scope["bucket"],
Expand All @@ -5899,10 +5829,7 @@ def create_function(self, name, body, function_scope=None, username="Administrat
return content

def update_function(self, name, body, function_scope=None, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/functions/" + name
if function_scope is not None:
url += "?bucket={0}&scope={1}".format(function_scope["bucket"],
Expand All @@ -5917,10 +5844,7 @@ def update_function(self, name, body, function_scope=None, username="Administrat
return content

def get_function_details(self, name, function_scope=None, username="Administrator", password="password"):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(username, password)
authorization = self.get_authorization(username, password)
url = "api/v1/functions/" + name
if function_scope is not None:
url += "?bucket={0}&scope={1}".format(function_scope["bucket"],
Expand All @@ -5933,10 +5857,7 @@ def get_function_details(self, name, function_scope=None, username="Administrato
return content

def get_eventing_go_routine_dumps(self):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(self.username, self.password)
authorization = self.get_authorization(self.username, self.password)
url = "debug/pprof/goroutine?debug=1"
api = self.eventing_baseUrl + url
headers = {'Content-type': 'application/json', 'Authorization': 'Basic %s' % authorization}
Expand All @@ -5946,10 +5867,7 @@ def get_eventing_go_routine_dumps(self):
return content

def set_eventing_retry(self, name, body, function_scope=None):
if self.eventing_role:
authorization = self.get_authorization("eventing_admin", "password")
else:
authorization = self.get_authorization(self.username, self.password)
authorization = self.get_authorization(self.username, self.password)
url = "api/v1/functions/" + name + "/retry"
if function_scope is not None:
url += "?bucket={0}&scope={1}".format(function_scope["bucket"],
Expand Down
6 changes: 0 additions & 6 deletions pytests/eventing/eventing_base.py
Expand Up @@ -125,12 +125,6 @@ def setUp(self):
self.function_scope = {"bucket": self.src_bucket_name, "scope": "_default"}
self.num_docs=2016
self.is_binary=self.input.param('binary_doc',False)
self.eventing_role=self.input.param('eventing_role', False)
if self.eventing_role:
self.eventing_role_username="eventing_admin"
self.eventing_role_password="password"
payload="name="+self.eventing_role_username+"&roles=eventing_admin"+"&password="+self.eventing_role_password
RestConnection(self.master).add_set_builtin_user(self.eventing_role_username,payload)
log.info("============== EventingBaseTest setup has completed ==============")

def tearDown(self):
Expand Down

0 comments on commit 07a5467

Please sign in to comment.