---
title: "Develop DAG for things;"
draft: true
---

In [1]:
from fztools import StageManager

stage1 = StageManager("stage1")
stage2 = StageManager("stage2")
@stage1.register("A")
def plus_one(a):
    return a + 1

@stage1.register("B")
def power_two(b):
    return b * b

@stage2.register("C", ["A", "B"])
def sum_all(a, b):
    return a + b


chain = stage1 >> stage2
chain.input = {"A": 1, "B": 2}
chain.invoke()
chain.output

chain.edge_table


Unnamed: 0,source_id,source_ele,target_id,target_ele
0,0,<function plus_one at 0x108955ee0>,0,A
1,0,<function power_two at 0x108956520>,0,B
2,1,<function sum_all at 0x108956f20>,1,C
0,-1,A,0,<function plus_one at 0x108955ee0>
1,-1,B,0,<function power_two at 0x108956520>
2,0,A,1,<function sum_all at 0x108956f20>
2,0,B,1,<function sum_all at 0x108956f20>


In [2]:
chain.as_table()

Unnamed: 0,stage_id,stage_name,output,prev_stage_id,inputs,func
0,0,stage,A,-1,[A],<function plus_one at 0x108955ee0>
1,0,stage,B,-1,[B],<function power_two at 0x108956520>
2,1,stage,C,0,"[A, B]",<function sum_all at 0x108956f20>


In [3]:
stgs = chain.stages
for stg in stgs:
    print(stg.funcs)
    print(stg.funcs_args)

{'A': <function plus_one at 0x108955ee0>, 'B': <function power_two at 0x108956520>}
{'A': ['A'], 'B': ['B']}
{'C': <function sum_all at 0x108956f20>}
{'C': ['A', 'B']}


There are a few problem with this method; both because variable without a function registered will pass on as it is...
But from `as_table` we can attempt to parase assign type;

In [4]:
import asyncio
def make_async_stage(stage):
    funcs = stage.funcs
    funcs_args = stage.funcs_args

    async_stage_funcs = {}
    for key, func in stage.funcs.items():
        async def async_func(*args,**kwargs):
            return func(*args,**kwargs)
        async_stage_funcs[key] = async_func
    return async_stage_funcs

input_dict = {"A": 1, "B": 2}


result = {}
async_stage_funcs = make_async_stage(stage1)
async with asyncio.TaskGroup() as tg:
    for key, async_func in async_stage_funcs.items():
        print(type(async_func))
        arg = input_dict[key]
        task = tg.create_task(async_func(arg))
        d = await task
        result[key] = d

result


<class 'function'>
<class 'function'>


{'A': 1, 'B': 4}

Now the problem became, how to wrap an unevaulated expectation? (Future)

In [5]:
from nest_asyncio import apply
apply()
async def set_after(fut, delay, value):
    # Sleep for *delay* seconds.
    await asyncio.sleep(delay)

    # Set *value* as a result of *fut* Future.
    fut.set_result(value)

async def main():
    # Get the current event loop.
    loop = asyncio.get_running_loop()

    # Create a new Future object.
    fut = loop.create_future()

    # Run "set_after()" coroutine in a parallel Task.
    # We are using the low-level "loop.create_task()" API here because
    # we already have a reference to the event loop at hand.
    # Otherwise we could have just used "asyncio.create_task()".
    loop.create_task(
        set_after(fut, 1, '... world') )

    print('hello ...')

    # Wait until *fut* has a result (1 second) and print it.
    print(await fut)
    

asyncio.run(main())

hello ...
... world


Finally I have found a menimum example of establish coroutine based on something is done or not;
This technical is essential dfs search all at the time;



In [6]:

from random import randrange
# Helper function for the creation of simple sample coroutine
def make_sample_coro(n):

    async def coro():
        print(f"Start of task {n} ...")
        await asyncio.sleep(randrange(1, 5))
        print(f"... End of task {n}")

    return coro


async def main():
    # Simple graph in standard representation (node => neighbours)
    graph = {1: {2, 5}, 2: {3}, 3: {4}, 4: set(), 5: {4}}
    tasks = {n: make_sample_coro(n) for n in graph}
    tasks_done = set()
    
    async def execute_task(ID):
        print(f"Trying to execute task {ID} ...")
        predecessors = {n for n, ns in graph.items() if ID in ns}
        
        while not predecessors <= tasks_done:  # Check if task can be started
            await asyncio.sleep(0.1) # halt task before they are done;
        await tasks[ID]()
        tasks_done.add(ID)
    
    await asyncio.gather(*[execute_task(n) for n in graph])
    print("... Finished")
await main()



Trying to execute task 1 ...
Start of task 1 ...
Trying to execute task 2 ...
Trying to execute task 3 ...
Trying to execute task 4 ...
Trying to execute task 5 ...
... End of task 1
Start of task 2 ...
Start of task 5 ...
... End of task 2
... End of task 5
Start of task 3 ...
... End of task 3
Start of task 4 ...
... End of task 4
... Finished


Note that this is not a chain; because each node are run at the same time, so actually you only need to search the dependency up one level!

It does not explicitly identify the graphical root; The root 1 would not have a dependency;



In [7]:
# to esplain the set operator;
assert {1} <= {1,2,3}
assert {1,2,3} <= {1,2,3}
assert ({1,2,3,4} <= {1,2,3}) == False

In [8]:
from fztools import StageManager

stage1 = StageManager(name="stage1")
stage2 = StageManager(name="stage2")

input_dict = {
    "A": 1,
    "B": 2,
}

@stage1.register("A")
def plus_one(a):
    return a + 1

@stage1.register("B")
def power_two(b):
    return b ** 2

@stage2.register("C", ["A", "B"])
def sum_all(a, b):
    return a + b
chain = stage1 >> stage2

In [9]:
complete_ns = set()
for i, stage in enumerate(chain.stages):
    for func in stage.funcs:
        complete_ns.add((i, func))

complete_ns



{(0, 'A'), (0, 'B'), (1, 'C')}

In [10]:
chain.to_mermaid()