<span style="font-family:Arial; font-size:3.5em;">Process Checks</span>
- - - -

In [88]:
from IPython.display import HTML
HTML('''<script>
code_show=true; 
function code_toggle() {
 if (code_show){
 $('div.input').hide();
 } else {
 $('div.input').show();
 }
 code_show = !code_show
} 
$( document ).ready(code_toggle);
</script>
The raw code for this IPython notebook is by default hidden for easier reading.
To toggle on/off the raw code, click <a href="javascript:code_toggle()">here</a>.''')

## Tickerplant Results

In [89]:
#importing required modules
from qpython.qconnection import QConnection as qcon
from qpython.qcollection import QDictionary as qdict
from contextlib import redirect_stdout as rd_so
from datetime import datetime
import calendar
import time
import psutil
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

#set qcon variables
import csv
with open('credentials.csv') as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    line_count = 0
    for row in csv_reader:
        if line_count == 0:
            print(f'Credentials required are {", ".join(row)}')
            line_count += 1
        else:
            host=row[0]
            un=row[1]
            pswd=row[2]
            print(f'\t host:{row[0]} username: {row[1]} password: {row[2]}.')
            line_count += 1
    print(f'Processed {line_count} lines.')         

Credentials required are host, username, password
	 host:localhost username: admin password: admin.
Processed 2 lines.


### Process Summary

Table showing process name, status, PID, Port numbers, CPU and Memory usage.
* Process status indicated by colours green (up) and red (down).
* Killtick, tpreplay1 and compression1 should usually have a down status indicated.

In [90]:
#run torq summary
torq_sum= !"${TORQHOME}"/torq.sh summary

#convert array to numpy array
summary=np.asarray(torq_sum)

#split each element of array by | character
sum_list = [i.split('|') for i in summary]
df = pd.DataFrame(sum_list)

# take first row and use as header for df
new_header = df.iloc[0]
df = df[1:]
df.columns = new_header

#trim whitespace from headers
cols=[]
for i in df.columns:
    cols.append(str.strip(i))
df.columns=cols

#trim whitespace from all objects within dataframe
data = df.select_dtypes(['object'])
df[data.columns] = df.apply(lambda x: x.str.strip())

#function to extract status of processes - takes a string argument
def stat_proc(process):
    process_info = df.loc[df['PROCESS'] == process]
    STAT = process_info['STATUS'].astype(str)
    return STAT

# function to extract the port number for each process - takes a string argument
def find_portno(process):
    process_info = df.loc[df['PROCESS'] == process]
    PORT = process_info['PORT'].astype(str).astype(int)
    return PORT

# function to extract the port number for each process - takes a string argument
def find_pid(process):
    process_info = df.loc[df['PROCESS'] == process]
    PID = process_info['PID'].astype(str).astype(int)
    return PID

#function to decode bytes to strings
def byte_decode(table,cols):
    table[cols] = table[cols].applymap(lambda x: x.decode('utf-8'))

#function to print mem and cpu stats
def print_mem_stats(pid):
    with open("tmp.txt","a") as file:
        with rd_so(file):
            return (psutil.Process(pid)).memory_percent()
def print_cpu_stats(pid):
    with open("tmp.txt","a") as file:
        with rd_so(file):
            return (psutil.Process(pid)).cpu_percent(interval =1.0)

if stat_proc("tickerplant1").tolist() == ["down"]:
    print("Please check if the following processes are all up: Tickerplant1, rdb1, hdb1")
else:
    #return PIDs
    df.replace('', np.nan, inplace=True)
    pids = df["PID"]
    pids = pids.dropna()
    pids = [int(i) for i in pids]
    
    #create array of mem and cpu stats        
    mems=[]
    cpus=[]
    
    for pid in pids:
        mems.append(print_mem_stats(pid));
        cpus.append(print_cpu_stats(pid))
        
    #insert 0 into cpu/mem for processes that are down     
    nopid = df[df['PID'].isnull()].index.tolist()
    for i in range(len(nopid)):
        mems.insert((nopid[i]-1),0)
        cpus.insert((nopid[i]-1),0)
    
    #append mem and cpu onto summary table    
    df['%MEM']=mems
    df['%CPU']=cpus
    df=df.round({'%MEM':1})
    
    
# set colour on down processes to red and up processes to green
def colour_down_red(col):
    color = 'red' if 'down' in col else 'green'
    return 'color: %s' % color
df.style.applymap(colour_down_red, subset=['STATUS'])

Unnamed: 0,TIME,PROCESS,STATUS,PID,PORT,%MEM,%CPU
1,16:40:28,discovery1,up,2916.0,9601.0,0.3,0
2,16:40:28,tickerplant1,up,3008.0,9600.0,0.1,0
3,16:40:28,rdb1,up,3098.0,9602.0,21.0,1
4,16:40:28,hdb1,up,3191.0,9603.0,4.5,0
5,16:40:29,hdb2,up,3283.0,9604.0,0.2,0
6,16:40:29,wdb1,up,3375.0,9605.0,0.3,0
7,16:40:29,sort1,up,3469.0,9606.0,0.3,0
8,16:40:29,gateway1,up,3561.0,9607.0,0.4,0
9,16:40:29,killtick,down,,,0.0,0
10,16:40:29,monitor1,up,3653.0,9609.0,0.4,1


### Count of tables in Tickerplant 

Table to show the count in each table found in the Tickerplant 
* The counts in each of these tables should be 0, as the Tickerplant should not be storing any data.
* If the counts in any of these tables is not 0, this could indicate a slow subscriber.

In [91]:
if stat_proc("tickerplant1").tolist() == ["up"]:
    with qcon(host, port=find_portno('tickerplant1'), username=un, password=pswd,timeout=3.0) as q:
        #counts tickerplant tables
        tablecounts = q("enlist tables[]!count each value each tables[]", pandas=True)
    display(tablecounts)
else:
    print('The Tickerplant process is down. Unable to count tables in Tickerplant')

Unnamed: 0,logmsg,quote,quote_iex,trade,trade_iex
0,0,0,0,0,0


### Tickerplant Log file size increasing

Checks if log messages in the log file of the tickerplant is increasing.
   * If log messages are increasing, the tickerplant is receiving data.
   * If log messages are not increasing, the tickerplant may not be recieving data. 


In [92]:
if stat_proc("tickerplant1").tolist() == ["up"]:
    with qcon(host, port=find_portno('tickerplant1'), username=un, password=pswd,timeout=3.0) as q:
        #counts log file size is increasing 
        log1=(q("hcount .u.L"))
        time.sleep(2)
        log2=(q("hcount .u.L"))
        print ("Log file sizes are increasing: ", log1<log2)
else:
    print('Tickerplant process is down. Unable to check if Tickerplant log files are increasing.')

Log file sizes are increasing:  True


### Process handles connected to Tickerplant

Table to show the handles of processes connected to the tickerplant and if there are any slow subscribers.
  *  A process may be a slow subscriber if there is a number value in the output queue.

In [93]:
if stat_proc("tickerplant1").tolist() == ["up"]:
    with qcon(host, port=find_portno('tickerplant1'), username=un, password=pswd,timeout=3.0) as q:
    # create pandas dataframe for handles connected to tickerplant
        zW=q(".z.W[]", pandas=True)
        hprocesses=q("asc select w,u from .clients.clients", pandas=True)
        byte_decode(hprocesses, ['u'])
    # assign columns with new names
        keys=pd.DataFrame(zW.keys, columns=['handles'])
        values=pd.DataFrame(zW.values, columns=['output queue'])
        hprocesses=hprocesses.rename(columns={'u':'processes'})
    # apply new names
        zW2=keys.join(values)
    # join zW2 and hprocesses
        zW2=zW2.join(hprocesses.processes)
        zW2.set_index('handles', inplace=True)
    # Show only table with slow subscribers 
        display(zW2.loc[zW2['output queue'] > 0])
else:
    print('The Tickerplant process is down. Unable to check process handles connected to Tickerplant.')

Unnamed: 0_level_0,output queue,processes
handles,Unnamed: 1_level_1,Unnamed: 2_level_1


# RDB Results

In [94]:
if stat_proc("rdb1").tolist() == ["up"]:
    with qcon(host, port=find_portno('rdb1'), username=un, password=pswd,timeout=3.0) as q:
        #Check tables in rdb are same as tables in tickerplant 
        tables = q('all 1_tables[] in ((exec w from .servers.SERVERS where proctype=`tickerplant)0)("tables[]")')
        #Check count of tables in rdb - data is being sent from the tickerplant 
        tptordb = q('enlist tables[]!count each value each tables[]', pandas=True)
        time.sleep(3)
        tptordb2 = q('enlist tables[]!count each value each tables[]', pandas=True)
        #Check that data in table can be queried
        rdbtquery=q('5#select from last tables[]', pandas=True)
        byte_decode(rdbtquery,['sym'])
        tabname=q('last tables[]')
        #Check that only data from today is present in the rdb tables
        onedatet = q('enlist tables[]!"N"^{exec distinct time.date from x} each tables[]', pandas=True)
else:
    print('The RDB process is down.')

### RDB tables

Checks if the tables in the rdb are the same as the tables in the Tickerplant.

In [95]:
if stat_proc("rdb1").tolist() == ["up"]:
    print ("RDB tables are same as Tickerplant tables : ", tables)
else:
    print('Unable to check if tables in RDB are the same as Tickerplant tables.')

RDB tables are same as Tickerplant tables :  True


### RDB table counts

Checks to see if data is being sent from the Tickerplant to the RDB
* Indicates whether counts in RDB tables are increasing over time (3 second period)

In [96]:
if stat_proc("rdb1").tolist() == ["up"]:
    tptordb.set_index(tptordb.columns.tolist())
    tptordb2.set_index(tptordb2.columns.tolist())
    rdb_comp = tptordb < tptordb2
    # set colour; True = Green; False = Red
    def rdbtable_colour(val):
        color = 'IndianRed' if val ==False else 'DarkSeaGreen'
        return 'background-color: %s' % color
    display(rdb_comp.style.applymap(rdbtable_colour))
else:
    print('Unable to count RDB tables.')

Unnamed: 0,heartbeat,logmsg,quote,quote_iex,trade,trade_iex
0,False,False,True,False,True,False


### RDB Query
Checks to see if tables in the RDB can be queried 

In [97]:
if stat_proc("rdb1").tolist() == ["up"]:
    print ('Table to be queried:')
    name = tabname.decode('UTF-8')
    print (name)
    display(rdbtquery)
else:
    print('Unable to query RDB tables.')

Table to be queried:
trade_iex


Unnamed: 0,time,sym,price,size,stop,cond,ex,srctime
0,NaT,,,,False,,,NaT
1,NaT,,,,False,,,NaT
2,NaT,,,,False,,,NaT
3,NaT,,,,False,,,NaT
4,NaT,,,,False,,,NaT


### RDB Date Checks
Check to see what date is in RDB tables

In [103]:
if stat_proc("rdb1").tolist() == ["up"]:
    display(onedatet)
    display(type(onedatet.heartbeat))
else:
    print('Unable to check date in RDB tables.')

Unnamed: 0,heartbeat,logmsg,quote,quote_iex,trade,trade_iex
0,"Series([], dtype: datetime64[ns])","Series([], dtype: datetime64[ns])",0 2019-12-11 dtype: datetime64[ns],"Series([], dtype: datetime64[ns])",0 2019-12-11 dtype: datetime64[ns],"Series([], dtype: datetime64[ns])"


# HDB Results


In [99]:
if stat_proc("hdb1").tolist() == ["up"]:
    with qcon(host, port=find_portno('hdb1'), username=un, password=pswd,timeout=10.0) as q:
        #Check hdb table counts excl. eod_summary and eod_summary_iex
        hdbtablecount=q('raze{0!select table:x,cnt:count i by date from x where date >=.z.d-5}each tables[] except `heartbeat`logmsg', pandas=True)
else:
    print('The HDB process is down.')

### HDB Table Counts
* Counts for heartbeat and logmsg should be 0
* If counts for other tables are 0, no data has been recieved for the day before

In [100]:
if stat_proc("hdb1").tolist() == ["up"]:
    if isinstance(hdbtablecount, pd.DataFrame):
        byte_decode(hdbtablecount,['table'])
        display(hdbtablecount.set_index(hdbtablecount.columns.tolist()))
    else:
        print('There is no data in the HDB')
else:
    print('Unable to count HDB tables.')

date,table,cnt
2019-12-10,quote,2730846
2019-12-10,trade,564746
