# Setting up a DAG for your calculation process

There are multiple frameworks for setting up a calculation tree as a direactional acyclic graph (DAG). 

* mdf (https://github.com/man-group/mdf) 
* tributary (https://github.com/timkpaine/tributary)
* streamz (https://streamz.readthedocs.io/en/latest/)

In the financial world, Goldman's SecDB was the father of all graph based calculations, followed by JP Morgan's Athena. More recently, BoA's Quartz are all well known. There is a commercial package available by Beacon, set up by Kirat Singh and I wholeheartedly recommend watching (https://www.youtube.com/watch?v=lTOP_shhVBQ) to understand the inspiration for pyg. 

We think the approach we took within pyg is very intuitive and most importantly, very easy to debug and track. The inspiration actually comes from Excel and thinking of each cell in Excell as a "micro-service". This means that the cell is a self-managed unit and is stored at the Mongo database as such. Each cell contains:

* its own address and location in the database
* its inputs and references to its inputs
* its function
* its outputs
* its calculation schedule

The framework supports multiple forms of DAG:

* in-memory cells & graph
* in-memory async graph
* mongodb-persistent graph
* mongodb-persistent async graph


In [1]:
from pyg import *
from pyg import add_ , mul_ # we will be using add_(a,b) and mul_(a,b) as a simple example for setting up network

## In memory network

The base cell is in-memory and is calculation policy is not to recalculate once it has an outpu.

In [2]:
a = cell(add_, a = 1, b = 2)
b = cell(add_, a = a, b = a)
c = cell(add_, a = a, b = b)

a

cell
{'a': 1, 'b': 2, 'function': <function add_ at 0x0000011CA70AB1F0>}

In [3]:
a = a()
assert a.data == 3
a

2021-11-22 11:20:40,236 - pyg - INFO - None


cell
a:
    1
b:
    2
function:
    <function add_ at 0x0000011CA70AB1F0>
join:
    ij
method:
    None
columns:
    ij
data:
    3
updated:
    2021-11-22 11:20:40.251842

In [4]:
b = b()
assert b().data == 6

2021-11-22 11:20:40,272 - pyg - INFO - None
2021-11-22 11:20:40,278 - pyg - INFO - None
2021-11-22 11:20:40,283 - pyg - INFO - None


In [5]:
b.run()

False

Once b has data, it will not want to run again (b.run() being False). However... let us remove the output:

In [6]:
b = b - 'data'
b.run()

True

The child c can force b to recalculate... and more importantly, we can force all of c's parents to calculate too.

In [7]:
c = c.go() # force me to calculate
c = c.go(1) # force me and 1 generation up to calculate
c = c.go(-1) # force me and ALL my parents to calculated

2021-11-22 11:20:40,333 - pyg - INFO - None
2021-11-22 11:20:40,335 - pyg - INFO - None
2021-11-22 11:20:40,338 - pyg - INFO - None
2021-11-22 11:20:40,340 - pyg - INFO - None
2021-11-22 11:20:40,343 - pyg - INFO - None
2021-11-22 11:20:40,349 - pyg - INFO - None
2021-11-22 11:20:40,355 - pyg - INFO - None
2021-11-22 11:20:40,360 - pyg - INFO - None
2021-11-22 11:20:40,363 - pyg - INFO - None
2021-11-22 11:20:40,365 - pyg - INFO - None
2021-11-22 11:20:40,368 - pyg - INFO - None


## In-memory with address

If we want to start building a graph, we need to let what primary keys (pk parameters) will be used for storage, and we need to provide these keys:

In [8]:
a = cell(add_, a = 1, b = 2, pk = 'key', key = 'a')
b = cell(add_, a = a, b = a, pk = 'key', key = 'b')
c = cell(add_, a = a, b = b, pk = 'key', key = 'c')
a

cell
a:
    1
b:
    2
pk:
    key
key:
    a
function:
    <function add_ at 0x0000011CA70AB1F0>

In [9]:
c = c()

2021-11-22 11:20:40,409 - pyg - INFO - get_cell(key = 'c')()
2021-11-22 11:20:40,413 - pyg - INFO - get_cell(key = 'a')()
2021-11-22 11:20:40,416 - pyg - INFO - get_cell(key = 'b')()


Both the cells and their data then becomes available for access:

In [10]:
get_data(key = 'c')

9

Since we built up a graph, a parent can force a calculation downstream too..

In [11]:
a.a = 5
a = a.push()
get_data(key = 'c')

2021-11-22 11:20:40,454 - pyg - INFO - get_cell(key = 'a')()
2021-11-22 11:20:40,457 - pyg - INFO - get_cell(key = 'b')()
2021-11-22 11:20:40,463 - pyg - INFO - get_cell(key = 'c')()


21

## In-memory, async

We replace cell with async cell (acell) but otherwise, most stuff remains the same...

In [12]:
a = acell(add_, a = 3, b = 2, pk = 'key', key = 'a')
b = acell(add_, a = a, b = a, pk = 'key', key = 'b')
c = acell(add_, a = a, b = b, pk = 'key', key = 'c')

c = await c.go(-1)

2021-11-22 11:20:40,475 - pyg - INFO - get_cell(key = 'c')()
2021-11-22 11:20:40,481 - pyg - INFO - get_cell(key = 'a')()
2021-11-22 11:20:40,483 - pyg - INFO - get_cell(key = 'b')()
2021-11-22 11:20:40,489 - pyg - INFO - get_cell(key = 'a')()
2021-11-22 11:20:40,490 - pyg - INFO - get_cell(key = 'a')()


And we can push data down...

In [13]:
a.a = 4
a = await a.push()
get_data(key = 'c')

2021-11-22 11:20:40,508 - pyg - INFO - get_cell(key = 'a')()
2021-11-22 11:20:40,515 - pyg - INFO - get_cell(key = 'b')()
2021-11-22 11:20:40,521 - pyg - INFO - get_cell(key = 'c')()


18

## Adding database persistence  

In [14]:
db = partial(mongo_table, db = 'db', table = 'table', pk = 'key')
db().reset.drop()

2021-11-22 11:20:40,766 - pyg - INFO - INFO: deleting 113 documents based on M{}


<class 'pyg.mongo._cursor.mongo_cursor'> for Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'db'), 'table') 
M{} None
documents count: 0

In [15]:
a = db_cell(add_, a = 1, b = 2, pk = 'key', key = 'a', db = db)
b = db_cell(add_, a = a, b = a, pk = 'key', key = 'b', db = db)
c = db_cell(add_, a = a, b = b, pk = 'key', key = 'c', db = db)
c = c()

2021-11-22 11:20:40,930 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'c')()
2021-11-22 11:20:40,995 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'a')()
2021-11-22 11:20:41,053 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'b')()


In [16]:
get_data('table', 'db', key = 'a') # grabbing data from MongoDB

3

In [17]:
loaded_a = get_cell('table', 'db', key = 'a') # we save the entire object, not just the values, in the database

In [18]:
loaded_a

db_cell
db:
    functools.partial(<function mongo_table at 0x0000011CA75D3DC0>, db='db', table='table', pk='key')
_id:
    619b7d09823a957ab56a9522
a:
    1
b:
    2
columns:
    ij
data:
    3
join:
    ij
key:
    a
method:
    None
pk:
    ['key']
updated:
    2021-11-22 11:20:40.997000
function:
    <function add_ at 0x0000011CA70AB1F0>

In [19]:
a.a = 6
a = a.push()

2021-11-22 11:20:41,575 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'a')()
2021-11-22 11:20:41,625 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'b')()
2021-11-22 11:20:41,883 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'c')()


In [20]:
get_data('table', 'db', key = 'a')

8

## database persistence, asynchronously

The Motor library provides an asynchronous API to MongoDB. We use this to replicate the same simple functionality.  

In [21]:
a = db_acell(add_, a = 5, b = 2, pk = 'key', key = 'a', db = db)
b = db_acell(add_, a = a, b = a, pk = 'key', key = 'b', db = db)
c = db_acell(add_, a = a, b = b, pk = 'key', key = 'c', db = db)
await c.go(-1)
assert get_data('table', 'db', key = 'c') == 21

2021-11-22 11:20:42,308 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'c')()
2021-11-22 11:20:42,338 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'a')()
2021-11-22 11:20:42,344 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'b')()
2021-11-22 11:20:42,376 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'a')()
2021-11-22 11:20:42,388 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'a')()


In [22]:
a = db_acell(add_, a = 5, b = 3, pk = 'key', key = 'a', db = db)
a = await a.push()
assert get_data('table', 'db', key = 'c') == 24

2021-11-22 11:20:42,688 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'a')()
2021-11-22 11:20:42,742 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'b')()
2021-11-22 11:20:42,903 - pyg - INFO - get_cell(url = 'localhost:27017', db = 'db', table = 'table', key = 'c')()
