# Workflows

In [None]:
from cosim.flow import Flow
#import networkx as nx

### create a new workflow

In [None]:
# a diction of tasks and their input-outputs
info = {
    "tA": {"inputs": ("x1",),          "outputs": ("y1", "y2",), }, 
    "tB": {"inputs": ("y1",),          "outputs": ("z1",),       },
    "tC": {"inputs": ("y2",),          "outputs": ("z2",),       },
    "tD": {"inputs": ("z2", "z1",),    "outputs": ("out",),      }, 
}

In [None]:
flow = Flow(**info)
flow.render() # 'multipartite_layout'

In [None]:
flow.render('multipartite_layout')

In [None]:
flow.LAYERS

In [None]:
flow.ENTRY, flow.EXIT, flow.NODES

In [None]:
flow.INFO

In [None]:
for n in flow.nodes: print(n, flow.nodes[n])
for n in flow.nodes: print(f' {list(flow.predecessors(n))} --> {n} --> {list(flow.successors(n))}')
for e in flow.edges: print(e, flow.edges[e])

# Infra

In [None]:
infra = dict()

infra['I'] = dict(
    url = "http://127.0.0.1:9800",
    xy = (0.0, 0.0),
)

infra['E'] = dict(
    url = "http://127.0.0.1:9801",
    xy = (1.0, 1.0),
)

infra['C'] = dict(
    url = "http://127.0.0.1:9802",
    xy = (2.0, 4.0),
)

actions = list(infra.keys())
for i,a in enumerate(actions):
    print(
        f"""
python -m cosim.work \\
--base=/home/ava/Server/Code/GitHub/cosim/__infra__/{a} \\
--script=/home/ava/.pyenv/versions/3.12.11/bin/python \\
--mods=/home/ava/Server/Code/GitHub/cosim/__infra__/mods \\
--host=127.0.0.1 --port=980{i} 
"""
    )

# Offload `flow` on `infra` using `decision`

In [None]:
decision = dict(
    tA = actions[0], tB = actions[1], tC = actions[2], tD = actions[2]
)

decision

In [None]:

def now(year:bool=True, month:bool=True, day:bool=True, 
        hour:bool=True, minute:bool=True, second:bool=True, mirco:bool=True, 
        start:str='', sep:str='', end:str='') -> str:
    form = []
    if year:    form.append("%Y")
    if month:   form.append("%m")
    if day:     form.append("%d")
    if hour:    form.append("%H")
    if minute:  form.append("%M")
    if second:  form.append("%S")
    if mirco:   form.append("%f")
    assert (form), 'format should not be empty!'
    import datetime
    return (start + datetime.datetime.strftime(datetime.datetime.now(), sep.join(form)) + end)

In [None]:
fid = now(start='UE28893_')
for n in flow.NODES: 
    flow.INFO[n]['name'] = n
    flow.INFO[n]['offl'] = decision[n]
    flow.INFO[n]['uid'] = f'{fid}_{n}'
    flow.INFO[n]['fid'] = f'{fid}'
    #for o in flow.INFO[n]['outputs']:
    #flow.INFO[n]['outsend'] = {flow.edges[n]['data']:n[-1] for n in flow.out_edges(n)}
    flow.INFO[n]['outsend'] = {flow.edges[n]['data']:(n[-1], decision[n[-1]], infra[decision[n[-1]]]['url']) for n in flow.out_edges(n)}

for o in flow.INFO[flow.EXIT]['outputs']:
    flow.INFO[flow.EXIT]['outsend'][o] = ('', decision[flow.ENTRY],  infra[decision[flow.ENTRY]]['url'])

flow.INFO

# offload now

In [None]:
import requests

In [None]:

# offload tA first
response = requests.post(
    url=f"{infra[decision['tA']]['url']}/add",
    json=flow.INFO['tA']
    )
response.__dict__

In [None]:

# offload tB
response = requests.post(
    url=f"{infra[decision['tB']]['url']}/add",
    json=flow.INFO['tB']
    )
response.__dict__

In [None]:

# offload tc
response = requests.post(
    url=f"{infra[decision['tC']]['url']}/add",
    json=flow.INFO['tC']
    )
response.__dict__

In [None]:

# offload tD
response = requests.post(
    url=f"{infra[decision['tD']]['url']}/add",
    json=flow.INFO['tD']
    )
response.__dict__

In [None]:
# make_inputs_at = '/home/ava/Server/Code/GitHub/cosim/__infra__/I/data/x1'
# import pickle
# with open(make_inputs_at, 'wb') as f: pickle.dump(12, f)

import pickle
with open('__input__', 'wb') as f: pickle.dump(12, f)

In [None]:
inital_node = infra[decision[flow.ENTRY]]['url']
initial_input_name = f"{flow.INFO[flow.ENTRY]['uid']}_{flow.INFO[flow.ENTRY]['inputs'][0]}"
initial_input_url = f"{inital_node}/data/{initial_input_name}"
inital_node,initial_input_name, initial_input_url

In [None]:
with open('__input__', 'rb') as f: response = requests.post(url=initial_input_url, files={initial_input_name:f})
response.__dict__

In [None]:
response = requests.post(
    url=f"{inital_node}/note",
    json={
        'uid': flow.INFO[flow.ENTRY]['uid'],
        'outputs': {
            'x1': initial_input_name,
        },
    }
)
response.__dict__

# ... wait for rsponse

In [None]:
raise ValueError

In [None]:
inital_node = infra[decision[flow.ENTRY]]['url']
final_input_name = f"{flow.INFO[flow.EXIT]['uid']}_{flow.INFO[flow.EXIT]['outputs'][0]}"
final_input_url = f"{inital_node}/data/{final_input_name}"
inital_node,final_input_name, final_input_url

In [None]:
response = requests.get(url=final_input_url,)
response

In [None]:

import os, pickle
with open('__output__', 'wb') as p: p.write(response._content)
with open('__output__', 'rb') as p: out = pickle.load(p)
print(type(out), out)

# end