# ReverseNumberFlow example

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os, json

from colink import CoLink

from aiflows.utils import serve_utils
from aiflows.utils.general_helpers import read_yaml_file
from aiflows.messages import FlowMessage
from aiflows.utils import coflows_utils, colink_utils
from aiflows.workers import run_dispatch_worker_thread

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
FLOW_MODULES_PATH = "./"

# Connect to CoLink server

To connect to a real server, run below cell instead of the one after.

In [4]:
# USER_JWT_PATH = "/home/staverm/workspace/coflows-dev/coflows/jwts.txt"
# user_jwt = open(USER_JWT_PATH).readline().strip()
# cl = CoLink("http://127.0.0.1:2021", user_jwt)

In [5]:
cl = colink_utils.start_colink_server()

started server on 0.0.0.0:11170
started server on 0.0.0.0:17415


# Load flow config

Every flow instance gets initialized with a flow config specified in a yaml file.

The ``_target_`` key specifies the method name used to instantiate the flow. 

## Load config for ReverseNumberAtomicFlow

In [6]:
reverse_number_atomic_default_config = read_yaml_file(
    os.path.join(FLOW_MODULES_PATH, "ReverseNumberFlowModule/ReverseNumberAtomicFlow.yaml"))
print(json.dumps(reverse_number_atomic_default_config, indent=4))

{
    "name": "ReverseNumber",
    "description": "A flow that takes in a number and reverses it.",
    "_target_": "ReverseNumberFlowModule.ReverseNumberAtomicFlow.instantiate_from_default_config",
    "input_interface": {
        "_target_": "aiflows.interfaces.KeyInterface",
        "keys_to_select": [
            "number"
        ]
    },
    "output_interface": {
        "_target_": "aiflows.interfaces.KeyInterface",
        "keys_to_rename": {
            "output_number": "reversed_number"
        }
    }
}


## Load config for ReverseNumberSequentialFlow

In [7]:
reverse_number_sequential_default_config = read_yaml_file(
    os.path.join(FLOW_MODULES_PATH, "ReverseNumberFlowModule/ReverseNumberSequentialFlow.yaml"))
print(json.dumps(reverse_number_sequential_default_config, indent=4))

{
    "name": "ReverseNumberTwice",
    "description": "A sequential flow that reverses a number twice.",
    "_target_": "ReverseNumberFlowModule.ReverseNumberSequentialFlow.instantiate_from_default_config",
    "flow_type": "ReverseNumberSequentialFlow_served",
    "input_interface": [
        "number"
    ],
    "output_interface": [
        "output_number"
    ],
    "subflows_config": {
        "first_reverse_flow": {
            "_target_": "aiflows.base_flows.AtomicFlow.instantiate_from_default_config",
            "user_id": "local",
            "flow_type": "ReverseNumberAtomicFlow_served",
            "name": "ReverseNumberFirst",
            "description": "A flow that takes in a number and reverses it."
        },
        "second_reverse_flow": {
            "_target_": "aiflows.base_flows.AtomicFlow.instantiate_from_default_config",
            "user_id": "local",
            "flow_type": "ReverseNumberAtomicFlow_served",
            "name": "ReverseNumberSecond",
        

# Serve Flows

In [8]:
serve_utils.serve_flow(
    cl=cl,
    flow_type="ReverseNumberAtomicFlow_served",
    default_config=reverse_number_atomic_default_config,
    default_state=None,
    default_dispatch_point="coflows_dispatch",
    serve_mode="statefull",
)
serve_utils.serve_flow(
    cl=cl,
    flow_type="ReverseNumberSequentialFlow_served",
    default_config=reverse_number_sequential_default_config,
    default_state=None,
    default_dispatch_point="coflows_dispatch",
    serve_mode="statefull",
)

Started serving at flows:ReverseNumberAtomicFlow_served.
Started serving at flows:ReverseNumberSequentialFlow_served.


True

# Start dispatch workers

In [9]:
run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)

Dispatch worker started in attached thread.


In [14]:
run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)

Dispatch worker started in attached thread.


# Get flow instance

In [10]:
flow = serve_utils.recursive_mount(
    cl=cl,
    client_id="local",
    flow_type="ReverseNumberSequentialFlow_served",
    config_overrides=None,
    initial_state=None,
    dispatch_point_override=None,
)

Mounted adca3e16-c97b-4f5b-a15c-0cdf4720bdaf at flows:ReverseNumberAtomicFlow_served:mounts:local:adca3e16-c97b-4f5b-a15c-0cdf4720bdaf
Mounted b0660839-9341-4cfa-b49c-10e641b5ba2f at flows:ReverseNumberAtomicFlow_served:mounts:local:b0660839-9341-4cfa-b49c-10e641b5ba2f
Mounted ea6b452b-420d-4531-a6e1-7531b1acbed4 at flows:ReverseNumberSequentialFlow_served:mounts:local:ea6b452b-420d-4531-a6e1-7531b1acbed4


In [12]:
print("Pushing to...\n", flow)
  
input_data = {"id": 0, "number": 81}
    
input_message = FlowMessage(
    data=input_data,
    src_flow="Coflows team",
    dst_flow=flow,
)

flow.ask(input_message).get_data()

Pushing to...
 Name: Proxy_ReverseNumberSequentialFlow_served
Class name: AtomicFlow
Type: AtomicFlow
Description: Proxy of ReverseNumberSequentialFlow_served
Input keys: no input keys
Output keys: no output keys




~~~ Dispatch task ~~~
flow_type: ReverseNumberSequentialFlow_served
flow_id: ea6b452b-420d-4531-a6e1-7531b1acbed4
message_ids: ['5faf31f3-5a70-4919-8c52-244711990d59']

Called ReverseNumberSequential 
 state {'current_call': 'first_reverse_flow'}

~~~ Dispatch task ~~~
flow_type: ReverseNumberAtomicFlow_served
flow_id: adca3e16-c97b-4f5b-a15c-0cdf4720bdaf
message_ids: ['1d40d24d-e145-4585-8ebd-96b2c2087e3c']

Called ReverseNumberAtomic 
 state {}

~~~ Dispatch task ~~~
flow_type: ReverseNumberSequentialFlow_served
flow_id: ea6b452b-420d-4531-a6e1-7531b1acbed4
message_ids: ['913267c1-e940-4d5a-ac3d-fa688da51841']

Called ReverseNumberSequential 
 state {'current_call': 'second_reverse_flow', 'initial_message': FlowMessage(message_id='25e8782d-d59f-4bee-bf26-36ae3dfee599', created_at='2024-03-08 12:50:10.516072000', created_by='Proxy_ReverseNumberSequentialFlow_served', message_type='FlowMessage', data={'id': 0, 'number': 81}, private_keys=[])}

~~~ Dispatch task ~~~
flow_type: ReverseN

{'output_number': 81}

# Unserve flow, delete instances

In [11]:
def status():
    colink_utils.print_served_flows(cl)
    print("\nLive flow instances:")
    colink_utils.print_flow_instances(cl)
status()

 ReverseNumberSequentialFlow_served/
   default_config
   mounts/
     local/
       ea6b452b-420d-4531-a6e1-7531b1acbed4/
         init


         config_overrides
   serve_mode
   default_dispatch_point
   init
 ReverseNumberAtomicFlow_served/
   init
   default_config
   default_dispatch_point
   mounts/
     local/
       b0660839-9341-4cfa-b49c-10e641b5ba2f/
         config_overrides
         init
       adca3e16-c97b-4f5b-a15c-0cdf4720bdaf/
         init
         config_overrides
   serve_mode

Live flow instances:
 adca3e16-c97b-4f5b-a15c-0cdf4720bdaf
 b0660839-9341-4cfa-b49c-10e641b5ba2f
 ea6b452b-420d-4531-a6e1-7531b1acbed4


In [14]:
serve_utils.unserve_flow(cl=cl, flow_type="ReverseNumberAtomicFlow_served")

ReverseNumberAtomicFlow_served has been unserved.


In [15]:
serve_utils.is_flow_served(cl, "ReverseNumberAtomicFlow_served")

False

In [16]:
status()

 ReverseNumberAtomicFlow_served/
   init
   serve_mode
   default_dispatch_point
   default_config
   mounts/
     local/
       dbb911c8-e3f2-4441-9f43-5b899905b150/
         state
         config_overrides
         init
       f9af6ba5-b5b8-4b78-a93c-f148b665f2bd/
         init
         config_overrides
         state
 ReverseNumberSequentialFlow_served/
   serve_mode
   default_config
   init
   default_dispatch_point
   mounts/
     local/
       882b3caf-e425-4218-98ad-239542f83acf/
         config_overrides
         state
         init

Live flow instances:
 dbb911c8-e3f2-4441-9f43-5b899905b150
 f9af6ba5-b5b8-4b78-a93c-f148b665f2bd
 882b3caf-e425-4218-98ad-239542f83acf


In [17]:
serve_utils.delete_served_flow(cl, "ReverseNumberAtomicFlow_served")

Deleted flow instance dbb911c8-e3f2-4441-9f43-5b899905b150
Deleted flow instance f9af6ba5-b5b8-4b78-a93c-f148b665f2bd
Stopped serving at flows:ReverseNumberAtomicFlow_served


In [18]:
status()

 ReverseNumberAtomicFlow_served/
   mounts/
     local/
       dbb911c8-e3f2-4441-9f43-5b899905b150/
       f9af6ba5-b5b8-4b78-a93c-f148b665f2bd/
 ReverseNumberSequentialFlow_served/
   serve_mode
   default_config
   init
   default_dispatch_point
   mounts/
     local/
       882b3caf-e425-4218-98ad-239542f83acf/
         config_overrides
         state
         init

Live flow instances:
 882b3caf-e425-4218-98ad-239542f83acf


In [21]:
serve_utils.delete_flow_instance(cl, flow.get_instance_id())

Deleted flow instance 882b3caf-e425-4218-98ad-239542f83acf.


In [22]:
status()

 ReverseNumberAtomicFlow_served/
   mounts/
     local/
       dbb911c8-e3f2-4441-9f43-5b899905b150/
       f9af6ba5-b5b8-4b78-a93c-f148b665f2bd/
 ReverseNumberSequentialFlow_served/
   serve_mode
   default_config
   init
   default_dispatch_point
   mounts/
     local/
       882b3caf-e425-4218-98ad-239542f83acf/

Live flow instances:
