diff --git a/bexhoma/configurations.py b/bexhoma/configurations.py
index 676768f3..bb48e878 100644
--- a/bexhoma/configurations.py
+++ b/bexhoma/configurations.py
@@ -90,6 +90,8 @@ def __init__(self, experiment, docker=None, configuration='', script=None, alias
self.dialect = dialect
self.num_worker = worker
self.monitoring_active = experiment.monitoring_active
+ self.maintaining_active = experiment.maintaining_active
+ self.parallelism = 1
self.storage_label = experiment.storage_label
self.experiment_done = False
self.dockerimage = dockerimage
@@ -242,6 +244,29 @@ def sut_is_running(self):
if status == "Running":
return True
return False
+ def maintaining_is_running(self):
+ app = self.appname
+ component = 'maintaining'
+ configuration = self.configuration
+ pods = self.experiment.cluster.getPods(app, component, self.experiment.code, configuration)
+ self.logger.debug("maintaining_is_running found {} pods".format(len(pods)))
+ if len(pods) > 0:
+ pod_sut = pods[0]
+ status = self.experiment.cluster.getPodStatus(pod_sut)
+ if status == "Running":
+ return True
+ return False
+ def maintaining_is_pending(self):
+ app = self.appname
+ component = 'maintaining'
+ configuration = self.configuration
+ pods = self.experiment.cluster.getPods(app, component, self.experiment.code, configuration)
+ if len(pods) > 0:
+ pod_sut = pods[0]
+ status = self.experiment.cluster.getPodStatus(pod_sut)
+ if status == "Pending":
+ return True
+ return False
def monitoring_is_running(self):
app = self.appname
component = 'monitoring'
@@ -299,6 +324,8 @@ def start_loading(self, delay=0):
forward = ['kubectl', '--context {context}'.format(context=self.experiment.cluster.context), 'port-forward', 'service/'+service] #bexhoma-service']#, '9091', '9300']#, '9400']
forward.extend(ports)
your_command = " ".join(forward)
+ # we do not test at localhost (forwarded), because there might be conflicts
+ """
self.logger.debug('configuration.start_loading({})'.format(your_command))
subprocess.Popen(your_command, stdout=subprocess.PIPE, shell=True)
# wait for port to be connected
@@ -308,6 +335,7 @@ def start_loading(self, delay=0):
# not answering
self.experiment.cluster.stopPortforwarding()
return False
+ """
#while not dbmsactive:
# self.wait(10)
# dbmsactive = self.checkDBMS(self.experiment.cluster.host, self.experiment.cluster.port)
@@ -316,8 +344,9 @@ def start_loading(self, delay=0):
if not self.loading_started:
#print("load_data")
self.load_data()
- self.experiment.cluster.stopPortforwarding()
- # store experiment
+ # we do not test at localhost (forwarded), because there might be conflicts
+ #self.experiment.cluster.stopPortforwarding()
+ # store experiment needs new format
"""
experiment = {}
experiment['delay'] = delay
@@ -344,6 +373,16 @@ def generate_component_name(self, app='', component='', experiment='', configura
else:
name = "{app}-{component}-{configuration}-{experiment}".format(app=app, component=component, configuration=configuration, experiment=experiment).lower()
return name
+ def start_maintaining(self, app='', component='maintaining', experiment='', configuration='', parallelism=1):
+ if len(app) == 0:
+ app = self.appname
+ if len(configuration) == 0:
+ configuration = self.configuration
+ if len(experiment) == 0:
+ experiment = self.code
+ job = self.create_job_maintaining(app=app, component='maintaining', experiment=experiment, configuration=configuration, parallelism=parallelism)
+ self.logger.debug("Deploy "+job)
+ self.experiment.cluster.kubectl('create -f '+job)#self.yamlfolder+deployment)
def create_monitoring(self, app='', component='monitoring', experiment='', configuration=''):
name = self.generate_component_name(app=app, component=component, experiment=experiment, configuration=configuration)
#if len(app) == 0:
@@ -448,6 +487,27 @@ def stop_monitoring(self, app='', component='monitoring', experiment='', configu
services = self.experiment.cluster.getServices(app=app, component=component, experiment=experiment, configuration=configuration)
for service in services:
self.experiment.cluster.deleteService(service)
+ def stop_maintaining(self, app='', component='maintaining', experiment='', configuration=''):
+ if len(app)==0:
+ app = self.appname
+ if len(configuration) == 0:
+ configuration = self.configuration
+ if len(experiment) == 0:
+ experiment = self.code
+ jobs = self.experiment.cluster.getJobs(app, component, experiment, configuration)
+ # status per job
+ for job in jobs:
+ success = self.experiment.cluster.getJobStatus(job)
+ print(job, success)
+ self.experiment.cluster.deleteJob(job)
+ # all pods to these jobs - automatically stopped? only if finished?
+ #self.experiment.cluster.getJobPods(app, component, experiment, configuration)
+ pods = self.experiment.cluster.getJobPods(app, component, experiment, configuration)
+ for p in pods:
+ status = self.experiment.cluster.getPodStatus(p)
+ print(p, status)
+ #if status == "Running":
+ self.experiment.cluster.deletePod(p)
def get_instance_from_resources(self):
resources = experiments.DictToObject(self.resources)
cpu = resources.requests.cpu
@@ -794,6 +854,8 @@ def stop_sut(self, app='', component='sut', experiment='', configuration=''):
self.experiment.cluster.deleteService(service)
if self.experiment.monitoring_active:
self.stop_monitoring()
+ if self.experiment.maintaining_active:
+ self.stop_maintaining()
if component == 'sut':
self.stop_sut(app=app, component='worker', experiment=experiment, configuration=configuration)
def checkGPUs(self):
@@ -1554,6 +1616,64 @@ def create_job(self, connection, app='', component='benchmarker', experiment='',
except yaml.YAMLError as exc:
print(exc)
return job_experiment
+ #def create_job_maintaining(self, app='', component='maintaining', experiment='', configuration='', client='1', parallelism=1, alias=''):
+ def create_job_maintaining(self, app='', component='maintaining', experiment='', configuration='', parallelism=1, alias=''):
+ if len(app) == 0:
+ app = self.appname
+ jobname = self.generate_component_name(app=app, component=component, experiment=experiment, configuration=configuration)
+ servicename = self.generate_component_name(app=app, component='sut', experiment=experiment, configuration=configuration)
+ #print(jobname)
+ self.logger.debug('configuration.create_job_maintainer({})'.format(jobname))
+ # determine start time
+ now = datetime.utcnow()
+ start = now + timedelta(seconds=180)
+ #start = datetime.strptime('2021-03-04 23:15:25', '%Y-%m-%d %H:%M:%S')
+ #wait = (start-now).seconds
+ now_string = now.strftime('%Y-%m-%d %H:%M:%S')
+ start_string = start.strftime('%Y-%m-%d %H:%M:%S')
+ #yamlfile = self.experiment.cluster.yamlfolder+"job-dbmsbenchmarker-"+code+".yml"
+ job_experiment = self.experiment.path+'/job-maintaining-{configuration}.yml'.format(configuration=configuration)
+ with open(self.experiment.cluster.yamlfolder+"jobtemplate-maintaining.yml") as stream:
+ try:
+ result=yaml.safe_load_all(stream)
+ result = [data for data in result]
+ #print(result)
+ except yaml.YAMLError as exc:
+ print(exc)
+ for dep in result:
+ if dep['kind'] == 'Job':
+ dep['metadata']['name'] = jobname
+ job = dep['metadata']['name']
+ dep['spec']['completions'] = parallelism
+ dep['spec']['parallelism'] = parallelism
+ dep['metadata']['labels']['app'] = app
+ dep['metadata']['labels']['component'] = component
+ dep['metadata']['labels']['configuration'] = configuration
+ dep['metadata']['labels']['experiment'] = str(experiment)
+ #dep['metadata']['labels']['client'] = str(client)
+ dep['metadata']['labels']['experimentRun'] = str(self.numExperimentsDone+1)
+ dep['spec']['template']['metadata']['labels']['app'] = app
+ dep['spec']['template']['metadata']['labels']['component'] = component
+ dep['spec']['template']['metadata']['labels']['configuration'] = configuration
+ dep['spec']['template']['metadata']['labels']['experiment'] = str(experiment)
+ #dep['spec']['template']['metadata']['labels']['client'] = str(client)
+ dep['spec']['template']['metadata']['labels']['experimentRun'] = str(self.numExperimentsDone+1)
+ envs = dep['spec']['template']['spec']['containers'][0]['env']
+ for i,e in enumerate(envs):
+ if e['name'] == 'SENSOR_DATABASE':
+ dep['spec']['template']['spec']['containers'][0]['env'][i]['value'] = 'postgresql://postgres:@{}:9091/postgres'.format(servicename)
+ if e['name'] == 'SENSOR_RATE':
+ dep['spec']['template']['spec']['containers'][0]['env'][i]['value'] = '0.1'
+ if e['name'] == 'SENSOR_NUMBER':
+ dep['spec']['template']['spec']['containers'][0]['env'][i]['value'] = '72000'
+ self.logger.debug('configuration.create_job_maintaining({})'.format(str(e)))
+ #print(e)
+ with open(job_experiment,"w+") as stream:
+ try:
+ stream.write(yaml.dump_all(result))
+ except yaml.YAMLError as exc:
+ print(exc)
+ return job_experiment
diff --git a/bexhoma/experiments.py b/bexhoma/experiments.py
index 83613ad4..ad0f68a0 100644
--- a/bexhoma/experiments.py
+++ b/bexhoma/experiments.py
@@ -99,6 +99,7 @@ def __init__(self,
self.namespace = self.cluster.namespace#.config['credentials']['k8s']['namespace']
self.configurations = []
self.storage_label = ''
+ self.maintaining_active = False
def wait(self, sec):
print("Waiting "+str(sec)+"s...", end="", flush=True)
intervals = int(sec)
@@ -430,9 +431,9 @@ def work_benchmark_list(self, intervals=30, stop=True):
#print("{} is not running".format(config.configuration))
if not config.experiment_done:
if not config.sut_is_pending():
- print("{} is not running yet - ".format(config.configuration), end="", flush=True)
+ print("{} is not running yet - ".format(config.configuration))#, end="", flush=True)
if self.cluster.max_sut is not None:
- print("{} running and {} pending pods: max is {} pods in the cluster - ".format(num_pods_running, num_pods_pending, self.cluster.max_sut), end="", flush=True)
+ print("{} running and {} pending pods: max is {} pods in the cluster - ".format(num_pods_running, num_pods_pending, self.cluster.max_sut))#, end="", flush=True)
if num_pods_running+num_pods_pending < self.cluster.max_sut:
print("it will start now")
config.start_sut()
@@ -474,11 +475,23 @@ def work_benchmark_list(self, intervals=30, stop=True):
config.loading_after_time = now + timedelta(seconds=delay)
print("{} will start loading but not before {} (that is in {} secs)".format(config.configuration, config.loading_after_time.strftime('%Y-%m-%d %H:%M:%S'), delay))
continue
+ # check if maintaining
+ if config.loading_finished:
+ if config.monitoring_active and not config.monitoring_is_running():
+ print("{} waits for monitoring".format(config.configuration))
+ continue
+ if config.maintaining_active:
+ if not config.maintaining_is_running():
+ print("{} is not maintained yet".format(config.configuration))
+ config.start_maintaining(parallelism=config.parallelism)
# benchmark if loading is done and monitoring is ready
if config.loading_finished:
if config.monitoring_active and not config.monitoring_is_running():
print("{} waits for monitoring".format(config.configuration))
continue
+ if config.maintaining_active and not config.maintaining_is_running():
+ print("{} waits for maintaining".format(config.configuration))
+ continue
app = self.cluster.appname
component = 'benchmarker'
configuration = ''
@@ -555,7 +568,7 @@ def work_benchmark_list(self, intervals=30, stop=True):
# status per job
for job in jobs:
success = self.cluster.getJobStatus(job)
- self.cluster.logger.debug('job {} has status {}'.format(job, success))
+ self.cluster.logger.debug('job {} has success status {}'.format(job, success))
#print(job, success)
if success:
self.cluster.deleteJob(job)
@@ -697,3 +710,55 @@ def set_queries_full(self):
def set_queries_profiling(self):
self.set_queryfile('queries-tpch-profiling.config')
+class iot(default):
+ def __init__(self,
+ cluster,
+ code=None,
+ queryfile = 'queries-iot.config',
+ SF = '1',
+ numExperiments = 1,
+ timeout = 7200,
+ detached=False):
+ default.__init__(self, cluster, code, numExperiments, timeout, detached)
+ self.set_experiment(volume='iot')
+ self.set_experiment(script='SF'+str(SF)+'-index')
+ self.cluster.set_configfolder('experiments/iot')
+ parameter.defaultParameters = {'SF': str(SF)}
+ self.set_queryfile(queryfile)
+ self.set_workload(
+ name = 'IoT Queries SF='+str(SF),
+ info = 'This experiment performs some IoT inspired queries.'
+ )
+ self.storage_label = 'tpch-'+str(SF)
+ self.maintaining_active = True
+ def set_queries_full(self):
+ self.set_queryfile('queries-iot.config')
+ def set_queries_profiling(self):
+ self.set_queryfile('queries-iot-profiling.config')
+ def set_querymanagement_maintaining(self,
+ numRun=128,
+ delay=5,
+ datatransfer=False):
+ self.set_querymanagement(
+ numWarmup = 0,
+ numCooldown = 0,
+ numRun = numRun,
+ delay = delay,
+ timer = {
+ 'connection':
+ {
+ 'active': True,
+ 'delay': 0
+ },
+ #'datatransfer':
+ #{
+ # 'active': datatransfer,
+ # 'sorted': True,
+ # 'compare': 'result',
+ # 'store': [],
+ # 'precision': 0,
+ #}
+ })
+ #self.monitoring_active = True
+ self.maintaining_active = True
+
diff --git a/bexhoma/masterK8s.py b/bexhoma/masterK8s.py
index 2be4cb9d..c9bf3326 100644
--- a/bexhoma/masterK8s.py
+++ b/bexhoma/masterK8s.py
@@ -1430,7 +1430,9 @@ def getJobs(self, app='', component='', experiment='', configuration='', client=
print("Create new access token")
self.cluster_access()
self.wait(2)
- return self.getJobs(app=app, component=component, experiment=experiment, configuration=configuration, client=client)
+ # try again, if not failed due to "not found"
+ if not e.status == 404:
+ return self.getJobs(app=app, component=component, experiment=experiment, configuration=configuration, client=client)
def getJobStatus(self, jobname='', app='', component='', experiment='', configuration='', client=''):
#print("getJobStatus")
label = ''
@@ -1461,13 +1463,16 @@ def getJobStatus(self, jobname='', app='', component='', experiment='', configur
print("Create new access token")
self.cluster_access()
self.wait(2)
- return self.getJobStatus(jobname=jobname, app=app, component=component, experiment=experiment, configuration=configuration, client=client)
+ # try again, if not failed due to "not found"
+ if not e.status == 404:
+ return self.getJobStatus(jobname=jobname, app=app, component=component, experiment=experiment, configuration=configuration, client=client)
def deleteJob(self, jobname='', app='', component='', experiment='', configuration='', client=''):
self.logger.debug('testdesign.deleteJob()')
try:
if len(jobname) == 0:
jobs = self.getJobs(app=app, component=component, experiment=experiment, configuration=configuration, client=client)
jobname = jobs[0]
+ self.logger.debug('testdesign.deleteJob({})'.format(jobname))
api_response = self.v1batches.delete_namespaced_job(jobname, self.namespace)#, label_selector='app='+cluster.appname)
#pprint(api_response)
#pprint(api_response.status.succeeded)
@@ -1476,7 +1481,9 @@ def deleteJob(self, jobname='', app='', component='', experiment='', configurati
print("Exception when calling BatchV1Api->delete_namespaced_job: %s\n" % e)
self.cluster_access()
self.wait(2)
- return self.deleteJob(jobname=jobname, app=app, component=component, experiment=experiment, configuration=configuration, client=client)
+ # try again, if not failed due to "not found"
+ if not e.status == 404:
+ return self.deleteJob(jobname=jobname, app=app, component=component, experiment=experiment, configuration=configuration, client=client)
def deleteJobPod(self, jobname='', app='', component='', experiment='', configuration='', client=''):
self.logger.debug('testdesign.deleteJobPod()')
body = kubernetes.client.V1DeleteOptions()
@@ -1488,13 +1495,16 @@ def deleteJobPod(self, jobname='', app='', component='', experiment='', configur
self.deleteJobPod(jobname=pod, app=app, component=component, experiment=experiment, configuration=configuration, client=client)
return
#jobname = pods[0]
+ self.logger.debug('testdesign.deleteJobPod({})'.format(jobname))
api_response = self.v1core.delete_namespaced_pod(jobname, self.namespace, body=body)
#pprint(api_response)
except ApiException as e:
print("Exception when calling CoreV1Api->delete_namespaced_pod: %s\n" % e)
self.cluster_access()
self.wait(2)
- return self.deleteJobPod(jobname=jobname, app=app, component=component, experiment=experiment, configuration=configuration, client=client)
+ # try again, if not failed due to "not found"
+ if not e.status == 404:
+ return self.deleteJobPod(jobname=jobname, app=app, component=component, experiment=experiment, configuration=configuration, client=client)
def getJobPods(self, app='', component='', experiment='', configuration='', client=''):
#print("getJobPods")
label = ''
@@ -1509,7 +1519,7 @@ def getJobPods(self, app='', component='', experiment='', configuration='', clie
label += ',configuration='+configuration
if len(client)>0:
label += ',client='+client
- self.logger.debug('getJobPods'+label)
+ self.logger.debug('getJobPods '+label)
try:
api_response = self.v1core.list_namespaced_pod(self.namespace, label_selector=label)#'app='+appname)
#pprint(api_response)
@@ -1523,7 +1533,9 @@ def getJobPods(self, app='', component='', experiment='', configuration='', clie
print("Create new access token")
self.cluster_access()
self.wait(2)
- return self.getJobPods(app=app, component=component, experiment=experiment, configuration=configuration, client=client)
+ # try again, if not failed due to "not found"
+ if not e.status == 404:
+ return self.getJobPods(app=app, component=component, experiment=experiment, configuration=configuration, client=client)
def create_job(self, connection, app='', component='benchmarker', experiment='', configuration='', client='1'):
if len(app) == 0:
app = self.appname
@@ -1654,6 +1666,24 @@ def stop_dashboard(self, app='', component='dashboard'):
services = self.getServices(app=app, component=component)
for service in services:
self.deleteService(service)
+ def stop_maintaining(self, experiment='', configuration=''):
+ # all jobs of configuration - benchmarker
+ app = self.appname
+ component = 'maintaining'
+ jobs = self.getJobs(app, component, experiment, configuration)
+ # status per job
+ for job in jobs:
+ success = self.getJobStatus(job)
+ print(job, success)
+ self.deleteJob(job)
+ # all pods to these jobs - automatically stopped?
+ #self.getJobPods(app, component, experiment, configuration)
+ pods = self.getJobPods(app, component, experiment, configuration)
+ for p in pods:
+ status = self.getPodStatus(p)
+ print(p, status)
+ #if status == "Running":
+ self.deletePod(p)
def stop_monitoring(self, app='', component='monitoring', experiment='', configuration=''):
deployments = self.getDeployments(app=app, component=component, experiment=experiment, configuration=configuration)
for deployment in deployments:
diff --git a/bexhoma/scripts/experimentsmanager.py b/bexhoma/scripts/experimentsmanager.py
index 66f68c37..a648b036 100644
--- a/bexhoma/scripts/experimentsmanager.py
+++ b/bexhoma/scripts/experimentsmanager.py
@@ -37,12 +37,22 @@ def manage():
# argparse
parser = argparse.ArgumentParser(description=description)
parser.add_argument('mode', help='manage experiments: stop, get status, connect to dbms or connect to dashboard', choices=['stop','status','dashboard', 'master'])
+ parser.add_argument('-db', '--debug', help='dump debug informations', action='store_true')
parser.add_argument('-e', '--experiment', help='code of experiment', default=None)
parser.add_argument('-c', '--connection', help='name of DBMS', default=None)
parser.add_argument('-v', '--verbose', help='gives more details about Kubernetes objects', action='store_true')
parser.add_argument('-cx', '--context', help='context of Kubernetes (for a multi cluster environment), default is current context', default=None)
clusterconfig = 'cluster.config'
+ # evaluate args
args = parser.parse_args()
+ if args.debug:
+ logging.basicConfig(level=logging.DEBUG)
+ #logging.basicConfig(level=logging.DEBUG)
+ if args.debug:
+ logger_bexhoma = logging.getLogger('bexhoma')
+ logger_bexhoma.setLevel(logging.DEBUG)
+ logger_loader = logging.getLogger('load_data_asynch')
+ logger_loader.setLevel(logging.DEBUG)
connection = args.connection
if args.mode == 'stop':
cluster = clusters.kubernetes(clusterconfig, context=args.context)
@@ -52,11 +62,13 @@ def manage():
connection = ''
cluster.stop_sut(configuration=connection)
cluster.stop_monitoring(configuration=connection)
+ cluster.stop_maintaining()
cluster.stop_benchmarker(configuration=connection)
else:
experiment = experiments.default(cluster=cluster, code=args.experiment)
experiment.stop_sut()
cluster.stop_monitoring()
+ cluster.stop_maintaining()
cluster.stop_benchmarker()
elif args.mode == 'dashboard':
cluster = clusters.kubernetes(clusterconfig, context=args.context)
@@ -170,6 +182,26 @@ def manage():
#print(status)
apps[configuration][component] += "{pod} ({status})".format(pod='', status=status)
############
+ component = 'maintaining'
+ apps[configuration][component] = ''
+ if args.verbose:
+ stateful_sets = cluster.getStatefulSets(app=app, component=component, experiment=experiment, configuration=configuration)
+ print("Stateful Sets", stateful_sets)
+ services = cluster.getServices(app=app, component=component, experiment=experiment, configuration=configuration)
+ print("Maintaining Services", services)
+ pods = cluster.getPods(app=app, component=component, experiment=experiment, configuration=configuration)
+ if args.verbose:
+ print("Maintaining Pods", pods)
+ num_pods = {}
+ for pod in pods:
+ status = cluster.getPodStatus(pod)
+ #print(status)
+ #apps[configuration][component] += "{pod} ({status})".format(pod='', status=status)
+ num_pods[status] = 1 if not status in num_pods else num_pods[status]+1
+ #print(num_pods)
+ for status in num_pods.keys():
+ apps[configuration][component] += "({num} {status})".format(num=num_pods[status], status=status)
+ ############
component = 'monitoring'
apps[configuration][component] = ''
if args.verbose:
diff --git a/cluster.py b/cluster.py
index fe76291e..43c91c50 100644
--- a/cluster.py
+++ b/cluster.py
@@ -1,20 +1,20 @@
"""
- This script contains some code snippets for testing the detached mode in Kubernetes
+ This script contains some code snippets for testing the detached mode in Kubernetes
- Copyright (C) 2021 Patrick Erdelt
+ Copyright (C) 2021 Patrick Erdelt
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see .
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see .
"""
from bexhoma import *
from dbmsbenchmarker import *
@@ -37,30 +37,45 @@
# argparse
parser = argparse.ArgumentParser(description=description)
parser.add_argument('mode', help='profile the import or run the TPC-H queries', choices=['stop','status','dashboard', 'master'])
+ parser.add_argument('-db', '--debug', help='dump debug informations', action='store_true')
parser.add_argument('-e', '--experiment', help='code of experiment', default=None)
parser.add_argument('-c', '--connection', help='name of DBMS', default=None)
parser.add_argument('-v', '--verbose', help='gives more details about Kubernetes objects', action='store_true')
parser.add_argument('-cx', '--context', help='context of Kubernetes (for a multi cluster environment), default is current context', default=None)
clusterconfig = 'cluster.config'
+ # evaluate args
args = parser.parse_args()
+ if args.debug:
+ logging.basicConfig(level=logging.DEBUG)
+ #logging.basicConfig(level=logging.DEBUG)
+ if args.debug:
+ logger_bexhoma = logging.getLogger('bexhoma')
+ logger_bexhoma.setLevel(logging.DEBUG)
+ logger_loader = logging.getLogger('load_data_asynch')
+ logger_loader.setLevel(logging.DEBUG)
+ connection = args.connection
if args.mode == 'stop':
cluster = clusters.kubernetes(clusterconfig, context=args.context)
if args.experiment is None:
experiment = experiments.default(cluster=cluster, code=cluster.code)
- cluster.stop_sut()
- cluster.stop_monitoring()
- cluster.stop_benchmarker()
+ if connection is None:
+ connection = ''
+ cluster.stop_sut(configuration=connection)
+ cluster.stop_monitoring(configuration=connection)
+ cluster.stop_maintaining()
+ cluster.stop_benchmarker(configuration=connection)
else:
experiment = experiments.default(cluster=cluster, code=args.experiment)
experiment.stop_sut()
cluster.stop_monitoring()
+ cluster.stop_maintaining()
cluster.stop_benchmarker()
elif args.mode == 'dashboard':
cluster = clusters.kubernetes(clusterconfig, context=args.context)
cluster.connect_dashboard()
elif args.mode == 'master':
cluster = clusters.kubernetes(clusterconfig, context=args.context)
- cluster.connect_master(experiment=args.experiment, configuration=args.connection)
+ cluster.connect_master(experiment=args.experiment, configuration=connection)
elif args.mode == 'status':
cluster = clusters.kubernetes(clusterconfig, context=args.context)
app = cluster.appname
@@ -167,6 +182,26 @@
#print(status)
apps[configuration][component] += "{pod} ({status})".format(pod='', status=status)
############
+ component = 'maintaining'
+ apps[configuration][component] = ''
+ if args.verbose:
+ stateful_sets = cluster.getStatefulSets(app=app, component=component, experiment=experiment, configuration=configuration)
+ print("Stateful Sets", stateful_sets)
+ services = cluster.getServices(app=app, component=component, experiment=experiment, configuration=configuration)
+ print("Maintaining Services", services)
+ pods = cluster.getPods(app=app, component=component, experiment=experiment, configuration=configuration)
+ if args.verbose:
+ print("Maintaining Pods", pods)
+ num_pods = {}
+ for pod in pods:
+ status = cluster.getPodStatus(pod)
+ #print(status)
+ #apps[configuration][component] += "{pod} ({status})".format(pod='', status=status)
+ num_pods[status] = 1 if not status in num_pods else num_pods[status]+1
+ #print(num_pods)
+ for status in num_pods.keys():
+ apps[configuration][component] += "({num} {status})".format(num=num_pods[status], status=status)
+ ############
component = 'monitoring'
apps[configuration][component] = ''
if args.verbose:
diff --git a/setup.py b/setup.py
index 542f233d..8c4969c7 100644
--- a/setup.py
+++ b/setup.py
@@ -8,7 +8,7 @@
setuptools.setup(
name="bexhoma",
- version="0.5.19",
+ version="0.5.20",
author="Patrick Erdelt",
author_email="perdelt@beuth-hochschule.de",
description="This python tools helps managing DBMS benchmarking experiments in a Kubernetes-based HPC cluster environment. It enables users to configure hardware / software setups for easily repeating tests over varying configurations.",