## Example Summary

The following example shows an end-to-end real-time data ingest and persistence framework orchestrated from Python which once running will allow:

1. Ingestion of trade and quote data in real-time from a data feed
2. Derivation of analytic insights into this data
3. Persistence of the raw and derived data at end-of-day
4. Querying across multiple real-time and historical process via a password protected gateway
5. Subscription to raw datasets for users requiring access to the data in real-time
    
The full infrastructure we will build will look as follows:

<img src="./images/evolving-system/full-infra.png" alt="Drawing" style="width: 750px;"/>

For a full breakdown on streaming within PyKX see our documentation [here](https://code.kx.com/pykx/user-guide/advanced/streaming/index.html) to start your journey.

#### Initialise PyKX

In [None]:
import pykx as kx
import subprocess

In [None]:
import os
os.environ['QHOME'] = '/usr/local/anaconda3/envs/qenv/q'

#### Create a Historical Database

To test queries across multiple processes and database types (in-memory vs on-disk) you can generate a Historical Database in the below cell by calling the Python script `generate_hdb.py` available within your zip file.

In [None]:
with kx.PyKXReimport():
    db = subprocess.Popen(
        ['python', 'generate_hdb.py',
         '--datapoints', '100000',
         '--days', '5',
         '--name', 'db'],
        stdin=subprocess.PIPE,
        stdout=None,
        stderr=None,
    )

rc = db.wait()
if rc !=0:
    db.stdin.close()
    db.kill()
    raise Exception('Generating HDB failed')
else:
    db.stdin.close()
    db.kill()

#### Define Required Schemas

The data published to the real-time system comes in the form of a `trade` and `quote` table with derived analytics stored in an `aggregate` table.

This functionality makes use of the `kx.schema.builder` functionality, see [here](https://code.kx.com/pykx/api/schema.md#builder) for full API definition.

In [None]:
trade = kx.schema.builder({
    'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
    'exchange': kx.SymbolAtom, 'sz': kx.LongAtom,
    'px': kx.FloatAtom})

quote = kx.schema.builder({
    'time': kx.TimespanAtom  , 'sym': kx.SymbolAtom,
    'exchange': kx.SymbolAtom, 'bid': kx.FloatAtom,
    'ask': kx.FloatAtom      , 'bidsz': kx.LongAtom,
    'asksz': kx.LongAtom})

aggregate = kx.schema.builder({
    'time': kx.TimespanAtom, 'sym': kx.SymbolAtom,
    'trdvol': kx.FloatAtom , 'maxpx': kx.FloatAtom,
    'minpx': kx.FloatAtom  , 'maxbpx': kx.FloatAtom,
    'minapx': kx.FloatAtom , 'baspread': kx.FloatAtom})

#### Core Ingest framework

The central pillar of the ingestion framework is composed of three connected processes a Tickerplant, Real-Time Database(RDB) and Historical Database(HDB). For this example each of these processes is configured using a single function call to the class [`kx.tick.BASIC`](https://code.kx.com/pykx/api/tick.html#BASIC). A full breakdown of these processes and how they interact can be found [here](https://code.kx.com/pykx/user-guide/advanced/streaming/basic.html). 

<img src="./images/evolving-system/simple-no-feed.png" alt="Drawing" style="width: 500px;"/>

The library functions called in this cell are as follows:

- [kx.tick.BASIC](https://code.kx.com/pykx/api/tick.html#pykx.tick.BASIC)
- [simple.start](https://code.kx.com/pykx/api/tick.html#pykx.tick.BASIC.start)

In [None]:
simple = kx.tick.BASIC(
    tables = {'trade': trade, 'quote': quote, 'aggregate': aggregate},
    ports={'tickerplant': 5010, 'rdb': 5012, 'hdb': 5011},
    log_directory = 'log',
    database = 'db'
)
simple.start()

---

### Add Data Feed and Python Subscriber

The following section adds a data-feed which publishes data to the trade and quote tables and a subscriber which validates that the data is available to subscribers.

<img src="./images/evolving-system/feed-sub.png" alt="Drawing" style="width: 750px;"/>

Firstly let's generate a data feed which publishes trade and quote messages to the Tickerplant on port 5010.

In [None]:
with kx.PyKXReimport():
    feed = subprocess.Popen(
        ['python', 'feed.py'],
        stdin=subprocess.PIPE,
        stdout=None,
        stderr=None,
    )

Now that data is being published to our system you can generate a subscribing process to get access to the latest trade information printing the number of datapoints which have been processed.

In [None]:
with kx.PyKXReimport():
    subscriber = subprocess.Popen(
        ['python', 'subscriber.py'],
        stdin=subprocess.PIPE,
        stdout=None,
        stderr=None,
    )

The above cell provides real-time information about the number of messages that have been processed. Printing to standard out in later cells will introduce noise that may be unwanted, you can stop printing your subscriber by running the following cell

In [None]:
subscriber.stdin.close()
subscriber.kill()

---

### Add Chained Tickerplant and Real-Time Event Processor

As highlighted [here](https://code.kx.com/pykx/user-guide/advanced/streaming/rta.html) the application of real-time analytics on your data can result in issues with slow subscribers which can be potentially lead to data corruption and loss.

A common usage pattern to avoid this is the addition of a [`chained tickerplant`](https://code.kx.com/pykx/user-guide/advanced/streaming/rta.html#protecting-data-ingest). In the below cells we will build a chained tickerplant which subscribes to the primary data ingestion pipeline and to which a real-time analytic process subscribes creating analytics which join information from the trade and quote table to derive analytic insights.

<img src="./images/evolving-system/analytic-addition.png" alt="Drawing" style="width: 500px;"/>

The library functions called in this cell are as follows:

- [kx.tick.TICK](https://code.kx.com/pykx/api/tick.html#pykx.tick.TICK)
- [chained_tp.start](https://code.kx.com/pykx/api/tick.html#pykx.tick.TICK.start)

In [None]:
chained_tp = kx.tick.TICK(port=5013, chained=True)
chained_tp.start({'tickerplant': 'localhost:5010'})

Now that your chained tickerplant is started we can initialize a real-time processor which subscribes to `trade` and `quote` data

The library functions called in this cell are as follows:

- [kx.tick.RTP](https://code.kx.com/pykx/api/tick.html#pykx.tick.RTP)

In [None]:
rte = kx.tick.RTP(port=5014, subscriptions = ['trade', 'quote'], vanilla=False)

In our real-time processor we are looking to achieve three things:

1. Filter out any messages from tables other than `trade`/`quote` if received
2. Apply a post-processing function which derives aggregate information about all `trade`/`quote` data seen in the current day to keep up-to-date information about the market.
3. Publish the aggregate data back to the primary tickerplant on port 5010 to ensure that the information is persisted. 

In [None]:
def pre_processor(table, message):
    if table in ['trade', 'quote']:
        return message
    return None

# Define a Python post-processing function which publishes back to
# the tickerplant
def post_processor(table, message):
    tradeagg = kx.q.qsql.select('trade',
                                 columns={'trdvol': 'sum px*sz',
                                          'maxpx': 'max px',
                                          'minpx': 'min px'},
                                 by='sym')
    quoteagg = kx.q.qsql.select('quote',
                                 columns={'maxbpx': 'max bid',
                                          'minapx': 'min ask',
                                          'baspread': 'max[bid]-min[ask]'},
                                 by='sym')
    tab = tradeagg.merge(quoteagg, how='left', q_join=True).reset_index()
    tab['time'] = kx.TimespanAtom('now')
    aggregate = kx.q.xcols(['time', 'sym'], tab)
    kx.q['aggregate'] = aggregate
    with kx.SyncQConnection(port=5010, wait=False, no_ctx=True) as q:
        q('.u.upd', 'aggregate', aggregate._values)
    return None

Now that the functions to be used are defined can do the following:

1. Specify that the process requires the Python libraries `pykx` to be available as `kx`
2. Register the pre and post processing functions

The library functions called in this cell are as follows:

- [rte.libraries](https://code.kx.com/pykx/api/tick.html#pykx.tick.RTP.start)
- [rte.pre_processor](https://code.kx.com/pykx/api/tick.html#pykx.tick.RTP.pre_processor)
- [rte.post_processor](https://code.kx.com/pykx/api/tick.html#pykx.tick.RTP.post_processor)

In [None]:
rte.libraries({'kx': 'pykx'})
rte.pre_processor(pre_processor)
rte.post_processor(post_processor)

Finally we can  start the real-time processor listening for messages from the chained-tickerplant on port 5013

The library functions called in this cell are as follows:

- [rte.start](https://code.kx.com/pykx/api/tick.html#pykx.tick.RTP.start)

In [None]:
rte.start({'tickerplant': 'localhost:5013'})

While the above steps allow this processing to be possible it can all be configured in two steps when setting up your real-time processor. This is outlined [here](https://code.kx.com/pykx/user-guide/advanced/streaming/rta.html#running-all-setup-at-once).

#### Add a Query API across real-time and historical data

The following section adds query APIs to the existing real-time processor and historical database processes which in each case queries the trade table to calculate the the number of trades for a specific symbol. How this is done varies slightly for each process type:

- Real-Time Processor: Query the in-memory table using SQL
- Historical Database: Query using QSQL the on-disk database limiting the search to N-Days in the past

In [None]:
def RTE_query(sym):
    return kx.q.sql('select count(sym) from trade where sym=$1', sym)

def HDB_query(sym, n):
    today = kx.DateAtom('today')
    return kx.q.qsql.select('trade', {'sym':'count sym'}, where = [f'date in({today-n};{today})', f'sym like "{sym}"'])

The library functions called in the following cells are:

- [rte.register_api](https://code.kx.com/pykx/api/tick.html#pykx.tick.STREAMING.register_api)
- [simple.hdb.libraries](https://code.kx.com/pykx/api/tick.html#pykx.tick.STREAMING.libraries)
- [simple.hdb.register_api](https://code.kx.com/pykx/api/tick.html#pykx.tick.STREAMING.register_api)

In [None]:
rte.register_api('custom_rte', RTE_query)

In [None]:
simple.hdb.libraries({'kx': 'pykx'})
simple.hdb.register_api('custom_hdb', HDB_query)

### Add a gateway to allow querying across the real-time and historical datasets

As a final step we will add a gateway process to which all users querying the system will connect and will allow data to be queried from both the real-time and historical datasets

<img src="./images/evolving-system/gateway.png" alt="Drawing" style="width: 750px;"/>


The following cell defines a function which can call named functions on the `custom_rte` and `custom_hdb` defined on ports named at initialization of the gateway below.

In [None]:
def gateway_function(sym, n_days=0):
    rte_data = gateway.call_port('rte', 'custom_rte', sym)
    if n_days>0:
        hdb_data = gateway.call_port('hdb', 'custom_hdb', sym, n_days)
    else:
        hdb_data = kx.Table(data={'sym': [0]})
    return rte_data + hdb_data

Additionally we can define a function which specifies the username/password information required by users querying the gateway.

In [None]:
def user_validation(username, password):
    if username == 'test_user':
        return True
    return False

The library functions called in the following cells are:

- [kx.tick.GATEWAY](https://code.kx.com/pykx/api/tick.html#pykx.tick.GATEWAY)
- [gateway.start](https://code.kx.com/pykx/api/tick.html#pykx.tick.GATEWAY.start)

In [None]:
gateway = kx.tick.GATEWAY(
    port=5015,
    libraries = {'kx': 'pykx'},
    apis = {'gateway_function': gateway_function},
    connections={'hdb': 'localhost:5011', 'rte': 'localhost:5014'},
    connection_validator = user_validation
)
gateway.start()

We can now emulate a user querying the gateway as follows

In [None]:
with kx.SyncQConnection(port=5015, no_ctx=True, username='test_user') as q:
    data = q('gateway_function', 'AAPL', 0)

In [None]:
data

### Infrastructure shutdown

To finish this notebook we can finally stop each of the specified processes and the data feed established.

In [None]:
feed.stdin.close()
feed.kill()

The library functions called in the following cells are:

- [rte.stop](https://code.kx.com/pykx/api/tick.html#pykx.tick.STREAMING.stop)
- [chained_tp.stop](https://code.kx.com/pykx/api/tick.html#pykx.tick.STREAMING.stop)
- [gateway.stop](https://code.kx.com/pykx/api/tick.html#pykx.tick.STREAMING.stop)
- [simple.stop](https://code.kx.com/pykx/api/tick.html#pykx.tick.BASIC.stop)

In [None]:
rte.stop()
chained_tp.stop()
simple.stop()
gateway.stop()