Skip to content

Commit

Permalink
V0.5.20 (#100)
Browse files Browse the repository at this point in the history
* storageConfiguration tag, so that postgres=9 and =13 can share a pvc

* Prepare next release

* Scaling factor per experiment

* Demo config cleaned

* Docs: Improve config and examples

* Configuration: Dockerimage as parameter

* Config: MonetDB log file

* Masterscript: we do not test at localhost (forwarded), because there might be conflicts

* First draft of an IoT benchmark

* First draft of an IoT benchmark - create_job_maintainer similar to benchmarker

* First draft of an IoT benchmark - create_job_maintainer specific to PostgreSQL

* Maintaining draft

* Tools: Show Maintaining pods

* Tools: Stop Maintaining pods

* First draft of an IoT benchmark - maintaining settings in experiment

* Masterscript: stop sut also means stop maintaining

* Masterscript: maintainer needs no client (benchmarker) infos

* Masterscript: maintainer parallelism as parameter

* Masterscript: maintainer runs for 2h per default

* Tools: Show Maintaining pods as number only

* Masterscript: stop maintainer job removes its pods

* Masterscript: reconnect and try again, if not failed due to "not found"

* Masterscript: more debug output about jobs and their pods

* Masterscript: more debug output about maintaining

* Masterscript: stop pods of a job explicitly

* Tools: Show more debug

* Masterscript: maintaining does not change datatransfer settings of benchmarker
  • Loading branch information
perdelt committed Jul 27, 2022
1 parent 3c055ec commit 0929756
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 28 deletions.
124 changes: 122 additions & 2 deletions bexhoma/configurations.py
Expand Up @@ -90,6 +90,8 @@ def __init__(self, experiment, docker=None, configuration='', script=None, alias
self.dialect = dialect
self.num_worker = worker
self.monitoring_active = experiment.monitoring_active
self.maintaining_active = experiment.maintaining_active
self.parallelism = 1
self.storage_label = experiment.storage_label
self.experiment_done = False
self.dockerimage = dockerimage
Expand Down Expand Up @@ -242,6 +244,29 @@ def sut_is_running(self):
if status == "Running":
return True
return False
def maintaining_is_running(self):
app = self.appname
component = 'maintaining'
configuration = self.configuration
pods = self.experiment.cluster.getPods(app, component, self.experiment.code, configuration)
self.logger.debug("maintaining_is_running found {} pods".format(len(pods)))
if len(pods) > 0:
pod_sut = pods[0]
status = self.experiment.cluster.getPodStatus(pod_sut)
if status == "Running":
return True
return False
def maintaining_is_pending(self):
app = self.appname
component = 'maintaining'
configuration = self.configuration
pods = self.experiment.cluster.getPods(app, component, self.experiment.code, configuration)
if len(pods) > 0:
pod_sut = pods[0]
status = self.experiment.cluster.getPodStatus(pod_sut)
if status == "Pending":
return True
return False
def monitoring_is_running(self):
app = self.appname
component = 'monitoring'
Expand Down Expand Up @@ -299,6 +324,8 @@ def start_loading(self, delay=0):
forward = ['kubectl', '--context {context}'.format(context=self.experiment.cluster.context), 'port-forward', 'service/'+service] #bexhoma-service']#, '9091', '9300']#, '9400']
forward.extend(ports)
your_command = " ".join(forward)
# we do not test at localhost (forwarded), because there might be conflicts
"""
self.logger.debug('configuration.start_loading({})'.format(your_command))
subprocess.Popen(your_command, stdout=subprocess.PIPE, shell=True)
# wait for port to be connected
Expand All @@ -308,6 +335,7 @@ def start_loading(self, delay=0):
# not answering
self.experiment.cluster.stopPortforwarding()
return False
"""
#while not dbmsactive:
# self.wait(10)
# dbmsactive = self.checkDBMS(self.experiment.cluster.host, self.experiment.cluster.port)
Expand All @@ -316,8 +344,9 @@ def start_loading(self, delay=0):
if not self.loading_started:
#print("load_data")
self.load_data()
self.experiment.cluster.stopPortforwarding()
# store experiment
# we do not test at localhost (forwarded), because there might be conflicts
#self.experiment.cluster.stopPortforwarding()
# store experiment needs new format
"""
experiment = {}
experiment['delay'] = delay
Expand All @@ -344,6 +373,16 @@ def generate_component_name(self, app='', component='', experiment='', configura
else:
name = "{app}-{component}-{configuration}-{experiment}".format(app=app, component=component, configuration=configuration, experiment=experiment).lower()
return name
def start_maintaining(self, app='', component='maintaining', experiment='', configuration='', parallelism=1):
if len(app) == 0:
app = self.appname
if len(configuration) == 0:
configuration = self.configuration
if len(experiment) == 0:
experiment = self.code
job = self.create_job_maintaining(app=app, component='maintaining', experiment=experiment, configuration=configuration, parallelism=parallelism)
self.logger.debug("Deploy "+job)
self.experiment.cluster.kubectl('create -f '+job)#self.yamlfolder+deployment)
def create_monitoring(self, app='', component='monitoring', experiment='', configuration=''):
name = self.generate_component_name(app=app, component=component, experiment=experiment, configuration=configuration)
#if len(app) == 0:
Expand Down Expand Up @@ -448,6 +487,27 @@ def stop_monitoring(self, app='', component='monitoring', experiment='', configu
services = self.experiment.cluster.getServices(app=app, component=component, experiment=experiment, configuration=configuration)
for service in services:
self.experiment.cluster.deleteService(service)
def stop_maintaining(self, app='', component='maintaining', experiment='', configuration=''):
if len(app)==0:
app = self.appname
if len(configuration) == 0:
configuration = self.configuration
if len(experiment) == 0:
experiment = self.code
jobs = self.experiment.cluster.getJobs(app, component, experiment, configuration)
# status per job
for job in jobs:
success = self.experiment.cluster.getJobStatus(job)
print(job, success)
self.experiment.cluster.deleteJob(job)
# all pods to these jobs - automatically stopped? only if finished?
#self.experiment.cluster.getJobPods(app, component, experiment, configuration)
pods = self.experiment.cluster.getJobPods(app, component, experiment, configuration)
for p in pods:
status = self.experiment.cluster.getPodStatus(p)
print(p, status)
#if status == "Running":
self.experiment.cluster.deletePod(p)
def get_instance_from_resources(self):
resources = experiments.DictToObject(self.resources)
cpu = resources.requests.cpu
Expand Down Expand Up @@ -794,6 +854,8 @@ def stop_sut(self, app='', component='sut', experiment='', configuration=''):
self.experiment.cluster.deleteService(service)
if self.experiment.monitoring_active:
self.stop_monitoring()
if self.experiment.maintaining_active:
self.stop_maintaining()
if component == 'sut':
self.stop_sut(app=app, component='worker', experiment=experiment, configuration=configuration)
def checkGPUs(self):
Expand Down Expand Up @@ -1554,6 +1616,64 @@ def create_job(self, connection, app='', component='benchmarker', experiment='',
except yaml.YAMLError as exc:
print(exc)
return job_experiment
#def create_job_maintaining(self, app='', component='maintaining', experiment='', configuration='', client='1', parallelism=1, alias=''):
def create_job_maintaining(self, app='', component='maintaining', experiment='', configuration='', parallelism=1, alias=''):
if len(app) == 0:
app = self.appname
jobname = self.generate_component_name(app=app, component=component, experiment=experiment, configuration=configuration)
servicename = self.generate_component_name(app=app, component='sut', experiment=experiment, configuration=configuration)
#print(jobname)
self.logger.debug('configuration.create_job_maintainer({})'.format(jobname))
# determine start time
now = datetime.utcnow()
start = now + timedelta(seconds=180)
#start = datetime.strptime('2021-03-04 23:15:25', '%Y-%m-%d %H:%M:%S')
#wait = (start-now).seconds
now_string = now.strftime('%Y-%m-%d %H:%M:%S')
start_string = start.strftime('%Y-%m-%d %H:%M:%S')
#yamlfile = self.experiment.cluster.yamlfolder+"job-dbmsbenchmarker-"+code+".yml"
job_experiment = self.experiment.path+'/job-maintaining-{configuration}.yml'.format(configuration=configuration)
with open(self.experiment.cluster.yamlfolder+"jobtemplate-maintaining.yml") as stream:
try:
result=yaml.safe_load_all(stream)
result = [data for data in result]
#print(result)
except yaml.YAMLError as exc:
print(exc)
for dep in result:
if dep['kind'] == 'Job':
dep['metadata']['name'] = jobname
job = dep['metadata']['name']
dep['spec']['completions'] = parallelism
dep['spec']['parallelism'] = parallelism
dep['metadata']['labels']['app'] = app
dep['metadata']['labels']['component'] = component
dep['metadata']['labels']['configuration'] = configuration
dep['metadata']['labels']['experiment'] = str(experiment)
#dep['metadata']['labels']['client'] = str(client)
dep['metadata']['labels']['experimentRun'] = str(self.numExperimentsDone+1)
dep['spec']['template']['metadata']['labels']['app'] = app
dep['spec']['template']['metadata']['labels']['component'] = component
dep['spec']['template']['metadata']['labels']['configuration'] = configuration
dep['spec']['template']['metadata']['labels']['experiment'] = str(experiment)
#dep['spec']['template']['metadata']['labels']['client'] = str(client)
dep['spec']['template']['metadata']['labels']['experimentRun'] = str(self.numExperimentsDone+1)
envs = dep['spec']['template']['spec']['containers'][0]['env']
for i,e in enumerate(envs):
if e['name'] == 'SENSOR_DATABASE':
dep['spec']['template']['spec']['containers'][0]['env'][i]['value'] = 'postgresql://postgres:@{}:9091/postgres'.format(servicename)
if e['name'] == 'SENSOR_RATE':
dep['spec']['template']['spec']['containers'][0]['env'][i]['value'] = '0.1'
if e['name'] == 'SENSOR_NUMBER':
dep['spec']['template']['spec']['containers'][0]['env'][i]['value'] = '72000'
self.logger.debug('configuration.create_job_maintaining({})'.format(str(e)))
#print(e)
with open(job_experiment,"w+") as stream:
try:
stream.write(yaml.dump_all(result))
except yaml.YAMLError as exc:
print(exc)
return job_experiment



Expand Down
71 changes: 68 additions & 3 deletions bexhoma/experiments.py
Expand Up @@ -99,6 +99,7 @@ def __init__(self,
self.namespace = self.cluster.namespace#.config['credentials']['k8s']['namespace']
self.configurations = []
self.storage_label = ''
self.maintaining_active = False
def wait(self, sec):
print("Waiting "+str(sec)+"s...", end="", flush=True)
intervals = int(sec)
Expand Down Expand Up @@ -430,9 +431,9 @@ def work_benchmark_list(self, intervals=30, stop=True):
#print("{} is not running".format(config.configuration))
if not config.experiment_done:
if not config.sut_is_pending():
print("{} is not running yet - ".format(config.configuration), end="", flush=True)
print("{} is not running yet - ".format(config.configuration))#, end="", flush=True)
if self.cluster.max_sut is not None:
print("{} running and {} pending pods: max is {} pods in the cluster - ".format(num_pods_running, num_pods_pending, self.cluster.max_sut), end="", flush=True)
print("{} running and {} pending pods: max is {} pods in the cluster - ".format(num_pods_running, num_pods_pending, self.cluster.max_sut))#, end="", flush=True)
if num_pods_running+num_pods_pending < self.cluster.max_sut:
print("it will start now")
config.start_sut()
Expand Down Expand Up @@ -474,11 +475,23 @@ def work_benchmark_list(self, intervals=30, stop=True):
config.loading_after_time = now + timedelta(seconds=delay)
print("{} will start loading but not before {} (that is in {} secs)".format(config.configuration, config.loading_after_time.strftime('%Y-%m-%d %H:%M:%S'), delay))
continue
# check if maintaining
if config.loading_finished:
if config.monitoring_active and not config.monitoring_is_running():
print("{} waits for monitoring".format(config.configuration))
continue
if config.maintaining_active:
if not config.maintaining_is_running():
print("{} is not maintained yet".format(config.configuration))
config.start_maintaining(parallelism=config.parallelism)
# benchmark if loading is done and monitoring is ready
if config.loading_finished:
if config.monitoring_active and not config.monitoring_is_running():
print("{} waits for monitoring".format(config.configuration))
continue
if config.maintaining_active and not config.maintaining_is_running():
print("{} waits for maintaining".format(config.configuration))
continue
app = self.cluster.appname
component = 'benchmarker'
configuration = ''
Expand Down Expand Up @@ -555,7 +568,7 @@ def work_benchmark_list(self, intervals=30, stop=True):
# status per job
for job in jobs:
success = self.cluster.getJobStatus(job)
self.cluster.logger.debug('job {} has status {}'.format(job, success))
self.cluster.logger.debug('job {} has success status {}'.format(job, success))
#print(job, success)
if success:
self.cluster.deleteJob(job)
Expand Down Expand Up @@ -697,3 +710,55 @@ def set_queries_full(self):
def set_queries_profiling(self):
self.set_queryfile('queries-tpch-profiling.config')

class iot(default):
def __init__(self,
cluster,
code=None,
queryfile = 'queries-iot.config',
SF = '1',
numExperiments = 1,
timeout = 7200,
detached=False):
default.__init__(self, cluster, code, numExperiments, timeout, detached)
self.set_experiment(volume='iot')
self.set_experiment(script='SF'+str(SF)+'-index')
self.cluster.set_configfolder('experiments/iot')
parameter.defaultParameters = {'SF': str(SF)}
self.set_queryfile(queryfile)
self.set_workload(
name = 'IoT Queries SF='+str(SF),
info = 'This experiment performs some IoT inspired queries.'
)
self.storage_label = 'tpch-'+str(SF)
self.maintaining_active = True
def set_queries_full(self):
self.set_queryfile('queries-iot.config')
def set_queries_profiling(self):
self.set_queryfile('queries-iot-profiling.config')
def set_querymanagement_maintaining(self,
numRun=128,
delay=5,
datatransfer=False):
self.set_querymanagement(
numWarmup = 0,
numCooldown = 0,
numRun = numRun,
delay = delay,
timer = {
'connection':
{
'active': True,
'delay': 0
},
#'datatransfer':
#{
# 'active': datatransfer,
# 'sorted': True,
# 'compare': 'result',
# 'store': [],
# 'precision': 0,
#}
})
#self.monitoring_active = True
self.maintaining_active = True

0 comments on commit 0929756

Please sign in to comment.