# Table of Contents
[Prerequisites](#pre)

[Run existing otq files](#run_from_file)

[Build Graphs via the API](#building)

  * [The EventProcessor type](#basics)
  * [The Graph type](#simple)
  * [Graphs with multiple EventProcessors](#multi)
  * [The Head EP on the Graph](#head)
  * [EventProcessor pins](#pins)
  * [Labeling an EventProcessor](#labels)  
  * [Accessing EventProcessors on the Graph via labels](#graph_labels)  
  * [Outputs from  multiple EventProcessors](#multi_output)
  * [Binding symbols to an EventProcessor](#bind)
  * [Using utililty functions for common Graph patterns](#chainlet)
  * [Printing the Graph](#print)
  
[Business Cases](#business)

  * [Retrieve database snapshot as of given time](#db_snapshot)
  * [Package trade filter conditions for reuse in different queries](#package_filters)
  * [Bucketed volume for N second buckets or N ticks buckets](#bucketed_volume_N)
  * [Append running total and sliding bucketed volumes to each trade tick and calculate a ratio between the two](#running_total_to_each_tick)
  * [Compute sliding aggregation per group](#sliding_aggr_per_group)
  * [Combine multiple aggregation functions](#mult_aggr_functions)
  * [Aggregate ticks into flexible volume-based buckets](#aggr_ticks_flex_buckets)
  * [Append total daily volume to each detail tick to compute % of total](#append_tot_vol_to_each_tick)
  * [Compute Relative Strength Index](#RSI)
  * [Get trades along with prevailing quotes](#jbt_trd_qte)
  * [Compute effective spread average and generate above/below mid flag for each trade](#spread_avg_mid_flag)
  * [Compute symbol pair RETURN correlation](#return_correlation)
  * [Get top N players, or how to MERGE time series to compute totals across symbols](#top_N_players)
  * [Compute ASK/BID VWAP and Volume over N seconds before and after each trade](#vwap_volume_for_each_trd)

<a><a id="pre"></a>
# Prerequisites
Before reading this tutorial you should already know Python (see the [Python Tutorial](http://docs.python.org/tut/)) and have some familiarlity with OneTick.

If you wish to work with the examples in this tutorial you must also have Python installed and have followed the setup procedures for both of OneTick's pyomd and NumPy_OneTickQuery modules - instructions for which can be found with your distribution.


<a><a id="run_from_file"></a>
# Run existing otq files
- TODO

<a><a id="building"></a>
# Build Graphs via the API
<a><a id="basics"></a>
## The EventProcessor type

The EventProcessor (EP) is the basic type in OneTick's quersive module. It is used to perform all of the analytics in OneTick and can be combined to construct powerful graphs via an intuitive interface. All EPs inherit from a single abstract class: *quersive.EventProcessor*:


**EventProcessor.name**<br>
&nbsp;&nbsp;&nbsp;&nbsp;name of the EventProcessor

**EventProcessor.sink**<br>
&nbsp;&nbsp;&nbsp;&nbsp;add an event processor to this event processor as a sink. Returns the same Graph object

**EventProcessor.source**:<br>
&nbsp;&nbsp;&nbsp;&nbsp;add an event processor to this event processor as a source. Returns the same Graph object

**EventProcessor.\__lshift\__**:<br>
&nbsp;&nbsp;&nbsp;&nbsp;overloads the >> operator and equivalent to EventProcessor.sink

**EventProcessor.\__rshift\__**:<br>
&nbsp;&nbsp;&nbsp;&nbsp;overloads the << operator and equivalent to EventProcessor.source
    
**EventProcessor.label**<br>
&nbsp;&nbsp;&nbsp;&nbsp;label the event processor, returns self.
            
**EventProcessor.symbol**<br>
&nbsp;&nbsp;&nbsp;&nbsp;bind a symbol to the event processor   
        
**EventProcessor.pin**<br>
&nbsp;&nbsp;&nbsp;&nbsp;attach to the pin of the event processor (i.e., 'if','else')
        
**EventProcessor.output**<br>
&nbsp;&nbsp;&nbsp;&nbsp;flag the event processor as an output node
    
**EventProcessor.Descriptor**<br>
&nbsp;&nbsp;&nbsp;&nbsp;read-only object that describes an EventProcessor
    
**EventProcessor.Descriptor.label**<br>
&nbsp;&nbsp;&nbsp;&nbsp;label of the event processor
            
**EventProcessor.Descriptor.symbol**<br>
&nbsp;&nbsp;&nbsp;&nbsp;bound symbol of the event processor   
        
**EventProcessor.Descriptor.pin**<br>
&nbsp;&nbsp;&nbsp;&nbsp;input/output pin of the event processor (i.e., 'if','else')
        
**EventProcessor.Descriptor.output**<br>
&nbsp;&nbsp;&nbsp;&nbsp;flag to set event processor as an output node
    
<br>

All of OneTick's EPs are exposed as python objects in the module. For example, the AddField EP can be instantiated like:


In [47]:
import quersive as q

a = q.add_field()
print(a)
print(type(a))
isinstance(a,q.EventProcessor)

AddField(name='ADD_FIELD',ticktype='',field='',value='')
<class 'quersive.ep.AddField'>


True

The above output demonstrates that the function call to q.add_field returns an object of type 'quersive.AddField' that inherits from the abstract class quersive.EventProcessor. Each call to an EP with no parameters passed in will populate its attributes with that EP's default values, also shown above.


All of the EP's attributes are listed in its function signature. For example, OneTick's AddField EP signature is:

```python
def add_field(ticktype='', field='',value='')
```

The above signfies that, in addition to the attributes accessible via the Descriptor, each EP has it's own set of attributes as listed in its signature and can be accessed in the usual way. For example, once instantiated, the 'field' attribute of the resulting 'AddField' can be accessed and set like:


In [48]:
import quersive as q

a = q.add_field()

print(a)

#set attributes
a.ticktype='TRD'
a.field='New_Field'
a.value = 'New_Value'

print(a.ticktype)
print(a.field)
print(a.value)


try:
    a.name='New_Name'
except AttributeError as e:
    print(str(e))
    
try:
    a.descriptor = q.EventProcessor.Descriptor(label='A',symbol='B',pin='',output=True)
except AttributeError as e:
    print(str(e))
    

AddField(name='ADD_FIELD',ticktype='',field='',value='')
TRD
New_Field
New_Value
can't set attribute
can't set attribute


Note that the name attribute is read-only as per the EventProcessor class and can never be set, nor can the descriptor object, as shown above.The Descriptor class is read-only. To set a Descriptor field on an object, each EP provides the appropriate functions. For example, to set the label of the AddField object, call its 'label' method. Note that the 'label' method actually returns the same AddField object, as shown below. This is useful for 'chaining' when building graphs (more below).


In [49]:
import quersive as q

a = q.add_field()

print(a.descriptor)
b = a.label('add_field')
print(a.descriptor)
(a is b)

Descriptor(symbol='',label='',output='False',pin='')
Descriptor(symbol='',label='add_field',output='False',pin='')


True

<a><a id="simple"></a>
## The Graph type

The Graph type represents a collection of EPs and facilitates building the Graph. EPs are directly sinked or sourced to each other while the Graph maintains references to them. Several other methods allow for simple graph construction and inspection.

**Graph.sink**<br>
&nbsp;&nbsp;&nbsp;&nbsp;sink an event processor to the graph. Returns the same Graph object.

**Graph.source**:<br>
&nbsp;&nbsp;&nbsp;&nbsp;source an event processor to the graph. Returns the same Graph object.

**Graph.\__lshift\__**:<br>
&nbsp;&nbsp;&nbsp;&nbsp;overloads the >> operator and equivalent to Graph.sink. 

**Graph.\__rshift\__**:<br>
&nbsp;&nbsp;&nbsp;&nbsp;overloads the << operator and equivalent to Graph.source. 
    
**Graph.head**:<br>
&nbsp;&nbsp;&nbsp;&nbsp;property to retrieve and set the head EP on the Graph.
    
**Graph.\__getitem\__**:<br>
&nbsp;&nbsp;&nbsp;&nbsp;Overloaded [] operator to retrieve an EP by label
    
**Graph.save**:<br>
&nbsp;&nbsp;&nbsp;&nbsp;Save the Graph to an otq file or to memory.
    
**Graph.to_text**:<br>
&nbsp;&nbsp;&nbsp;&nbsp;Return a textual representation of the Graph as a collections.OrderedDict object of strings.<br> &nbsp;&nbsp;&nbsp;&nbsp;Keys represent sources and Values are sinks represented as children (sinks) of the source.
    
**Graph.chainlet**:<br>
&nbsp;&nbsp;&nbsp;&nbsp;static utility method that accepts a list of EPs and returns a linear Graph.
    
**Graph.join_chainlets**:<br>
&nbsp;&nbsp;&nbsp;&nbsp;static utility method that accepts two lists of EPs joins them by time via the join_by_time EP <br> &nbsp;&nbsp;&nbsp;&nbsp;and returns a Graph.
<br><br>

The below code creates a passthrough, instantiates a Graph object, and runs it via the quersive.run_query:


In [50]:
import quersive as q
from datetime import datetime

p = q.passthrough(ticktype='TRD')
g = q.Graph(p)
result = q.run_query(g,symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
result

{'FULL_DEMO_L1::GS': [{'SEQ_NUM': array([1971504712011575, 1971504713008187, 1971504713508334, ...,
       1971527983664438, 1971527985164078, 1971527993164523], dtype=int64), 'TICK_STATUS': array([0, 0, 0, ..., 0, 0, 0], dtype=int32), 'Time': array(['2005-01-03T14:31:52.011000', '2005-01-03T14:31:53.008000',
       '2005-01-03T14:31:53.508000', ..., '2005-01-03T20:59:43.664000',
       '2005-01-03T20:59:45.164000', '2005-01-03T20:59:53.164000'], dtype='datetime64[us]'), 'OMDSEQ': array([0, 0, 0, ..., 0, 0, 0], dtype=int32), 'CORR': array([0, 0, 0, ..., 0, 0, 0], dtype=int32), 'COND': array([' ', ' ', ' ', ..., ' ', ' ', ' '], 
      dtype='<U1'), 'TICKER': array(['G', 'G', 'G', ..., 'G', 'G', 'G'], 
      dtype='<U1'), 'DELETED_TIME': array(['1970-01-01T00:00:00.000', '1970-01-01T00:00:00.000',
       '1970-01-01T00:00:00.000', ..., '1970-01-01T00:00:00.000',
       '1970-01-01T00:00:00.000', '1970-01-01T00:00:00.000'], dtype='datetime64[ms]'), 'SOURCE': array(['C', 'C', 'C', ..., 'C'

Note that the output of the above is a dictionary of a list of dictionaries. All the outputs of any symbol is retrievable via the 'call' operator. This allows all outputs from queries to be retrieved. Usually, only one output is produced by the graph or we are only interested in the 'last' output of the query. As a shortcut, the [] operator retrieves the 'last' output:

In [51]:
result('FULL_DEMO_L1::GS')

[{'COND': array([' ', ' ', ' ', ..., ' ', ' ', ' '], 
        dtype='<U1'),
  'CORR': array([0, 0, 0, ..., 0, 0, 0], dtype=int32),
  'DELETED_TIME': array(['1970-01-01T00:00:00.000', '1970-01-01T00:00:00.000',
         '1970-01-01T00:00:00.000', ..., '1970-01-01T00:00:00.000',
         '1970-01-01T00:00:00.000', '1970-01-01T00:00:00.000'], dtype='datetime64[ms]'),
  'EXCHANGE': array(['N', 'T', 'T', ..., 'N', 'N', 'N'], 
        dtype='<U1'),
  'OMDSEQ': array([0, 0, 0, ..., 0, 0, 0], dtype=int32),
  'PRICE': array([ 104.9 ,  104.9 ,  104.9 , ...,  104.85,  104.86,  104.86]),
  'SEQ_NUM': array([1971504712011575, 1971504713008187, 1971504713508334, ...,
         1971527983664438, 1971527985164078, 1971527993164523], dtype=int64),
  'SIZE': array([40400,  1200,  1000, ...,  1200,  4900, 10000], dtype=int32),
  'SOURCE': array(['C', 'C', 'C', ..., 'C', 'C', 'C'], 
        dtype='<U1'),
  'STOP_STOCK': array(['N', 'N', 'N', ..., 'N', 'N', 'N'], 
        dtype='<U1'),
  'TICKER': array(['G

In [52]:
result['FULL_DEMO_L1::GS']

{'COND': array([' ', ' ', ' ', ..., ' ', ' ', ' '], 
       dtype='<U1'),
 'CORR': array([0, 0, 0, ..., 0, 0, 0], dtype=int32),
 'DELETED_TIME': array(['1970-01-01T00:00:00.000', '1970-01-01T00:00:00.000',
        '1970-01-01T00:00:00.000', ..., '1970-01-01T00:00:00.000',
        '1970-01-01T00:00:00.000', '1970-01-01T00:00:00.000'], dtype='datetime64[ms]'),
 'EXCHANGE': array(['N', 'T', 'T', ..., 'N', 'N', 'N'], 
       dtype='<U1'),
 'OMDSEQ': array([0, 0, 0, ..., 0, 0, 0], dtype=int32),
 'PRICE': array([ 104.9 ,  104.9 ,  104.9 , ...,  104.85,  104.86,  104.86]),
 'SEQ_NUM': array([1971504712011575, 1971504713008187, 1971504713508334, ...,
        1971527983664438, 1971527985164078, 1971527993164523], dtype=int64),
 'SIZE': array([40400,  1200,  1000, ...,  1200,  4900, 10000], dtype=int32),
 'SOURCE': array(['C', 'C', 'C', ..., 'C', 'C', 'C'], 
       dtype='<U1'),
 'STOP_STOCK': array(['N', 'N', 'N', ..., 'N', 'N', 'N'], 
       dtype='<U1'),
 'TICKER': array(['G', 'G', 'G', ..., 

Note the latter produces a dict while the former produces a list of dicts, with each dict being keyed by OneTick's TickDescriptors with values as numpy arrays. This allows for rapid instantiation of a Pandas DataFrame object:

In [53]:
import pandas as pd

pd.DataFrame(result['FULL_DEMO_L1::GS']).head()


Unnamed: 0,COND,CORR,DELETED_TIME,EXCHANGE,OMDSEQ,PRICE,SEQ_NUM,SIZE,SOURCE,STOP_STOCK,TICKER,TICK_STATUS,Time
0,,0,1970-01-01,N,0,104.9,1971504712011575,40400,C,N,G,0,2005-01-03 14:31:52.011
1,,0,1970-01-01,T,0,104.9,1971504713008187,1200,C,N,G,0,2005-01-03 14:31:53.008
2,,0,1970-01-01,T,0,104.9,1971504713508334,1000,C,N,G,0,2005-01-03 14:31:53.508
3,,0,1970-01-01,T,0,104.9,1971504713511894,100,C,N,G,0,2005-01-03 14:31:53.511
4,,0,1970-01-01,T,0,104.9,1971504713515182,200,C,N,G,0,2005-01-03 14:31:53.515


Of course, the same could be done via the 'call' operator for any of the outputs, but more verbosely:

In [54]:
pd.DataFrame(result('FULL_DEMO_L1::GS')[-1]).head()

Unnamed: 0,COND,CORR,DELETED_TIME,EXCHANGE,OMDSEQ,PRICE,SEQ_NUM,SIZE,SOURCE,STOP_STOCK,TICKER,TICK_STATUS,Time
0,,0,1970-01-01,N,0,104.9,1971504712011575,40400,C,N,G,0,2005-01-03 14:31:52.011
1,,0,1970-01-01,T,0,104.9,1971504713008187,1200,C,N,G,0,2005-01-03 14:31:53.008
2,,0,1970-01-01,T,0,104.9,1971504713508334,1000,C,N,G,0,2005-01-03 14:31:53.508
3,,0,1970-01-01,T,0,104.9,1971504713511894,100,C,N,G,0,2005-01-03 14:31:53.511
4,,0,1970-01-01,T,0,104.9,1971504713515182,200,C,N,G,0,2005-01-03 14:31:53.515


To obtain all the outputs associated with a symbol, the call operator '()' should be used; the last output of the query is always achieved via the '[]' operator.

<a><a id="multi"></a>
## Graphs with multiple EventProcessors

Graphs can be constructed by sinking or sourcing EPs to one another. To filter the above results on PRICE, for example, sink a where_clause:

In [55]:
from datetime import datetime
import pandas as pd

w = q.where_clause(where='PRICE > 105')
root = p = q.passthrough(ticktype='TRD')
p.sink(w)
g = q.Graph(root)
result = q.run_query(g,symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
pd.DataFrame(result['FULL_DEMO_L1::GS']).head()

Unnamed: 0,COND,CORR,DELETED_TIME,EXCHANGE,OMDSEQ,PRICE,SEQ_NUM,SIZE,SOURCE,STOP_STOCK,TICKER,TICK_STATUS,Time
0,,0,1970-01-01,N,0,105.05,1971504952659752,1500,C,N,G,0,2005-01-03 14:35:52.659
1,E,0,1970-01-01,N,0,105.05,1971504956661036,200,C,N,G,0,2005-01-03 14:35:56.661
2,E,0,1970-01-01,N,0,105.08,1971504958660452,500,C,N,G,0,2005-01-03 14:35:58.660
3,,0,1970-01-01,N,0,105.08,1971504960661857,100,C,N,G,0,2005-01-03 14:36:00.661
4,,0,1970-01-01,N,0,105.07,1971504961161477,100,C,N,G,0,2005-01-03 14:36:01.161


TODO - Replace table below with add_fields. EPs support chaining sinks and sources, so that separate calls to sink or source can occur simultatneously. For example, the above example can be chained like:

In [56]:
from datetime import datetime
import pandas as pd

w = q.where_clause(where='PRICE > 105')
root = p = q.passthrough(ticktype='TRD')
p.sink(w).sink(q.table(fields='PRICE,SIZE,EXCHANGE'))
g = q.Graph(root)
result = q.run_query(g,symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
pd.DataFrame(result['FULL_DEMO_L1::GS']).head()

Unnamed: 0,EXCHANGE,PRICE,SIZE,Time
0,N,105.05,1500,2005-01-03 14:35:52.659
1,N,105.05,200,2005-01-03 14:35:56.661
2,N,105.08,500,2005-01-03 14:35:58.660
3,N,105.08,100,2005-01-03 14:36:00.661
4,N,105.07,100,2005-01-03 14:36:01.161


The above can be achieved also via operators in place of the 'sink' or 'source' methods:

In [57]:
import quersive as q
from datetime import datetime
import pandas as pd

root = p = q.passthrough(ticktype='TRD')
p >> q.where_clause(where='PRICE > 105') >> q.table(fields='PRICE,SIZE,EXCHANGE')
g = q.Graph(root)
result = q.run_query(g,symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
pd.DataFrame(result['FULL_DEMO_L1::GS']).head()

Unnamed: 0,EXCHANGE,PRICE,SIZE,Time
0,N,105.05,1500,2005-01-03 14:35:52.659
1,N,105.05,200,2005-01-03 14:35:56.661
2,N,105.08,500,2005-01-03 14:35:58.660
3,N,105.08,100,2005-01-03 14:36:00.661
4,N,105.07,100,2005-01-03 14:36:01.161


Note in the above example that we first created the passthrough EP, and then we constructed the Graph object to pass to quersive.run_query. The quesiv.Graph object is NOT and EP (i.e., Graph objects cannot be sinked our sourced themselves), but it does support sinking and sourcing of event processors. Rather than returning an EP, its sink and source methods return the same graph:

In [58]:
import quersive as q
from datetime import datetime
import pandas as pd

g = q.Graph(q.passthrough(ticktype='TRD')) >> q.where_clause(where='PRICE > 105') >> q.table(fields='PRICE,SIZE,EXCHANGE')
result = q.run_query(g,symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
pd.DataFrame(result['FULL_DEMO_L1::GS']).head()

Unnamed: 0,EXCHANGE,PRICE,SIZE,Time
0,N,105.05,1500,2005-01-03 14:35:52.659
1,N,105.05,200,2005-01-03 14:35:56.661
2,N,105.08,500,2005-01-03 14:35:58.660
3,N,105.08,100,2005-01-03 14:36:00.661
4,N,105.07,100,2005-01-03 14:36:01.161


Sourcing the where_clause can be easily achieved as well:

In [59]:
import quersive as q
from datetime import datetime
import pandas as pd

g = q.Graph(q.passthrough(ticktype='TRD')) << q.where_clause(where='PRICE > 105')
result = q.run_query(g,symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
pd.DataFrame(result['FULL_DEMO_L1::GS']).head()

Unnamed: 0,COND,CORR,DELETED_TIME,EXCHANGE,OMDSEQ,PRICE,SEQ_NUM,SIZE,SOURCE,STOP_STOCK,TICKER,TICK_STATUS,Time
0,,0,1970-01-01,N,0,105.05,1971504952659752,1500,C,N,G,0,2005-01-03 14:35:52.659
1,E,0,1970-01-01,N,0,105.05,1971504956661036,200,C,N,G,0,2005-01-03 14:35:56.661
2,E,0,1970-01-01,N,0,105.08,1971504958660452,500,C,N,G,0,2005-01-03 14:35:58.660
3,,0,1970-01-01,N,0,105.08,1971504960661857,100,C,N,G,0,2005-01-03 14:36:00.661
4,,0,1970-01-01,N,0,105.07,1971504961161477,100,C,N,G,0,2005-01-03 14:36:01.161


<a><a id="head"></a>
## The Head EP on the Graph

The Graph object always keeps track of the last EP sourced or sinked to it via the 'head' property; whenever an EP is added to the Graph, the head of the Graph is updated and yields the EP. For example, consider the above examples:

In [60]:
import quersive as q
from datetime import datetime
import pandas as pd

p = q.passthrough(ticktype='TRD')
g = q.Graph(p)

print(p.name == g.head.name)
print(p is g.head)

w = q.where_clause(where='PRICE>105')
g >> w

print(w.name == g.head.name)
print(w is g.head)

True
True
True
True


Care must be taken when sinking or sourcing to the Graph object vs. sinking or sourcing to the underlying EP directly. If the underlying EP is accessed or is sinked-to/sourced-from directly the Graph object's head will not be updated, though the EP will still be connected to the Graph. For example, 

In [61]:
import quersive as q
from datetime import datetime
import pandas as pd

p = q.passthrough(ticktype='TRD')
g = q.Graph(p)

print(p.name == g.head.name)
print(p is g.head)

w = q.where_clause(where='PRICE>105')
p >> w

print(w.name == g.head.name)
print(w is g.head)

True
True
False
False


In the above case, the 'where_clause' EP was directly sinked to the 'passthrough' EP and was therefore done 'off the graph', so the graph object's head still points to the passthrough EP. To update the head EP on the graph, it can be re-set, as:

In [62]:
import quersive as q
from datetime import datetime
import pandas as pd

p = q.passthrough(ticktype='TRD')
g = q.Graph(p)

print(p.name == g.head.name)
print(p is g.head)

w = q.where_clause(where='PRICE>105')
p >> w

g.head = w

print(w.name == g.head.name)
print(w is g.head)

True
True
True
True


<a><a id="pins"></a>
## EventProcessor pins
In the above example, the passthrough EP was sinked or sourced to the 'IF' pin of the Where_Clause EP, by default. To attach to the 'Else' pin, the 'pin' method of the EP object is called, below. In the example below, prices are filtered by PRICE < 105 since the Passthrough EP is sourcing the 'else' pin.


In [63]:
import quersive as q
from datetime import datetime
import pandas as pd

g = q.Graph(q.passthrough(ticktype='TRD')) << q.where_clause(where='PRICE > 105').pin('else')
result = q.run_query(g,symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
pd.DataFrame(result['FULL_DEMO_L1::GS']).head()

Unnamed: 0,COND,CORR,DELETED_TIME,EXCHANGE,OMDSEQ,PRICE,SEQ_NUM,SIZE,SOURCE,STOP_STOCK,TICKER,TICK_STATUS,Time
0,,0,1970-01-01,N,0,104.9,1971504712011575,40400,C,N,G,0,2005-01-03 14:31:52.011
1,,0,1970-01-01,T,0,104.9,1971504713008187,1200,C,N,G,0,2005-01-03 14:31:53.008
2,,0,1970-01-01,T,0,104.9,1971504713508334,1000,C,N,G,0,2005-01-03 14:31:53.508
3,,0,1970-01-01,T,0,104.9,1971504713511894,100,C,N,G,0,2005-01-03 14:31:53.511
4,,0,1970-01-01,T,0,104.9,1971504713515182,200,C,N,G,0,2005-01-03 14:31:53.515


Likewise, pins can be used when sinking from the Where_Clause EP:

In [64]:
import quersive as q
from datetime import datetime
import pandas as pd

g = q.Graph(q.where_clause(where='PRICE > 105').pin('else')) >> q.passthrough(ticktype='TRD')
result = q.run_query(g,symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
pd.DataFrame(result['FULL_DEMO_L1::GS']).head()

Unnamed: 0,COND,CORR,DELETED_TIME,EXCHANGE,OMDSEQ,PRICE,SEQ_NUM,SIZE,SOURCE,STOP_STOCK,TICKER,TICK_STATUS,Time
0,,0,1970-01-01,N,0,104.9,1971504712011575,40400,C,N,G,0,2005-01-03 14:31:52.011
1,,0,1970-01-01,T,0,104.9,1971504713008187,1200,C,N,G,0,2005-01-03 14:31:53.008
2,,0,1970-01-01,T,0,104.9,1971504713508334,1000,C,N,G,0,2005-01-03 14:31:53.508
3,,0,1970-01-01,T,0,104.9,1971504713511894,100,C,N,G,0,2005-01-03 14:31:53.511
4,,0,1970-01-01,T,0,104.9,1971504713515182,200,C,N,G,0,2005-01-03 14:31:53.515


<a><a id="labels"></a>
## Labeling an EventProcessor

Labeling an EP is necessary when an EP needs to distinguish between mulitiple sources. For example, several of the parameters to the 'JOIN_BY_TIME' EP require specification of one or several of the input data being sourced. LEADING_SOURCES is a parameter that asks how ticks should be updated when they are streaming data into the EP. This is achieved by labeling the input EPs:

In [65]:
import quersive as q
from datetime import datetime
import pandas as pd

trds = q.passthrough(ticktype='TRD')
qtes = q.passthrough(ticktype='QTE')
nticks = q.num_ticks(is_running_aggr=True,output_field_name='TICK_ID')

g = q.Graph(trds) >> nticks.label('t') >> q.join_by_time(leading_sources='t') << nticks.copy().label('q') << qtes
data = q.run_query(g,symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
print(pd.DataFrame(data['FULL_DEMO_L1::GS']).head())


                     Time q.ASK_EXCHANGE  q.ASK_PRICE  q.ASK_SIZE  \
0 2005-01-03 14:31:52.011              C        999.0          10   
1 2005-01-03 14:31:53.008              T        105.0          10   
2 2005-01-03 14:31:53.508              T        105.0          10   
3 2005-01-03 14:31:53.511              T        105.0          10   
4 2005-01-03 14:31:53.515              T        105.0          10   

  q.BID_EXCHANGE  q.BID_PRICE  q.BID_SIZE q.COND q.CORR q.DELETED_TIME  \
0              C         0.01          10      R            1970-01-01   
1              T       104.64           1      R      A     1970-01-01   
2              T       104.64           1      R      A     1970-01-01   
3              T       104.64           1      R      A     1970-01-01   
4              T       104.64           1      R      A     1970-01-01   

            ...           t.OMDSEQ t.PRICE         t.SEQ_NUM t.SIZE  t.SOURCE  \
0           ...                  0   104.9  197150471201157

<a><a id="graph_labels"></a>
## Accessing EventProcessors on the Graph via labels

In the previous example, trades and quotes were joined by time. To filter all prices > 105, we could make use of the head EP and build the graph like:

In [66]:
import quersive as q
from datetime import datetime
import pandas as pd

trds = q.passthrough(ticktype='TRD')
qtes = q.passthrough(ticktype='QTE')
nticks = q.num_ticks(is_running_aggr=True,output_field_name='TICK_ID')

join = q.join_by_time(leading_sources='t') 
g = q.Graph(trds) >> nticks.label('t') >> join << nticks.copy().label('q') << qtes
join >> q.where_clause(where='t.PRICE>105')
data = q.run_query(g,symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
print(pd.DataFrame(data['FULL_DEMO_L1::GS']).head())

                     Time q.ASK_EXCHANGE  q.ASK_PRICE  q.ASK_SIZE  \
0 2005-01-03 14:35:52.659              N       105.05           1   
1 2005-01-03 14:35:56.661              N       105.08           7   
2 2005-01-03 14:35:58.660              N       105.08           1   
3 2005-01-03 14:36:00.661              T       106.64          10   
4 2005-01-03 14:36:01.161              N       105.08          21   

  q.BID_EXCHANGE  q.BID_PRICE  q.BID_SIZE q.COND q.CORR q.DELETED_TIME  \
0              N       105.00           9      R            1970-01-01   
1              N       105.05           1      R            1970-01-01   
2              N       105.05           1      R            1970-01-01   
3              T        90.85          10      R      A     1970-01-01   
4              N       105.07           1      R            1970-01-01   

            ...           t.OMDSEQ t.PRICE         t.SEQ_NUM t.SIZE  t.SOURCE  \
0           ...                  0  105.05  197150495265975

Above, the 'join_by_time' EP was explicitly set to the variable 'join' and the where_clause EP was sinked thereafter. A more intuitive approach is to label the join_by_time EP and set it directly:

In [67]:
import quersive as q
from datetime import datetime
import pandas as pd

trds = q.passthrough(ticktype='TRD')
qtes = q.passthrough(ticktype='QTE')
nticks = q.num_ticks(is_running_aggr=True,output_field_name='TICK_ID')

g = q.Graph(trds) >> nticks.label('t') >> q.join_by_time(leading_sources='t').label('join') << nticks.copy().label('q') << qtes
g['join'] >> q.where_clause(where='t.PRICE>105')
print(g.head is g['join'])
print(type(g))
print(type(g['join']))

data = q.run_query(g,symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
print(pd.DataFrame(data['FULL_DEMO_L1::GS']).head())

False
<class 'quersive.graph.Graph'>
<class 'quersive.ep.JoinByTime'>
                     Time q.ASK_EXCHANGE  q.ASK_PRICE  q.ASK_SIZE  \
0 2005-01-03 14:35:52.659              N       105.05           1   
1 2005-01-03 14:35:56.661              N       105.08           7   
2 2005-01-03 14:35:58.660              N       105.08           1   
3 2005-01-03 14:36:00.661              T       106.64          10   
4 2005-01-03 14:36:01.161              N       105.08          21   

  q.BID_EXCHANGE  q.BID_PRICE  q.BID_SIZE q.COND q.CORR q.DELETED_TIME  \
0              N       105.00           9      R            1970-01-01   
1              N       105.05           1      R            1970-01-01   
2              N       105.05           1      R            1970-01-01   
3              T        90.85          10      R      A     1970-01-01   
4              N       105.07           1      R            1970-01-01   

            ...           t.OMDSEQ t.PRICE         t.SEQ_NUM t.SIZE  t

EPs that are labeled AND sinked/sourced to the graph can be accessed from the graph directly via the overloaded dict operator '[]'. In the above example, g['join'] refers to the underlying EP, join_by_time, and is an example of sinking an EP off-the-graph, as described above.

<a><a id="multi_output"></a>
## Outputs from multiple EventProcessors

By default, only leaf-nodes (defined as those EPs that do not have any sinks and are not sourced by any other EP) are automatically output from the graph. In all of the examples thus far, only a single leaf node is present. Any EP, however, can output data via the 'output' function on the EP. The below example outputs from both EPs:

In [68]:
import quersive as q
from datetime import datetime
import pandas as pd

g = q.Graph(q.where_clause(where='PRICE > 105').pin('else').output(True)) >> q.passthrough(ticktype='TRD')
result = q.run_query(g,symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
for o in result('FULL_DEMO_L1::GS'):
    print(pd.DataFrame(o).head())

  COND  CORR DELETED_TIME EXCHANGE  OMDSEQ   PRICE           SEQ_NUM  SIZE  \
0          0   1970-01-01        N       0  105.05  1971504952659752  1500   
1    E     0   1970-01-01        N       0  105.05  1971504956661036   200   
2    E     0   1970-01-01        N       0  105.08  1971504958660452   500   
3          0   1970-01-01        N       0  105.08  1971504960661857   100   
4          0   1970-01-01        N       0  105.07  1971504961161477   100   

  SOURCE STOP_STOCK TICKER  TICK_STATUS                    Time  
0      C          N      G            0 2005-01-03 14:35:52.659  
1      C          N      G            0 2005-01-03 14:35:56.661  
2      C          N      G            0 2005-01-03 14:35:58.660  
3      C          N      G            0 2005-01-03 14:36:00.661  
4      C          N      G            0 2005-01-03 14:36:01.161  
  COND  CORR DELETED_TIME EXCHANGE  OMDSEQ  PRICE           SEQ_NUM   SIZE  \
0          0   1970-01-01        N       0  104.9  197150

Note that to see all of the output, the call operator '()' is used rather than the '[]' operator.

<a><a id="bind"></a>
## Binding symbols to an EventProcessor

Symbols can be directly bound to an EP via the 'Symbol' method on the EP. Several  [examples](#return_correlation) below illustrate this usage

<a><a id="chainlet"></a>
## Using utility functions for common Graph patterns

Several patterns appear very frequently when building (simple) Graphs. Chief among them is the singly-connected graph. For example:

In [69]:
import quersive as q
from datetime import datetime
import pandas as pd

g = q.Graph(q.passthrough(ticktype='TRD')) >> q.where_clause(where='PRICE>105') >> q.table(fields='PRICE')
result=q.run_query(g, symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
print(pd.DataFrame(result['FULL_DEMO_L1::GS']).head())

    PRICE                    Time
0  105.05 2005-01-03 14:35:52.659
1  105.05 2005-01-03 14:35:56.661
2  105.08 2005-01-03 14:35:58.660
3  105.08 2005-01-03 14:36:00.661
4  105.07 2005-01-03 14:36:01.161


The above example is a linear chain where all EPs are sinked together. In fact, most of the examples in this tutorial are of this type. Since this is a recurring theme a convenience function is available to construct it. Graph.chainlet is a static function that accepts a list of EPs and builds a singly-linked graph (chainlet) and returns the graph object:

In [70]:
import quersive as q
from datetime import datetime
import pandas as pd

g = q.Graph.chainlet([q.passthrough(ticktype='TRD'),q.where_clause(where='PRICE>105'),q.table(fields='PRICE')])
result=q.run_query(g, symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
print(pd.DataFrame(result['FULL_DEMO_L1::GS']).head())

    PRICE                    Time
0  105.05 2005-01-03 14:35:52.659
1  105.05 2005-01-03 14:35:56.661
2  105.08 2005-01-03 14:35:58.660
3  105.08 2005-01-03 14:36:00.661
4  105.07 2005-01-03 14:36:01.161


Another common usage pattern results from using the join-by-time EP. Another utility function, Graph.join_chainlets, is available to join to lists of EPs. It returns the Graph object with the head node pointing to the 'join' EP:

In [71]:
import quersive as q
from datetime import datetime
import pandas as pd

trds = q.passthrough(ticktype='TRD')
qtes = q.passthrough(ticktype='QTE')
nticks = q.num_ticks(is_running_aggr=True,output_field_name='TICK_ID')

g = q.Graph.join_chainlets(lhs_epd=[trds,nticks.label('t')],rhs_epd=[qtes,nticks.copy().label('q')])
result=q.run_query(g, symbol='FULL_DEMO_L1::GS',s=datetime(2005,1,3,9,30),e=datetime(2005,1,3,16))
print(pd.DataFrame(result['FULL_DEMO_L1::GS']).head())
print(g.head.name)

  L.COND  L.CORR L.DELETED_TIME L.EXCHANGE  L.OMDSEQ  L.PRICE  L.SEQ_NUM  \
0              0     1970-01-01                    0      0.0          0   
1              0     1970-01-01                    0      0.0          0   
2              0     1970-01-01                    0      0.0          0   
3              0     1970-01-01                    0      0.0          0   
4              0     1970-01-01                    0      0.0          0   

   L.SIZE L.SOURCE L.STOP_STOCK         ...         R.NASDAQ_BBO_IND  \
0       0                               ...                        3   
1       0                               ...                        1   
2       0                               ...                        0   
3       0                               ...                        0   
4       0                               ...                        0   

   R.NBBO_IND  R.OMDSEQ R.SEQ_NUM R.SOURCE  R.TICKER  R.TICK_ID R.TICK_STATUS  \
0           1         0     2

<a><a id="print"></a>
## Printing the Graph

Sometimes it is useful to view the relation of all the EPs on the Graph, especially for graphs with numerous EPs, without having to inspect the code. The method Graph.to_text() returns an OrderedDict of the names of all of the EPs connected in a Graph to easily see the connections. For example:

In [72]:
import quersive as q

g = q.Graph(q.passthrough(ticktype='ARCA::PRL'))
print(g.to_text())

g >> q.ob_snapshot_wide(bucket_interval=60,max_levels=3,book_delimiters='D')
print(g.to_text())

g << q.passthrough(ticktype='BATS::PRL')
print(g.to_text())
    

OrderedDict([('PASSTHROUGH|0', [])])
OrderedDict([('PASSTHROUGH|0', ['OB_SNAPSHOT_WIDE|1'])])
OrderedDict([('PASSTHROUGH|0', ['OB_SNAPSHOT_WIDE|1']), ('PASSTHROUGH|2', ['OB_SNAPSHOT_WIDE|1'])])


The above shows the connections of the graph, each with a numeric label. After the first print statement, only a Passthrough EP is present on the graph with no connections. This is represented as an OrderedDict with a single key, the EP Passthrough, and an empty list showing it has no connections. After the second print statement, the Passthrough EP is connected to the OB_SNAPSHOT_WIDE EP, as demonstrated by it being present in the list of connections of the Passthrough EP. The last print statement shows that a new Passthrough EP has been added, with a label '2', and it is connected to the same OB_SNAPSHOT_WIDE EP. Sources appear as keys and sinks appear as its list of values.

<a><a id="business"></a>
# Business Cases

The graphical representation of below use cases can be found [here]('file:///C:/OMD/one_market_data/one_tick/docs/OneTickQueryTrainingHandsOnExercises.pdf').

In [73]:
import pandas as pd
import quersive as q
from datetime import datetime

<a><a id="db_snapshot"></a>
### Retrieve database snapshot as of given time

In [89]:
symbols = ['FULL_DEMO_L1::IBM']
start_time, end_time = datetime(2006, 6, 1, 9, 30), datetime(2006, 6, 1, 9, 30)

where_clause = q.where_clause(where='ASK_PRICE>0 AND BID_PRICE>0 AND COND="R"')
qte_pass = q.passthrough(ticktype='QTE', go_back_to_first_tick='86400', max_back_ticks_to_prepend=10)
add_field = q.add_field(field='ORIG_TS msectime', value='TIMESTAMP')
graph = q.Graph.chainlet([where_clause, add_field, qte_pass])

data = q.run_query(graph, symbols, start_time, end_time)
df = pd.DataFrame(data[symbols[0]])
print(df.head())

  ASK_EXCHANGE  ASK_PRICE  ASK_SIZE BID_EXCHANGE  BID_PRICE  BID_SIZE COND  \
0            D       80.7        20            D      78.50        20    R   
1            D       80.7        20            D      78.50        20    R   
2            P       79.9         2            P      79.31         1    R   
3            D       80.5         1            D      78.50        20    R   
4            D       80.5         1            D      78.50        20    R   

  CORR DELETED_TIME EXCHANGE MMID NASDAQ_BBO_IND NBBO_IND  OMDSEQ  \
0    A   1970-01-01        D    B              2        0       0   
1    A   1970-01-01        D    N              2        0       1   
2        1970-01-01        P                   2        1       0   
3    A   1970-01-01        D    B              2        0       0   
4    A   1970-01-01        D    N              2        0       1   

                  ORIG_TS  SEQ_NUM SOURCE TICKER  TICK_STATUS  \
0 2006-06-01 13:27:47.567   115916      C      I   

<a><a id="package_filters"></a>
### Package trade filter conditions for reuse in different queries

In [75]:
symbols = ['IBM']
start_time, end_time = datetime(2006, 6, 1, 9, 30), datetime(2006, 6, 1, 16)
db = 'FULL_DEMO_L1'
conditions_to_exclude = 'N4Q'
tz = 'EST5EDT'

trd_pass = q.passthrough(ticktype='{}::TRD'.format(db))
char_present = q.character_present(discard_on_match=True, field='COND', characters=conditions_to_exclude)
where_clause = q.where_clause(where='PRICE>80 AND SIZE>0 AND MOD(DAY_OF_WEEK(TIMESTAMP,"{}"),6)!=0'.format(tz))
time_filter = q.time_filter(start_time='93000000', end_time='1600000', timezone='EST5EDT')

# create a nested query and save it to a file
nested_query = q.Graph.chainlet([trd_pass, char_present, where_clause])
nested_query.sink(event_processor=time_filter.pin('OUT'))
otq_file = nested_query.save(symbols, start_time, end_time, otq_file='C:/temp/my_sample_filter.otq', query_name='_FilterTrades')

#print the graph to view it
print(nested_query.to_text())

# create the main query
trd_pass_main = q.passthrough('{}::TRD'.format(db))
main_graph = q.Graph.chainlet([trd_pass_main, q.nested_otq(otq_name=otq_file), q.vwap(output_field_name='VWAP')])
# Add another output leg with a filter
# Make the TRD PASSTHROUGH EP node a head of the graph
main_graph.head = trd_pass_main
main_graph >> q.vwap(output_field_name='VWAP')
main_graph.save(symbols, start_time, end_time, otq_file='C:/temp/main_filtering.otq', query_name='main')

#print the main graph:
print(main_graph.to_text())

data = q.run_query(main_graph, symbols, start_time, end_time)
df_1, df_2 = pd.DataFrame(data(symbols[0])[0]), pd.DataFrame(data(symbols[0])[1])
print(df_1.head())
print(df_2.head())

OrderedDict([('PASSTHROUGH|0', ['CHARACTER_PRESENT|1']), ('CHARACTER_PRESENT|1', ['WHERE_CLAUSE|2']), ('WHERE_CLAUSE|2', ['TIME_FILTER|3'])])
OrderedDict([('PASSTHROUGH|0', ['NESTED_OTQ|1', 'VWAP|2']), ('NESTED_OTQ|1', ['VWAP|3'])])
                 Time       VWAP
0 2006-06-01 20:00:00  80.544101
                 Time       VWAP
0 2006-06-01 20:00:00  80.516017


<a><a id="bucketed_volume_N"></a>
### Bucketed volume for N second buckets or N ticks buckets

In [76]:
symbols = ['FULL_DEMO_L1::A','FULL_DEMO_L1::AA','FULL_DEMO_L1::AAA','FULL_DEMO_L1::C','FULL_DEMO_L1::CSCO']
start_time, end_time = datetime(2006, 6, 1, 9, 30), datetime(2006, 6, 1, 16)
bucket_interval_units = 'TICKS' #SECONDS, DAYS, etc.
bucket_interval = 600

trd_volume = q.sum(bucket_interval=bucket_interval,
                   bucket_interval_units=bucket_interval_units,
                   is_running_aggr=True,
                   output_field_name='VOLUME',
                   input_field_name='SIZE')

graph = q.Graph.chainlet([trd_pass, trd_volume])
data = q.run_query(graph, symbols, start_time, end_time)
for sym in symbols:
    print(sym)
    df = pd.DataFrame(data[sym])
    print(df.head(5))

FULL_DEMO_L1::A
                     Time   VOLUME
0 2006-06-01 13:31:03.619  45900.0
1 2006-06-01 13:31:04.631  46000.0
2 2006-06-01 13:31:04.666  46700.0
3 2006-06-01 13:31:04.670  47300.0
4 2006-06-01 13:31:04.673  47500.0
FULL_DEMO_L1::AA
                     Time    VOLUME
0 2006-06-01 13:31:22.200     200.0
1 2006-06-01 13:31:40.359     400.0
2 2006-06-01 13:32:40.209  142000.0
3 2006-06-01 13:32:40.723  142200.0
4 2006-06-01 13:32:40.726  142600.0
FULL_DEMO_L1::AAA
                     Time  VOLUME
0 2006-06-01 13:31:44.893   100.0
1 2006-06-01 13:35:33.108   600.0
2 2006-06-01 14:03:21.204  1500.0
3 2006-06-01 14:27:34.966  1600.0
4 2006-06-01 14:32:53.169  1700.0
FULL_DEMO_L1::C
                     Time    VOLUME
0 2006-06-01 13:30:08.733  226700.0
1 2006-06-01 13:30:09.225  226900.0
2 2006-06-01 13:30:09.247  228200.0
3 2006-06-01 13:30:09.250  229500.0
4 2006-06-01 13:30:09.264  229700.0
FULL_DEMO_L1::CSCO
                     Time  VOLUME
0 2006-06-01 13:30:00.193   407.0


<a><a id="running_total_to_each_tick"></a>
### Append running total and sliding bucketed volumes to each trade tick and calculate a ratio between the two

In [77]:
symbols = ['FULL_DEMO_L1::A','FULL_DEMO_L1::AA','FULL_DEMO_L1::AAA','FULL_DEMO_L1::C','FULL_DEMO_L1::CSCO']
start_time, end_time = datetime(2006, 6, 1, 9, 30), datetime(2006, 6, 1, 16)
bucket_interval_units = 'TICKS' #SECONDS, DAYS, etc.
bucket_interval = 600

trd_pass_ltd = q.passthrough(ticktype='TRD', fields='PRICE, SIZE, EXCHANGE')
volume = q.sum(bucket_interval=bucket_interval,
                   bucket_interval_units=bucket_interval_units,
                   is_running_aggr=True,
                   all_fields_for_sliding=True,
                   output_field_name='VOLUME',
                   input_field_name='SIZE')
total_running_volume = q.sum(is_running_aggr=True,
                              all_fields_for_sliding=True,
                              output_field_name='TOTAL_RUNNING_VOLUME',
                              input_field_name='SIZE')
add_fields = q.add_fields(fields='BUCKET_VOLUME_CHANGE double = VOLUME/VOLUME[-1], '
                                 'BUCKET_VS_RUNNING_TOTAL double = VOLUME/TOTAL_RUNNING_VOLUME')

graph = q.Graph.chainlet([trd_pass_ltd, volume, total_running_volume, add_fields])
data = q.run_query(graph, symbols, start_time, end_time)
for sym in symbols:
    print(sym)
    df = pd.DataFrame(data[sym])
    print(df.head())

FULL_DEMO_L1::A
   BUCKET_VOLUME_CHANGE  BUCKET_VS_RUNNING_TOTAL EXCHANGE  PRICE   SIZE  \
0                   NaN                      1.0        N   34.7  45900   
1              1.002179                      1.0        D   34.7    100   
2              1.015217                      1.0        D   34.7    700   
3              1.012848                      1.0        D   34.7    600   
4              1.004228                      1.0        D   34.7    200   

   TOTAL_RUNNING_VOLUME                    Time   VOLUME  
0               45900.0 2006-06-01 13:31:03.619  45900.0  
1               46000.0 2006-06-01 13:31:04.631  46000.0  
2               46700.0 2006-06-01 13:31:04.666  46700.0  
3               47300.0 2006-06-01 13:31:04.670  47300.0  
4               47500.0 2006-06-01 13:31:04.673  47500.0  
FULL_DEMO_L1::AA
   BUCKET_VOLUME_CHANGE  BUCKET_VS_RUNNING_TOTAL EXCHANGE  PRICE    SIZE  \
0                   NaN                      1.0        P  31.81     200   
1         

<a><a id="sliding_aggr_per_group"></a>
### Compute sliding aggregation per group

In [78]:
symbols = ['FULL_DEMO_L1::A','FULL_DEMO_L1::AA','FULL_DEMO_L1::AAA','FULL_DEMO_L1::C','FULL_DEMO_L1::CSCO']
start_time, end_time = datetime(2006, 6, 1, 9, 30), datetime(2006, 6, 1, 16)

trd_pass_ltd = q.passthrough(ticktype='TRD', fields='PRICE, SIZE, EXCHANGE')
running_volume_per_exch = q.sum(is_running_aggr=True,
                                all_fields_for_sliding=True,
                                output_field_name='RUNNING_VOLUME_PER_EXCHANGE',
                                group_by='EXCHANGE',
                                input_field_name='SIZE')
total_running_volume = q.sum(is_running_aggr=True,
                             all_fields_for_sliding=True,
                             output_field_name='TOTAL_RUNNING_VOLUME',
                             input_field_name='SIZE')
add_fields = q.add_fields(fields='EXCHANGE_VOLUME_VS_TOTAL double = RUNNING_VOLUME_PER_EXCHANGE/TOTAL_RUNNING_VOLUME')
last_tick = q.last_tick(group_by='EXCHANGE')

graph = q.Graph.chainlet([trd_pass_ltd, running_volume_per_exch, total_running_volume, add_fields, last_tick])

data = q.run_query(graph, symbols, start_time, end_time)
for sym in symbols:
    print(sym)
    df = pd.DataFrame(data[sym])
    print(df.head())

FULL_DEMO_L1::A
  EXCHANGE  EXCHANGE_VOLUME_VS_TOTAL  PRICE  RUNNING_VOLUME_PER_EXCHANGE  \
0        B                  0.007228  35.15                      20200.0   
1        C                  0.004515  35.02                      12200.0   
2        D                  0.055009  35.07                     161700.0   
3        M                  0.251000  34.98                     652400.0   
4        N                  0.645346  35.05                    1899900.0   

   SIZE               TICK_TIME  TOTAL_RUNNING_VOLUME                Time  
0  1000 2006-06-01 19:44:23.438             2794700.0 2006-06-01 20:00:00  
1   200 2006-06-01 19:37:02.425             2702300.0 2006-06-01 20:00:00  
2   300 2006-06-01 19:59:32.104             2939500.0 2006-06-01 20:00:00  
3   200 2006-06-01 19:23:03.831             2599200.0 2006-06-01 20:00:00  
4   100 2006-06-01 19:59:50.302             2944000.0 2006-06-01 20:00:00  
FULL_DEMO_L1::AA
  EXCHANGE  EXCHANGE_VOLUME_VS_TOTAL   PRICE  RUNNING_

<a><a id="mult_aggr_functions"></a>
### Combine multiple aggregation functions

In [79]:
symbols = ['FULL_DEMO_L1::A','FULL_DEMO_L1::AA','FULL_DEMO_L1::AAA','FULL_DEMO_L1::C','FULL_DEMO_L1::CSCO']
start_time, end_time = datetime(2006, 6, 1, 9, 30), datetime(2006, 6, 1, 16)

bucket_interval = 300
bucket_interval_units = 'SECONDS'
is_running_aggr = True
compute = q.compute(ticktype='TRD',compute='FIRST OPEN, HIGH, LOW, LAST CLOSE, VWAP, AVERAGE, STDDEV, SUM VOLUME, NUM_TICKS',
                    bucket_interval=bucket_interval,
                    bucket_interval_units=bucket_interval_units,
                    is_running_aggr=is_running_aggr,
                    append_output_field_name=False)
# PASSTHROUGH EP is unnecessary
graph = q.Graph(compute)

data = q.run_query(graph, symbols, start_time, end_time)
for sym in symbols:
    print(sym)
    df = pd.DataFrame(data[sym])
    print(df.head(), "\n")

FULL_DEMO_L1::A
   AVERAGE  CLOSE  HIGH   LOW  NUM_TICKS  OPEN  STDDEV  \
0     34.7   34.7  34.7  34.7        1.0  34.7     0.0   
1     34.7   34.7  34.7  34.7        2.0  34.7     0.0   
2     34.7   34.7  34.7  34.7        3.0  34.7     0.0   
3     34.7   34.7  34.7  34.7        4.0  34.7     0.0   
4     34.7   34.7  34.7  34.7        5.0  34.7     0.0   

                     Time   VOLUME  VWAP  
0 2006-06-01 13:31:03.619  45900.0  34.7  
1 2006-06-01 13:31:04.631  46000.0  34.7  
2 2006-06-01 13:31:04.666  46700.0  34.7  
3 2006-06-01 13:31:04.670  47300.0  34.7  
4 2006-06-01 13:31:04.673  47500.0  34.7   

FULL_DEMO_L1::AA
     AVERAGE  CLOSE   HIGH    LOW  NUM_TICKS   OPEN    STDDEV  \
0  31.810000  31.81  31.81  31.81        1.0  31.81  0.000000   
1  31.790000  31.77  31.81  31.77        2.0  31.81  0.020000   
2  31.766667  31.72  31.81  31.72        3.0  31.81  0.036818   
3  31.755000  31.72  31.81  31.72        4.0  31.81  0.037749   
4  31.748000  31.72  31.81  31.72

<a><a id="aggr_ticks_flex_buckets"></a>
### Aggregate ticks into flexible volume-based buckets

In [80]:
symbols = ['FULL_DEMO_L1::A','FULL_DEMO_L1::AA','FULL_DEMO_L1::AAA','FULL_DEMO_L1::C','FULL_DEMO_L1::CSCO']
start_time, end_time = datetime(2006, 6, 1, 9, 30), datetime(2006, 6, 1, 16)

volume_bucket_size = 1000
trd_pass = q.passthrough(ticktype='TRD', fields='PRICE,SIZE')
volume = q.sum(is_running_aggr=True, output_field_name='VOLUME')
compute = q.compute(compute='FIRST OPEN,HIGH,LOW,LAST CLOSE,VWAP,HIGH(INPUT_FIELD_NAME=SIZE) HIGH_SIZE, SUM VOLUME, NUM_TICKS',
                    bucket_interval_units='FLEXIBLE',
                    bucket_end_criteria='DIV(VOLUME,{})>DIV(VOLUME[-1],{}) AND VOLUME[-1]>0'
                    .format(volume_bucket_size, volume_bucket_size),
                    append_output_field_name=False)
graph = q.Graph.chainlet([trd_pass, volume, compute])

data = q.run_query(graph, symbols, start_time, end_time)
for sym in symbols:
    print(sym)
    df = pd.DataFrame(data[sym])
    print(df.head(), "\n")

FULL_DEMO_L1::A
   CLOSE  HIGH  HIGH_SIZE   LOW  NUM_TICKS  OPEN                    Time  \
0   34.7  34.7    45900.0  34.7        1.0  34.7 2006-06-01 13:31:04.631   
1   34.7  34.7      700.0  34.7        2.0  34.7 2006-06-01 13:31:04.670   
2   34.7  34.7      600.0  34.7        4.0  34.7 2006-06-01 13:31:04.717   
3   34.7  34.7      300.0  34.7        4.0  34.7 2006-06-01 13:31:04.730   
4   34.7  34.7      300.0  34.7        4.0  34.7 2006-06-01 13:31:04.774   

    VOLUME  VWAP  
0  45900.0  34.7  
1    800.0  34.7  
2   1000.0  34.7  
3   1100.0  34.7  
4    900.0  34.7   

FULL_DEMO_L1::AA
   CLOSE   HIGH  HIGH_SIZE    LOW  NUM_TICKS   OPEN                    Time  \
0  31.77  31.81      200.0  31.77        2.0  31.81 2006-06-01 13:32:40.209   
1  31.72  31.72   141600.0  31.72        3.0  31.72 2006-06-01 13:32:40.730   
2  31.72  31.72     1200.0  31.72        2.0  31.72 2006-06-01 13:32:41.266   
3  31.72  31.72      100.0  31.72        3.0  31.72 2006-06-01 13:32:41.307   

<a><a id="append_tot_vol_to_each_tick"></a>
### Append total daily volume to each detail tick to compute % of total

In [81]:
symbols = ['FULL_DEMO_L1::A','FULL_DEMO_L1::AA','FULL_DEMO_L1::AAA','FULL_DEMO_L1::C','FULL_DEMO_L1::CSCO']
start_time, end_time = datetime(2006, 6, 1, 9, 30), datetime(2006, 6, 1, 16)
rolling_bucket_sec = 60

#Round - robin - start from leftmost node and construct each node left-to-right.
trd_pass = q.passthrough(ticktype='TRD', fields='PRICE,SIZE')
compute = q.compute(ticktype='TRD', compute='SUM VOLUME, NUM_TICKS', bucket_time='BUCKET_START', append_output_field_name=False)
jbt = q.join_by_time(join_type='INNER', leading_sources='trd')

# Create a graph object with trd_pass
graph = q.Graph(trd_pass.label('trd'))

# Attach 'join' node. Tag previous node as 'trd' and use it as the leading source
graph.sink(jbt)

# Now remember JBT node as head (will be used later) and add the COMPUTE branch
jbt_node = graph.head
graph.source(compute.label('day'))

# Change head back to jbt_node and continue constructing the query
graph.head = jbt_node
rename_fields = q.rename_fields(rename_fields='trd.PRICE=PRICE, trd.SIZE=SIZE')
compute_2 = q.compute(compute='SUM VOLUME, NUM_TICKS', show_all_fields=True, is_running_aggr=True,append_output_field_name=False)
add_fields = q.add_fields(fields='PERCENT_VOLUME double=VOLUME/day.VOLUME,PERCENT_COUNT double = NUM_TICKS/day.NUM_TICKS')
compute_3 = q.compute(compute='SUM VOLUME_{}, NUM_TICKS NUM_TICKS_{}'.format(rolling_bucket_sec, rolling_bucket_sec),
                      show_all_fields=True,
                      bucket_interval=rolling_bucket_sec,
                      is_running_aggr=True,
                      append_output_field_name=False)
add_fields_2 = q.add_fields(fields='PERCENT_VOLUME_{} double=VOLUME_{}/day.VOLUME,'
                                   'PERCENT_COUNT_{} double=NUM_TICKS_{}/day.NUM_TICKS'
                            .format(rolling_bucket_sec, rolling_bucket_sec, rolling_bucket_sec, rolling_bucket_sec))
filter_pt = q.passthrough(fields='PERCENT_VOLUME, PERCENT_COUNT, PERCENT_VOLUME_{}, PERCENT_COUNT_{}'.
                          format(rolling_bucket_sec,rolling_bucket_sec))
# Add all EPs to the Graph object
graph >> rename_fields >> compute_2 >> add_fields >> compute_3 >> add_fields_2 >> filter_pt

data = q.run_query(graph, symbols, start_time, end_time)
for sym in symbols:
    print(sym)
    df = pd.DataFrame(data[sym])
    print(df.head(), "\n")

FULL_DEMO_L1::A
   PERCENT_COUNT  PERCENT_COUNT_60  PERCENT_VOLUME  PERCENT_VOLUME_60  \
0       0.000236          0.000236        0.015591           0.015591   
1       0.000471          0.000471        0.015625           0.015625   
2       0.000707          0.000707        0.015863           0.015863   
3       0.000943          0.000943        0.016067           0.016067   
4       0.001178          0.001178        0.016135           0.016135   

                     Time  
0 2006-06-01 13:31:03.619  
1 2006-06-01 13:31:04.631  
2 2006-06-01 13:31:04.666  
3 2006-06-01 13:31:04.670  
4 2006-06-01 13:31:04.673   

FULL_DEMO_L1::AA
   PERCENT_COUNT  PERCENT_COUNT_60  PERCENT_VOLUME  PERCENT_VOLUME_60  \
0       0.000102          0.000102        0.000029           0.000029   
1       0.000203          0.000203        0.000058           0.000058   
2       0.000305          0.000203        0.020477           0.020448   
3       0.000406          0.000203        0.020506           0.020

<a><a id="RSI"></a>
### Compute Relative Strength Index

In [82]:
symbols = ['FULL_DEMO_L1::A','FULL_DEMO_L1::AA','FULL_DEMO_L1::AAA','FULL_DEMO_L1::C','FULL_DEMO_L1::CSCO']
start_time, end_time = datetime(2006, 6, 1, 9, 30), datetime(2006, 6, 1, 16)
interval = 60
rsi_ticks = 14

compute = q.compute(ticktype='TRD', compute='FIRST,HIGH,LAST (TIME_SERIES_TYPE=STATE_TS),SUM',
                    bucket_interval=interval,
                    append_output_field_name=False)
add_fields = q.add_fields(fields='GAIN double=(LAST-LAST[-1]),LOSS double=(LAST-LAST[-1])')
update_fields = q.update_fields(set='GAIN=0,LOSS=ABS(LOSS)', else_set='LOSS=0', where='LOSS<0')
compute_2 = q.compute(compute='SUM (INPUT_FIELD_NAME=GAIN) TOTAL_GAIN,SUM (INPUT_FIELD_NAME=LOSS) TOTAL_LOSS',
                      show_all_fields=True,
                      bucket_interval=rsi_ticks,
                      bucket_interval_units='TICKS',
                      is_running_aggr=True,
                      append_output_field_name=False)
add_fields_2 = q.add_fields(fields='AVG_GAIN double=TOTAL_GAIN/{}, AVG_LOSS double=TOTAL_LOSS/{}'.format(rsi_ticks, rsi_ticks))
add_field = q.add_field(field='RSI', value='100 - ( 100/( 1 + AVG_GAIN/AVG_LOSS ) )')
num_ticks = q.num_ticks(is_running_aggr=True, output_field_name='ROWNUM', bucket_end_per_group=True)
update_field = q.update_field(field='RSI', value='NAN()', where='ROWNUM<={}'.format(rsi_ticks))
#drop_fields = q.passthrough(fields='GAIN,LOSS,ROWNUM,AVG_GAIN,AVG_LOSS,TOTAL_GAIN,TOTAL_LOSS', drop_fields=True)
drop_fields = q.passthrough(fields='\.*GAIN,\.*LOSS, ROWNUM', drop_fields=True, use_regex=True)

graph = q.Graph(compute)
graph >> add_fields >> update_fields >> compute_2 >> add_fields_2 >> add_field >> num_ticks >> update_field >> drop_fields

data = q.run_query(graph, symbols, start_time, end_time)
for sym in symbols:
    print(sym)
    df = pd.DataFrame(data[sym])
    print(df.head(), "\n")

FULL_DEMO_L1::A
   FIRST   HIGH   LAST  RSI      SUM                Time
0    NaN    NaN    NaN  NaN      0.0 2006-06-01 13:31:00
1  34.70  34.70  34.69  NaN  62000.0 2006-06-01 13:32:00
2  34.66  34.67  34.65  NaN   2100.0 2006-06-01 13:33:00
3  34.65  34.65  34.62  NaN   4600.0 2006-06-01 13:34:00
4  34.62  34.65  34.63  NaN   4400.0 2006-06-01 13:35:00 

FULL_DEMO_L1::AA
   FIRST     HIGH     LAST  RSI       SUM                Time
0    NaN      NaN      NaN  NaN       0.0 2006-06-01 13:31:00
1  31.81  31.8100  31.7700  NaN     400.0 2006-06-01 13:32:00
2  31.72  31.7200  31.7200  NaN  194200.0 2006-06-01 13:33:00
3  31.72  31.7399  31.6300  NaN   45800.0 2006-06-01 13:34:00
4  31.60  31.7000  31.6791  NaN   60200.0 2006-06-01 13:35:00 

FULL_DEMO_L1::AAA
   FIRST  HIGH  LAST  RSI    SUM                Time
0    NaN   NaN   NaN  NaN    0.0 2006-06-01 13:31:00
1   58.2  58.2  58.2  NaN  100.0 2006-06-01 13:32:00
2    NaN   NaN  58.2  NaN    0.0 2006-06-01 13:33:00
3    NaN   NaN  58.

<a><a id="jbt_trd_qte"></a>
### Get trades along with prevailing quotes

In [83]:
symbols = ['FULL_DEMO_L1::IBM']
start_time, end_time = datetime(2006, 6, 1, 9, 30), datetime(2006, 6, 1, 16)

# QTE branch: ticks are filtered; GO_BACK_TO_FIRST_TICK is enabled
trd_pass = q.passthrough('TRD', fields='PRICE')
where_clause = q.where_clause(where='ASK_PRICE>0 AND BID_PRICE>0 AND COND="R"')
add_field = q.add_field(field='TIMESTAMP_ORIG msectime', value='TIMESTAMP')
qte_pass = q.passthrough('QTE', fields='ASK_PRICE,BID_PRICE,TIMESTAMP_ORIG', go_back_to_first_tick='expr(3*86400)')
num_ticks = q.num_ticks(is_running_aggr=True, output_field_name='TICK_ID')
# Construct left/right hand side nodes
lhs_epd = [trd_pass, num_ticks]
rhs_epd = [where_clause, add_field, qte_pass, num_ticks.copy()]
graph = q.Graph.join_chainlets(lhs_epd, rhs_epd, LEADING_SOURCES='L')

data = q.run_query(graph, symbols, start_time, end_time)

for sym in symbols:
    print(sym)
    df = pd.DataFrame(data[sym])
    print(df.head())

FULL_DEMO_L1::IBM
   L.PRICE  L.TICK_ID             L.TIMESTAMP  R.ASK_PRICE  R.BID_PRICE  \
0    79.89        1.0 2006-06-01 13:30:11.366       1000.0         0.01   
1    79.89        2.0 2006-06-01 13:30:12.366         79.9        79.85   
2    79.89        3.0 2006-06-01 13:30:12.406         81.1        78.10   
3    79.89        4.0 2006-06-01 13:30:12.410         81.1        78.10   
4    79.89        5.0 2006-06-01 13:30:12.414         81.1        78.10   

   R.TICK_ID             R.TIMESTAMP        R.TIMESTAMP_ORIG  \
0        2.0 2006-06-01 13:30:02.136 2006-06-01 13:30:02.136   
1        3.0 2006-06-01 13:30:11.876 2006-06-01 13:30:11.876   
2        4.0 2006-06-01 13:30:12.386 2006-06-01 13:30:12.386   
3        4.0 2006-06-01 13:30:12.386 2006-06-01 13:30:12.386   
4        4.0 2006-06-01 13:30:12.386 2006-06-01 13:30:12.386   

                     Time  
0 2006-06-01 13:30:11.366  
1 2006-06-01 13:30:12.366  
2 2006-06-01 13:30:12.406  
3 2006-06-01 13:30:12.410  
4 2006

<a><a id="spread_avg_mid_flag"></a>
### Compute effective spread average and generate above/below mid flag for each trade

In [84]:
symbols = ['FULL_DEMO_L1::IBM']
start_time, end_time = datetime(2006, 6, 1, 9, 30), datetime(2006, 6, 1, 16)
avg_interval_sec = 60

trd_pass = q.passthrough(ticktype='TRD', fields='PRICE')
trd_where_clause = q.where_clause(where='PRICE>0')
jbt = q.join_by_time(join_type='INNER', leading_sources='t')
qte_pass = q.passthrough(ticktype='QTE', fields='ASK_PRICE, BID_PRICE', go_back_to_first_tick='expr(3*86400)')
qte_where_clause = q.where_clause(where='ASK_PRICE>0 AND BID_PRICE>0 AND COND="R"')
add_field = q.add_field(field='MID double', value='(q.ASK_PRICE+q.BID_PRICE)/2')
add_fields = q.add_fields(fields='SPREAD double = t.PRICE - MID, '
                                 'ABOVE_BELOW_MID_FLAG int = SIGN(t.PRICE - MID)')
average = q.average(bucket_interval=avg_interval_sec,
                    is_running_aggr=True,
                    all_fields_for_sliding=True,
                    output_field_name='AVERAGE_SPREAD',
                    input_field_name='SPREAD')

# Create a graph object
graph = q.Graph(trd_pass)
graph.sink(trd_where_clause.label('t'))

# Attach 'join' node. Tag previous node as 'trd' and use it as the leading source
graph.sink(jbt)

# Now remember JBT node as head (will be used later) and add the COMPUTE branch
jbt_node = graph.head
graph.source(qte_pass.label('q'))
graph.source(qte_where_clause)

# Move the graph head back to JBT node and append Event Processors
graph.head = jbt_node
graph >> add_field >> add_fields >> average

data = q.run_query(graph, symbols, start_time, end_time)

for sym in symbols:
    print(sym)
    df = pd.DataFrame(data[sym])
    print(df.head())

FULL_DEMO_L1::IBM
   ABOVE_BELOW_MID_FLAG  AVERAGE_SPREAD      MID   SPREAD  \
0                    -1     -420.115000  500.005 -420.115   
1                     1     -210.050000   79.875    0.015   
2                     1     -139.936667   79.600    0.290   
3                     1     -104.880000   79.600    0.290   
4                     1      -83.846000   79.600    0.290   

                     Time  q.ASK_PRICE  q.BID_PRICE             q.TIMESTAMP  \
0 2006-06-01 13:30:11.366       1000.0         0.01 2006-06-01 13:30:02.136   
1 2006-06-01 13:30:12.366         79.9        79.85 2006-06-01 13:30:11.876   
2 2006-06-01 13:30:12.406         81.1        78.10 2006-06-01 13:30:12.386   
3 2006-06-01 13:30:12.410         81.1        78.10 2006-06-01 13:30:12.386   
4 2006-06-01 13:30:12.414         81.1        78.10 2006-06-01 13:30:12.386   

   t.PRICE             t.TIMESTAMP  
0    79.89 2006-06-01 13:30:11.366  
1    79.89 2006-06-01 13:30:12.366  
2    79.89 2006-06-01 13:30:1

<a><a id="return_correlation"></a>
### Compute symbol pair RETURN correlation

In [88]:
sym1 = 'FULL_DEMO_L1::MSFT'
sym2 = 'FULL_DEMO_L1::CSCO'
interval = 60
compute_string='FIRST(TIME_SERIES_TYPE=STATE_TS) OPEN, LAST(TIME_SERIES_TYPE=STATE_TS) CLOSE'

# create EPs
trd_pass = q.passthrough('TRD', fields='PRICE',go_back_to_first_tick=86400).symbol(sym1)
compute = q.compute(compute=compute_string, bucket_interval=0, output_interval=interval, \
                    is_running_aggr=False, append_output_field_name=False)
add_field = q.add_field(field='RETURN double', value='(CLOSE-CLOSE[-1])/CLOSE[-1]')
symbol_pair = q.add_field(field='SYMBOL_PAIR',value='"{}-{}"'.format(sym1,sym2))

# Create two lists of the above EPs
lhs=[trd_pass, compute, add_field]
rhs=[ep.copy() for ep in lhs]
rhs[0].symbol(sym2)
graph = q.Graph.join_chainlets(lhs, rhs,\
                               source_order='L,R', match_if_identical_times=True)

# Can now just add some extra EPs via sinks to the head node, which is the join (JOIN_BY_TIME) node
graph >> q.correlation(is_running_aggr=True, input_field1_name='L.RETURN', \
                       input_field2_name='R.RETURN', output_field_name='RETURN_CORRELATION') >> symbol_pair

data = q.run_query(graph, symbol=None, s=start_time, e=end_time)
df = pd.DataFrame(data[''])
print(df.head())

   RETURN_CORRELATION                            SYMBOL_PAIR  \
0                 NaN  FULL_DEMO_L1::MSFT-FULL_DEMO_L1::CSCO   
1                 NaN  FULL_DEMO_L1::MSFT-FULL_DEMO_L1::CSCO   
2            1.000000  FULL_DEMO_L1::MSFT-FULL_DEMO_L1::CSCO   
3            0.802609  FULL_DEMO_L1::MSFT-FULL_DEMO_L1::CSCO   
4            0.375188  FULL_DEMO_L1::MSFT-FULL_DEMO_L1::CSCO   

                 Time  
0 2006-06-01 13:31:00  
1 2006-06-01 13:32:00  
2 2006-06-01 13:33:00  
3 2006-06-01 13:34:00  
4 2006-06-01 13:35:00  


<a><a id="top_N_players"></a>
### Get top N players, or how to MERGE time series to compute totals across symbols

In [86]:
symbols = ['FULL_DEMO_L1::A',
           'FULL_DEMO_L1::AA',
           'FULL_DEMO_L1::AAA',
           'FULL_DEMO_L1::C',
           'FULL_DEMO_L1::CSCO']
start_time, end_time = datetime(2006, 6, 1, 9, 30), datetime(2006, 6, 1, 16)
num_of_top_ticks = 10

# create graph with EPs
graph = q.Graph(q.passthrough(ticktype='TRD',fields='SIZE'))    \
    >> q.sum(output_field_name='VOLUME',input_field_name='SIZE')    \
    >> q.presort().symbol(symbols)   \
    >> q.merge().label('merge')    \
    >> q.high_tick(num_ticks=num_of_top_ticks,input_field_name='VOLUME').label('H')  \
    >> q.join_by_time(leading_sources='H',match_if_identical_times=True,same_timestamp_join_policy='EACH_FOR_LEADER_WITH_LATEST').label('join') \
    << q.sum(output_field_name='VOLUME',input_field_name='VOLUME').label('T')
jbt = graph['join']
graph << graph['merge']
jbt >> q.add_field(field='PERC_OF_TOTAL',value='H.VOLUME/T.VOLUME')

data = q.run_query(graph,symbol=None,s=start_time,e=end_time)
df = pd.DataFrame(data[''])
print(df.head())

        H.SYMBOL_NAME         H.TICK_TIME H.TICK_TYPE    H.VOLUME  \
0  FULL_DEMO_L1::CSCO 2006-06-01 20:00:00         TRD  51306585.0   
1     FULL_DEMO_L1::C 2006-06-01 20:00:00         TRD  12754200.0   
2    FULL_DEMO_L1::AA 2006-06-01 20:00:00         TRD   6934500.0   
3     FULL_DEMO_L1::A 2006-06-01 20:00:00         TRD   2944000.0   
4   FULL_DEMO_L1::AAA 2006-06-01 20:00:00         TRD      6400.0   

   PERC_OF_TOTAL    T.VOLUME                Time  
0       0.693842  73945685.0 2006-06-01 20:00:00  
1       0.172481  73945685.0 2006-06-01 20:00:00  
2       0.093778  73945685.0 2006-06-01 20:00:00  
3       0.039813  73945685.0 2006-06-01 20:00:00  
4       0.000087  73945685.0 2006-06-01 20:00:00  


<a><a id="vwap_volume_for_each_trd"></a>
### Compute ASK/BID VWAP and Volume over N seconds before and after each trade

In [87]:
symbols = ['FULL_DEMO_L1::IBM']
start_time, end_time = datetime(2006, 6, 1, 9, 30), datetime(2006, 6, 1, 16)

# calculate QTE stats and save query to a file
get_qte_stats = q.Graph(q.passthrough(ticktype='QTE'))
get_qte_stats >> q.compute(compute='VWAP(PRICE_FIELD_NAME=ASK_PRICE,SIZE_FIELD_NAME=ASK_SIZE) ASK_VWAP,'
                                   'VWAP(PRICE_FIELD_NAME=BID_PRICE,SIZE_FIELD_NAME=BID_SIZE) BID_VWAP,'
                                   'SUM(INPUT_FIELD_NAME=ASK_SIZE) ASK_VOLUME,'
                                   'SUM(INPUT_FIELD_NAME=ASK_SIZE) BID_VOLUME',
                          append_output_field_name=False)
get_qte_stats = get_qte_stats.save(symbols, start_time, end_time, otq_file='C:/temp/temp.otq', query_name='get_qte_stats')

# main query: use JOIN_WITH_QUERY EP
time_window_sec = 300
graph = q.Graph(q.passthrough(ticktype='TRD', fields='PRICE,SIZE,EXCHANGE'))
graph >> q.join_with_query(otq_query='"{}"'.format(get_qte_stats), symbol_name='_SYMBOL_NAME',
                           start_timestamp='TIMESTAMP-{}*1000'.format(time_window_sec),
                           end_timestamp='TIMESTAMP',
                           prefix_for_output_ticks='BEFORE_')\
      >> q.join_with_query(otq_query='"{}"'.format(get_qte_stats), symbol_name='_SYMBOL_NAME',
                           start_timestamp='TIMESTAMP',
                           end_timestamp='TIMESTAMP+{}*1000'.format(time_window_sec),
                           prefix_for_output_ticks='AFTER_')
    
data = q.run_query(graph, symbols, start_time, end_time)
for sym in symbols:
    print(sym)
    df = pd.DataFrame(data[sym])
    print(df.head())

FULL_DEMO_L1::IBM
   AFTER_ASK_VOLUME  AFTER_ASK_VWAP  AFTER_BID_VOLUME  AFTER_BID_VWAP  \
0            6486.0       82.723298            6486.0       79.369503   
1            6495.0       82.720092            6495.0       79.370549   
2            6495.0       82.720075            6495.0       79.370916   
3            6495.0       82.720075            6495.0       79.370916   
4            6495.0       82.720075            6495.0       79.370916   

          AFTER_TIMESTAMP  BEFORE_ASK_VOLUME  BEFORE_ASK_VWAP  \
0 2006-06-01 13:35:11.366              669.0        82.059193   
1 2006-06-01 13:35:12.366              671.0        82.052757   
2 2006-06-01 13:35:12.406              672.0        82.051339   
3 2006-06-01 13:35:12.410              672.0        82.051339   
4 2006-06-01 13:35:12.414              672.0        82.051339   

   BEFORE_BID_VOLUME  BEFORE_BID_VWAP        BEFORE_TIMESTAMP EXCHANGE  PRICE  \
0              669.0        78.196237 2006-06-01 13:30:11.366        N 