### Using KDB+ for a Financial Ticking

#### Install qPython

In [1]:
!python3.8 -m pip install qPython --user



#### Before run next (Create sesion, last one to test the connection)

First go to kdb folder  @It is necesary to load the csv files

cd path to kdb

Execute in terminal:

q -p 5000

h:hopen `:localhost:5000

h"2+2" 

In [2]:
from qpython import qconnection
import pandas as pd
import datetime 

def create_connection():
    q = qconnection.QConnection(host='localhost', port=5000, pandas = False)
    # initialize connection
    q.open()
    
    return q

def close_sconnection(q):
    q.close()



In [3]:
import pandas as pd
import numpy as np
import os, gc
from glob import glob
import plotly.express as px
import seaborn as sns
import matplotlib.pyplot as plt


# Create database and tables


In [4]:

def create_database(q):
    
    print('IPC version: %s. Is connected: %s' % (q.protocol_version, q.is_connected()))

    # Load price tick file
    q.sendSync('tick:("SIDTFIFIFIS"; enlist"|")0:`:../data/test/raw/test_tick_price_file.csv')
    
    # Create enumeration for table (this is required to create a splayed table and then a partitioned table)
    q.sendSync('tickenum: .Q.en[`:../data/test/raw/tickprice/] tick')
    # Save table
    q.sendSync('rsave `tickenum')


    # Load base tick file
    q.sendSync('base:("SSSSS"; enlist"|")0:`:../data/test/raw/test_tick_base_file.csv')
    # Create enumeration for table (this is required to create a splayed table and then a partitioned table)
    q.sendSync('baseenum: .Q.en[`:../data/test/raw/basetick/] base')
    # Save table
    q.sendSync('rsave `baseenum')

    q.sendSync('baseenum2:get `baseenum ')
    q.sendSync('priceenum2:get `tickenum')


In [5]:
import csv

def save_list_results(url, data):
    print("save_list_results")
    df = pd.DataFrame.from_records(data)
    df.to_csv(url)
   
def save_stats(url, data):
    print("save_stats")
    df = pd.DataFrame.from_records(data)
    df.to_csv(url)


In [6]:
!python3.8 -m pip install joblib --user



In [7]:
from joblib import Parallel, delayed
from multiprocessing.pool import Pool
import traceback
import time
NUM_THREADS = 5
NUM_POOLS = 10

def load_queries(path_to_queries) -> list:
    queries=[]
    for file in glob(path_to_queries+'*.q'):
        with open(file, 'r') as file:
            data = file.read().replace('\n', ';')
            queries.append(data)
    return queries

In [8]:
def run_query(q,run_id, query_number, queries, path_to_save_results, data_size, print_result=False):
    print(f"Running query {query_number} for scale factor {data_size}, saving results at {path_to_save_results}")
    try:
        start = time.time()
        temp=np.asarray(q(queries[query_number-1], qtype=1, adjust_dtype=False))
        df = pd.DataFrame(data=temp.tolist()).replace(False, np.NaN)
        result=df
        count = df.shape[0]
        end = time.time()
        result.to_csv(path_to_save_results.format(size=data_size, query_number=query_number))
        stats = {
            "run_id": run_id,
            "query_id": query_number,
            "start_time": start,
            "end_time": end,
            "elapsed_time": end-start,
            "row_count": count,
            'error': False
        }
        print(stats)
        return stats
    except Exception:
        print(traceback.format_exc())
        return {
            "run_id": run_id,
            "query_id": query_number,
            "start_time": time.time(),
            "end_time": time.time(),
            "elapsed_time": 0.0,
            "row_count": 0,
            "error": True
        }

In [9]:
def run_queries(q,run_id, queries, path_to_save_results, path_to_save_stats, data_size, print_result=False):
    stats = Parallel(n_jobs=NUM_THREADS, prefer="threads")(delayed(run_query)(q,run_id, i+1, queries, path_to_save_results, data_size, print_result) for i in range(len(queries)))
    save_list_results(path_to_save_stats, stats)


In [10]:
import math 
def run(data_sizes,q):    
    for i, data_size in enumerate(data_sizes):
        queries_path = "./queries/"
        result_path = "../kdb/results/result_Q{query_number}_{size}.csv"
        stats_path ="../kdb/results/test_run_stats_csv_{size}.csv".format(size=data_size)
        

        start_create_db = time.time()
        # Create metastore for the given size
        create_database(q)
        end_create_db = time.time()
        
        # Load queries for the given size
        queries = load_queries(queries_path)

        start_run = time.time()
        run_queries(q, i+1, queries, result_path, stats_path, data_size)
        end_run = time.time()
        
        df = pd.read_csv(stats_path)   
        response_t= math.sqrt(df[['elapsed_time']].prod().tolist()[0])

        # Saving the overall stats to csv file
        overall_stats = [{
            'batch_id': i+1,
            'create_db_time': end_create_db - start_create_db,
            'run_query_time': end_run - start_run,
            'Response Time Metric': response_t
            
        }]

        
        overall_stats_path = "../kdb/results/{size}_overall_stats.csv".format(size=data_size)
        save_stats(overall_stats_path, overall_stats)

In [11]:
#data_sizes=['10', '100','1000']
q=create_connection()
run(data_sizes=[''],q=q)
close_sconnection(q)


IPC version: 3. Is connected: True
['select [10;>TradeCumulative] TradeCumulative:sum(TradeSize) by Id from priceenum2 where TradeDate=2022.11.03', 'a: select TimeStamp:last TimeStamp where not null TradePrice, TradePrice: last TradePrice where not null TradePrice by Id from priceenum2 where TradeDate=2022.11.03;b: select TimeStamp_final: last TimeStamp where not null TradePrice, TradePrice_final:last TradePrice where not null TradePrice by Id  from priceenum2 where TradeDate=2022.11.04;c: a^b;select [10;>(TradePrice_final-TradePrice)%TradePrice] Id, loss:(TradePrice_final-TradePrice)%TradePrice from c', 'a: select  last AskPrice where not null AskPrice, last BidPrice where not null BidPrice by Id from priceenum2 where TradeDate=2022.11.03;;select [10;>2*(AskPrice-BidPrice)%(AskPrice+BidPrice)] Id, Percentage_spread: 2*(AskPrice-BidPrice)%(AskPrice+BidPrice) from a', 'complete: ej[`Id;baseenum2;priceenum2];;select [1;>TradeCumulative] TradeCumulative:count 1 by Id from complete where S

  new_mask = arr == x
