Skip to content

Commit

Permalink
V0.11.22 (#89)
Browse files Browse the repository at this point in the history
* Prepare next release

* Docs: Reference to rendered example

* Inspector: Correct "DataFrame is highly fragmented"

* Dependencies: Jinja2 == 2.11.3 because of dash

* Current version of click

* Cleaned code - comments and marked DEPRECATED_

* Cleaned code - Prometheus without time extend possible

* Cleaned code - metrics collector

* Cleaned code - mp.cpu_count()

* Cleaned code - stop logging multiprocessing at end of benchmark

* Cleaned code

* JOSS: More details

* "Size of processed result list retrieved" only if there is a result set

* JOSS: Acknowledgements

* JOSS: More details
  • Loading branch information
perdelt committed Jun 17, 2022
1 parent eda33d5 commit 39b63aa
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 262 deletions.
10 changes: 8 additions & 2 deletions dbmsbenchmarker/benchmarker.py
Expand Up @@ -50,6 +50,8 @@
BENCHMARKER_VERBOSE_RESULTS = False
BENCHMARKER_VERBOSE_PROCESS = False

logger = mp.log_to_stderr(logging.INFO)

class singleRunInput:
"""
Class for collecting info about a benchmark run
Expand Down Expand Up @@ -280,7 +282,7 @@ def singleResult(connectiondata, inputConfig, numRuns, connectionname, numQuery,
logger.debug(workername+"Begin sorting")
data = sorted(data, key=itemgetter(*list(range(0,len(data[0])))))
logger.debug(workername+"Finished sorting")
logger.info(workername+"Size of processed result list retrieved: "+str(sys.getsizeof(data))+" bytes")
logger.info(workername+"Size of processed result list retrieved: "+str(sys.getsizeof(data))+" bytes")
# convert to dataframe
#columnnames = [[i[0].upper() for i in connection.cursor.description]]
df = pd.DataFrame.from_records(data=data, coerce_float=True)
Expand Down Expand Up @@ -386,6 +388,8 @@ def __init__(self, result_path=None, working='query', batch=False, fixedQuery=No
#self.timeout = 600
# there is no general pool
self.pool = None
# store number of cpu cores
self.num_cpu = mp.cpu_count()
# printer is first and fixed reporter
self.reporter = [reporter.printer(self)]
# store is fixed reporter and cannot be removed
Expand Down Expand Up @@ -1171,7 +1175,7 @@ def runBenchmark(self, numQuery, connectionname):
inputConfig.append(singleResultInput(i, l_data[i], l_columnnames[i], self.queries[numQuery-1]))
#print(inputConfig)
lists = []
numProcesses_cpu = mp.cpu_count()
numProcesses_cpu = self.num_cpu# mp.cpu_count()
batchsize_data = 1
numBatches_data = math.ceil(query.numRun/batchsize_data)
runs_data = list(range(0,query.numRun))
Expand Down Expand Up @@ -1428,6 +1432,8 @@ def runBenchmarks(self):
if self.bBatch:
# generate reports at the end only
self.generateReportsAll()
# stop logging multiprocessing
mp.log_to_stderr(logging.ERROR)
def readResultfolder(self):
"""
Reads data of previous benchmark from folder.
Expand Down
2 changes: 1 addition & 1 deletion dbmsbenchmarker/evaluator.py
@@ -1,5 +1,5 @@
"""
Classes for generating reports for benchmarks of the Python Package DBMS Benchmarker
Classes for generating evaluation cubes for benchmarks of the Python Package DBMS Benchmarker
Copyright (C) 2020 Patrick Erdelt
This program is free software: you can redistribute it and/or modify
Expand Down
42 changes: 40 additions & 2 deletions dbmsbenchmarker/inspector.py
Expand Up @@ -50,14 +50,48 @@
]

def getIntersection(df1, df2):
"""
Intersection of two dataframes.
:param df1: First dataframe
:param df2: Second dataframe
:return: Intersection
"""
return pd.merge(df1, df2, how='inner')
def getUnion(df1, df2):
"""
Union of two dataframes.
:param df1: First dataframe
:param df2: Second dataframe
:return: Union
"""
return pd.concat([df1, df2])
def getDifference12(df1, df2):
"""
Difference of two dataframes.
:param df1: First dataframe
:param df2: Second dataframe
:return: Difference
"""
return pd.concat([df1, df2, df2]).drop_duplicates(keep=False)
def completeSort(df):
"""
Sort dataframe by all columns.
:param df: Dataframe
:return: Sorted dataframe
"""
return df.sort_values(by=[df_union_all.columns[i] for i in range(0,len(df.columns))], ascending=True)
def list_intersection(lst1, lst2):
"""
Intersection of two lists.
:param lst1: First list
:param lst2: Second list
:return: Intersection
"""
lst3 = [value for value in lst1 if value in lst2]
return lst3

Expand Down Expand Up @@ -322,7 +356,9 @@ def get_aggregated_experiment_statistics(self, type='timer', name='run', dbms_fi
#print(df_stat[total_aggregate])
df_result.insert(loc=len(df_result.columns), column=i, value=df_stat[total_aggregate])
#print(df_result)
return df_result
#return df_result
newframe = df_result.copy()
return newframe
def get_aggregated_by_connection(self, dataframe, list_connections=[], connection_aggregate='Mean'):
"""
Calculate the connection aggregate
Expand All @@ -343,7 +379,9 @@ def get_aggregated_by_connection(self, dataframe, list_connections=[], connectio
else:
df_stats = evaluator.addStatistics(dataframe.T, drop_measures=True)
df_stats = pd.DataFrame(df_stats[connection_aggregate])
return df_stats.T
#return df_stats.T
newframe = df_stats.copy().T
return newframe
def get_error(self, numQuery, connection=None):
# error message of connection at query
return self.benchmarks.getError(numQuery, connection)
Expand Down
39 changes: 35 additions & 4 deletions dbmsbenchmarker/monitor.py
Expand Up @@ -104,6 +104,7 @@ def __init__(self, benchmarks):
self.benchmarker = benchmarks
@staticmethod
def getMetrics(url, metric, time_start, time_end, step=1):
logging.debug("getMetrics from "+url)
query = 'query_range'#?query='+metric['query']+'&start='+str(time_start)+'&end='+str(time_end)+'&step='+str(self.step)
params = {
'query': metric['query'],
Expand Down Expand Up @@ -197,6 +198,8 @@ def fetchMetric(query, metric_code, connection, connectiondata, time_start, time
# is there a custom query for this metric and dbms?
if 'metrics' in connectiondata['monitoring'] and metric_code in connectiondata['monitoring']['metrics']:
metric = connectiondata['monitoring']['metrics'][metric_code].copy()
else:
metric = metrics.metrics[metric_code]
#print(metric)
# this yields seconds
# is there a global timeshift
Expand All @@ -208,7 +211,11 @@ def fetchMetric(query, metric_code, connection, connectiondata, time_start, time
time_shift = 0
time_start = time_start + time_shift
time_end = time_end + time_shift
add_interval = int(connectiondata['monitoring']['extend'])
if 'extend' in connectiondata['monitoring']:
add_interval = int(connectiondata['monitoring']['extend'])
else:
add_interval = 0
#add_interval = int(connectiondata['monitoring']['extend'])
#add_interval = int(connectiondata['monitoring']['grafanaextend'])
time_start = time_start - add_interval
time_end = time_end + add_interval
Expand All @@ -228,7 +235,23 @@ def fetchMetric(query, metric_code, connection, connectiondata, time_start, time
return df
df = pd.DataFrame()
return df
def generatePlotForQuery(self, query):
def fetchMetricPerQuery(self, query):
times = self.benchmarker.protocol['query'][str(query)]
for m, metric in metrics.metrics.items():
intervals = {}
logging.debug("Metric "+m)
for c,t in times["starts"].items():
time_start = int(datetime.timestamp(datetime.strptime(times["starts"][c],'%Y-%m-%d %H:%M:%S.%f')))
time_end = int(datetime.timestamp(datetime.strptime(times["ends"][c],'%Y-%m-%d %H:%M:%S.%f')))
#if 'extend' in self.benchmarker.dbms[c].connectiondata['monitoring']:
# add_interval = int(self.benchmarker.dbms[c].connectiondata['monitoring']['extend'])
#else:
# add_interval = 0
#intervals[c] = time_end-time_start #+1# because of ceil()
logging.debug("Start={} End={}".format(time_start, time_end))
df = self.fetchMetric(query, m, c, self.benchmarker.dbms[c].connectiondata, time_start, time_end, self.benchmarker.path)
print(df)
def DEPRECATED_generatePlotForQuery(self, query):
times = self.benchmarker.protocol['query'][str(query)]
for m, metric in metrics.metrics.items():
intervals = {}
Expand All @@ -237,7 +260,11 @@ def generatePlotForQuery(self, query):
for c,t in times["starts"].items():
time_start = int(datetime.timestamp(datetime.strptime(times["starts"][c],'%Y-%m-%d %H:%M:%S.%f')))
time_end = int(datetime.timestamp(datetime.strptime(times["ends"][c],'%Y-%m-%d %H:%M:%S.%f')))
add_interval = int(self.benchmarker.dbms[c].connectiondata['monitoring']['extend'])
if 'extend' in self.benchmarker.dbms[c].connectiondata['monitoring']:
add_interval = int(self.benchmarker.dbms[c].connectiondata['monitoring']['extend'])
else:
add_interval = 0
#add_interval = int(self.benchmarker.dbms[c].connectiondata['monitoring']['extend'])
intervals[c] = time_end-time_start #+1# because of ceil()
df = metrics.fetchMetric(query, m, c, self.benchmarker.dbms[c].connectiondata, time_start, time_end, self.benchmarker.path)
if df.empty or len(df.index)==1:
Expand Down Expand Up @@ -320,7 +347,11 @@ def computeAverages(self):
m_n[c][m] = 0
m_sum[c][m] = 0
#logging.debug("Connection "+c)
add_interval = int(self.benchmarker.dbms[c].connectiondata['monitoring']['extend'])
if 'extend' in self.benchmarker.dbms[c].connectiondata['monitoring']:
add_interval = int(self.benchmarker.dbms[c].connectiondata['monitoring']['extend'])
else:
add_interval = 0
#add_interval = int(self.benchmarker.dbms[c].connectiondata['monitoring']['extend'])
csvfile = self.benchmarker.path+'/query_'+str(query)+'_metric_'+str(m)+'_'+c+'.csv'
if os.path.isfile(csvfile):
#print(csvfile)
Expand Down

0 comments on commit 39b63aa

Please sign in to comment.