Skip to content

Commit

Permalink
V0.6.4 (#177)
Browse files Browse the repository at this point in the history
* Require current version of dbmsbenchmarker

* TPC-H: Allow to set a default script (for example Create Schema)

* Configuration: Draft yugabytedb using own get_service_sut()

* Configuration: DBMSBenchmarker does not have to wait before starting

* Docs: Sketch of workflow (parallel generate/load and benchmark

* Configuration: Draft kinetica using own get_service_sut()

* Configuration: Draft kinetica using own create_monitoring()

* Configuration: Draft kinetica using own set_metric_of_config()

* Configuration: Jobs are synched by Redis, they do not have to wait for fixed period of time

* Configuration: Upload YCSB results to cluster

* Cluster: get_service_endpoints()

* Cluster: prometheus_config contains endpoints in new format

* Configuration: Catch if container does not have ENV

* Cluster: prometheus_config contains endpoints in new format

* YCSB: Benchmark might contain INSERT and SCAN

* Configuration: Upload YCSB results to cluster

* Configuration: connections.config should contain all connections processed so far

* YCSB: Benchmark ignore GC

* Configuration: Kinetica monitoring tests

* Cluster: get_service_endpoints() bug

* HammerDB: Upload results to evaluation server

* Cluster: endpoints_cluster for monitoring

* Metrics: Option container_name

* Metrics: Fetch loader metrics first try

* Metrics: Fetch loader metrics by benchmarker section

* Metrics: Fetch benchmarker metrics by benchmarker section

* Metrics: Scraping and timeout 10s

* AWS: Set context

* Experiments: Only start monitoring if benchmarks left

* Experiments: Store pod log, remove pod, then delete job

* YCSB: Evaluator contains number of operations and works for workload E

* YCSB: Averaging percentiles of latencies

* Cluster: If monitor_cluster_active, do not deploy additional cAdvicors per SUT

* Metrics: Fetch datagenerator metrics

* Configuration: Catch if get_host_diskspace_used() returns weird format

* HammerDB: Averaging metrics

* Cluster: Job status checks number of completions

* Configuration: Do not restart loading when sut pod got lost

* Cluster: Job status checks number of completions >=

* Configuration: Do not restart loading when sut pod got lost

* Cluster: Job status checks number of completions not None

* Cluster: Do not dump job status

* Benchbase: Duration of pod is part of loading result

* Evaluation: Know natural sort

* Experiment: Option to change parameters for each run of a benchmarker instance

* Experiment: Store pod logs before job has finished

* Experiment: Store logs of job - message in debug

* Experiment: Store logs of job - up to 10 retries

* Configuration: Disk size in kilobytes

* Experiment: Deploy daemonset

* Cluster: Retry if timeout at execute_command_in_pod()

* Removed IPython dependency
  • Loading branch information
perdelt committed Jun 20, 2023
1 parent f5efc22 commit 321abab
Show file tree
Hide file tree
Showing 6 changed files with 776 additions and 171 deletions.
98 changes: 90 additions & 8 deletions bexhoma/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def __init__(self, clusterconfig='cluster.config', experiments_configfolder='exp
self.host = 'localhost'
self.port = self.contextdata['port']
self.monitoring_active = True
self.monitor_cluster_active = False
# k8s:
self.namespace = self.contextdata['namespace']
self.appname = self.config['credentials']['k8s']['appname']
Expand Down Expand Up @@ -851,11 +852,23 @@ def execute_command_in_pod(self, command, pod='', container='', params=''):
#print(stdout.decode('utf-8'), stderr.decode('utf-8'))
str_stdout = stdout.decode('utf-8')
str_stderr = stderr.decode('utf-8')
return "", str_stdout, str_stderr
if 'Error from server: error dialing backend' in str_stdout or 'Error from server: error dialing backend' in str_stderr:
print("Connection error found")
self.wait(5)
return self.execute_command_in_pod(command=command, pod=pod, container=container, params=params)
else:
return "", str_stdout, str_stderr
except Exception as e:
print(e)
print(stdout, stderr)
return "", stdout, stderr
str_stdout = stdout.decode('utf-8')
str_stderr = stderr.decode('utf-8')
if 'Error from server: error dialing backend' in str_stdout or 'Error from server: error dialing backend' in str_stderr:
print("Connection error found")
self.wait(5)
return self.execute_command_in_pod(command=command, pod=pod, container=container, params=params)
else:
return "", stdout, stderr
return "", "", ""
def check_DBMS_connection(self, ip, port):
"""
Expand Down Expand Up @@ -1076,11 +1089,16 @@ def get_job_status(self, jobname='', app='', component='', experiment='', config
jobname = jobs[0]
api_response = self.v1batches.read_namespaced_job_status(jobname, self.namespace)#, label_selector='app='+cluster.appname)
#pprint(api_response)
print("api_response.spec.completions", api_response.spec.completions)
print("api_response.status.succeeded", api_response.status.succeeded)
# returns number of completed pods (!)
#return api_response.status.succeeded
# we want status of job (!)
#self.logger.debug("api_response.status.succeeded = {}".format(api_response.status.succeeded))
#self.logger.debug("api_response.status.conditions = {}".format(api_response.status.conditions))
if api_response.status.succeeded is not None and api_response.spec.completions <= api_response.status.succeeded:
print("Number of completions reached")
return True
if api_response.status.succeeded is not None and api_response.status.succeeded > 0 and api_response.status.conditions is not None and len(api_response.status.conditions) > 0:
self.logger.debug(api_response.status.conditions[0].type)
return api_response.status.conditions[0].type == 'Complete'
Expand All @@ -1094,6 +1112,8 @@ def get_job_status(self, jobname='', app='', component='', experiment='', config
# try again, if not failed due to "not found"
if not e.status == 404:
return self.get_job_status(jobname=jobname, app=app, component=component, experiment=experiment, configuration=configuration, client=client)
else:
return 0
def delete_job(self, jobname='', app='', component='', experiment='', configuration='', client=''):
"""
Delete a job given by name or matching a set of labels (component/ experiment/ configuration)
Expand Down Expand Up @@ -1241,6 +1261,28 @@ def start_dashboard(self, app='', component='dashboard'):
name = self.create_dashboard_name(app, component)
self.logger.debug('testbed.start_dashboard({})'.format(deployment))
self.kubectl('create -f '+self.yamlfolder+deployment)
def start_monitoring_cluster(self, app='', component='monitoring'):
"""
Starts the monitoring component and its service.
Manifest for node exporters is expected in 'deamonsettemplate-monitoring.yml'.
:param app: app monitoring belongs to
:param component: Component name, should be 'monitoring' typically
"""
self.monitor_cluster_active = True
endpoints = self.get_service_endpoints(service_name="bexhoma-service-monitoring-default")
if len(endpoints) > 0:
# dashboard exists
self.logger.debug('testbed.start_monitoring_cluster()=exists')
return
else:
self.logger.debug('testbed.start_monitoring_cluster()=deploy')
deployment = 'daemonsettemplate-monitoring.yml'
#name = self.create_dashboard_name(app, component)
#self.logger.debug('testbed.start_monitoring_general({})'.format(deployment))
self.kubectl('create -f '+self.yamlfolder+deployment)
#deployment = 'deploymenttemplate-bexhoma-prometheus.yml'
#self.kubectl('create -f '+self.yamlfolder+deployment)
def start_messagequeue(self, app='', component='messagequeue'):
"""
Starts the message queue.
Expand Down Expand Up @@ -1472,6 +1514,32 @@ def set_pod_counter(self, queue, value=0):
self.logger.debug("I am using messagequeue {}".format(pod_messagequeue))
redisCommand = 'redis-cli set {redisQueue} {value} '.format(redisQueue=queue, value=value)
self.execute_command_in_pod(command=redisCommand, pod=pod_messagequeue)
def get_service_endpoints(self, service_name="bexhoma-service-monitoring-default"):
"""
Returns a list of all endpoints of a service as a list.
This is in particular interesting for headless services.
It is used to find all nodes in a cluster, if monitoring of cluster is active.
:param service_name: Name of the service
:return: List of IPs of endpoints
"""
#kubectl get endpoints -o jsonpath="{range .items[*]}{.metadata.name},{.subsets[*].addresses[*].ip}{'\n'}{end}"
#service_name = "bexhoma-service-monitoring-default"
self.logger.debug("get_service_endpoints({})".format(service_name))
endpoints = self.kubectl("get endpoints -o jsonpath=\"{range .items[*]}{.metadata.name},{.subsets[*].addresses[*].ip}{'\\n'}{end}\"")
try:
endpoints_of_service = endpoints.split("\n")
for service in endpoints_of_service:
if service.startswith(service_name):
#print(service)
endpoints_string = service[service.find(",")+1:]
#print(endpoints_string)
endpoints_list = endpoints_string.split(" ")
self.logger.debug("endpoints: {}".format(endpoints_list))
return endpoints_list
except Exception as e:
print("Exception when calling get_service_endpoints: %s\n" % e)
return []



Expand Down Expand Up @@ -1555,16 +1623,30 @@ def store_pod_log(self, pod_name, container=''):
"""
Store the log of a pod in a local file in the experiment result folder.
Optionally the name of a container can be given (mandatory, if pod has multiple containers).
If file containing pod log is already present, we do nothing (no update).
:param pod_name: Name of the pod
:param container: Name of the container
"""
# write pod log
stdout = self.pod_log(pod_name, container)
filename_log = self.config['benchmarker']['resultfolder'].replace("\\", "/").replace("C:", "")+"/"+str(self.code)+'/'+pod_name+'.log'
f = open(filename_log, "w")
f.write(stdout)
f.close()
if len(container) > 0:
filename_log = "{path}/{code}/{pod}.{container}.log".format(path=self.config['benchmarker']['resultfolder'].replace("\\", "/").replace("C:", ""), code=self.code, pod=pod_name, container=container)
else:
filename_log = "{path}/{code}/{pod}.log".format(path=self.config['benchmarker']['resultfolder'].replace("\\", "/").replace("C:", ""), code=self.code, pod=pod_name)
# do not overwrite
if not os.path.isfile(filename_log):
# max 10 tries to receive the log (timeout might occure)
tries = 1
while tries<10:
stdout = self.pod_log(pod_name, container)
if len(stdout) > 0:
f = open(filename_log, "w")
f.write(stdout)
f.close()
return
else:
tries = tries + 1




Expand Down Expand Up @@ -1606,7 +1688,7 @@ def __init__(self, clusterconfig='cluster.config', experiments_configfolder='exp
"""
self.code = code
kubernetes.__init__(self, clusterconfig=clusterconfig, experiments_configfolder=experiments_configfolder, context=context, yamlfolder=yamlfolder, code=self.code, instance=instance, volume=volume, docker=docker, script=script, queryfile=queryfile)
self.cluster = self.contextdata['cluster']
self.cluster = self.context#data['cluster']
def eksctl(self, command):
"""
Runs an eksctl command.
Expand Down

0 comments on commit 321abab

Please sign in to comment.