In [None]:
%matplotlib nbagg
import matplotlib.pyplot as plt
##
import json
import re
##
import numpy           as np
import pandas          as pd
import statsmodels.api as sm
##
from elasticsearch     import Elasticsearch, helpers
from elasticsearch_dsl import Search, A, Q

##
import thundermint.plot  as plot
import thundermint.splot as splot

In [None]:
with open("credentials.json") as f:
    credentials = json.load(f)
es = Elasticsearch("%s:%s@elastic.hxr.team" % (credentials['login'], credentials['password']), 
                   port=443, 
                   use_ssl=True,
                   )

# Global parameters for search

In [None]:
index = 'xenochain-2018-11-08'

# Queries

In [None]:
# -------------------------------------------------------------------
# Primitive filters

def flt_host(host) :
    "filter by cluster"
    return lambda s : s.filter('term', host=host)
def flt_cluster(cluster) :
    "filter by cluster"
    return lambda s : s.filter('term', env=cluster)
def flt_severity(sev) :
    "Filter by severity"
    return lambda s : s.filter('term',  sev=sev)
def flt_namespace(ns) :
    "Filter on namespace"
    return lambda s : s.filter('term',  ns=ns)

flt_consensus = flt_namespace('consensus')
flt_mempool   = flt_namespace('mempool')
flt_net       = flt_namespace('net')

def flt_time(start, delta):
    "Filter time range"
    (h1,m1) = start
    (dh,dm) = delta
    h2 = h1 + dh + (m1+dm) // 60
    m2 = (m1+dm) % 60   
    r  = re.match("^.*-(\d+-\d+-\d+)", index)
    t1 = "%sT%02i:%02i:00Z" % (r.group(1), h1, m1)
    t2 = "%sT%02i:%02i:00Z" % (r.group(1), h2, m2)
    return lambda s : s.filter('range', at ={"gte":t1, "lt":t2})

def make_seatch(cluster, filters) :
    "Create Search object from filters"
    s = Search(using=es, index=index)
    for f in filters:
        s = f(s)
    return s

# -------------------------------------------------------------------
# Postprocessing

def postprocess_entries(s) :
    r       = pd.DataFrame.from_records([x.to_dict() for x in s.scan()])
    r['at'] = pd.to_datetime(r['at'])
    r       = r.sort_values('at')
    return r

def postprocess_mempool(r):
    r['size']      = r['data'].apply(lambda x: x['size'])
    r['filtered']  = r['data'].apply(lambda x: x['filtered'])
    r['added']     = r['data'].apply(lambda x: x['added'])
    r['discarded'] = r['data'].apply(lambda x: x['discarded'])
    r = r.drop('data', axis=1)
    return r

def split_on_host(df):
    return {k : d.reset_index() for k,d in df.groupby('host')}

# -------------------------------------------------------------------
# Queries

def q_agg_uniq(cluster, filters, field):
    "Aggregate by unique field"
    s = make_seatch(cluster, filters)
    s.aggs.bucket('unique_ids', A('terms', field=field))
    return s[0:0].execute().aggregations.unique_ids.buckets

def q_scan(cluster, filters, host=None, source=['at','msg','data','host'], postprocess=[]):
    "Fetch data from elastic"
    s = make_seatch(cluster, filters).source(source)
    s = flt_cluster(cluster)(s)
    if host is not None:
        s = flt_host(host)(s)
    r = postprocess_entries(s)
    for f in postprocess:
        r = f(r)
    if host is None:
        r = split_on_host(r)
    return r

In [None]:
def distinct_fields(field):
    "Count distinct field values in current index"
    s = Search(using=es, index=index)[0:0]
    s.aggs.bucket('unique_ids', A('terms', field=field))
    return s.execute().aggregations.unique_ids.buckets

def query_consensus(start, delta, cluster):
    "Fetch data for logs about consensus"
    (t1,t2) = time_range(start, delta)
    s = Search(using=es, index=index).\
        filter('term',  env=cluster).\
        filter('term',  ns ='consensus').\
        filter('range', at ={"gte":t1, "lt":t2}).\
        source(['at','msg','data','host'])
    return split_on_host(postprocess_entries(s))

def query_mempool(start, delta, cluster):
     # Query
    (t1,t2) = time_range(start, delta)
    s = Search(using=es, index=index).\
        filter('term',  env=cluster).\
        filter('term',  ns ='mempool').\
        filter('range', at ={"gte":t1, "lt":t2}).\
        source(['at','msg','data','host'])
    r = postprocess_entries(s)
    r['size']      = r['data'].apply(lambda x: x['size'])
    r['filtered']  = r['data'].apply(lambda x: x['filtered'])
    r['added']     = r['data'].apply(lambda x: x['added'])
    r['discarded'] = r['data'].apply(lambda x: x['discarded'])
    r = r.drop('data', axis=1)
    return split_on_host(r)

def query_errors(cluster) :
    s = Search(using=es, index=index).\
        filter('term', env=cluster).\
        filter('term', sev='Error').\
        source(['at','msg','data','host'])
    r = postprocess_entries(s)
    return split_on_host(r)

# Plots

In [None]:
logsC = q_scan('atum', [flt_namespace('consensus'),
                        flt_time((11,0), (0,5))])
logsM = q_scan('atum', [flt_namespace('mempool'),
                        flt_time((11,0), (0,5))],
                postprocess=[postprocess_mempool])

In [None]:
plot.plot_commit_time(logsC)
None

In [None]:
plot.plot_n_tx_in_block(logsC)
None

In [None]:
plot.plot_mempool_size([x for x in logsM.values()])
None

In [None]:
splot.splot(logsC, w=4000)

# Scratchpad

In [None]:
d = logsC['validator5']

In [None]:
d=d.drop(['index','host'], axis=1)

In [None]:
(t1,t2) = time_range((10,5),(0,20))

In [None]:
s = Search(using=es, index=index).\
        filter('term',  env='atum').\
        filter('term',  ns ='net').\
        filter('range', at ={"gte":t1, "lt":t2}).\
        filter('term', host='validator5').\
        source(['at','msg','data','sev'])

In [None]:
df = postprocess_entries(s).reset_index(drop=True)

In [None]:
df1=df[df['msg']=='Gossip stats'].reset_index(drop=True)

In [None]:
df2 = pd.DataFrame.from_records(df1['data'].values)
df2['at'] = df1['at']

In [None]:
plt.figure()
plt.grid()
plt.plot(df2['at'].values, df2['RxPC'].values)

In [None]:
llc = query_consensus((10,5),(0,20), 'atum')

In [None]:
d= llc['validator5'].drop(['host'], axis=1)

In [None]:
(lambda n : d[n:n+50])(200)