From 092975698894ff4019057e4130c72e75bdeb8fbd Mon Sep 17 00:00:00 2001 From: Patrick Erdelt Date: Wed, 27 Jul 2022 11:48:43 +0200 Subject: [PATCH] V0.5.20 (#100) * storageConfiguration tag, so that postgres=9 and =13 can share a pvc * Prepare next release * Scaling factor per experiment * Demo config cleaned * Docs: Improve config and examples * Configuration: Dockerimage as parameter * Config: MonetDB log file * Masterscript: we do not test at localhost (forwarded), because there might be conflicts * First draft of an IoT benchmark * First draft of an IoT benchmark - create_job_maintainer similar to benchmarker * First draft of an IoT benchmark - create_job_maintainer specific to PostgreSQL * Maintaining draft * Tools: Show Maintaining pods * Tools: Stop Maintaining pods * First draft of an IoT benchmark - maintaining settings in experiment * Masterscript: stop sut also means stop maintaining * Masterscript: maintainer needs no client (benchmarker) infos * Masterscript: maintainer parallelism as parameter * Masterscript: maintainer runs for 2h per default * Tools: Show Maintaining pods as number only * Masterscript: stop maintainer job removes its pods * Masterscript: reconnect and try again, if not failed due to "not found" * Masterscript: more debug output about jobs and their pods * Masterscript: more debug output about maintaining * Masterscript: stop pods of a job explicitly * Tools: Show more debug * Masterscript: maintaining does not change datatransfer settings of benchmarker --- bexhoma/configurations.py | 124 +++++++++++++++++++++++++- bexhoma/experiments.py | 71 ++++++++++++++- bexhoma/masterK8s.py | 42 +++++++-- bexhoma/scripts/experimentsmanager.py | 32 +++++++ cluster.py | 67 ++++++++++---- setup.py | 2 +- 6 files changed, 310 insertions(+), 28 deletions(-) diff --git a/bexhoma/configurations.py b/bexhoma/configurations.py index 676768f3..bb48e878 100644 --- a/bexhoma/configurations.py +++ b/bexhoma/configurations.py @@ -90,6 +90,8 @@ def __init__(self, experiment, docker=None, configuration='', script=None, alias self.dialect = dialect self.num_worker = worker self.monitoring_active = experiment.monitoring_active + self.maintaining_active = experiment.maintaining_active + self.parallelism = 1 self.storage_label = experiment.storage_label self.experiment_done = False self.dockerimage = dockerimage @@ -242,6 +244,29 @@ def sut_is_running(self): if status == "Running": return True return False + def maintaining_is_running(self): + app = self.appname + component = 'maintaining' + configuration = self.configuration + pods = self.experiment.cluster.getPods(app, component, self.experiment.code, configuration) + self.logger.debug("maintaining_is_running found {} pods".format(len(pods))) + if len(pods) > 0: + pod_sut = pods[0] + status = self.experiment.cluster.getPodStatus(pod_sut) + if status == "Running": + return True + return False + def maintaining_is_pending(self): + app = self.appname + component = 'maintaining' + configuration = self.configuration + pods = self.experiment.cluster.getPods(app, component, self.experiment.code, configuration) + if len(pods) > 0: + pod_sut = pods[0] + status = self.experiment.cluster.getPodStatus(pod_sut) + if status == "Pending": + return True + return False def monitoring_is_running(self): app = self.appname component = 'monitoring' @@ -299,6 +324,8 @@ def start_loading(self, delay=0): forward = ['kubectl', '--context {context}'.format(context=self.experiment.cluster.context), 'port-forward', 'service/'+service] #bexhoma-service']#, '9091', '9300']#, '9400'] forward.extend(ports) your_command = " ".join(forward) + # we do not test at localhost (forwarded), because there might be conflicts + """ self.logger.debug('configuration.start_loading({})'.format(your_command)) subprocess.Popen(your_command, stdout=subprocess.PIPE, shell=True) # wait for port to be connected @@ -308,6 +335,7 @@ def start_loading(self, delay=0): # not answering self.experiment.cluster.stopPortforwarding() return False + """ #while not dbmsactive: # self.wait(10) # dbmsactive = self.checkDBMS(self.experiment.cluster.host, self.experiment.cluster.port) @@ -316,8 +344,9 @@ def start_loading(self, delay=0): if not self.loading_started: #print("load_data") self.load_data() - self.experiment.cluster.stopPortforwarding() - # store experiment + # we do not test at localhost (forwarded), because there might be conflicts + #self.experiment.cluster.stopPortforwarding() + # store experiment needs new format """ experiment = {} experiment['delay'] = delay @@ -344,6 +373,16 @@ def generate_component_name(self, app='', component='', experiment='', configura else: name = "{app}-{component}-{configuration}-{experiment}".format(app=app, component=component, configuration=configuration, experiment=experiment).lower() return name + def start_maintaining(self, app='', component='maintaining', experiment='', configuration='', parallelism=1): + if len(app) == 0: + app = self.appname + if len(configuration) == 0: + configuration = self.configuration + if len(experiment) == 0: + experiment = self.code + job = self.create_job_maintaining(app=app, component='maintaining', experiment=experiment, configuration=configuration, parallelism=parallelism) + self.logger.debug("Deploy "+job) + self.experiment.cluster.kubectl('create -f '+job)#self.yamlfolder+deployment) def create_monitoring(self, app='', component='monitoring', experiment='', configuration=''): name = self.generate_component_name(app=app, component=component, experiment=experiment, configuration=configuration) #if len(app) == 0: @@ -448,6 +487,27 @@ def stop_monitoring(self, app='', component='monitoring', experiment='', configu services = self.experiment.cluster.getServices(app=app, component=component, experiment=experiment, configuration=configuration) for service in services: self.experiment.cluster.deleteService(service) + def stop_maintaining(self, app='', component='maintaining', experiment='', configuration=''): + if len(app)==0: + app = self.appname + if len(configuration) == 0: + configuration = self.configuration + if len(experiment) == 0: + experiment = self.code + jobs = self.experiment.cluster.getJobs(app, component, experiment, configuration) + # status per job + for job in jobs: + success = self.experiment.cluster.getJobStatus(job) + print(job, success) + self.experiment.cluster.deleteJob(job) + # all pods to these jobs - automatically stopped? only if finished? + #self.experiment.cluster.getJobPods(app, component, experiment, configuration) + pods = self.experiment.cluster.getJobPods(app, component, experiment, configuration) + for p in pods: + status = self.experiment.cluster.getPodStatus(p) + print(p, status) + #if status == "Running": + self.experiment.cluster.deletePod(p) def get_instance_from_resources(self): resources = experiments.DictToObject(self.resources) cpu = resources.requests.cpu @@ -794,6 +854,8 @@ def stop_sut(self, app='', component='sut', experiment='', configuration=''): self.experiment.cluster.deleteService(service) if self.experiment.monitoring_active: self.stop_monitoring() + if self.experiment.maintaining_active: + self.stop_maintaining() if component == 'sut': self.stop_sut(app=app, component='worker', experiment=experiment, configuration=configuration) def checkGPUs(self): @@ -1554,6 +1616,64 @@ def create_job(self, connection, app='', component='benchmarker', experiment='', except yaml.YAMLError as exc: print(exc) return job_experiment + #def create_job_maintaining(self, app='', component='maintaining', experiment='', configuration='', client='1', parallelism=1, alias=''): + def create_job_maintaining(self, app='', component='maintaining', experiment='', configuration='', parallelism=1, alias=''): + if len(app) == 0: + app = self.appname + jobname = self.generate_component_name(app=app, component=component, experiment=experiment, configuration=configuration) + servicename = self.generate_component_name(app=app, component='sut', experiment=experiment, configuration=configuration) + #print(jobname) + self.logger.debug('configuration.create_job_maintainer({})'.format(jobname)) + # determine start time + now = datetime.utcnow() + start = now + timedelta(seconds=180) + #start = datetime.strptime('2021-03-04 23:15:25', '%Y-%m-%d %H:%M:%S') + #wait = (start-now).seconds + now_string = now.strftime('%Y-%m-%d %H:%M:%S') + start_string = start.strftime('%Y-%m-%d %H:%M:%S') + #yamlfile = self.experiment.cluster.yamlfolder+"job-dbmsbenchmarker-"+code+".yml" + job_experiment = self.experiment.path+'/job-maintaining-{configuration}.yml'.format(configuration=configuration) + with open(self.experiment.cluster.yamlfolder+"jobtemplate-maintaining.yml") as stream: + try: + result=yaml.safe_load_all(stream) + result = [data for data in result] + #print(result) + except yaml.YAMLError as exc: + print(exc) + for dep in result: + if dep['kind'] == 'Job': + dep['metadata']['name'] = jobname + job = dep['metadata']['name'] + dep['spec']['completions'] = parallelism + dep['spec']['parallelism'] = parallelism + dep['metadata']['labels']['app'] = app + dep['metadata']['labels']['component'] = component + dep['metadata']['labels']['configuration'] = configuration + dep['metadata']['labels']['experiment'] = str(experiment) + #dep['metadata']['labels']['client'] = str(client) + dep['metadata']['labels']['experimentRun'] = str(self.numExperimentsDone+1) + dep['spec']['template']['metadata']['labels']['app'] = app + dep['spec']['template']['metadata']['labels']['component'] = component + dep['spec']['template']['metadata']['labels']['configuration'] = configuration + dep['spec']['template']['metadata']['labels']['experiment'] = str(experiment) + #dep['spec']['template']['metadata']['labels']['client'] = str(client) + dep['spec']['template']['metadata']['labels']['experimentRun'] = str(self.numExperimentsDone+1) + envs = dep['spec']['template']['spec']['containers'][0]['env'] + for i,e in enumerate(envs): + if e['name'] == 'SENSOR_DATABASE': + dep['spec']['template']['spec']['containers'][0]['env'][i]['value'] = 'postgresql://postgres:@{}:9091/postgres'.format(servicename) + if e['name'] == 'SENSOR_RATE': + dep['spec']['template']['spec']['containers'][0]['env'][i]['value'] = '0.1' + if e['name'] == 'SENSOR_NUMBER': + dep['spec']['template']['spec']['containers'][0]['env'][i]['value'] = '72000' + self.logger.debug('configuration.create_job_maintaining({})'.format(str(e))) + #print(e) + with open(job_experiment,"w+") as stream: + try: + stream.write(yaml.dump_all(result)) + except yaml.YAMLError as exc: + print(exc) + return job_experiment diff --git a/bexhoma/experiments.py b/bexhoma/experiments.py index 83613ad4..ad0f68a0 100644 --- a/bexhoma/experiments.py +++ b/bexhoma/experiments.py @@ -99,6 +99,7 @@ def __init__(self, self.namespace = self.cluster.namespace#.config['credentials']['k8s']['namespace'] self.configurations = [] self.storage_label = '' + self.maintaining_active = False def wait(self, sec): print("Waiting "+str(sec)+"s...", end="", flush=True) intervals = int(sec) @@ -430,9 +431,9 @@ def work_benchmark_list(self, intervals=30, stop=True): #print("{} is not running".format(config.configuration)) if not config.experiment_done: if not config.sut_is_pending(): - print("{} is not running yet - ".format(config.configuration), end="", flush=True) + print("{} is not running yet - ".format(config.configuration))#, end="", flush=True) if self.cluster.max_sut is not None: - print("{} running and {} pending pods: max is {} pods in the cluster - ".format(num_pods_running, num_pods_pending, self.cluster.max_sut), end="", flush=True) + print("{} running and {} pending pods: max is {} pods in the cluster - ".format(num_pods_running, num_pods_pending, self.cluster.max_sut))#, end="", flush=True) if num_pods_running+num_pods_pending < self.cluster.max_sut: print("it will start now") config.start_sut() @@ -474,11 +475,23 @@ def work_benchmark_list(self, intervals=30, stop=True): config.loading_after_time = now + timedelta(seconds=delay) print("{} will start loading but not before {} (that is in {} secs)".format(config.configuration, config.loading_after_time.strftime('%Y-%m-%d %H:%M:%S'), delay)) continue + # check if maintaining + if config.loading_finished: + if config.monitoring_active and not config.monitoring_is_running(): + print("{} waits for monitoring".format(config.configuration)) + continue + if config.maintaining_active: + if not config.maintaining_is_running(): + print("{} is not maintained yet".format(config.configuration)) + config.start_maintaining(parallelism=config.parallelism) # benchmark if loading is done and monitoring is ready if config.loading_finished: if config.monitoring_active and not config.monitoring_is_running(): print("{} waits for monitoring".format(config.configuration)) continue + if config.maintaining_active and not config.maintaining_is_running(): + print("{} waits for maintaining".format(config.configuration)) + continue app = self.cluster.appname component = 'benchmarker' configuration = '' @@ -555,7 +568,7 @@ def work_benchmark_list(self, intervals=30, stop=True): # status per job for job in jobs: success = self.cluster.getJobStatus(job) - self.cluster.logger.debug('job {} has status {}'.format(job, success)) + self.cluster.logger.debug('job {} has success status {}'.format(job, success)) #print(job, success) if success: self.cluster.deleteJob(job) @@ -697,3 +710,55 @@ def set_queries_full(self): def set_queries_profiling(self): self.set_queryfile('queries-tpch-profiling.config') +class iot(default): + def __init__(self, + cluster, + code=None, + queryfile = 'queries-iot.config', + SF = '1', + numExperiments = 1, + timeout = 7200, + detached=False): + default.__init__(self, cluster, code, numExperiments, timeout, detached) + self.set_experiment(volume='iot') + self.set_experiment(script='SF'+str(SF)+'-index') + self.cluster.set_configfolder('experiments/iot') + parameter.defaultParameters = {'SF': str(SF)} + self.set_queryfile(queryfile) + self.set_workload( + name = 'IoT Queries SF='+str(SF), + info = 'This experiment performs some IoT inspired queries.' + ) + self.storage_label = 'tpch-'+str(SF) + self.maintaining_active = True + def set_queries_full(self): + self.set_queryfile('queries-iot.config') + def set_queries_profiling(self): + self.set_queryfile('queries-iot-profiling.config') + def set_querymanagement_maintaining(self, + numRun=128, + delay=5, + datatransfer=False): + self.set_querymanagement( + numWarmup = 0, + numCooldown = 0, + numRun = numRun, + delay = delay, + timer = { + 'connection': + { + 'active': True, + 'delay': 0 + }, + #'datatransfer': + #{ + # 'active': datatransfer, + # 'sorted': True, + # 'compare': 'result', + # 'store': [], + # 'precision': 0, + #} + }) + #self.monitoring_active = True + self.maintaining_active = True + diff --git a/bexhoma/masterK8s.py b/bexhoma/masterK8s.py index 2be4cb9d..c9bf3326 100644 --- a/bexhoma/masterK8s.py +++ b/bexhoma/masterK8s.py @@ -1430,7 +1430,9 @@ def getJobs(self, app='', component='', experiment='', configuration='', client= print("Create new access token") self.cluster_access() self.wait(2) - return self.getJobs(app=app, component=component, experiment=experiment, configuration=configuration, client=client) + # try again, if not failed due to "not found" + if not e.status == 404: + return self.getJobs(app=app, component=component, experiment=experiment, configuration=configuration, client=client) def getJobStatus(self, jobname='', app='', component='', experiment='', configuration='', client=''): #print("getJobStatus") label = '' @@ -1461,13 +1463,16 @@ def getJobStatus(self, jobname='', app='', component='', experiment='', configur print("Create new access token") self.cluster_access() self.wait(2) - return self.getJobStatus(jobname=jobname, app=app, component=component, experiment=experiment, configuration=configuration, client=client) + # try again, if not failed due to "not found" + if not e.status == 404: + return self.getJobStatus(jobname=jobname, app=app, component=component, experiment=experiment, configuration=configuration, client=client) def deleteJob(self, jobname='', app='', component='', experiment='', configuration='', client=''): self.logger.debug('testdesign.deleteJob()') try: if len(jobname) == 0: jobs = self.getJobs(app=app, component=component, experiment=experiment, configuration=configuration, client=client) jobname = jobs[0] + self.logger.debug('testdesign.deleteJob({})'.format(jobname)) api_response = self.v1batches.delete_namespaced_job(jobname, self.namespace)#, label_selector='app='+cluster.appname) #pprint(api_response) #pprint(api_response.status.succeeded) @@ -1476,7 +1481,9 @@ def deleteJob(self, jobname='', app='', component='', experiment='', configurati print("Exception when calling BatchV1Api->delete_namespaced_job: %s\n" % e) self.cluster_access() self.wait(2) - return self.deleteJob(jobname=jobname, app=app, component=component, experiment=experiment, configuration=configuration, client=client) + # try again, if not failed due to "not found" + if not e.status == 404: + return self.deleteJob(jobname=jobname, app=app, component=component, experiment=experiment, configuration=configuration, client=client) def deleteJobPod(self, jobname='', app='', component='', experiment='', configuration='', client=''): self.logger.debug('testdesign.deleteJobPod()') body = kubernetes.client.V1DeleteOptions() @@ -1488,13 +1495,16 @@ def deleteJobPod(self, jobname='', app='', component='', experiment='', configur self.deleteJobPod(jobname=pod, app=app, component=component, experiment=experiment, configuration=configuration, client=client) return #jobname = pods[0] + self.logger.debug('testdesign.deleteJobPod({})'.format(jobname)) api_response = self.v1core.delete_namespaced_pod(jobname, self.namespace, body=body) #pprint(api_response) except ApiException as e: print("Exception when calling CoreV1Api->delete_namespaced_pod: %s\n" % e) self.cluster_access() self.wait(2) - return self.deleteJobPod(jobname=jobname, app=app, component=component, experiment=experiment, configuration=configuration, client=client) + # try again, if not failed due to "not found" + if not e.status == 404: + return self.deleteJobPod(jobname=jobname, app=app, component=component, experiment=experiment, configuration=configuration, client=client) def getJobPods(self, app='', component='', experiment='', configuration='', client=''): #print("getJobPods") label = '' @@ -1509,7 +1519,7 @@ def getJobPods(self, app='', component='', experiment='', configuration='', clie label += ',configuration='+configuration if len(client)>0: label += ',client='+client - self.logger.debug('getJobPods'+label) + self.logger.debug('getJobPods '+label) try: api_response = self.v1core.list_namespaced_pod(self.namespace, label_selector=label)#'app='+appname) #pprint(api_response) @@ -1523,7 +1533,9 @@ def getJobPods(self, app='', component='', experiment='', configuration='', clie print("Create new access token") self.cluster_access() self.wait(2) - return self.getJobPods(app=app, component=component, experiment=experiment, configuration=configuration, client=client) + # try again, if not failed due to "not found" + if not e.status == 404: + return self.getJobPods(app=app, component=component, experiment=experiment, configuration=configuration, client=client) def create_job(self, connection, app='', component='benchmarker', experiment='', configuration='', client='1'): if len(app) == 0: app = self.appname @@ -1654,6 +1666,24 @@ def stop_dashboard(self, app='', component='dashboard'): services = self.getServices(app=app, component=component) for service in services: self.deleteService(service) + def stop_maintaining(self, experiment='', configuration=''): + # all jobs of configuration - benchmarker + app = self.appname + component = 'maintaining' + jobs = self.getJobs(app, component, experiment, configuration) + # status per job + for job in jobs: + success = self.getJobStatus(job) + print(job, success) + self.deleteJob(job) + # all pods to these jobs - automatically stopped? + #self.getJobPods(app, component, experiment, configuration) + pods = self.getJobPods(app, component, experiment, configuration) + for p in pods: + status = self.getPodStatus(p) + print(p, status) + #if status == "Running": + self.deletePod(p) def stop_monitoring(self, app='', component='monitoring', experiment='', configuration=''): deployments = self.getDeployments(app=app, component=component, experiment=experiment, configuration=configuration) for deployment in deployments: diff --git a/bexhoma/scripts/experimentsmanager.py b/bexhoma/scripts/experimentsmanager.py index 66f68c37..a648b036 100644 --- a/bexhoma/scripts/experimentsmanager.py +++ b/bexhoma/scripts/experimentsmanager.py @@ -37,12 +37,22 @@ def manage(): # argparse parser = argparse.ArgumentParser(description=description) parser.add_argument('mode', help='manage experiments: stop, get status, connect to dbms or connect to dashboard', choices=['stop','status','dashboard', 'master']) + parser.add_argument('-db', '--debug', help='dump debug informations', action='store_true') parser.add_argument('-e', '--experiment', help='code of experiment', default=None) parser.add_argument('-c', '--connection', help='name of DBMS', default=None) parser.add_argument('-v', '--verbose', help='gives more details about Kubernetes objects', action='store_true') parser.add_argument('-cx', '--context', help='context of Kubernetes (for a multi cluster environment), default is current context', default=None) clusterconfig = 'cluster.config' + # evaluate args args = parser.parse_args() + if args.debug: + logging.basicConfig(level=logging.DEBUG) + #logging.basicConfig(level=logging.DEBUG) + if args.debug: + logger_bexhoma = logging.getLogger('bexhoma') + logger_bexhoma.setLevel(logging.DEBUG) + logger_loader = logging.getLogger('load_data_asynch') + logger_loader.setLevel(logging.DEBUG) connection = args.connection if args.mode == 'stop': cluster = clusters.kubernetes(clusterconfig, context=args.context) @@ -52,11 +62,13 @@ def manage(): connection = '' cluster.stop_sut(configuration=connection) cluster.stop_monitoring(configuration=connection) + cluster.stop_maintaining() cluster.stop_benchmarker(configuration=connection) else: experiment = experiments.default(cluster=cluster, code=args.experiment) experiment.stop_sut() cluster.stop_monitoring() + cluster.stop_maintaining() cluster.stop_benchmarker() elif args.mode == 'dashboard': cluster = clusters.kubernetes(clusterconfig, context=args.context) @@ -170,6 +182,26 @@ def manage(): #print(status) apps[configuration][component] += "{pod} ({status})".format(pod='', status=status) ############ + component = 'maintaining' + apps[configuration][component] = '' + if args.verbose: + stateful_sets = cluster.getStatefulSets(app=app, component=component, experiment=experiment, configuration=configuration) + print("Stateful Sets", stateful_sets) + services = cluster.getServices(app=app, component=component, experiment=experiment, configuration=configuration) + print("Maintaining Services", services) + pods = cluster.getPods(app=app, component=component, experiment=experiment, configuration=configuration) + if args.verbose: + print("Maintaining Pods", pods) + num_pods = {} + for pod in pods: + status = cluster.getPodStatus(pod) + #print(status) + #apps[configuration][component] += "{pod} ({status})".format(pod='', status=status) + num_pods[status] = 1 if not status in num_pods else num_pods[status]+1 + #print(num_pods) + for status in num_pods.keys(): + apps[configuration][component] += "({num} {status})".format(num=num_pods[status], status=status) + ############ component = 'monitoring' apps[configuration][component] = '' if args.verbose: diff --git a/cluster.py b/cluster.py index fe76291e..43c91c50 100644 --- a/cluster.py +++ b/cluster.py @@ -1,20 +1,20 @@ """ - This script contains some code snippets for testing the detached mode in Kubernetes + This script contains some code snippets for testing the detached mode in Kubernetes - Copyright (C) 2021 Patrick Erdelt + Copyright (C) 2021 Patrick Erdelt - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . """ from bexhoma import * from dbmsbenchmarker import * @@ -37,30 +37,45 @@ # argparse parser = argparse.ArgumentParser(description=description) parser.add_argument('mode', help='profile the import or run the TPC-H queries', choices=['stop','status','dashboard', 'master']) + parser.add_argument('-db', '--debug', help='dump debug informations', action='store_true') parser.add_argument('-e', '--experiment', help='code of experiment', default=None) parser.add_argument('-c', '--connection', help='name of DBMS', default=None) parser.add_argument('-v', '--verbose', help='gives more details about Kubernetes objects', action='store_true') parser.add_argument('-cx', '--context', help='context of Kubernetes (for a multi cluster environment), default is current context', default=None) clusterconfig = 'cluster.config' + # evaluate args args = parser.parse_args() + if args.debug: + logging.basicConfig(level=logging.DEBUG) + #logging.basicConfig(level=logging.DEBUG) + if args.debug: + logger_bexhoma = logging.getLogger('bexhoma') + logger_bexhoma.setLevel(logging.DEBUG) + logger_loader = logging.getLogger('load_data_asynch') + logger_loader.setLevel(logging.DEBUG) + connection = args.connection if args.mode == 'stop': cluster = clusters.kubernetes(clusterconfig, context=args.context) if args.experiment is None: experiment = experiments.default(cluster=cluster, code=cluster.code) - cluster.stop_sut() - cluster.stop_monitoring() - cluster.stop_benchmarker() + if connection is None: + connection = '' + cluster.stop_sut(configuration=connection) + cluster.stop_monitoring(configuration=connection) + cluster.stop_maintaining() + cluster.stop_benchmarker(configuration=connection) else: experiment = experiments.default(cluster=cluster, code=args.experiment) experiment.stop_sut() cluster.stop_monitoring() + cluster.stop_maintaining() cluster.stop_benchmarker() elif args.mode == 'dashboard': cluster = clusters.kubernetes(clusterconfig, context=args.context) cluster.connect_dashboard() elif args.mode == 'master': cluster = clusters.kubernetes(clusterconfig, context=args.context) - cluster.connect_master(experiment=args.experiment, configuration=args.connection) + cluster.connect_master(experiment=args.experiment, configuration=connection) elif args.mode == 'status': cluster = clusters.kubernetes(clusterconfig, context=args.context) app = cluster.appname @@ -167,6 +182,26 @@ #print(status) apps[configuration][component] += "{pod} ({status})".format(pod='', status=status) ############ + component = 'maintaining' + apps[configuration][component] = '' + if args.verbose: + stateful_sets = cluster.getStatefulSets(app=app, component=component, experiment=experiment, configuration=configuration) + print("Stateful Sets", stateful_sets) + services = cluster.getServices(app=app, component=component, experiment=experiment, configuration=configuration) + print("Maintaining Services", services) + pods = cluster.getPods(app=app, component=component, experiment=experiment, configuration=configuration) + if args.verbose: + print("Maintaining Pods", pods) + num_pods = {} + for pod in pods: + status = cluster.getPodStatus(pod) + #print(status) + #apps[configuration][component] += "{pod} ({status})".format(pod='', status=status) + num_pods[status] = 1 if not status in num_pods else num_pods[status]+1 + #print(num_pods) + for status in num_pods.keys(): + apps[configuration][component] += "({num} {status})".format(num=num_pods[status], status=status) + ############ component = 'monitoring' apps[configuration][component] = '' if args.verbose: diff --git a/setup.py b/setup.py index 542f233d..8c4969c7 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setuptools.setup( name="bexhoma", - version="0.5.19", + version="0.5.20", author="Patrick Erdelt", author_email="perdelt@beuth-hochschule.de", description="This python tools helps managing DBMS benchmarking experiments in a Kubernetes-based HPC cluster environment. It enables users to configure hardware / software setups for easily repeating tests over varying configurations.",