## docs

https://clickhouse-driver.readthedocs.io/en/latest/index.html

In [None]:
import sys
import json
import pandas as pd
from uuid import uuid4

import ipywidgets as widgets
from IPython.display import display
from IPython.core.display import Markdown
import seaborn as sns
import matplotlib.pyplot as plt

sns.set(style="whitegrid")

from clickhouse_driver import Client

import rafal

# Config loading : modify in the config.json file
config = json.load(open('config.json'))
user= config['Rafal.user']
url = config['Rafal.url']
print(url)

pwd_exists = bool(config.get('Rafal.password', None)) 
wPwd= widgets.Password(value= '',
                      placeholder= 'from config' if pwd_exists else 'Enter password',
                      disabled= pwd_exists
                     )

display(widgets.HBox([widgets.Label('Password for Rafal API :'), wPwd]))

# proxy parameters
proxies = (config['proxies'] if config['proxies']['http'] or config['proxies']['https'] 
           else None)

In [None]:
from datetime import datetime, timezone
import pytz
tz= pytz.timezone('Europe/Paris')
tst= tz.localize(datetime.now())

In [None]:
def chk_to_dataframe(ckh_result):
    """Convert a clickhouse query result into Dataframe"""
    return pd.DataFrame.from_records(ckh_result[0], columns= [col for col, _ in ckh_result[1]])

def benchmarkQueries(client, queryDict, nb= 5, verbose= True):
    """compute perf stats on a dictionary of ClickHouse Queries
        queryDict: dict of name: query
            name = label name of the quey (any string)
            query = clickhouse query sent to the clickhouse driver
        nb: number of iteration to run the benchmark (default = 5)
        
        return: 
            tuple of (stats_dataframe, list_of_queries_result)
    """
    stats=[]
    
    for i in range(nb):
        dfs= []
        for name, query in queryDict.items():
            dfs.append(client.execute(query, with_column_types= True, settings= settings))
            dd= dict(qname=name, i=i, elapsed= client.last_query.elapsed, **client.last_query.profile_info.__dict__)
            if verbose:
                print(dd)
            else:
                print('.', end='')
            stats.append(dd)
    client.disconnect()
    return pd.DataFrame.from_records(stats), dfs

def plot_duration(stats, title= None, ax= None):
    """Draw a barplot to show average query duration"""
    nb= len(stats.i.unique())
    g = sns.barplot(x="qname", y="elapsed", data= stats, palette="muted", ax= ax)
    g.set_title(title + f'\n(median on {nb} iterations)')
    g.set_ylabel("duration in sec (avg)")
    g.set_xlabel("")
    sns.despine(left=True)
    return g

def getQueryStats(client_name= 'ClickHouse python-driver', os_user= None, fromDateTime= None, query= None, limit= 300):
    """ request query stats from table [distributed_query_log]
    """
    wTst = (f"and query_start_time >= '{fromDateTime.astimezone(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}'" 
            if fromDateTime else '')
    wOs = f"and os_user= '{os_user}'" if os_user else ''
    query_mon= f"""select `type`, `event_date`, `event_time`, `query_start_time`, `query_duration_ms`, `read_rows`, `read_bytes`, `written_rows`, `written_bytes`, 
        `result_rows`, `result_bytes`, `memory_usage`,`query`, `is_initial_query`,`user`, `query_id` ,
        `os_user`, `client_hostname`, `Settings.Names`, `Settings.Values`
    from distributed_query_log dql 
    where client_name = '{client_name}' {wOs} {wTst} and type != 'QueryStart'
    order by event_time desc limit {limit}"""
    res= client.execute(query_mon, with_column_types= True, settings= settings)
    df= chk_to_dataframe(res)
    if query:
        df= df[df['query'] == query]
    return df.reset_index()

In [None]:
host, port= url.replace('http://', '').split(':')
host, port

In [None]:
settings = {'max_block_size': 100000}

In [None]:
client = Client(host= host, port= 9000, password= 'thisIsADevPassword', settings= settings)

In [None]:
client.execute('SHOW DATABASES')

In [None]:
res= client.execute('SHOW CREATE TABLE mtms')
res

## setup queries

In [None]:
where1= """((scenario GLOBAL IN 
    (
        SELECT DISTINCT scenario
        FROM default.diffusions
        WHERE ((diffusions[
        (
            SELECT indexOf(dates, toDate('2047-06-05'))
            FROM datearray
        )]) < 0.34) AND (SensId = 2)
    )) AND ((trade >= 0.) AND (trade <= 950.)) AND (toUInt64(csaId) IN 
    (
        SELECT csaId
        FROM dict.csaId
        WHERE csa IN ('BANK01')
    )) AND (Date IN ('2019-04-16'))) AND ((Date = '2019-04-16') AND (commit IN (1)))"""

In [None]:
where2= """((trade >= 0.) AND (trade <= 950.)) AND (commit IN (1))"""

In [None]:
query_ref = """
SELECT *
FROM (
SELECT    mtm, dates, scenario
FROM (
    SELECT    mtm, mtms_mtms_datearray_dates, scenario
    FROM
        (
        SELECT
            scenario,
            {func}ForEach(arrayMap(x -> x * factor, mtms)) AS mtm
        FROM
            mtms
        WHERE {where}
        GROUP BY scenario
        ) ARRAY JOIN mtm,
        arrayEnumerate(mtm) as mtms_mtms_datearray_dates ) ALL
INNER JOIN (
    SELECT
        dates,    mtms_mtms_datearray_dates
    FROM
        datearray ARRAY JOIN arrayEnumerate(dates) as mtms_mtms_datearray_dates,
        dates )
        USING mtms_mtms_datearray_dates
) WHERE {WhereArray}
"""

In [None]:
query_new= """
SELECT   {func}Merge(MtmsAggreg) AS mtm,   fdate,   scenario
FROM
(
   SELECT       MtmsAggregs,       dates AS fdates,       scenario
   FROM
   (
       SELECT           scenario,           Date,
           {func}StateForEach(arrayMap(x -> (x * factor), mtms)) AS MtmsAggregs
       FROM mtms
       WHERE {where}
       GROUP BY
           scenario,
           Date
   )
   INNER JOIN
   (
       SELECT dates, Date FROM datearraywithdate
   ) USING (Date)
)
ARRAY JOIN
   arrayResize(fdates, length(MtmsAggregs)) AS fdate,
   MtmsAggregs AS MtmsAggreg
WHERE {WhereArray}
GROUP BY
   fdate,
   scenario
"""

In [None]:
query_new2= """
SELECT   sum(MtmsAggreg) AS mtm,   fdate,   scenario
FROM
(
   SELECT       MtmsAggregs,       dates AS fdates,       scenario
   FROM
   (
       SELECT           scenario,           Date,
           sumForEach(arrayMap(x -> (x * factor), mtms)) AS MtmsAggregs
       FROM mtms
       WHERE {where}
       GROUP BY
           scenario,
           Date
   )
   INNER JOIN
   (
       SELECT dates, Date FROM datearraywithdate
   ) USING (Date)
)
ARRAY JOIN
   arrayResize(fdates, length(MtmsAggregs)) AS fdate,
   MtmsAggregs AS MtmsAggreg
WHERE {WhereArray}
GROUP BY
   fdate,
   scenario
"""

In [None]:
# nb of iterations
nb= 5
# where clause
wheres= dict(narrow= where1, large= where2)
# metric function
funcs= ['sum', 'count', 'avg']

# where clause & nb of iterations
#filters= dict(narrow= (where1, 2), large= (where2, 2))
WhereArray= "1=1"

In [None]:
runs= [{'func': func,
        'where': where,
        'name': f"{func} agg\n{name} where",
        'queries': dict(query_ref= query_ref.format(where= where, func= func, WhereArray= WhereArray),
                        query_new= query_new.format(where= where, func= func, WhereArray= WhereArray),
                       ),
        'iterations': nb}
       for func in funcs for name, where in wheres.items()]
       #for name, (where, nb) in filters.items()]

for run in runs:
    if run['func'] == 'sum':
        run['queries']['query_sum']= query_new2.format(where= run['where'], WhereArray= WhereArray)
        #print(run['queries']['query_sum'])

In [None]:
stats0= {}
dfs= {}
g= {}

In [None]:
aggDict= dict(duration_sec_avg= ('elapsed', 'median'), 
              rows= ('rows', 'median'), 
              blocks= ('blocks', 'median'), 
              bytes= ('bytes', 'median'),
              nb= ('elapsed', 'count'),
             )

In [None]:
#for run, ax in zip(runs, axes):
for run in runs:
    name= run['name']
    print(f"\n--> Running serie '{name}' with {run['iterations']} iterations :")
    stats0[name], dfs[name]= benchmarkQueries(client, 
                                              queryDict= run['queries'], 
                                              nb= run['iterations'], 
                                              verbose= False)
    #g[name]= plot_duration(stats0[name], title= f"{name} filter", ax= ax)
    print('\n', f"{name} filter", 'with WHERE clause :\n', run['where'])

stats= pd.concat(stats0, names=['serie'])
stats

## Distributed query log

In [None]:
client_name= 'ClickHouse python-driver'
client_hostname= 'LAPTOP-M7C54VOV'
os_user= 'chdec'

In [None]:
%%time
# request query stats from table [distributed_query_log], filtered by previous queries
dropFields= ['query', 'is_initial_query', 'client_hostname', 'query_start_time']
distQueryLog= pd.concat({run['name']: pd.concat({name: getQueryStats(fromDateTime= tst, query= query) 
                                                 for name, query in run['queries'].items()}, 
                                                names= ['qname']) 
                         for run in runs}, names= ['serie']
                     ).drop(dropFields, axis= 1)
distQueryLog.index.names = ['serie', 'qname', 'i']

# concat results with clickHouse client stats
ckhqStats= pd.concat([stats.reset_index(level=1, drop= True).set_index(['qname', 'i'], append= True), 
                      distQueryLog], axis= 1
                    ).assign(query_duration_sec= lambda x: x.query_duration_ms/1000)

ckhqStats.head()

In [None]:
# plot a seaborn bar graph, with average stats
fig1, axes = plt.subplots(2,2)
fig1.set_size_inches(16, 14)
data= ckhqStats.reset_index()
flds= ["elapsed", "query_duration_sec", "memory_usage", "read_bytes"]
for field, ax in zip(flds, axes.flatten()):
    g = sns.barplot(x="serie", y= field, hue= "qname",data= data, palette="muted", ax= ax)
    g.set_title(field)
    g.set_ylabel(f"{field} (avg)")
    g.set_xlabel("")
    g.set_xticklabels(g.get_xticklabels(), 
                      rotation= 35, 
                      horizontalalignment='right',
                      fontweight='light',
                      fontsize='large')

plt.suptitle(f"Benchmark on {url}\n{nb} iterations for each, {tst:at %Y-%m-%d %H:%M [UTC %z]}", 
             fontsize= 'x-large', y=1.03)
plt.tight_layout(pad=0.4, w_pad=0.5, h_pad=1.0)
sns.despine(left=True)

In [None]:
aggDict2= dict(elapsed= 'median', query_duration_sec= 'median', read_bytes= 'median', result_rows='median',
               memory_usage= 'median', event_time= 'first', query_id= 'first')
queryReport= ckhqStats.groupby(['serie', 'qname']
                                 ).agg(aggDict2).unstack(0)
delta = queryReport._get_numeric_data().apply(lambda df: df.loc['query_new'] / df.loc['query_ref'] -1)
fmt_cols= {col: "{:,}"  for col, typ in queryReport.dtypes.iteritems() if typ == 'int64'}
fmt_cols.update({('memory_usage', 'large'): "{:,.0f}", ('memory_usage', 'narrow'): "{:,.0f}"})
queryReport.style.format(fmt_cols)

### Cache effet ?

In [None]:
# % increase of first query duration compared to median duration
cacheEffect= ckhqStats.xs(0, level='i').query_duration_sec - ckhqStats.groupby(level=[0,1]).query_duration_sec.median()
cacheEffect

In [None]:
# max cache value in sec
cacheEffect.index[cacheEffect.argmax()], cacheEffect.iloc[cacheEffect.argmax()]

In [None]:
ckhqStats.loc[('sum agg\nnarrow where')].query_duration_sec#('query_ref', level='qname')