Skip to content

Commit

Permalink
V0.6.3 Unified benchmarking / evaluation components, refined loading …
Browse files Browse the repository at this point in the history
…and indexing components (#156)

* Masterscript: Towards unifying collections of loading and benchmarking components

* Masterscript: Unification of run_benchmarker_pod()

* Evaluator: First sketch of implementation

* Evaluator: Exit code

* Masterscript: Currently, only benchmarking component fetches loading metrics

* Evaluator: First sketch of implementation

* Evaluator: Debugging

* Masterscript: Unification of run_benchmarker_pod()

* Masterscript: Unification of run_benchmarker_pod() - copy split connection files

* Masterscript: Test benchbase result

* Masterscript: get_workflow_list() to test results for completeness

* Masterscript: Dump output of loading metrics pod

* Masterscript: Reconstruct workflow need benchmarking df

* Masterscript: Compare reconstructed workflow with benchmarker lists

* Evaluator: HammerDB test result for formal correctness

* Masterscript: use_distributed_datasource, default False

* Masterscript: Loading metrics dumps debug infos

* Masterscript: Monitoring set to 5s scraping interval

* Masterscript: Monitoring set to 15s scraping interval

* Masterscript: Monitoring scraping interval as parameter of experiment and configuration

* HammerDB: Download results

* Masterscript: More job labels to identify connection and time

* Masterscript: More job labels to identify connection and time

* Masterscript: Monitor stream first tests

* Masterscript: Monitor stream for all benchmarking components

* Masterscript: Monitoring transformed

* Masterscript: Monitoring loading uses connection's specific config

* Masterscript: Monitoring stream uses connection's specific config, show errors

* Masterscript: Monitoring changes connection.config, overwrite it with correct data again

* Masterscript: Remove old methods and set path more consistently

* Masterscript: Upload connection.config

* Masterscript: Docs

* Masterscript: Loading time from time spans of pods

* Masterscript: Loading time from time spans of pods - more debug output

* Masterscript: Loading time from time spans of pods if exists, total time span otherwise

* Masterscript: Also add timeGenerator as info

* Masterscript: Remove old evaluation methods

* Masterscript: Indexing reuses schema script methods

* Masterscript: time_ingested at sut and pvc

* Masterscript: No indexing per default

* Masterscript: Also store time markers per pod and container of jobs (loading and benchmarking)

* Masterscript: Use message queue for benchmarker

* Masterscript: Also store time markers per pod and container of jobs (loading and benchmarking)

* Masterscript: Also store time markers per pod and container of jobs (loading and benchmarking) in connection.config

* Masterscript: Sketch storing index time, dbmsbenchmarker uses benchmarking_parameters

* DBMSBenchmarker: Time synch 4 min in future

* Masterscript: Copy configs to result folder before starting benchmarker component's job

* Masterscript: Only delete job pods when job is completed

* Masterscript: patch loading

* Masterscript: store timeIngesting

* Require: patch loading

* Masterscript: patch loading

* Masterscript: store timeIndex

* Masterscript: set_pod_counter() to 0 before loading starts

* Masterscript: read timeLoading after indexing (again)

* Masterscript: store timeSchema

* Masterscript: store all script_times

* Masterscript: store all script_times as float

* Masterscript: Log scripting times for debugging

* Masterscript: Verify loading times

* Masterscript: set_additional_labels()

* Masterscript: set_additional_labels() SF for TPC-H
  • Loading branch information
perdelt committed Mar 3, 2023
1 parent bbb3e01 commit 9f91370
Show file tree
Hide file tree
Showing 6 changed files with 1,818 additions and 1,083 deletions.
2 changes: 1 addition & 1 deletion bexhoma/__init__.py
@@ -1,4 +1,4 @@
"""
The clustermanager module
"""
__all__ = ["clusters", "experiments", "configurations"]
__all__ = ["evaluators", "clusters", "experiments", "configurations"]
59 changes: 58 additions & 1 deletion bexhoma/clusters.py
Expand Up @@ -89,7 +89,7 @@ def __init__(self, clusterconfig='cluster.config', experiments_configfolder='exp
configfile=f.read()
self.config = eval(configfile)
self.experiments_configfolder = experiments_configfolder
self.resultfolder = self.config['benchmarker']['resultfolder']
self.resultfolder = self.config['benchmarker']['resultfolder'].replace("\\", "/").replace("C:", "")
self.queryfile = queryfile
self.clusterconfig = clusterconfig
self.timeLoading = 0
Expand Down Expand Up @@ -1001,6 +1001,48 @@ def get_jobs(self, app='', component='', experiment='', configuration='', client
# try again, if not failed due to "not found"
if not e.status == 404:
return self.get_jobs(app=app, component=component, experiment=experiment, configuration=configuration, client=client)
def get_jobs_labels(self, app='', component='', experiment='', configuration='', client=''):
"""
Return all jobs matching a set of labels (component/ experiment/ configuration)
:param app: app the job belongs to
:param component: Component, for example sut or monitoring
:param experiment: Unique identifier of the experiment
:param configuration: Name of the dbms configuration
:param client: DEPRECATED?
"""
#print("getJobs")
label = ''
if len(app)==0:
app = self.appname
label += 'app='+app
if len(component)>0:
label += ',component='+component
if len(experiment)>0:
label += ',experiment='+experiment
if len(configuration)>0:
label += ',configuration='+configuration
if len(client)>0:
label += ',client='+client
self.logger.debug('get_jobs_labels '+label)
job_labels = {}
try:
api_response = self.v1batches.list_namespaced_job(self.namespace, label_selector=label)#'app='+appname)
#pprint(api_response)
if len(api_response.items) > 0:
for item in api_response.items:
job_labels[item.metadata.name] = item.metadata.labels
return job_labels
else:
return []
except ApiException as e:
print("Exception when calling BatchV1Api->list_namespaced_job: %s\n" % e)
print("Create new access token")
self.cluster_access()
self.wait(2)
# try again, if not failed due to "not found"
if not e.status == 404:
return self.get_jobs_labels(app=app, component=component, experiment=experiment, configuration=configuration, client=client)
def get_job_status(self, jobname='', app='', component='', experiment='', configuration='', client=''):
"""
Return status of a jobs given by name or matching a set of labels (component/ experiment/ configuration)
Expand Down Expand Up @@ -1415,6 +1457,21 @@ def add_to_messagequeue(self, queue, data):
self.logger.debug("I am using messagequeue {}".format(pod_messagequeue))
redisCommand = 'redis-cli rpush {redisQueue} {data} '.format(redisQueue=queue, data=data)
self.execute_command_in_pod(command=redisCommand, pod=pod_messagequeue)
def set_pod_counter(self, queue, value=0):
"""
Add data to (Redis) message queue.
:param queue: Name of the queue
:param data: Data to be added to queue
"""
pods_messagequeue = self.get_pods(component='messagequeue')
if len(pods_messagequeue) > 0:
pod_messagequeue = pods_messagequeue[0]
else:
pod_messagequeue = 'bexhoma-messagequeue-5ff94984ff-mv9zn'
self.logger.debug("I am using messagequeue {}".format(pod_messagequeue))
redisCommand = 'redis-cli set {redisQueue} {value} '.format(redisQueue=queue, value=value)
self.execute_command_in_pod(command=redisCommand, pod=pod_messagequeue)



Expand Down

0 comments on commit 9f91370

Please sign in to comment.