From ec6bd6e46f8308102c9bcef4afda5534ab9d3f11 Mon Sep 17 00:00:00 2001 From: Patrick Erdelt Date: Tue, 18 Oct 2022 14:18:00 +0200 Subject: [PATCH] V0.6.0 Introduced Cloud-Orchestration and Scalable Loading and Maintaining Components (#102) * Prepare next release * Masterscript: maintaining does not change timer settings of benchmarker * Masterscript: reconnect and try again, if not failed due to "not found" * Masterscript: improved output about workflow * Masterscript: aws example nodegroup scale * Masterscript: aws example nodegroup get size * Masterscript: aws example nodegroup wait for size * Masterscript: aws example nodegroup show size * Masterscript: aws example nodegroup show and check size * Masterscript: aws example nodegroup name and type * Masterscript: aws example dict of nodegroups * Masterscript: aws example nodegroup name necessary for scaling * Masterscript: aws example nodegroup name and type * Masterscript: maintaining duration default 4h * Masterscript: maintaining parameters and nodeSelector * Masterscript: nodeSelector for sut, monitoring and benchmarker * Masterscript: maintaining is accepted running also when num_maintaining=0 * Masterscript: request resources from command line * Masterscript: prepare max_sut per cluster and per experiment * Masterscript: catch json exception in getNode() * Masterscript: maintaining example TSBS as experiment setup * Masterscript: jobtemplate_maintaining per experiment * Masterscript: initContainers in maintaining * Masterscript: maintaining also watches succeeded pods * Masterscript: maintaining also respects (longly) pending pods * Masterscript: loading pods controlled by redis queue * Masterscript: loading pods controlled by redis queue, include params * Masterscript: initContainers parameters set correctly * Masterscript: Stop also loading jobs and pods * Masterscript: Number of parallel loaders * Masterscript: Empty schema before loading pods * Masterscript: Stop also loading jobs and pods when putting sut down * Masterscript: Loading only finished, when outside and inside cluster are done * Masterscript: Stop also loading jobs and pods - in all configurations * Masterscript: Stop also loading jobs and pods - in all configurations (config, experiment, cluster) * Masterscript: Check status of parallel loading * Masterscript: Job status explained * Masterscript: Job status returns true iff all pods are completed * Masterscript: Job status more output * Masterscript: Job status returns true iff all pods are completed * Masterscript: Job status returns true iff all pods are completed, then delete all loading pods * Masterscript: Job status returns true iff all pods are completed, copy loading pods logs * Masterscript: Copy logs of all containers of loading pods * Masterscript: Mark SUT as loaded as soon as realizing all pods have status success - include this as timeLoading * Masterscript: Use maintaining structure for setting loading parameters * Masterscript: Mark SUT as loaded * Masterscript: Mark SUT as loaded, read old labels at first * Masterscript: Mark SUT as loaded, read old labels at first and convert to float * Masterscript: Mark SUT as loaded, read old labels at first and convert to float, debug output * Masterscript: Mark SUT as loaded, read old labels at first and convert to int * Masterscript: Mark SUT as loaded, read old labels at first and convert to int, cleaned --- bexhoma/clusters.py | 75 +++++ bexhoma/configurations.py | 425 ++++++++++++++++++++++---- bexhoma/experiments.py | 133 ++++++-- bexhoma/masterK8s.py | 77 ++++- bexhoma/scripts/experimentsmanager.py | 28 +- cluster.py | 28 +- setup.py | 2 +- 7 files changed, 676 insertions(+), 92 deletions(-) diff --git a/bexhoma/clusters.py b/bexhoma/clusters.py index 0c4a90ca..e35c8942 100644 --- a/bexhoma/clusters.py +++ b/bexhoma/clusters.py @@ -65,3 +65,78 @@ def store_pod_log(self, pod_name, container=''): + +class aws(kubernetes): + def __init__(self, clusterconfig='cluster.config', configfolder='experiments/', yamlfolder='k8s/', context=None, code=None, instance=None, volume=None, docker=None, script=None, queryfile=None): + self.code = code + kubernetes.__init__(self, clusterconfig=clusterconfig, configfolder=configfolder, context=context, yamlfolder=yamlfolder, code=self.code, instance=instance, volume=volume, docker=docker, script=script, queryfile=queryfile) + self.cluster = self.contextdata['cluster'] + def eksctl(self, command): + #fullcommand = 'eksctl --context {context} {command}'.format(context=self.context, command=command) + fullcommand = 'eksctl {command}'.format(command=command) + self.logger.debug('aws.eksctl({})'.format(fullcommand)) + #print(fullcommand) + return os.popen(fullcommand).read()# os.system(fullcommand) + def getNodes(self, app='', nodegroup_type='', nodegroup_name=''): + self.logger.debug('aws.getNodes()') + label = '' + if len(app)==0: + app = self.appname + label += 'app='+app + if len(nodegroup_type)>0: + label += ',type='+nodegroup_type + if len(nodegroup_name)>0: + label += ',alpha.eksctl.io/nodegroup-name='+nodegroup_name + try: + api_response = self.v1core.list_node(label_selector=label) + #pprint(api_response) + if len(api_response.items) > 0: + return api_response.items + else: + return [] + except ApiException as e: + print("Exception when calling CoreV1Api->list_node for getNodes: %s\n" % e) + print("Create new access token") + self.cluster_access() + self.wait(2) + return self.getNodes(app=app, nodegroup_type=nodegroup_type, nodegroup_name=nodegroup_name) + def scale_nodegroups(self, nodegroup_names, size=None): + print("aws.scale_nodegroups({nodegroup_names}, {size})".format(nodegroup_names=nodegroup_names, size=size)) + for nodegroup_name, size_default in nodegroup_names.items(): + if size is not None: + size_default = size + self.scale_nodegroup(nodegroup_name, size_default) + def scale_nodegroup(self, nodegroup_name, size): + print("aws.scale_nodegroup({nodegroup_name}, {size})".format(nodegroup_name=nodegroup_name, size=size)) + if not self.check_nodegroup(nodegroup_name=nodegroup_name, num_nodes_aux_planned=size): + #fullcommand = "eksctl scale nodegroup --cluster=Test-2 --nodes=0 --nodes-min=0 --name=Kleine_Gruppe" + command = "scale nodegroup --cluster={cluster} --nodes={size} --name={nodegroup_name}".format(cluster=self.cluster, size=size, nodegroup_name=nodegroup_name) + return self.eksctl(command) + #if not self.check_nodegroup(nodegroup_type, num_nodes_aux_planned): + # command = "scale nodegroup --cluster={cluster} --nodes={size} --name={nodegroup}".format(cluster=self.cluster, size=size, nodegroup=nodegroup) + # return self.eksctl(command) + #else: + # return "" + def get_nodegroup_size(self, nodegroup_type='', nodegroup_name=''): + resp = self.getNodes(nodegroup_type=nodegroup_type, nodegroup_name=nodegroup_name) + num_nodes_aux_actual = len(resp) + self.logger.debug('aws.get_nodegroup_size({},{}) = {}'.format(nodegroup_type, nodegroup_name, num_nodes_aux_actual)) + return num_nodes_aux_actual + def check_nodegroup(self, nodegroup_type='', nodegroup_name='', num_nodes_aux_planned=0): + num_nodes_aux_actual = self.get_nodegroup_size(nodegroup_type=nodegroup_type, nodegroup_name=nodegroup_name) + self.logger.debug('aws.check_nodegroup({}, {}, {}) = {}'.format(nodegroup_type, nodegroup_name, num_nodes_aux_planned, num_nodes_aux_actual)) + return num_nodes_aux_planned == num_nodes_aux_actual + def wait_for_nodegroups(self, nodegroup_names, size=None): + print("aws.wait_for_nodegroups({nodegroup_names})".format(nodegroup_names=nodegroup_names)) + for nodegroup_name, size_default in nodegroup_names.items(): + if size is not None: + size_default = size + self.wait_for_nodegroup(nodegroup_name=nodegroup_name, num_nodes_aux_planned=size_default) + def wait_for_nodegroup(self, nodegroup_type='', nodegroup_name='', num_nodes_aux_planned=0): + while (not self.check_nodegroup(nodegroup_type=nodegroup_type, nodegroup_name=nodegroup_name, num_nodes_aux_planned=num_nodes_aux_planned)): + self.wait(30) + print("Nodegroup {},{} ready".format(nodegroup_type, nodegroup_name)) + return True + + + diff --git a/bexhoma/configurations.py b/bexhoma/configurations.py index bb48e878..37df5bc9 100644 --- a/bexhoma/configurations.py +++ b/bexhoma/configurations.py @@ -86,11 +86,16 @@ def __init__(self, experiment, docker=None, configuration='', script=None, alias self.set_eval_parameters(**self.experiment.eval_parameters) self.set_connectionmanagement(**self.experiment.connectionmanagement) self.set_storage(**self.experiment.storage) + self.set_nodes(**self.experiment.nodes) + self.set_maintaining_parameters(**self.experiment.maintaining_parameters) self.experiment.add_configuration(self) self.dialect = dialect self.num_worker = worker + self.num_loading = 0 + self.num_maintaining = 0 self.monitoring_active = experiment.monitoring_active self.maintaining_active = experiment.maintaining_active + self.loading_active = experiment.loading_active self.parallelism = 1 self.storage_label = experiment.storage_label self.experiment_done = False @@ -147,6 +152,10 @@ def set_ddl_parameters(self, **kwargs): self.ddl_parameters = kwargs def set_eval_parameters(self, **kwargs): self.eval_parameters = kwargs + def set_maintaining_parameters(self, **kwargs): + self.maintaining_parameters = kwargs + def set_nodes(self, **kwargs): + self.nodes = kwargs def set_experiment(self, instance=None, volume=None, docker=None, script=None): """ Read experiment details from cluster config""" #self.bChangeInstance = True @@ -248,24 +257,26 @@ def maintaining_is_running(self): app = self.appname component = 'maintaining' configuration = self.configuration - pods = self.experiment.cluster.getPods(app, component, self.experiment.code, configuration) - self.logger.debug("maintaining_is_running found {} pods".format(len(pods))) - if len(pods) > 0: - pod_sut = pods[0] - status = self.experiment.cluster.getPodStatus(pod_sut) - if status == "Running": - return True - return False + pods_running = self.experiment.cluster.getPods(app, component, self.experiment.code, configuration, status="Running") + pods_succeeded = self.experiment.cluster.getPods(app, component, self.experiment.code, configuration, status="Succeeded") + self.logger.debug("maintaining_is_running found {} running and {} succeeded pods".format(len(pods_running), len(pods_succeeded))) + return len(pods_running) + len(pods_succeeded) == self.num_maintaining + #if len(pods) > 0: + # pod_sut = pods[0] + # status = self.experiment.cluster.getPodStatus(pod_sut) + # if status == "Running": + # return True + #return False def maintaining_is_pending(self): app = self.appname component = 'maintaining' configuration = self.configuration - pods = self.experiment.cluster.getPods(app, component, self.experiment.code, configuration) + pods = self.experiment.cluster.getPods(app, component, self.experiment.code, configuration, status="Pending") if len(pods) > 0: pod_sut = pods[0] - status = self.experiment.cluster.getPodStatus(pod_sut) - if status == "Pending": - return True + #status = self.experiment.cluster.getPodStatus(pod_sut) + #if status == "Pending": + return True return False def monitoring_is_running(self): app = self.appname @@ -300,6 +311,16 @@ def sut_is_pending(self): if status == "Pending": return True return False + def start_loading_pod(self, app='', component='loading', experiment='', configuration='', parallelism=1): + if len(app) == 0: + app = self.appname + if len(configuration) == 0: + configuration = self.configuration + if len(experiment) == 0: + experiment = self.code + job = self.create_job_loading(app=app, component='loading', experiment=experiment, configuration=configuration, parallelism=parallelism) + self.logger.debug("Deploy "+job) + self.experiment.cluster.kubectl('create -f '+job)#self.yamlfolder+deployment) def start_loading(self, delay=0): """ Per config: Load Data """ app = self.appname @@ -465,6 +486,13 @@ def start_monitoring(self, app='', component='monitoring', experiment='', config dep['spec']['template']['spec']['containers'][0]['env'][i]['value'] = prometheus_config #print(e) self.logger.debug('configuration.start_monitoring({})'.format(str(e))) + # set nodeSelector + if 'monitoring' in self.nodes: + if not 'nodeSelector' in dep['spec']['template']['spec']: + dep['spec']['template']['spec']['nodeSelector'] = dict() + if dep['spec']['template']['spec']['nodeSelector'] is None: + dep['spec']['template']['spec']['nodeSelector'] = dict() + dep['spec']['template']['spec']['nodeSelector']['type'] = self.nodes['monitoring'] except yaml.YAMLError as exc: print(exc) with open(deployment_experiment,"w+") as stream: @@ -508,6 +536,27 @@ def stop_maintaining(self, app='', component='maintaining', experiment='', confi print(p, status) #if status == "Running": self.experiment.cluster.deletePod(p) + def stop_loading(self, app='', component='loading', experiment='', configuration=''): + if len(app)==0: + app = self.appname + if len(configuration) == 0: + configuration = self.configuration + if len(experiment) == 0: + experiment = self.code + jobs = self.experiment.cluster.getJobs(app, component, experiment, configuration) + # status per job + for job in jobs: + success = self.experiment.cluster.getJobStatus(job) + print(job, success) + self.experiment.cluster.deleteJob(job) + # all pods to these jobs - automatically stopped? only if finished? + #self.experiment.cluster.getJobPods(app, component, experiment, configuration) + pods = self.experiment.cluster.getJobPods(app, component, experiment, configuration) + for p in pods: + status = self.experiment.cluster.getPodStatus(p) + print(p, status) + #if status == "Running": + self.experiment.cluster.deletePod(p) def get_instance_from_resources(self): resources = experiments.DictToObject(self.resources) cpu = resources.requests.cpu @@ -743,10 +792,10 @@ def start_sut(self, app='', component='sut', experiment='', configuration=''): node_cpu = '' node_gpu = node # should be overwritten by resources dict? - #if 'requests' in self.resources and 'cpu' in self.resources['requests']: - # req_cpu = self.resources['requests']['cpu'] - #if 'requests' in self.resources and 'memory' in self.resources['requests']: - # req_mem = self.resources['requests']['memory'] + if 'requests' in self.resources and 'cpu' in self.resources['requests']: + req_cpu = self.resources['requests']['cpu'] + if 'requests' in self.resources and 'memory' in self.resources['requests']: + req_mem = self.resources['requests']['memory'] if 'limits' in self.resources and 'cpu' in self.resources['limits']: limit_cpu = self.resources['limits']['cpu'] if 'limits' in self.resources and 'memory' in self.resources['limits']: @@ -812,6 +861,13 @@ def start_sut(self, app='', component='sut', experiment='', configuration=''): else: dep['spec']['template']['spec']['nodeSelector'][nodeSelector] = value self.resources['nodeSelector'][nodeSelector] = value + # set nodeSelector + if 'sut' in self.nodes: + if not 'nodeSelector' in dep['spec']['template']['spec']: + dep['spec']['template']['spec']['nodeSelector'] = dict() + if dep['spec']['template']['spec']['nodeSelector'] is None: + dep['spec']['template']['spec']['nodeSelector'] = dict() + dep['spec']['template']['spec']['nodeSelector']['type'] = self.nodes['sut'] #print('nodeSelector', dep['spec']['template']['spec']['nodeSelector']) if dep['kind'] == 'Service': service = dep['metadata']['name'] @@ -856,6 +912,8 @@ def stop_sut(self, app='', component='sut', experiment='', configuration=''): self.stop_monitoring() if self.experiment.maintaining_active: self.stop_maintaining() + if self.experiment.loading_active: + self.stop_loading() if component == 'sut': self.stop_sut(app=app, component='worker', experiment=experiment, configuration=configuration) def checkGPUs(self): @@ -927,12 +985,15 @@ def getNode(self): #fullcommand = 'kubectl get pods/'+self.pod_sut+' -o=json' result = self.experiment.cluster.kubectl('get pods/'+self.pod_sut+' -o=json')#self.yamlfolder+deployment) #result = os.popen(fullcommand).read() - datastore = json.loads(result) - #print(datastore) - if self.appname == datastore['metadata']['labels']['app']: - if self.pod_sut == datastore['metadata']['name']: - node = datastore['spec']['nodeName'] - return node + try: + datastore = json.loads(result) + #print(datastore) + if self.appname == datastore['metadata']['labels']['app']: + if self.pod_sut == datastore['metadata']['name']: + node = datastore['spec']['nodeName'] + return node + except Exception as e: + return "" return "" def getGPUs(self): self.logger.debug('configuration.getGPUs()') @@ -1449,28 +1510,110 @@ def check_sut(self): else: return False def check_load_data(self): - # check if loading is done - pod_labels = self.experiment.cluster.getPodsLabels(app=self.appname, component='sut', experiment=self.experiment.code, configuration=self.configuration) - #print(pod_labels) - if len(pod_labels) > 0: - pod = next(iter(pod_labels.keys())) - if 'loaded' in pod_labels[pod]: - self.loading_started = True - if pod_labels[pod]['loaded'] == 'True': - self.loading_finished = True + loading_pods_active = True + # check if asynch loading inside cluster is done + if self.loading_active: + # success of job + app = self.experiment.cluster.appname + component = 'loading' + experiment = self.code + configuration = self.configuration + success = self.experiment.cluster.getJobStatus(app=app, component=component, experiment=experiment, configuration=configuration) + jobs = self.experiment.cluster.getJobs(app, component, self.code, configuration) + # status per job + for job in jobs: + success = self.experiment.cluster.getJobStatus(job) + self.experiment.cluster.logger.debug('job {} has success status {}'.format(job, success)) + #print(job, success) + if success: + self.experiment.cluster.logger.debug('job {} will be suspended and parallel loading will be considered finished'.format(job, success)) + # get labels (start) of sut + pod_labels = self.experiment.cluster.getPodsLabels(app=app, component='sut', experiment=experiment, configuration=configuration) + #print(pod_labels) + if len(pod_labels) > 0: + pod = next(iter(pod_labels.keys())) + if 'timeLoadingStart' in pod_labels[pod]: + self.timeLoadingStart = float(pod_labels[pod]['timeLoadingStart']) + if 'timeLoadingEnd' in pod_labels[pod]: + self.timeLoadingEnd = float(pod_labels[pod]['timeLoadingEnd']) + if 'timeLoading' in pod_labels[pod]: + self.timeLoading = float(pod_labels[pod]['timeLoading']) + # mark pod with new end time and duration + pods_sut = self.experiment.cluster.getPods(app=app, component='sut', experiment=experiment, configuration=configuration) + if len(pods_sut) > 0: + pod_sut = pods_sut[0] + #self.timeLoadingEnd = default_timer() + #self.timeLoading = float(self.timeLoadingEnd) - float(self.timeLoadingStart) + #self.experiment.cluster.logger.debug("LOADING LABELS") + #self.experiment.cluster.logger.debug(self.timeLoading) + #self.experiment.cluster.logger.debug(float(self.timeLoadingEnd)) + #self.experiment.cluster.logger.debug(float(self.timeLoadingStart)) + #self.timeLoading = float(self.timeLoading) + float(timeLoading) + now = datetime.utcnow() + now_string = now.strftime('%Y-%m-%d %H:%M:%S') + time_now = str(datetime.now()) + time_now_int = int(datetime.timestamp(datetime.strptime(time_now,'%Y-%m-%d %H:%M:%S.%f'))) + self.timeLoadingEnd = int(time_now_int) + self.timeLoading = int(self.timeLoadingEnd) - int(self.timeLoadingStart) + self.timeLoading + self.experiment.cluster.logger.debug("LOADING LABELS") + self.experiment.cluster.logger.debug(self.timeLoadingStart) + self.experiment.cluster.logger.debug(self.timeLoadingEnd) + self.experiment.cluster.logger.debug(self.timeLoading) + fullcommand = 'label pods '+pod_sut+' --overwrite loaded=True timeLoadingEnd="{}" timeLoading={}'.format(time_now_int, self.timeLoading) + #print(fullcommand) + self.experiment.cluster.kubectl(fullcommand) + # TODO: Also mark volume + # delete job and all its pods + self.experiment.cluster.deleteJob(job) + pods = self.experiment.cluster.getJobPods(app=app, component=component, experiment=experiment, configuration=configuration) + for pod in pods: + status = self.experiment.cluster.getPodStatus(pod) + print(pod, status) + #if status == "Running": + # TODO: Find names of containers dynamically + container = 'datagenerator' + stdout = self.experiment.cluster.pod_log(pod=pod, container=container) + #stdin, stdout, stderr = self.pod_log(client_pod_name) + filename_log = self.experiment.cluster.config['benchmarker']['resultfolder'].replace("\\", "/").replace("C:", "")+"/"+str(self.code)+'/'+pod+'.'+container+'.log' + f = open(filename_log, "w") + f.write(stdout) + f.close() + # + container = 'sensor' + stdout = self.experiment.cluster.pod_log(pod=pod, container='sensor') + #stdin, stdout, stderr = self.pod_log(client_pod_name) + filename_log = self.experiment.cluster.config['benchmarker']['resultfolder'].replace("\\", "/").replace("C:", "")+"/"+str(self.code)+'/'+pod+'.'+container+'.log' + f = open(filename_log, "w") + f.write(stdout) + f.close() + self.experiment.cluster.deletePod(pod) + loading_pods_active = False + else: + loading_pods_active = False + # check if asynch loading outside cluster is done + # only if inside cluster if done + if not loading_pods_active: + pod_labels = self.experiment.cluster.getPodsLabels(app=self.appname, component='sut', experiment=self.experiment.code, configuration=self.configuration) + #print(pod_labels) + if len(pod_labels) > 0: + pod = next(iter(pod_labels.keys())) + if 'loaded' in pod_labels[pod]: + self.loading_started = True + if pod_labels[pod]['loaded'] == 'True': + self.loading_finished = True + else: + self.loading_finished = False else: - self.loading_finished = False + self.loading_started = False + if 'timeLoadingStart' in pod_labels[pod]: + self.timeLoadingStart = pod_labels[pod]['timeLoadingStart'] + if 'timeLoadingEnd' in pod_labels[pod]: + self.timeLoadingEnd = pod_labels[pod]['timeLoadingEnd'] + if 'timeLoading' in pod_labels[pod]: + self.timeLoading = float(pod_labels[pod]['timeLoading']) else: self.loading_started = False - if 'timeLoadingStart' in pod_labels[pod]: - self.timeLoadingStart = pod_labels[pod]['timeLoadingStart'] - if 'timeLoadingEnd' in pod_labels[pod]: - self.timeLoadingEnd = pod_labels[pod]['timeLoadingEnd'] - if 'timeLoading' in pod_labels[pod]: - self.timeLoading = float(pod_labels[pod]['timeLoading']) - else: - self.loading_started = False - self.loading_finished = False + self.loading_finished = False def load_data(self): self.logger.debug('configuration.load_data()') self.loading_started = True @@ -1501,6 +1644,7 @@ def load_data(self): #loop.run_until_complete(asyncio.gather(*pending)) #print(result) return + """ self.timeLoadingStart = default_timer() # mark pod fullcommand = 'label pods '+self.pod_sut+' --overwrite loaded=False timeLoadingStart="{}"'.format(self.timeLoadingStart) @@ -1548,6 +1692,7 @@ def load_data(self): self.experiment.cluster.kubectl(fullcommand) #proc = subprocess.Popen(fullcommand, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) #stdout, stderr = proc.communicate() + """ def create_job(self, connection, app='', component='benchmarker', experiment='', configuration='', client='1', parallelism=1, alias=''): if len(app) == 0: app = self.appname @@ -1608,6 +1753,13 @@ def create_job(self, connection, app='', component='benchmarker', experiment='', dep['spec']['template']['spec']['containers'][0]['env'].append(e) e = {'name': 'DBMSBENCHMARKER_START', 'value': start_string} dep['spec']['template']['spec']['containers'][0]['env'].append(e) + # set nodeSelector + if 'benchmarking' in self.nodes: + if not 'nodeSelector' in dep['spec']['template']['spec']: + dep['spec']['template']['spec']['nodeSelector'] = dict() + if dep['spec']['template']['spec']['nodeSelector'] is None: + dep['spec']['template']['spec']['nodeSelector'] = dict() + dep['spec']['template']['spec']['nodeSelector']['type'] = self.nodes['benchmarking'] #if not path.isdir(self.path): # makedirs(self.path) with open(job_experiment,"w+") as stream: @@ -1633,7 +1785,10 @@ def create_job_maintaining(self, app='', component='maintaining', experiment='', start_string = start.strftime('%Y-%m-%d %H:%M:%S') #yamlfile = self.experiment.cluster.yamlfolder+"job-dbmsbenchmarker-"+code+".yml" job_experiment = self.experiment.path+'/job-maintaining-{configuration}.yml'.format(configuration=configuration) - with open(self.experiment.cluster.yamlfolder+"jobtemplate-maintaining.yml") as stream: + jobtemplate = self.experiment.cluster.yamlfolder+"jobtemplate-maintaining.yml" + if len(self.experiment.jobtemplate_maintaining) > 0: + jobtemplate = self.experiment.cluster.yamlfolder+self.experiment.jobtemplate_maintaining + with open(jobtemplate) as stream: try: result=yaml.safe_load_all(stream) result = [data for data in result] @@ -1658,16 +1813,182 @@ def create_job_maintaining(self, app='', component='maintaining', experiment='', dep['spec']['template']['metadata']['labels']['experiment'] = str(experiment) #dep['spec']['template']['metadata']['labels']['client'] = str(client) dep['spec']['template']['metadata']['labels']['experimentRun'] = str(self.numExperimentsDone+1) - envs = dep['spec']['template']['spec']['containers'][0]['env'] - for i,e in enumerate(envs): - if e['name'] == 'SENSOR_DATABASE': - dep['spec']['template']['spec']['containers'][0]['env'][i]['value'] = 'postgresql://postgres:@{}:9091/postgres'.format(servicename) - if e['name'] == 'SENSOR_RATE': - dep['spec']['template']['spec']['containers'][0]['env'][i]['value'] = '0.1' - if e['name'] == 'SENSOR_NUMBER': - dep['spec']['template']['spec']['containers'][0]['env'][i]['value'] = '72000' - self.logger.debug('configuration.create_job_maintaining({})'.format(str(e))) - #print(e) + # set nodeSelector + if 'maintaining' in self.nodes: + if not 'nodeSelector' in dep['spec']['template']['spec']: + dep['spec']['template']['spec']['nodeSelector'] = dict() + if dep['spec']['template']['spec']['nodeSelector'] is None: + dep['spec']['template']['spec']['nodeSelector'] = dict() + dep['spec']['template']['spec']['nodeSelector']['type'] = self.nodes['maintaining'] + # set ENV variables - defaults + env_default = {} + if 'SENSOR_RATE' in self.maintaining_parameters: + env_default['SENSOR_RATE'] = self.maintaining_parameters['SENSOR_RATE'] + else: + env_default['SENSOR_RATE'] = '0.1' + if 'SENSOR_NUMBER' in self.maintaining_parameters: + env_default['SENSOR_NUMBER'] = self.maintaining_parameters['SENSOR_NUMBER'] + else: + env_default['SENSOR_NUMBER'] = '144000' + # set ENV variables - in YAML + # all init containers + if 'initContainers' in dep['spec']['template']['spec']: + for num_container, container in enumerate(dep['spec']['template']['spec']['initContainers']): + envs = dep['spec']['template']['spec']['containers'][num_container]['env'] + for i,e in enumerate(envs): + if e['name'] == 'BEXHOMA_HOST': + dep['spec']['template']['spec']['initContainers'][num_container]['env'][i]['value'] = servicename + if e['name'] == 'SENSOR_DATABASE': + dep['spec']['template']['spec']['initContainers'][num_container]['env'][i]['value'] = 'postgresql://postgres:@{}:9091/postgres'.format(servicename) + if e['name'] == 'SENSOR_RATE': + dep['spec']['template']['spec']['initContainers'][num_container]['env'][i]['value'] = str(env_default['SENSOR_RATE']) + if e['name'] == 'SENSOR_NUMBER': + dep['spec']['template']['spec']['initContainers'][num_container]['env'][i]['value'] = str(env_default['SENSOR_NUMBER']) + self.logger.debug('configuration.create_job_maintaining({})'.format(str(e))) + #print(e) + # all containers + for num_container, container in enumerate(dep['spec']['template']['spec']['containers']): + envs = dep['spec']['template']['spec']['containers'][num_container]['env'] + for i,e in enumerate(envs): + if e['name'] == 'BEXHOMA_HOST': + dep['spec']['template']['spec']['containers'][num_container]['env'][i]['value'] = servicename + if e['name'] == 'SENSOR_DATABASE': + dep['spec']['template']['spec']['containers'][num_container]['env'][i]['value'] = 'postgresql://postgres:@{}:9091/postgres'.format(servicename) + if e['name'] == 'SENSOR_RATE': + dep['spec']['template']['spec']['containers'][num_container]['env'][i]['value'] = str(env_default['SENSOR_RATE']) + if e['name'] == 'SENSOR_NUMBER': + dep['spec']['template']['spec']['containers'][num_container]['env'][i]['value'] = str(env_default['SENSOR_NUMBER']) + self.logger.debug('configuration.create_job_maintaining({})'.format(str(e))) + #print(e) + with open(job_experiment,"w+") as stream: + try: + stream.write(yaml.dump_all(result)) + except yaml.YAMLError as exc: + print(exc) + return job_experiment + def create_job_loading(self, app='', component='loading', experiment='', configuration='', parallelism=1, alias=''): + if len(app) == 0: + app = self.appname + if len(configuration) == 0: + configuration = self.configuration + if len(experiment) == 0: + experiment = self.code + jobname = self.generate_component_name(app=app, component=component, experiment=experiment, configuration=configuration) + servicename = self.generate_component_name(app=app, component='sut', experiment=experiment, configuration=configuration) + #print(jobname) + self.logger.debug('configuration.create_job_loading({})'.format(jobname)) + # determine start time + now = datetime.utcnow() + start = now + timedelta(seconds=180) + #start = datetime.strptime('2021-03-04 23:15:25', '%Y-%m-%d %H:%M:%S') + #wait = (start-now).seconds + now_string = now.strftime('%Y-%m-%d %H:%M:%S') + start_string = start.strftime('%Y-%m-%d %H:%M:%S') + #yamlfile = self.experiment.cluster.yamlfolder+"job-dbmsbenchmarker-"+code+".yml" + job_experiment = self.experiment.path+'/job-loading-{configuration}.yml'.format(configuration=configuration) + jobtemplate = self.experiment.cluster.yamlfolder+"jobtemplate-loading.yml" + if len(self.experiment.jobtemplate_loading) > 0: + jobtemplate = self.experiment.cluster.yamlfolder+self.experiment.jobtemplate_loading + with open(jobtemplate) as stream: + try: + result=yaml.safe_load_all(stream) + result = [data for data in result] + #print(result) + except yaml.YAMLError as exc: + print(exc) + for dep in result: + if dep['kind'] == 'Job': + dep['metadata']['name'] = jobname + job = dep['metadata']['name'] + dep['spec']['completions'] = parallelism + dep['spec']['parallelism'] = parallelism + dep['metadata']['labels']['app'] = app + dep['metadata']['labels']['component'] = component + dep['metadata']['labels']['configuration'] = configuration + dep['metadata']['labels']['experiment'] = str(experiment) + #dep['metadata']['labels']['client'] = str(client) + dep['metadata']['labels']['experimentRun'] = str(self.numExperimentsDone+1) + dep['spec']['template']['metadata']['labels']['app'] = app + dep['spec']['template']['metadata']['labels']['component'] = component + dep['spec']['template']['metadata']['labels']['configuration'] = configuration + dep['spec']['template']['metadata']['labels']['experiment'] = str(experiment) + #dep['spec']['template']['metadata']['labels']['client'] = str(client) + dep['spec']['template']['metadata']['labels']['experimentRun'] = str(self.numExperimentsDone+1) + # set nodeSelector + if 'loading' in self.nodes: + if not 'nodeSelector' in dep['spec']['template']['spec']: + dep['spec']['template']['spec']['nodeSelector'] = dict() + if dep['spec']['template']['spec']['nodeSelector'] is None: + dep['spec']['template']['spec']['nodeSelector'] = dict() + dep['spec']['template']['spec']['nodeSelector']['type'] = self.nodes['loading'] + # set ENV variables - defaults + env_default = {} + if 'PARALLEL' in self.maintaining_parameters: + env_default['PARALLEL'] = self.maintaining_parameters['PARALLEL'] + else: + env_default['PARALLEL'] = '24' + if 'CHILD' in self.maintaining_parameters: + env_default['CHILD'] = self.maintaining_parameters['1'] + else: + env_default['CHILD'] = '1' + if 'RNGSEED' in self.maintaining_parameters: + env_default['RNGSEED'] = self.maintaining_parameters['RNGSEED'] + else: + env_default['RNGSEED'] = '123' + if 'SF' in self.maintaining_parameters: + env_default['SF'] = self.maintaining_parameters['SF'] + else: + env_default['SF'] = '10' + # set ENV variables - in YAML + # all init containers + if 'initContainers' in dep['spec']['template']['spec']: + for num_container, container in enumerate(dep['spec']['template']['spec']['initContainers']): + envs = dep['spec']['template']['spec']['containers'][num_container]['env'] + for i,e in enumerate(envs): + if e['name'] == 'BEXHOMA_HOST': + dep['spec']['template']['spec']['initContainers'][num_container]['env'][i]['value'] = servicename + if e['name'] == 'BEXHOMA_CLIENT': + dep['spec']['template']['spec']['initContainers'][num_container]['env'][i]['value'] = str(parallelism) + if e['name'] == 'BEXHOMA_EXPERIMENT': + dep['spec']['template']['spec']['initContainers'][num_container]['env'][i]['value'] = experiment + if e['name'] == 'BEXHOMA_CONNECTION': + dep['spec']['template']['spec']['initContainers'][num_container]['env'][i]['value'] = configuration + if e['name'] == 'BEXHOMA_SLEEP': + dep['spec']['template']['spec']['initContainers'][num_container]['env'][i]['value'] = '60' + if e['name'] == 'PARALLEL': + dep['spec']['template']['spec']['initContainers'][num_container]['env'][i]['value'] = str(env_default['PARALLEL']) + if e['name'] == 'CHILD': + dep['spec']['template']['spec']['initContainers'][num_container]['env'][i]['value'] = str(env_default['CHILD']) + if e['name'] == 'RNGSEED': + dep['spec']['template']['spec']['initContainers'][num_container]['env'][i]['value'] = str(env_default['RNGSEED']) + if e['name'] == 'SF': + dep['spec']['template']['spec']['initContainers'][num_container]['env'][i]['value'] = str(env_default['SF']) + self.logger.debug('configuration.create_job_loading({})'.format(str(e))) + #print(e) + # all containers + for num_container, container in enumerate(dep['spec']['template']['spec']['containers']): + envs = dep['spec']['template']['spec']['containers'][num_container]['env'] + for i,e in enumerate(envs): + if e['name'] == 'BEXHOMA_HOST': + dep['spec']['template']['spec']['containers'][num_container]['env'][i]['value'] = servicename + if e['name'] == 'BEXHOMA_CLIENT': + dep['spec']['template']['spec']['containers'][num_container]['env'][i]['value'] = str(parallelism) + if e['name'] == 'BEXHOMA_EXPERIMENT': + dep['spec']['template']['spec']['containers'][num_container]['env'][i]['value'] = experiment + if e['name'] == 'BEXHOMA_CONNECTION': + dep['spec']['template']['spec']['containers'][num_container]['env'][i]['value'] = configuration + if e['name'] == 'BEXHOMA_SLEEP': + dep['spec']['template']['spec']['containers'][num_container]['env'][i]['value'] = '60' + if e['name'] == 'PARALLEL': + dep['spec']['template']['spec']['containers'][num_container]['env'][i]['value'] = str(env_default['PARALLEL']) + if e['name'] == 'CHILD': + dep['spec']['template']['spec']['containers'][num_container]['env'][i]['value'] = str(env_default['CHILD']) + if e['name'] == 'RNGSEED': + dep['spec']['template']['spec']['containers'][num_container]['env'][i]['value'] = str(env_default['RNGSEED']) + if e['name'] == 'SF': + dep['spec']['template']['spec']['containers'][num_container]['env'][i]['value'] = str(env_default['SF']) + self.logger.debug('configuration.create_job_maintaining({})'.format(str(e))) + #print(e) with open(job_experiment,"w+") as stream: try: stream.write(yaml.dump_all(result)) diff --git a/bexhoma/experiments.py b/bexhoma/experiments.py index ad0f68a0..4ee3f901 100644 --- a/bexhoma/experiments.py +++ b/bexhoma/experiments.py @@ -81,12 +81,17 @@ def __init__(self, timeout = timeout, singleConnection = True) self.numExperiments = numExperiments + self.max_sut = None self.cluster.add_experiment(self) self.appname = self.cluster.appname self.resources = {} self.ddl_parameters = {} self.eval_parameters = {} self.storage = {} + self.nodes = {} + self.maintaining_parameters = {} + self.jobtemplate_maintaining = "" + self.jobtemplate_loading = "" #self.connectionmanagement = {} #self.connectionmanagement['numProcesses'] = None #self.connectionmanagement['runsPerConnection'] = None @@ -95,6 +100,7 @@ def __init__(self, self.querymanagement = {} self.workload = {} self.monitoring_active = True + self.loading_active = False # k8s: self.namespace = self.cluster.namespace#.config['credentials']['k8s']['namespace'] self.configurations = [] @@ -146,6 +152,10 @@ def set_eval_parameters(self, **kwargs): self.eval_parameters = kwargs def set_storage(self, **kwargs): self.storage = kwargs + def set_nodes(self, **kwargs): + self.nodes = kwargs + def set_maintaining_parameters(self, **kwargs): + self.maintaining_parameters = kwargs def add_configuration(self, configuration): self.configurations.append(configuration) def __set_queryfile(self, queryfile): @@ -380,6 +390,28 @@ def evaluate_results(self, pod_dashboard=''): #print(fullcommand) #proc = subprocess.Popen(fullcommand, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) #stdout, stderr = proc.communicate() + def stop_maintaining(self): + if len(self.configurations) > 0: + for config in self.configurations: + config.stop_maintaining() + else: + app = self.cluster.appname + component = 'maintaining' + configuration = '' + jobs = self.cluster.getJobs(app=app, component=component, experiment=self.code, configuration=configuration) + for job in jobs: + self.cluster.deleteJob(job) + def stop_loading(self): + if len(self.configurations) > 0: + for config in self.configurations: + config.stop_loading() + else: + app = self.cluster.appname + component = 'loading' + configuration = '' + jobs = self.cluster.getJobs(app=app, component=component, experiment=self.code, configuration=configuration) + for job in jobs: + self.cluster.deleteJob(job) def stop_monitoring(self): if len(self.configurations) > 0: for config in self.configurations: @@ -423,8 +455,10 @@ def work_benchmark_list(self, intervals=30, stop=True): #time.sleep(intervals) self.wait(intervals) # count number of running and pending pods - num_pods_running = len(self.cluster.getPods(app = self.appname, component = 'sut', experiment=self.code, status = 'Running')) - num_pods_pending = len(self.cluster.getPods(app = self.appname, component = 'sut', experiment=self.code, status = 'Pending')) + num_pods_running_experiment = len(self.cluster.getPods(app = self.appname, component = 'sut', experiment=self.code, status = 'Running')) + num_pods_pending_experiment = len(self.cluster.getPods(app = self.appname, component = 'sut', experiment=self.code, status = 'Pending')) + num_pods_running_cluster = len(self.cluster.getPods(app = self.appname, component = 'sut', status = 'Running')) + num_pods_pending_cluster = len(self.cluster.getPods(app = self.appname, component = 'sut', status = 'Pending')) for config in self.configurations: # check if sut is running if not config.sut_is_running(): @@ -432,19 +466,28 @@ def work_benchmark_list(self, intervals=30, stop=True): if not config.experiment_done: if not config.sut_is_pending(): print("{} is not running yet - ".format(config.configuration))#, end="", flush=True) - if self.cluster.max_sut is not None: - print("{} running and {} pending pods: max is {} pods in the cluster - ".format(num_pods_running, num_pods_pending, self.cluster.max_sut))#, end="", flush=True) - if num_pods_running+num_pods_pending < self.cluster.max_sut: - print("it will start now") + if self.cluster.max_sut is not None or self.max_sut is not None: + we_can_start_new_sut = True + if self.max_sut is not None: + print("In experiment: {} running and {} pending pods: max is {} pods)".format(num_pods_running_experiment, num_pods_pending_experiment, self.max_sut))#, end="", flush=True) + if num_pods_running_experiment+num_pods_pending_experiment >= self.max_sut: + print("{} has to wait".format(config.configuration)) + we_can_start_new_sut = False + if self.cluster.max_sut is not None: + print("In cluster: {} running and {} pending pods: max is {} pods".format(num_pods_running_cluster, num_pods_pending_cluster, self.cluster.max_sut))#, end="", flush=True) + if num_pods_running_cluster+num_pods_pending_cluster >= self.cluster.max_sut: + print("{} has to wait".format(config.configuration)) + we_can_start_new_sut = False + if we_can_start_new_sut: + print("{} will start now".format(config.configuration)) config.start_sut() - num_pods_pending = num_pods_pending + 1 - #self.wait(10) - else: - print("it has to wait") + num_pods_pending_experiment = num_pods_pending_experiment + 1 + num_pods_pending_cluster = num_pods_pending_cluster + 1 else: - print("it will start now") + print("{} will start now".format(config.configuration)) config.start_sut() - num_pods_pending = num_pods_pending + 1 + num_pods_pending_experiment = num_pods_pending_experiment + 1 + num_pods_pending_cluster = num_pods_pending_cluster + 1 #self.wait(10) else: print("{} is pending".format(config.configuration)) @@ -463,7 +506,11 @@ def work_benchmark_list(self, intervals=30, stop=True): now = datetime.utcnow() if config.loading_after_time is not None: if now >= config.loading_after_time: - config.start_loading() + if config.loading_active: + config.start_loading() + config.start_loading_pod(parallelism=config.num_loading) + else: + config.start_loading() else: print("{} will start loading but not before {}".format(config.configuration, config.loading_after_time.strftime('%Y-%m-%d %H:%M:%S'))) continue @@ -483,7 +530,10 @@ def work_benchmark_list(self, intervals=30, stop=True): if config.maintaining_active: if not config.maintaining_is_running(): print("{} is not maintained yet".format(config.configuration)) - config.start_maintaining(parallelism=config.parallelism) + if not config.maintaining_is_pending(): + config.start_maintaining(parallelism=config.num_maintaining) + else: + print("{} has pending maintaining".format(config.configuration)) # benchmark if loading is done and monitoring is ready if config.loading_finished: if config.monitoring_active and not config.monitoring_is_running(): @@ -744,21 +794,46 @@ def set_querymanagement_maintaining(self, numCooldown = 0, numRun = numRun, delay = delay, - timer = { - 'connection': - { - 'active': True, - 'delay': 0 - }, - #'datatransfer': - #{ - # 'active': datatransfer, - # 'sorted': True, - # 'compare': 'result', - # 'store': [], - # 'precision': 0, - #} - }) + ) + #self.monitoring_active = True + self.maintaining_active = True + +class tsbs(default): + def __init__(self, + cluster, + code=None, + queryfile = 'queries-tsbs.config', + SF = '1', + numExperiments = 1, + timeout = 7200, + detached=False): + default.__init__(self, cluster, code, numExperiments, timeout, detached) + self.set_experiment(volume='tsbs') + self.set_experiment(script='SF'+str(SF)+'-index') + self.cluster.set_configfolder('experiments/tsbs') + parameter.defaultParameters = {'SF': str(SF)} + self.set_queryfile(queryfile) + self.set_workload( + name = 'TSBS Queries SF='+str(SF), + info = 'This experiment performs some TSBS inspired queries.' + ) + self.storage_label = 'tsbs-'+str(SF) + self.maintaining_active = True + self.jobtemplate_maintaining = "jobtemplate-maintaining-tsbs.yml" + def set_queries_full(self): + self.set_queryfile('queries-tsbs.config') + def set_queries_profiling(self): + self.set_queryfile('queries-tsbs-profiling.config') + def set_querymanagement_maintaining(self, + numRun=128, + delay=5, + datatransfer=False): + self.set_querymanagement( + numWarmup = 0, + numCooldown = 0, + numRun = numRun, + delay = delay, + ) #self.monitoring_active = True self.maintaining_active = True diff --git a/bexhoma/masterK8s.py b/bexhoma/masterK8s.py index c9bf3326..15df1999 100644 --- a/bexhoma/masterK8s.py +++ b/bexhoma/masterK8s.py @@ -522,6 +522,29 @@ def getStatefulSets(self, app='', component='', experiment='', configuration='') self.cluster_access() self.wait(2) return self.getStatefulSets(app=app, component=component, experiment=experiment, configuration=configuration) + def getNodes(self, app='', nodegroup_type='', nodegroup_name=''): + self.logger.debug('testdesign.getNodes()') + label = '' + if len(app)==0: + app = self.appname + label += 'app='+app + if len(nodegroup_type)>0: + label += ',type='+nodegroup_type + if len(nodegroup_name)>0: + label += ',name='+nodegroup_name + try: + api_response = self.v1core.list_node(label_selector=label) + #pprint(api_response) + if len(api_response.items) > 0: + return api_response.items + else: + return [] + except ApiException as e: + print("Exception when calling CoreV1Api->list_node for getNodes: %s\n" % e) + print("Create new access token") + self.cluster_access() + self.wait(2) + return self.getNodes(app=app, nodegroup_type=nodegroup_type, nodegroup_name=nodegroup_name) def getPodStatus(self, pod, appname=''): self.logger.debug('testdesign.getPodStatus()') try: @@ -748,7 +771,9 @@ def deletePod(self, name): print("Exception when calling CoreV1Api->delete_namespaced_pod: %s\n" % e) self.cluster_access() self.wait(2) - return self.deletePod(name=name) + # try again, if not failed due to "not found" + if not e.status == 404: + return self.deletePod(name=name) def deletePVC(self, name): self.logger.debug('testdesign.deletePVC({})'.format(name)) body = kubernetes.client.V1DeleteOptions() @@ -1456,8 +1481,16 @@ def getJobStatus(self, jobname='', app='', component='', experiment='', configur jobname = jobs[0] api_response = self.v1batches.read_namespaced_job_status(jobname, self.namespace)#, label_selector='app='+cluster.appname) #pprint(api_response) - #pprint(api_response.status.succeeded) - return api_response.status.succeeded + # returns number of completed pods (!) + #return api_response.status.succeeded + # we want status of job (!) + #self.logger.debug("api_response.status.succeeded = {}".format(api_response.status.succeeded)) + #self.logger.debug("api_response.status.conditions = {}".format(api_response.status.conditions)) + if api_response.status.succeeded is not None and api_response.status.succeeded > 0 and api_response.status.conditions is not None and len(api_response.status.conditions) > 0: + self.logger.debug(api_response.status.conditions[0].type) + return api_response.status.conditions[0].type == 'Complete' + else: + return 0 except ApiException as e: print("Exception when calling BatchV1Api->read_namespaced_job_status: %s\n" % e) print("Create new access token") @@ -1684,6 +1717,24 @@ def stop_maintaining(self, experiment='', configuration=''): print(p, status) #if status == "Running": self.deletePod(p) + def stop_loading(self, experiment='', configuration=''): + # all jobs of configuration - benchmarker + app = self.appname + component = 'loading' + jobs = self.getJobs(app, component, experiment, configuration) + # status per job + for job in jobs: + success = self.getJobStatus(job) + print(job, success) + self.deleteJob(job) + # all pods to these jobs - automatically stopped? + #self.getJobPods(app, component, experiment, configuration) + pods = self.getJobPods(app, component, experiment, configuration) + for p in pods: + status = self.getPodStatus(p) + print(p, status) + #if status == "Running": + self.deletePod(p) def stop_monitoring(self, app='', component='monitoring', experiment='', configuration=''): deployments = self.getDeployments(app=app, component=component, experiment=experiment, configuration=configuration) for deployment in deployments: @@ -1792,4 +1843,22 @@ def connect_master(self, experiment='', configuration=''): -# kubectl delete pvc,pods,services,deployments,jobs -l app=bexhoma-client \ No newline at end of file +# kubectl delete pvc,pods,services,deployments,jobs -l app=bexhoma-client + + +""" +class aws(): + def __init__(self, clusterconfig='cluster.config', configfolder='experiments/', yamlfolder='k8s/', context=None, code=None, instance=None, volume=None, docker=None, script=None, queryfile=None): + super().__init__(clusterconfig, configfolder, yamlfolder, context, code, instance, volume, docker, script, queryfile) + self.cluster = self.contextdata['cluster'] + def eksctl(self, command): + #fullcommand = 'eksctl --context {context} {command}'.format(context=self.context, command=command) + fullcommand = 'eksctl {command}'.format(command=command) + self.logger.debug('aws.eksctl({})'.format(fullcommand)) + #print(fullcommand) + return os.popen(fullcommand).read()# os.system(fullcommand) + def scale_nodegroup(self, nodegroup, size): + #fullcommand = "eksctl scale nodegroup --cluster=Test-2 --nodes=0 --nodes-min=0 --name=Kleine_Gruppe" + command = "scale nodegroup --cluster={cluster} --nodes={size} --name={nodegroup}".format(cluster=self.cluster, size=size, nodegroup=nodegroup) + return self.eksctl(command) +""" diff --git a/bexhoma/scripts/experimentsmanager.py b/bexhoma/scripts/experimentsmanager.py index a648b036..827c9f20 100644 --- a/bexhoma/scripts/experimentsmanager.py +++ b/bexhoma/scripts/experimentsmanager.py @@ -63,13 +63,15 @@ def manage(): cluster.stop_sut(configuration=connection) cluster.stop_monitoring(configuration=connection) cluster.stop_maintaining() + cluster.stop_loading() cluster.stop_benchmarker(configuration=connection) else: experiment = experiments.default(cluster=cluster, code=args.experiment) experiment.stop_sut() - cluster.stop_monitoring() - cluster.stop_maintaining() - cluster.stop_benchmarker() + experiment.stop_monitoring() + experiment.stop_maintaining() + experiment.stop_loading() + experiment.stop_benchmarker() elif args.mode == 'dashboard': cluster = clusters.kubernetes(clusterconfig, context=args.context) cluster.connect_dashboard() @@ -202,6 +204,26 @@ def manage(): for status in num_pods.keys(): apps[configuration][component] += "({num} {status})".format(num=num_pods[status], status=status) ############ + component = 'loading' + apps[configuration][component] = '' + if args.verbose: + stateful_sets = cluster.getStatefulSets(app=app, component=component, experiment=experiment, configuration=configuration) + print("Stateful Sets", stateful_sets) + services = cluster.getServices(app=app, component=component, experiment=experiment, configuration=configuration) + print("Loading Services", services) + pods = cluster.getPods(app=app, component=component, experiment=experiment, configuration=configuration) + if args.verbose: + print("Loading Pods", pods) + num_pods = {} + for pod in pods: + status = cluster.getPodStatus(pod) + #print(status) + #apps[configuration][component] += "{pod} ({status})".format(pod='', status=status) + num_pods[status] = 1 if not status in num_pods else num_pods[status]+1 + #print(num_pods) + for status in num_pods.keys(): + apps[configuration][component] += "({num} {status})".format(num=num_pods[status], status=status) + ############ component = 'monitoring' apps[configuration][component] = '' if args.verbose: diff --git a/cluster.py b/cluster.py index 43c91c50..eaeb57cc 100644 --- a/cluster.py +++ b/cluster.py @@ -63,13 +63,15 @@ cluster.stop_sut(configuration=connection) cluster.stop_monitoring(configuration=connection) cluster.stop_maintaining() + cluster.stop_loading() cluster.stop_benchmarker(configuration=connection) else: experiment = experiments.default(cluster=cluster, code=args.experiment) experiment.stop_sut() - cluster.stop_monitoring() - cluster.stop_maintaining() - cluster.stop_benchmarker() + experiment.stop_monitoring() + experiment.stop_maintaining() + experiment.stop_loading() + experiment.stop_benchmarker() elif args.mode == 'dashboard': cluster = clusters.kubernetes(clusterconfig, context=args.context) cluster.connect_dashboard() @@ -202,6 +204,26 @@ for status in num_pods.keys(): apps[configuration][component] += "({num} {status})".format(num=num_pods[status], status=status) ############ + component = 'loading' + apps[configuration][component] = '' + if args.verbose: + stateful_sets = cluster.getStatefulSets(app=app, component=component, experiment=experiment, configuration=configuration) + print("Stateful Sets", stateful_sets) + services = cluster.getServices(app=app, component=component, experiment=experiment, configuration=configuration) + print("Loading Services", services) + pods = cluster.getPods(app=app, component=component, experiment=experiment, configuration=configuration) + if args.verbose: + print("Loading Pods", pods) + num_pods = {} + for pod in pods: + status = cluster.getPodStatus(pod) + #print(status) + #apps[configuration][component] += "{pod} ({status})".format(pod='', status=status) + num_pods[status] = 1 if not status in num_pods else num_pods[status]+1 + #print(num_pods) + for status in num_pods.keys(): + apps[configuration][component] += "({num} {status})".format(num=num_pods[status], status=status) + ############ component = 'monitoring' apps[configuration][component] = '' if args.verbose: diff --git a/setup.py b/setup.py index 8c4969c7..c482f0b7 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setuptools.setup( name="bexhoma", - version="0.5.20", + version="0.6.0", author="Patrick Erdelt", author_email="perdelt@beuth-hochschule.de", description="This python tools helps managing DBMS benchmarking experiments in a Kubernetes-based HPC cluster environment. It enables users to configure hardware / software setups for easily repeating tests over varying configurations.",