Define TestRunner and TestPlan

In [None]:
import time
import json
import pandas as pd

import warnings
warnings.filterwarnings('ignore')

from importlib import reload
import logging
reload(logging)
logging.basicConfig(format='%(asctime)s %(message)s',level=logging.INFO)

from elasticsearch import Elasticsearch
from clickhouse_driver import Client

import seaborn as sns
import matplotlib.pyplot as plt
sns.set_theme(style="ticks", color_codes=True)

class TestRunner:
    def __init__(self, name, config):
        self.name = name
        self.config = config
    
    def run(self, case):
        pass

class ElasticsearchRuner(TestRunner):
    def __init__(self, name, config):
        TestRunner.__init__(self, name, config)
        self.client = Elasticsearch([self.config['cluster']])
        self.indices = self.config['indices']
           
    def run(self, case):
        query = case
        
        logging.debug(f'{self.name} run query {case}')
        start = time.time()
        response = self.client.search(index=self.indices, body=query)
        end = time.time()
        logging.debug(f'{self.name} run query complete')
        
        result = {}
        result['count'] = response['hits']['total']['value']
        result['response'] = response
        result['elapsed'] = end - start
        
        logging.debug(f'query result count {result["count"]} elapsed {result["elapsed"]}')
        return result
        
class ClickhouseRuner(TestRunner):
    def __init__(self, name, config):
        TestRunner.__init__(self, name, config)
        self.client = Client(self.config['cluster'])
    
    def run(self, case):
        query = case
        logging.debug(f'{self.name} run query {case}')
        start = time.time()
        response = self.client.execute(query)
        end = time.time()
        logging.debug(f'{self.name} run query complete')
        
        result = {}
        result['count'] = len(response)
        result['response'] = response
        result['elapsed'] = end - start
        
        logging.debug(f'query result count {result["count"]} elapsed {result["elapsed"]}')
        return result
    
class TestPlan:
    def __init__(self, planfile):
        with open(planfile) as json_file:
            self.plan = json.load(json_file)
            self.targets = []
            for target in self.plan['targets']:
                if target['type'] == 'elasticsearch':
                    runner = ElasticsearchRuner(target['name'], target['config'])
                    self.targets.append(runner)
                elif target['type'] == 'clickhouse':
                    runner = ClickhouseRuner(target['name'], target['config'])
                    self.targets.append(runner)
    
    def run(self):
        reports = []
        for testcase in self.plan['testcases']:
            name, description, cases = testcase
            report = {}
            report['name'] = name
            report['description'] = description
            report['result'] = []
            logging.info(f'run testcase {name} {description}')
            tests = list(zip(self.targets, cases))
            for test in tests:
                runner, query = test
                result = {}
                result['target'] = runner.name
                result['case'] = str(query)
                
                elapsed = []
                for i in range(self.plan['runs']+1):
                    run_result = runner.run(query)
                    elapsed.append(run_result['elapsed'])
                    
                result['elapsed'] = elapsed
                result['elapsed_total'] = sum(elapsed[1:])
                report['result'].append(result)
                
            logging.info(f'run testcase {name} complete')
            reports.append(report)
        return self._summary(reports), self._detail(reports)
    
    def _summary(self, reports):
        #convert reports data to dataframe
        data = {}
        columns = []

        for report in reports:
            for result in report['result']:
                if 'name' not in data:
                    data['name'] = []
                    columns.append('name')
                data['name'].append(report['name'])
                
                if 'description' not in data:
                    data['description'] = []
                    columns.append('description')
                data['description'].append(report['description'])

                if 'target' not in data:
                    data['target'] = []
                    columns.append('target')
                data['target'].append(result['target'])

                if 'elapsed_total' not in data:
                    data['elapsed_total'] = []
                    columns.append('elapsed_total')
                data['elapsed_total'].append(result['elapsed_total'])

        summary_df = pd.DataFrame (data, columns = columns)
        return summary_df
    
    def _detail(self, reports):
        data = {}
        columns = []

        for report in reports:
            for result in report['result']:
                for elapsed in result['elapsed']:
                    if 'name' not in data:
                        data['name'] = []
                        columns.append('name')
                    data['name'].append(report['name'])

                    if 'description' not in data:
                        data['description'] = []
                        columns.append('description')
                    data['description'].append(report['description'])

                    if 'target' not in data:
                        data['target'] = []
                        columns.append('target')
                    data['target'].append(result['target'])

                    if 'elapsed' not in data:
                        data['elapsed'] = []
                        columns.append('elapsed')
                    data['elapsed'].append(elapsed)

        detail_df = pd.DataFrame (data, columns = columns)
        return detail_df
        

Sample to run a test query

In [None]:
clickhouse_config = { "cluster" : "host.docker.internal"} 
clickhouse_runner = ClickhouseRuner('clickhouse', clickhouse_config)

query = 'SELECT * FROM syslog'
clickhouse_runner.run(query)

es_config = { "cluster" : "host.docker.internal", "indices" : "syslog-2021-02-24,syslog-2021-02-25"} 
es_runner = ElasticsearchRuner('es', es_config)

query = {
    "query": {
        "match_all": {}
    }
}
es_runner.run(query)

Run a test plan and show the test result

In [None]:
testplan = TestPlan("testplan.json")
summary_report, detail_report = testplan.run()
detail_report.head()

Detail report

In [None]:
sns.catplot(x="elapsed", y="target", row="description", kind="swarm", 
            orient="h", height=1.5, aspect=5, 
            data=detail_report)

Summary report

In [None]:
sns.barplot(x="description", y="elapsed_total", hue="target", data=summary_report)