Skip to content

Commit

Permalink
V0.5.18 (#97)
Browse files Browse the repository at this point in the history
* Prepare next release

* Docs: Intro image

* K8s: Removed ExtensionsV1beta1Api (no more beta)

* Started improving debug messages

* Requirements: Downgraded kubernetes client (kubernetes-client/python#1718)

* Loading: More infos about intended delay

* Improving debug messages

* Ignore spyproject

* Dockerimage of DBMS as parameter

* TPC-DS: Dialect of MonetDB for Q72 and Q23
  • Loading branch information
perdelt committed May 25, 2022
1 parent f9cd3d6 commit cee4b02
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 184 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ jars/*
build/*
bexhoma.egg-info/*
dist/*
.spyproject/*
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ It serves as the **orchestrator** [2] for distributed parallel benchmarking expe
This has been tested at Amazon Web Services, Google Cloud, Microsoft Azure, IBM Cloud und Oracle Cloud and at Minikube installations.

<p align="center">
<img src="https://raw.githubusercontent.com/Beuth-Erdelt/Benchmark-Experiment-Host-Manager/v0.5.6/docs/experiment-with-orchestrator.png" width="800">
<img src="https://raw.githubusercontent.com/Beuth-Erdelt/Benchmark-Experiment-Host-Manager/master/docs/experiment-with-orchestrator.png" width="800">
</p>

The basic workflow is [1,2]: start a virtual machine, install monitoring software and a database management system, import data, run benchmarks (external tool) and shut down everything with a single command.
Expand Down
16 changes: 6 additions & 10 deletions bexhoma/clusters.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""
Class to managing experiments in a Kubernetes cluster
:Date: 2022-05-01
:Version: 0.5
:Authors: Patrick Erdelt
Class to managing experiments in a cluster
Copyright (C) 2020 Patrick Erdelt
This program is free software: you can redistribute it and/or modify
Expand All @@ -23,7 +27,7 @@
import subprocess
import os
import time
from timeit import default_timer #as timer
from timeit import default_timer
import psutil
import logging
import socket
Expand All @@ -48,14 +52,6 @@ def __init__(self, clusterconfig='cluster.config', configfolder='experiments/',
self.code = code
masterK8s.testdesign.__init__(self, clusterconfig=clusterconfig, configfolder=configfolder, context=context, yamlfolder=yamlfolder, code=self.code, instance=instance, volume=volume, docker=docker, script=script, queryfile=queryfile)
self.max_sut = None
"""
self.code = code
if self.code is None:
self.code = str(round(time.time()))
self.path = self.resultfolder+"/"+self.code
if not path.isdir(self.path):
makedirs(self.path)
"""
self.experiments = []
def add_experiment(self, experiment):
self.experiments.append(experiment)
Expand Down
173 changes: 105 additions & 68 deletions bexhoma/configurations.py

Large diffs are not rendered by default.

97 changes: 63 additions & 34 deletions bexhoma/experiments.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
"""
:Date: 2018-08-22
:Version: 0.1
:Date: 2022-05-01
:Version: 0.5
:Authors: Patrick Erdelt
Demo of TPC-DS in a K8s Cluster.
Class for managing an experiment.
This is plugged into a cluster object.
It collects some configuation objects.
# Compare 4 DBMS on different HW
# 256 runs
# no delay
# Compare result sets
# 2x each DBMS
# MemSQL, OmniSci, MonetDB, PostgreSQL, maybe add MySQL, MariaDB, Kinetica?
# Limit 4 CPUs
Two examples included, dealing with TPC-H and TPC-DS tests.
This deals with the TPC-DS tests.
Copyright (C) 2020 Patrick Erdelt
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
#from bexhoma import *
from dbmsbenchmarker import parameter, tools, inspector
import logging
import urllib3
Expand Down Expand Up @@ -315,7 +323,7 @@ def zip(self):
# include sub directories
#cmd['zip_results'] = 'cd /results;zip -r {code}.zip {code}'.format(code=self.code)
#fullcommand = 'kubectl exec '+pod_dashboard+' -- bash -c "'+cmd['zip_results'].replace('"','\\"')+'"'
self.cluster.executeCTL(command=cmd['zip_results'], pod=pod_dashboard)#self.yamlfolder+deployment)
self.cluster.executeCTL(command=cmd['zip_results'], pod=pod_dashboard, container="dashboard")#self.yamlfolder+deployment)
#print(fullcommand)
#proc = subprocess.Popen(fullcommand, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
#stdout, stderr = proc.communicate()
Expand All @@ -340,26 +348,33 @@ def evaluate_results(self, pod_dashboard=''):
pods = self.cluster.getPods(component='dashboard')
pod_dashboard = pods[0]
# copy logs and yamls to result folder
print("Copy configuration and logs", end="", flush=True)
directory = os.fsencode(self.path)
for file in os.listdir(directory):
filename = os.fsdecode(file)
if filename.endswith(".log") or filename.endswith(".yml") or filename.endswith(".error"):
self.cluster.kubectl('cp '+self.path+"/"+filename+' '+pod_dashboard+':/results/'+str(self.code)+'/'+filename)
filename = os.fsdecode(file)
if filename.endswith(".log") or filename.endswith(".yml") or filename.endswith(".error"):
self.cluster.kubectl('cp '+self.path+"/"+filename+' '+pod_dashboard+':/results/'+str(self.code)+'/'+filename+' -c dashboard')
print(".", end="", flush=True)
print("done!")
cmd = {}
cmd['update_dbmsbenchmarker'] = 'git pull'#/'+str(self.code)
#fullcommand = 'kubectl exec '+pod_dashboard+' -- bash -c "'+cmd['update_dbmsbenchmarker'].replace('"','\\"')+'"'
self.cluster.executeCTL(command=cmd['update_dbmsbenchmarker'], pod=pod_dashboard)
self.cluster.executeCTL(command=cmd['update_dbmsbenchmarker'], pod=pod_dashboard, container="dashboard")
#print(fullcommand)
#proc = subprocess.Popen(fullcommand, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
#stdout, stderr = proc.communicate()
print("Join results ", end="", flush=True)
cmd['merge_results'] = 'python merge.py -r /results/ -c '+str(self.code)
self.cluster.executeCTL(command=cmd['merge_results'], pod=pod_dashboard)
self.cluster.executeCTL(command=cmd['merge_results'], pod=pod_dashboard, container="dashboard")
print("done!")
#fullcommand = 'kubectl exec '+pod_dashboard+' -- bash -c "'+cmd['merge_results'].replace('"','\\"')+'"'
#print(fullcommand)
#proc = subprocess.Popen(fullcommand, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
#stdout, stderr = proc.communicate()
print("Build evaluation cube ", end="", flush=True)
cmd['evaluate_results'] = 'python benchmark.py read -e yes -r /results/'+str(self.code)
self.cluster.executeCTL(command=cmd['evaluate_results'], pod=pod_dashboard)
self.cluster.executeCTL(command=cmd['evaluate_results'], pod=pod_dashboard, container="dashboard")
print("done!")
#fullcommand = 'kubectl exec '+pod_dashboard+' -- bash -c "'+cmd['evaluate_results'].replace('"','\\"')+'"'
#print(fullcommand)
#proc = subprocess.Popen(fullcommand, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
Expand Down Expand Up @@ -404,30 +419,41 @@ def add_benchmark_list(self, list_clients):
def work_benchmark_list(self, intervals=30, stop=True):
do = True
while do:
time.sleep(intervals)
#time.sleep(intervals)
self.wait(intervals)
# count number of running and pending pods
num_pods_running = len(self.cluster.getPods(app = self.appname, component = 'sut', status = 'Running'))
num_pods_pending = len(self.cluster.getPods(app = self.appname, component = 'sut', status = 'Pending'))
for config in self.configurations:
# count number of running and pending pods
num_pods_running = len(self.cluster.getPods(app = self.appname, component = 'sut', status = 'Running'))
num_pods_pending = len(self.cluster.getPods(app = self.appname, component = 'sut', status = 'Pending'))
# check if sut is running
if not config.sut_is_running():
print("{} is not running".format(config.configuration))
#print("{} is not running".format(config.configuration))
if not config.experiment_done:
if not config.sut_is_pending():
print("{} is not running yet - ".format(config.configuration), end="", flush=True)
if self.cluster.max_sut is not None:
print("{} running and {} pending pods".format(num_pods_running, num_pods_pending))
print("{} running and {} pending pods: max is {} pods in the cluster - ".format(num_pods_running, num_pods_pending, self.cluster.max_sut), end="", flush=True)
if num_pods_running+num_pods_pending < self.cluster.max_sut:
print("it will start now")
config.start_sut()
self.wait(10)
num_pods_pending = num_pods_pending + 1
#self.wait(10)
else:
print("it has to wait")
else:
print("it will start now")
config.start_sut()
self.wait(10)
num_pods_pending = num_pods_pending + 1
#self.wait(10)
else:
print("{} is pending".format(config.configuration))
continue
# check if loading is done
config.check_load_data()
# start loading
if not config.loading_started:
print("{} is not loaded".format(config.configuration))
if config.sut_is_running():
print("{} is not loaded yet".format(config.configuration))
if config.monitoring_active and not config.monitoring_is_running():
print("{} waits for monitoring".format(config.configuration))
if not config.monitoring_is_pending():
Expand All @@ -446,7 +472,7 @@ def work_benchmark_list(self, intervals=30, stop=True):
# config demands other delay
delay = config.dockertemplate['delay_prepare']
config.loading_after_time = now + timedelta(seconds=delay)
print("{} will start loading but not before {}".format(config.configuration, config.loading_after_time.strftime('%Y-%m-%d %H:%M:%S')))
print("{} will start loading but not before {} (that is in {} secs)".format(config.configuration, config.loading_after_time.strftime('%Y-%m-%d %H:%M:%S'), delay))
continue
# benchmark if loading is done and monitoring is ready
if config.loading_finished:
Expand Down Expand Up @@ -510,7 +536,8 @@ def work_benchmark_list(self, intervals=30, stop=True):
# status per pod
for p in pods:
status = self.cluster.getPodStatus(p)
print(p,status)
self.cluster.logger.debug('job-pod {} has status {}'.format(p, status))
#print(p,status)
if status == 'Succeeded':
#if status != 'Running':
self.cluster.store_pod_log(p)
Expand All @@ -528,21 +555,22 @@ def work_benchmark_list(self, intervals=30, stop=True):
# status per job
for job in jobs:
success = self.cluster.getJobStatus(job)
print(job, success)
self.cluster.logger.debug('job {} has status {}'.format(job, success))
#print(job, success)
if success:
self.cluster.deleteJob(job)
if len(pods) == 0 and len(jobs) == 0:
do = False
for config in self.configurations:
#if config.sut_is_pending() or config.loading_started or len(config.benchmark_list) > 0:
if config.sut_is_pending():
print("{} pending".format(config.configuration))
self.cluster.logger.debug("{} pending".format(config.configuration))
do = True
if not config.loading_started:
print("{} not loaded".format(config.configuration))
self.cluster.logger.debug("{} not loaded".format(config.configuration))
do = True
if len(config.benchmark_list) > 0:
print("{} still benchmarks to run".format(config.configuration))
self.cluster.logger.debug("{} still benchmarks to run".format(config.configuration))
do = True
def benchmark_list(self, list_clients):
for i, parallelism in enumerate(list_clients):
Expand Down Expand Up @@ -596,6 +624,7 @@ def benchmark_list(self, list_clients):
break
def stop_benchmarker(self, configuration=''):
# all jobs of configuration - benchmarker
self.cluster.logger.debug("experiment.stop_benchmarker({})".format(configuration))
app = self.appname
component = 'benchmarker'
jobs = self.cluster.getJobs(app, component, self.code, configuration)
Expand Down

0 comments on commit cee4b02

Please sign in to comment.