# Our big picture

![The Big Picture](figure/big-picture.png)

# The Workflow implementation so far

![title](figure/workflow-demo.jpeg)

In [3]:
import time, json, sys, os
from globus_automate_client import create_flows_client
from funcx.sdk.client import FuncXClient
from DNNTFlow import t3flow_def

In [4]:
fxc = FuncXClient()

## Step 0, a sanitary check of the funcX service on the AI system
We run a simple funcX function to check the funcX service on AI system

In [5]:
def hello_world():
    import platform
    return platform.uname()

func_uuid = fxc.register_function(hello_world)
print('function UUID is: %s' % func_uuid)

function UUID is: 3b5d7542-d8b3-4e89-b1b6-e15bf7b4f5a5


In [6]:
# Note, this endpoint ID is obtained after initialize your own funcx-endpoint on the system 
# This is an example, it must be replaced with your own as this is not a shared endpoint
aisys_funcx_ep = '367491b0-41dc-4fa7-9421-414a0b34e2e7'
res = fxc.run(endpoint_id=aisys_funcx_ep, function_id=func_uuid)

In [7]:
while True:
    try:
        x = fxc.get_result(res)
        print(x)
        break
    except Exception as e:
        print("Exception: {}".format(e))
        time.sleep(3)

Exception: Task is pending due to waiting-for-ep
uname_result(system='Linux', node='lambda8', release='5.4.0-80-generic', version='#90-Ubuntu SMP Fri Jul 9 22:49:44 UTC 2021', machine='x86_64', processor='x86_64')


## Step 1, Create a funcX function to launch the training via system call

In [8]:
def trun_mdl_train(args):
    cmd_aug = args['cmd'].split('#')
    import subprocess, os
    os.chdir('/data/shared/zliu/BraggNN/') # switch to the working directory, example!
    result = subprocess.run(cmd_aug, stdout=subprocess.PIPE)
    return result.stdout.decode('utf-8')
    
# register the function to funcX service and obtain the function ID as workflow argument
shell_funcX = fxc.register_function(trun_mdl_train)

## Step 2, Build and deploy the workflow to Globus Automate service, once built, it can be run as many times as needed with different data and configuration/argument

In [9]:
flows_client = create_flows_client()
flow = flows_client.deploy_flow(t3flow_def, input_schema={}, title="BraggNN training Workflow using remote AI system")
flow_id = flow['id']
flow_scope = flow['globus_auth_scope']

## Step 3, Define a set of auguments to run an instance of the workflow

In [10]:
## Note that all values here are examples, they must be supplied based on your own setup

# these must to a point of your own
data_endpoint = 'aa4fcdaf-6d04-11e5-ba46-22000b92c6ec' # Globus endpoint ID that host the training dataset, e.g., SLAC
comp_endpoint = '5b66ac62-042a-11eb-8930-0a5521ff3f4b' # Globus endpoint ID for the AI system, e.g., an AI system at ALCF
dest_endpoint = '302abb02-af25-11e9-9393-02ff96a5aa76' # Globus endpoint ID for model destination, e.g., the edge host machine at SLAC

data_path = '/zliu/hedm' # the path of the training dataset
comp_path = '/data/shared/zliu/BraggNN/dataset'    # a temp place on AI system to save data
mdl_path  = '/data/shared/zliu/BraggNN/'           # a temp place on AI system to save model
dest_path = '/Users/zliu/Documents/gcp-ep/BraggNN/model' # a place to save the model

# a sample shell command splitted using "#"
train_cmd = 'horovodrun#-np#8#python#main-hvd.py'

# supply all arguments to the workflow
flow_input = {
    "input": {
        "data_endpoint": data_endpoint,
        "data_path": data_path,
        "comp_endpoint":comp_endpoint,
        "comp_path":comp_path,
        "mdl_path":mdl_path,
        "dest_endpoint": dest_endpoint,
        "dest_path": dest_path,
        "result_path": '/tmp/zliu',
        "fx_ep": aisys_funcx_ep,
        "fx_id": shell_funcX,
        "params": {'cmd': train_cmd,}
    }
}

## Step 4, run the workflow, fire and forget

In [10]:
flow_action = flows_client.run_flow(flow_id, flow_scope, flow_input)

print('Flow action started with id: %s' % flow_action['action_id'])

flow_status = flow_action['status']
while flow_status == 'ACTIVE':
    time.sleep(5)
    flow_action = flows_client.flow_action_status(flow_id, flow_scope, flow_action['action_id'])
    flow_status = flow_action['status']
    print(f'[{time.time()}] Flow status: {flow_status}')

Starting login with Globus Auth, press ^C to cancel.
Flow action started with id: d386b63d-26d2-41b7-bdb7-61f54c60127e
[1617402976.939321] Flow status: ACTIVE
[1617402982.945178] Flow status: ACTIVE
[1617402988.783828] Flow status: ACTIVE
[1617402994.574173] Flow status: ACTIVE
[1617403000.558295] Flow status: ACTIVE
[1617403006.499208] Flow status: ACTIVE
[1617403012.8425422] Flow status: ACTIVE
[1617403018.5992918] Flow status: ACTIVE
[1617403024.568869] Flow status: ACTIVE
[1617403030.753729] Flow status: ACTIVE
[1617403036.788411] Flow status: ACTIVE
[1617403042.6457229] Flow status: ACTIVE
[1617403048.692362] Flow status: ACTIVE
[1617403054.47262] Flow status: ACTIVE
[1617403060.286417] Flow status: ACTIVE
[1617403066.408673] Flow status: ACTIVE
[1617403072.239617] Flow status: ACTIVE
[1617403078.178251] Flow status: ACTIVE
[1617403084.2199051] Flow status: ACTIVE
[1617403090.277707] Flow status: ACTIVE
[1617403096.407212] Flow status: ACTIVE
[1617403102.345149] Flow status: ACTIV