Skip to content

Commit

Permalink
Bexhoma: Default summary for DBMSBenchmarker follows TPC-H
Browse files Browse the repository at this point in the history
  • Loading branch information
perdelt committed May 2, 2024
1 parent eb70352 commit fe77357
Show file tree
Hide file tree
Showing 3 changed files with 430 additions and 110 deletions.
217 changes: 107 additions & 110 deletions bexhoma/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,113 @@ def end_loading(self, jobname):
self.evaluator.end_loading(jobname)
def show_summary(self):
self.cluster.logger.debug('default.show_summary()')
pass
print("\n## Show Summary")
pd.set_option("display.max_rows", None)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
resultfolder = self.cluster.config['benchmarker']['resultfolder']
code = self.code
with open(resultfolder+"/"+code+"/queries.config",'r') as inp:
workload_properties = ast.literal_eval(inp.read())
print("\n### Workload\n "+workload_properties['name'])
print(" "+workload_properties['intro'])
print(" "+workload_properties['info'])
print("\n### Connections")
with open(resultfolder+"/"+code+"/connections.config",'r') as inf:
connections = ast.literal_eval(inf.read())
pretty_connections = json.dumps(connections, indent=2)
#print(pretty_connections)
connections_sorted = sorted(connections, key=lambda c: c['name'])
for c in connections_sorted:
print(c['name'],
"uses docker image",
c['parameter']['dockerimage'])
infos = [" {}:{}".format(key,info) for key, info in c['hostsystem'].items() if not 'timespan' in key and not info=="" and not str(info)=="0" and not info==[]]
for info in infos:
print(info)
evaluate = inspector.inspector(resultfolder)
evaluate.load_experiment(code=code, silent=True)
#####################
print("\n### Errors")
print(evaluate.get_total_errors().T)
#####################
print("\n### Warnings")
print(evaluate.get_total_warnings().T)
#####################
print("\n### Latency of Timer Execution [ms]")
df = evaluate.get_aggregated_query_statistics(type='latency', name='execution', query_aggregate='Mean')
if not df is None:
print(df.sort_index().T.round(2))
#####################
print("\n### Loading [s]")
times = {}
for c, connection in evaluate.benchmarks.dbms.items():
times[c]={}
if 'timeGenerate' in connection.connectiondata:
times[c]['timeGenerate'] = connection.connectiondata['timeGenerate']
if 'timeIngesting' in connection.connectiondata:
times[c]['timeIngesting'] = connection.connectiondata['timeIngesting']
if 'timeSchema' in connection.connectiondata:
times[c]['timeSchema'] = connection.connectiondata['timeSchema']
if 'timeIndex' in connection.connectiondata:
times[c]['timeIndex'] = connection.connectiondata['timeIndex']
if 'timeLoad' in connection.connectiondata:
times[c]['timeLoad'] = connection.connectiondata['timeLoad']
df = pd.DataFrame(times)
df = df.reindex(sorted(df.columns), axis=1)
print(df.round(2).T)
#####################
print("\n### Geometric Mean of Medians of Timer Run [s]")
df = evaluate.get_aggregated_experiment_statistics(type='timer', name='run', query_aggregate='Median', total_aggregate='Geo')
df = (df/1000.0).sort_index()
df.columns = ['Geo Times [s]']
print(df.round(2))
#####################
print("\n### Power@Size")
df = evaluate.get_aggregated_experiment_statistics(type='timer', name='execution', query_aggregate='Median', total_aggregate='Geo')
df = (df/1000.0).sort_index().astype('float')
df = float(parameter.defaultParameters['SF'])*3600./df
df.columns = ['Power@Size [~Q/h]']
print(df.round(2))
#####################
# aggregate time and throughput for parallel pods
print("\n### Throughput@Size")
df_merged_time = pd.DataFrame()
for connection_nr, connection in evaluate.benchmarks.dbms.items():
df_time = pd.DataFrame()
c = connection.connectiondata
connection_name = c['name']
orig_name = c['orig_name']
eva = evaluate.get_experiment_connection_properties(c['name'])
df_time.index = [connection_name]
#df_time['SF'] = int(SF)
#print(c)
df_time['orig_name'] = orig_name
df_time['SF'] = int(c['parameter']['connection_parameter']['loading_parameters']['SF'])
df_time['pods'] = int(c['parameter']['connection_parameter']['loading_parameters']['PODS_PARALLEL'])
#df_time['threads'] = int(c['parameter']['connection_parameter']['loading_parameters']['MYSQL_LOADING_THREADS'])
df_time['num_experiment'] = int(c['parameter']['numExperiment'])
df_time['num_client'] = int(c['parameter']['client'])
df_time['benchmark_start'] = eva['times']['total'][c['name']]['time_start']
df_time['benchmark_end'] = eva['times']['total'][c['name']]['time_end']
df_merged_time = pd.concat([df_merged_time, df_time])
df_time = df_merged_time.sort_index()
benchmark_start = df_time.groupby(['orig_name', 'SF', 'num_experiment', 'num_client']).min('benchmark_start')
benchmark_end = df_time.groupby(['orig_name', 'SF', 'num_experiment', 'num_client']).max('benchmark_end')
df_benchmark = pd.DataFrame(benchmark_end['benchmark_end'] - benchmark_start['benchmark_start'])
df_benchmark.columns = ['time [s]']
benchmark_count = df_time.groupby(['orig_name', 'SF', 'num_experiment', 'num_client']).count()
df_benchmark['count'] = benchmark_count['benchmark_end']
df_benchmark['SF'] = df_benchmark.index.map(lambda x: x[1])
df_benchmark['Throughput@Size [~GB/h]'] = (22*3600*df_benchmark['count']/df_benchmark['time [s]']*df_benchmark['SF']).round(2)
index_names = list(df_benchmark.index.names)
index_names[0] = "DBMS"
df_benchmark.rename_axis(index_names, inplace=True)
print(df_benchmark)
#####################
self.show_summary_monitoring()
def show_summary_monitoring_table(self, evaluate, component):
df_monitoring = list()
##########
Expand Down Expand Up @@ -1378,115 +1484,6 @@ def set_queries_full(self):
self.set_queryfile('queries-tpch.config')
def set_queries_profiling(self):
self.set_queryfile('queries-tpch-profiling.config')
def show_summary(self):
self.cluster.logger.debug('tpch.show_summary()')
print("\n## Show Summary")
pd.set_option("display.max_rows", None)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
resultfolder = self.cluster.config['benchmarker']['resultfolder']
code = self.code
with open(resultfolder+"/"+code+"/queries.config",'r') as inp:
workload_properties = ast.literal_eval(inp.read())
print("\n### Workload\n "+workload_properties['name'])
print(" "+workload_properties['intro'])
print(" "+workload_properties['info'])
print("\n### Connections")
with open(resultfolder+"/"+code+"/connections.config",'r') as inf:
connections = ast.literal_eval(inf.read())
pretty_connections = json.dumps(connections, indent=2)
#print(pretty_connections)
connections_sorted = sorted(connections, key=lambda c: c['name'])
for c in connections_sorted:
print(c['name'],
"uses docker image",
c['parameter']['dockerimage'])
infos = [" {}:{}".format(key,info) for key, info in c['hostsystem'].items() if not 'timespan' in key and not info=="" and not str(info)=="0" and not info==[]]
for info in infos:
print(info)
evaluate = inspector.inspector(resultfolder)
evaluate.load_experiment(code=code, silent=True)
#####################
print("\n### Errors")
print(evaluate.get_total_errors().T)
#####################
print("\n### Warnings")
print(evaluate.get_total_warnings().T)
#####################
print("\n### Latency of Timer Execution [ms]")
df = evaluate.get_aggregated_query_statistics(type='latency', name='execution', query_aggregate='Mean')
if not df is None:
print(df.sort_index().T.round(2))
#####################
print("\n### Loading [s]")
times = {}
for c, connection in evaluate.benchmarks.dbms.items():
times[c]={}
if 'timeGenerate' in connection.connectiondata:
times[c]['timeGenerate'] = connection.connectiondata['timeGenerate']
if 'timeIngesting' in connection.connectiondata:
times[c]['timeIngesting'] = connection.connectiondata['timeIngesting']
if 'timeSchema' in connection.connectiondata:
times[c]['timeSchema'] = connection.connectiondata['timeSchema']
if 'timeIndex' in connection.connectiondata:
times[c]['timeIndex'] = connection.connectiondata['timeIndex']
if 'timeLoad' in connection.connectiondata:
times[c]['timeLoad'] = connection.connectiondata['timeLoad']
df = pd.DataFrame(times)
df = df.reindex(sorted(df.columns), axis=1)
print(df.round(2).T)
#####################
print("\n### Geometric Mean of Medians of Timer Run [s]")
df = evaluate.get_aggregated_experiment_statistics(type='timer', name='run', query_aggregate='Median', total_aggregate='Geo')
df = (df/1000.0).sort_index()
df.columns = ['Geo Times [s]']
print(df.round(2))
#####################
print("\n### TPC-H Power@Size")
df = evaluate.get_aggregated_experiment_statistics(type='timer', name='execution', query_aggregate='Median', total_aggregate='Geo')
df = (df/1000.0).sort_index().astype('float')
df = float(parameter.defaultParameters['SF'])*3600./df
df.columns = ['Power@Size [~Q/h]']
print(df.round(2))
#####################
# aggregate time and throughput for parallel pods
print("\n### TPC-H Throughput@Size")
df_merged_time = pd.DataFrame()
for connection_nr, connection in evaluate.benchmarks.dbms.items():
df_time = pd.DataFrame()
c = connection.connectiondata
connection_name = c['name']
orig_name = c['orig_name']
eva = evaluate.get_experiment_connection_properties(c['name'])
df_time.index = [connection_name]
#df_time['SF'] = int(SF)
#print(c)
df_time['orig_name'] = orig_name
df_time['SF'] = int(c['parameter']['connection_parameter']['loading_parameters']['SF'])
df_time['pods'] = int(c['parameter']['connection_parameter']['loading_parameters']['PODS_PARALLEL'])
#df_time['threads'] = int(c['parameter']['connection_parameter']['loading_parameters']['MYSQL_LOADING_THREADS'])
df_time['num_experiment'] = int(c['parameter']['numExperiment'])
df_time['num_client'] = int(c['parameter']['client'])
df_time['benchmark_start'] = eva['times']['total'][c['name']]['time_start']
df_time['benchmark_end'] = eva['times']['total'][c['name']]['time_end']
df_merged_time = pd.concat([df_merged_time, df_time])
df_time = df_merged_time.sort_index()
benchmark_start = df_time.groupby(['orig_name', 'SF', 'num_experiment', 'num_client']).min('benchmark_start')
benchmark_end = df_time.groupby(['orig_name', 'SF', 'num_experiment', 'num_client']).max('benchmark_end')
df_benchmark = pd.DataFrame(benchmark_end['benchmark_end'] - benchmark_start['benchmark_start'])
df_benchmark.columns = ['time [s]']
benchmark_count = df_time.groupby(['orig_name', 'SF', 'num_experiment', 'num_client']).count()
df_benchmark['count'] = benchmark_count['benchmark_end']
df_benchmark['SF'] = df_benchmark.index.map(lambda x: x[1])
df_benchmark['Throughput@Size [~GB/h]'] = (22*3600*df_benchmark['count']/df_benchmark['time [s]']*df_benchmark['SF']).round(2)
index_names = list(df_benchmark.index.names)
index_names[0] = "DBMS"
df_benchmark.rename_axis(index_names, inplace=True)
print(df_benchmark)
#####################
self.show_summary_monitoring()



Expand Down
2 changes: 2 additions & 0 deletions bexhoma/scripts/experimentsmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import pandas as pd
from tabulate import tabulate
from datetime import datetime
from prettytable import PrettyTable, ALL

urllib3.disable_warnings()
logging.basicConfig(level=logging.ERROR)

Expand Down

0 comments on commit fe77357

Please sign in to comment.