Skip to content

Commit

Permalink
v0.13.2 (#117)
Browse files Browse the repository at this point in the history
* Build(deps): Bump werkzeug from 2.1.0 to 2.2.3

* Metrics: find the last active query (with end time)

* Evaluation: skip adding loading metrics multiple times if there are several benchmarking instancs for the same loading

* Metrics: split the time span for fetching into max 9000s intervals

* Benchmarker: STREAM id as parameter, optional shuffling

* Metrics: Fetch component metrics of containers other than SUT

* Evaluation: lineterminator in df_to_csv()

* Evaluation: Include loader, benchmarker and datagenerator metrics in evaluation cube

* Benchmarker: Recreate parameter default 0

* Build(deps): Bump flask from 2.1.0 to 2.3.2

* Benchmarker: Reads product and version from JDBC, if implemented - not stored anywhere

* Benchmarker: store_connectiondata() to keep connection infos

* Benchmarker: Reads product and version of driver from JDBC, if implemented

* Benchmarker: Parallel threads defined per query

* Benchmarker: starmap_async() for process pool

* Benchmarker: We cannot handle a single connection if there are multiple processes

* Benchmarker: Even with a global (single) connection, the fixation of the connection is taken into account

* Benchmarker: Describe default behaviour

* Benchmarker: Optional parameter init_SQL per connection

* Benchmarker: Optional parameter init_SQL per connection - catch exceptions and result not fetched

* Removed conflicts in requirements
  • Loading branch information
perdelt committed Jun 20, 2023
1 parent 5c98fac commit 2df65b8
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 85 deletions.
12 changes: 10 additions & 2 deletions benchmark.py
Expand Up @@ -42,9 +42,9 @@
parser.add_argument('-w', '--working', help='working per query or connection', default='query', choices=['query','connection'])
#parser.add_argument('-a', '--anonymize', help='anonymize all dbms', action='store_true', default=False)
#parser.add_argument('-u', '--unanonymize', help='unanonymize some dbms, only sensible in combination with anonymize', nargs='*', default=[])
parser.add_argument('-p', '--numProcesses', help='Number of parallel client processes. Global setting, can be overwritten by connection. If None given, half of all available processes is taken', default=None)
parser.add_argument('-p', '--numProcesses', help='Number of parallel client processes. Global setting, can be overwritten by connection. Default is 1.', default=None)
parser.add_argument('-s', '--seed', help='random seed', default=None)
parser.add_argument('-rcp', '--recreate-parameter', help='recreate parameter for randomized queries', default=None)
parser.add_argument('-rcp', '--recreate-parameter', help='recreate parameter for randomized queries', default=0)
parser.add_argument('-cs', '--copy-subfolder', help='copy subfolder of result folder', action='store_true')
parser.add_argument('-ms', '--max-subfolders', help='maximum number of subfolders of result folder', default=None)
parser.add_argument('-sl', '--sleep', help='sleep SLEEP seconds before going to work', default=0)
Expand All @@ -57,6 +57,8 @@
parser.add_argument('-pn', '--num-run', help='Parameter: Number of executions per query', default=0)
parser.add_argument('-m', '--metrics', help='collect hardware metrics per query', action='store_true', default=False)
parser.add_argument('-mps', '--metrics-per-stream', help='collect hardware metrics per stream', action='store_true', default=False)
parser.add_argument('-sid', '--stream-id', help='id of a stream in parallel execution of streams', default=None)
parser.add_argument('-ssh', '--stream-shuffle', help='shuffle query execution based on id of stream', default=None)
#parser.add_argument('-pt', '--timeout', help='Parameter: Timeout in seconds', default=0)
args = parser.parse_args()
# evaluate args
Expand Down Expand Up @@ -119,6 +121,10 @@
benchmarker.BENCHMARKER_VERBOSE_RESULTS = True
if args.verbose_process:
benchmarker.BENCHMARKER_VERBOSE_PROCESS = True
# handle parallel streams
stream_id = args.stream_id
stream_shuffle = args.stream_shuffle
# overwrite parameters
if int(args.num_run) > 0:
querymanagement = {
'numRun': int(args.num_run),
Expand All @@ -138,6 +144,8 @@
#anonymize=args.anonymize,
#unanonymize=args.unanonymize,
numProcesses=args.numProcesses,
stream_id=stream_id,
stream_shuffle=stream_shuffle,
seed=args.seed)
experiments.getConfig(args.config_folder, args.connection_file, args.query_file)
# switch for args.mode
Expand Down
69 changes: 56 additions & 13 deletions dbmsbenchmarker/benchmarker.py
Expand Up @@ -355,7 +355,7 @@ class benchmarker():
"""
Class for running benchmarks
"""
def __init__(self, result_path=None, working='query', batch=False, fixedQuery=None, fixedConnection=None, rename_connection='', rename_alias='', fixedAlias='', anonymize=False, unanonymize=[], numProcesses=None, seed=None, code=None, subfolder=None):
def __init__(self, result_path=None, working='query', batch=False, fixedQuery=None, fixedConnection=None, rename_connection='', rename_alias='', fixedAlias='', anonymize=False, unanonymize=[], numProcesses=None, seed=None, code=None, subfolder=None, stream_id=None, stream_shuffle=None):
"""
Construct a new 'benchmarker' object.
Allocated the reporters store (always called) and printer (if reports are to be generated).
Expand All @@ -377,13 +377,20 @@ def __init__(self, result_path=None, working='query', batch=False, fixedQuery=No
if seed is not None:
random.seed(seed)
## connection management:
self.connectionmanagement = {'numProcesses': numProcesses, 'runsPerConnection': None, 'timeout': None, 'singleConnection': True}
if numProcesses is not None:
# we cannot handle a single connection if there are multiple processes
singleConnection = False
numProcesses = int(numProcesses)
else:
# default is 1 connection per stream
singleConnection = True
self.connectionmanagement = {'numProcesses': numProcesses, 'runsPerConnection': None, 'timeout': None, 'singleConnection': singleConnection}
# set number of parallel client processes
#self.connectionmanagement['numProcesses'] = numProcesses
if self.connectionmanagement['numProcesses'] is None:
self.connectionmanagement['numProcesses'] = 1#math.ceil(mp.cpu_count()/2) #If None, half of all available processes is taken
else:
self.connectionmanagement['numProcesses'] = int(self.numProcesses)
#else:
# self.connectionmanagement['numProcesses'] = int(self.numProcesses)
# connection should be renamed (because of copy to subfolder and parallel processing)
# also rename alias
self.rename_connection = rename_connection
Expand All @@ -393,6 +400,10 @@ def __init__(self, result_path=None, working='query', batch=False, fixedQuery=No
self.activeConnections = []
#self.runsPerConnection = 4
#self.timeout = 600
# number of stream, in particular for parallel streams
# None = ignore this
self.stream_id = stream_id
self.stream_shuffle = stream_shuffle
# there is no general pool
self.pool = None
# store number of cpu cores
Expand Down Expand Up @@ -646,6 +657,13 @@ def getConnectionsFromFile(self,filename=None):
anonymous = False
self.dbms[c['name']] = tools.dbms(c, anonymous)
#self.connectDBMSAll()
def store_connectiondata(self):
connections_content = []
for key, dbms in self.dbms.items():
connections_content.append(dbms.connectiondata)
#with open(self.path+'/connections_copy.config', "w") as connections_file:
with open(self.path+'/connections.config', "w") as connections_file:
connections_file.write(str(connections_content))
def connectDBMSAll(self):
"""
Connects to all dbms we have collected connection data of.
Expand Down Expand Up @@ -884,6 +902,8 @@ def getConnectionManager(self, numQuery, connectionname):
if('timeout' in connectionmanagement):# and connectionmanagement['timeout'] != 0):
# 0=unlimited
timeout = connectionmanagement['timeout']
if('singleConnection' in connectionmanagement):# and connectionmanagement['timeout'] != 0):
singleConnection = connectionmanagement['singleConnection']
if numProcesses == 0 or numProcesses is None:
numProcesses = 1
if timeout == 0:
Expand Down Expand Up @@ -1006,6 +1026,8 @@ def runBenchmark(self, numQuery, connectionname):
timeout = connectionmanagement['timeout']#self.timeout
if timeout is not None:
jaydebeapi.QUERY_TIMEOUT = timeout
if self.stream_id is not None:
parameter.defaultParameters['STREAM'] = self.stream_id
singleConnection = connectionmanagement['singleConnection']
# overwrite by connection
#if 'connectionmanagement' in self.dbms[c].connectiondata:
Expand Down Expand Up @@ -1137,12 +1159,25 @@ def runBenchmark(self, numQuery, connectionname):
lists = [res.get(timeout=timeout) for res in multiple_results]
lists = [i for j in lists for i in j]
else:
"""
with mp.Pool(processes=numProcesses) as pool:
self.logger.info("POOL of query senders (local pool)")
#multiple_results = [pool.apply_async(singleRun, (self.dbms[c].connectiondata, inputConfig, runs[i*batchsize:(i+1)*batchsize], connectionname, numQuery, self.path, JPickler.dumps(self.activeConnections))) for i in range(numBatches)]
multiple_results = [pool.apply_async(singleRun, (self.dbms[c].connectiondata, inputConfig, runs[i*batchsize:(i+1)*batchsize], connectionname, numQuery, self.path, [], BENCHMARKER_VERBOSE_QUERIES, BENCHMARKER_VERBOSE_RESULTS, BENCHMARKER_VERBOSE_PROCESS)) for i in range(numBatches)]
lists = [res.get(timeout=timeout) for res in multiple_results]
lists = [i for j in lists for i in j]
pool.close()
"""
with mp.Pool(processes=numProcesses) as pool:
self.logger.info("POOL of query senders (local pool starmap)")
#multiple_results = [pool.apply_async(singleRun, (self.dbms[c].connectiondata, inputConfig, runs[i*batchsize:(i+1)*batchsize], connectionname, numQuery, self.path, JPickler.dumps(self.activeConnections))) for i in range(numBatches)]
args = [(self.dbms[c].connectiondata, inputConfig, runs[i*batchsize:(i+1)*batchsize], connectionname, numQuery, self.path, [], BENCHMARKER_VERBOSE_QUERIES, BENCHMARKER_VERBOSE_RESULTS, BENCHMARKER_VERBOSE_PROCESS) for i in range(numBatches)]
multiple_results = pool.starmap_async(singleRun, args)
lists = multiple_results.get(timeout=timeout)
#lists = [res.get(timeout=timeout) for res in multiple_results]
lists = [i for j in lists for i in j]
pool.close()
pool.join()
else:
# no parallel processes because JVM does not parallize
# time the queries and stop early if maxTime is reached
Expand Down Expand Up @@ -1394,10 +1429,13 @@ def runBenchmarksQuery(self):
numProcesses = 1
i = 0
#connectionname = c
print("More active connections from {} to {} for {}".format(len(self.activeConnections), numProcesses, connectionname))
self.activeConnections.append(tools.dbms(self.dbms[connectionname].connectiondata))
print("Establish global connection #"+str(i))
self.activeConnections[i].connect()
if (self.fixedQuery is not None and self.fixedQuery != numQuery) or (self.fixedConnection is not None and self.fixedConnection != connectionname):
continue
else:
print("More active connections from {} to {} for {}".format(len(self.activeConnections), numProcesses, connectionname))
self.activeConnections.append(tools.dbms(self.dbms[connectionname].connectiondata))
print("Establish global connection #"+str(i))
self.activeConnections[i].connect()
# run benchmark, current query and connection
bBenchmarkDoneForThisQuery = self.runBenchmark(numQuery, connectionname)
# close global connection
Expand Down Expand Up @@ -1433,10 +1471,13 @@ def runBenchmarksConnection(self):
numProcesses = 1
i = 0
#connectionname = c
print("More active connections from {} to {} for {}".format(len(self.activeConnections), numProcesses, connectionname))
self.activeConnections.append(tools.dbms(self.dbms[connectionname].connectiondata))
print("Establish global connection #"+str(i))
self.activeConnections[i].connect()
if (self.fixedConnection is not None and self.fixedConnection != connectionname):
continue
else:
print("More active connections from {} to {} for {}".format(len(self.activeConnections), numProcesses, connectionname))
self.activeConnections.append(tools.dbms(self.dbms[connectionname].connectiondata))
print("Establish global connection #"+str(i))
self.activeConnections[i].connect()
#print(self.activeConnections)
# work queries
ordered_list_of_queries = range(1, len(self.queries)+1)
Expand Down Expand Up @@ -1487,9 +1528,11 @@ def runBenchmarks(self):
self.logger.debug("### Time end: "+str(self.time_end))
for connectionname in sorted(self.dbms.keys()):
self.protocol['total'][connectionname]['time_end'] = self.time_end
print("DBMSBenchmarker duration: "+str(self.time_end-self.time_start))
print("DBMSBenchmarker duration: {} [s]".format(self.time_end-self.time_start))
# write protocol again
self.reporterStore.writeProtocol()
# store connection data again, it may have changed
self.store_connectiondata()
if self.bBatch:
# generate reports at the end only
self.generateReportsAll()
Expand Down
41 changes: 38 additions & 3 deletions dbmsbenchmarker/evaluator.py
Expand Up @@ -218,6 +218,9 @@ def hasInitScript(c):
evaluation['dbms'][c]['hardwaremetrics'] = {}
evaluation['general']['loadingmetrics'] = {}
evaluation['general']['streamingmetrics'] = {}
evaluation['general']['loadermetrics'] = {}
evaluation['general']['benchmarkermetrics'] = {}
evaluation['general']['datageneratormetrics'] = {}
metricsReporter = monitor.metrics(self.benchmarker)
hardwareAverages = metricsReporter.computeAverages()
if c in hardwareAverages:
Expand All @@ -226,16 +229,27 @@ def hasInitScript(c):
if 'total_gpu_power' in hardwareAverages[c]:
# basis: per second average power, total time in ms
evaluation['dbms'][c]['hardwaremetrics']['total_gpu_energy'] = hardwareAverages[c]['total_gpu_power']*times[c]/3600000
# load test metrics
for m, avg in hardwareAverages[c].items():
# load test metrics
df = metricsReporter.dfHardwareMetricsLoading(m)
df.drop_duplicates(inplace=True) # TODO: Why are there duplicates sometimes?
evaluation['general']['loadingmetrics'][m] = df.to_dict(orient='index')
# load streaming metrics
for m, avg in hardwareAverages[c].items():
# streaming metrics
df = metricsReporter.dfHardwareMetricsStreaming(m)
df.drop_duplicates(inplace=True) # TODO: Why are there duplicates sometimes?
evaluation['general']['streamingmetrics'][m] = df.to_dict(orient='index')
# loader metrics
df = metricsReporter.dfHardwareMetricsLoader(m)
df.drop_duplicates(inplace=True) # TODO: Why are there duplicates sometimes?
evaluation['general']['loadermetrics'][m] = df.to_dict(orient='index')
# benchmarker metrics
df = metricsReporter.dfHardwareMetricsBenchmarker(m)
df.drop_duplicates(inplace=True) # TODO: Why are there duplicates sometimes?
evaluation['general']['benchmarkermetrics'][m] = df.to_dict(orient='index')
# datagenerator metrics
df = metricsReporter.dfHardwareMetricsDatagenerator(m)
df.drop_duplicates(inplace=True) # TODO: Why are there duplicates sometimes?
evaluation['general']['datageneratormetrics'][m] = df.to_dict(orient='index')
# appendix start: query survey
evaluation['query'] = {}
for i in range(1, len(self.benchmarker.queries)+1):
Expand Down Expand Up @@ -601,3 +615,24 @@ def dfStreamingMetric(evaluation, metric):
else:
df = pd.DataFrame()
return df
def dfLoaderMetric(evaluation, metric):
if 'loadermetrics' in evaluation['general'] and metric in evaluation['general']['loadermetrics']:
df = pd.DataFrame.from_dict(evaluation['general']['loadermetrics'][metric]).transpose()
df.index.name = 'DBMS'
else:
df = pd.DataFrame()
return df
def dfBenchmarkerMetric(evaluation, metric):
if 'benchmarkermetrics' in evaluation['general'] and metric in evaluation['general']['benchmarkermetrics']:
df = pd.DataFrame.from_dict(evaluation['general']['benchmarkermetrics'][metric]).transpose()
df.index.name = 'DBMS'
else:
df = pd.DataFrame()
return df
def dfDatageneratorMetric(evaluation, metric):
if 'datageneratormetrics' in evaluation['general'] and metric in evaluation['general']['datageneratormetrics']:
df = pd.DataFrame.from_dict(evaluation['general']['datageneratormetrics'][metric]).transpose()
df.index.name = 'DBMS'
else:
df = pd.DataFrame()
return df
6 changes: 6 additions & 0 deletions dbmsbenchmarker/inspector.py
Expand Up @@ -448,6 +448,12 @@ def get_loading_metrics(self, metric):
return evaluator.dfLoadingMetric(self.e.evaluation, metric)
def get_streaming_metrics(self, metric):
return evaluator.dfStreamingMetric(self.e.evaluation, metric)
def get_loader_metrics(self, metric):
return evaluator.dfLoaderMetric(self.e.evaluation, metric)
def get_benchmarker_metrics(self, metric):
return evaluator.dfBenchmarkerMetric(self.e.evaluation, metric)
def get_datagenerator_metrics(self, metric):
return evaluator.dfDatageneratorMetric(self.e.evaluation, metric)
def get_total_resultsize_normalized(self):
return tools.dataframehelper.evaluateNormalizedResultsizeToDataFrame(self.e.evaluation).T
def get_total_resultsize(self):
Expand Down

0 comments on commit 2df65b8

Please sign in to comment.