# Tomo funcX Flow

This example creates a flow to use tomopy on Theta.

In [1]:
import os
import sys
import time
import json
import shortuuid

from funcx.sdk.client import FuncXClient
from fair_research_login import NativeClient
from globus_automate_client import (create_flows_client, graphviz_format, 
                                    state_colors_for_log, create_action_client, 
                                    create_flows_client)
from globus_automate_client.token_management import CLIENT_ID

# Authenticate

Auth with the funcX and Automate clients.

Note: You will still need to grant access to the flow later on in this notebook.

In [2]:
fxc = FuncXClient()

In [3]:
flows_client = create_flows_client(CLIENT_ID)

Get an auth token to HTTPS download from petrel#globuslabs

In [4]:
client = NativeClient(client_id='7414f0b4-7d05-4bb6-bb00-076fa3f17cf5')
tokens = client.login(requested_scopes=['https://auth.globus.org/scopes/56ceac29-e98a-440a-a594-b41e7a084b62/all'])
auth_token = tokens["petrel_https_server"]['access_token']
headers = {'Authorization': f'Bearer {auth_token}'}
print(headers)

{'Authorization': 'Bearer AgX5Wxx888jkOJgz6PdQBzggXGvdlBXzK3wPbV0xYEvm31v3qVfwCQPvBOKJv7yyD1poJXY4powP9rT4x9krDF87yK'}


# funcX setup
Register the containers and functions for the flow.

In [6]:
# An initial container to use
init_cont_id = fxc.register_container(location="/home/rchard/.funcx/containers/test.simg", 
                                 container_type='singularity')
# And the container we will download during the flow
tomo_cont_id = fxc.register_container(location="/home/rchard/.funcx/containers/tomo.simg", 
                                 container_type='singularity')

Register the download function

In [7]:
def download_cont(data):
    """Download the container and dataset"""
    import os
    import requests

    headers = data['headers']
    container = data.get('container', "")
    
    # Download the container
    cont = container.split("/")[-1]
    cont_out = f"/home/rchard/.funcx/containers/{cont}"
    if not os.path.isfile(cont_out):
        r = requests.get(container, headers=headers)
        open(cont_out , 'wb').write(r.content)
    
    return 'done'

download_cont_fxid = fxc.register_function(download_cont, container_uuid=init_cont_id)

In [8]:
def download_data(data):
    """Download the container and dataset"""
    import os
    import requests

    proc_dir = data['proc_dir']
    headers = data['headers']
    
    dataset = data.get('dataset', "")
    
    if not os.path.exists(proc_dir):
        os.makedirs(proc_dir)
    os.chdir(proc_dir)
    
    # Download the data
    r = requests.get(dataset, headers=headers)
    open(dataset.split("/")[-1] , 'wb').write(r.content)
    
    return 'done'

download_data_fxid = fxc.register_function(download_data, container_uuid=init_cont_id)

Register the tomo function

In [9]:
def tomo_recon(data):
    import os
    import subprocess

    proc_dir = data['proc_dir']
    recon_type = data.get("recon_type", "full")
    dataset = data.get('dataset', "").split("/")[-1]

    os.chdir(proc_dir)    
    
    cmd = f"tomopy recon --file-name {dataset} --output-folder {proc_dir} --reconstruction-type {recon_type}"
    result = subprocess.run(cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    return result.stdout

recon_fxid = fxc.register_function(tomo_recon, container_uuid=tomo_cont_id)

# Create the flow

In [10]:
flow_definition = flow_definition = {
  "Comment": "Tomo Reconstruction",
  "StartAt": "Download",
  "States": {
    "Download": {
      "Comment": "Download the data and container",
      "Type": "Action",
      "ActionUrl": "https://api.funcx.org/automate",
      "ActionScope": "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all",
      "Parameters": {
          "tasks": [{
            "endpoint.$": "$.input.funcx_local_ep",
            "func.$": "$.input.download_cont_fxid",
            "payload.$": "$.input"
        }]
      },
      "ResultPath": "$.Exec1Result",
      "WaitTime": 600,
      "Next": "DownloadData"
    },
    "DownloadData": {
      "Comment": "Download the data and container",
      "Type": "Action",
      "ActionUrl": "https://api.funcx.org/automate",
      "ActionScope": "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all",
      "Parameters": {
          "tasks": [{
            "endpoint.$": "$.input.funcx_local_ep",
            "func.$": "$.input.download_data_fxid",
            "payload.$": "$.input"
        }]
      },
      "ResultPath": "$.Exec1Result",
      "WaitTime": 600,
      "Next": "TomopyRecon"
    },
    "TomopyRecon": {
      "Comment": "Reconstruct full tomogram",
      "Type": "Action",
      "ActionUrl": "https://api.funcx.org/automate",
      "ActionScope": "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all",
      "Parameters": {
          "tasks": [{
            "endpoint.$": "$.input.funcx_ep",
            "func.$": "$.input.recon_fxid",
            "payload.$": "$.input"
        }]
      },
      "ResultPath": "$.Exec2Result",
      "WaitTime": 3600,
      "End": True
    }
  }
}

In [11]:
flow = flows_client.deploy_flow(flow_definition, title="Tomo flow")
flow_id = flow['id']
flow_scope = flow['globus_auth_scope']
print(f'Newly created flow with id:\n{flow_id}')

Newly created flow with id:
71d56ad4-cee1-4e83-9cfe-47aedd634b83


# Define input for the flow

The input to the flow needs to specify what data to process, where it is located, and where to put it for analysis. The flow also requires the funcX function endpoint ids to use.

In [12]:
run_name = shortuuid.uuid()

theta_conf = {'endpoint': 'c4326f1e-d0ef-4ce1-83d3-9a4b88d0d67d',
              'local_endpoint': 'c4326f1e-d0ef-4ce1-83d3-9a4b88d0d67d',
              'proc_dir': f'/projects/APSDataAnalysis/Braid/process/{run_name}'}
midway_conf = {'endpoint': 'd86e31f7-71b2-4e42-8a54-7bc9d5e79df9',
               'local_endpoint': 'de6a8104-e53e-4dbd-82f1-2e9a09462a31',
               'proc_dir': f'/project2/chard/ryan/Braid/process/{run_name}'}

conf = midway_conf

flow_input = {
    "input": {
        "recon_type": "full",
        "proc_dir": conf['proc_dir'],
        "dataset": "https://45a53408-c797-11e6-9c33-22000a1e3b52.e.globus.org/Braid/data/tomo_example/data/tooth.h5",
        "container": "https://45a53408-c797-11e6-9c33-22000a1e3b52.e.globus.org/Braid/containers/tomo.simg",
        "headers": headers,
        # funcX functions
        "download_cont_fxid": download_cont_fxid,
        "download_data_fxid": download_data_fxid,
        "recon_fxid": recon_fxid,
        # funcX endpoints
        "funcx_ep": conf['endpoint'],
        "funcx_local_ep": conf['local_endpoint'],
    }
}

# Run the flow

This will require you to authenticate and grant access to the flow to use Transfer and funcX on your behalf.

The flow should take a couple of minutes.

In [13]:
flow_action = flows_client.run_flow(flow_id, flow_scope, flow_input)
print(flow_action)
flow_action_id = flow_action['action_id']
flow_status = flow_action['status']
print(f'Flow action started with id: {flow_action_id}')
while flow_status == 'ACTIVE':
    time.sleep(10)
    flow_action = flows_client.flow_action_status(flow_id, flow_scope, flow_action_id)
    flow_status = flow_action['status']
    print(f'Flow status: {flow_status}')

Starting login with Globus Auth, press ^C to cancel.
GlobusHTTPResponse({'action_id': '854f727d-bdf5-40a6-9801-6cda50e4db55', 'completion_time': 'None', 'created_by': 'urn:globus:auth:identity:c4765424-d274-11e5-b894-cb4139f74ecf', 'details': {'code': 'ActionStarted', 'description': 'State Download of type Action started', 'details': {'input': {'tasks': [{'endpoint': 'de6a8104-e53e-4dbd-82f1-2e9a09462a31', 'func': 'f3096f1e-ead3-42e8-bc4d-bbf19261be69', 'payload': {'container': 'https://45a53408-c797-11e6-9c33-22000a1e3b52.e.globus.org/Braid/containers/tomo.simg', 'dataset': 'https://45a53408-c797-11e6-9c33-22000a1e3b52.e.globus.org/Braid/data/tomo_example/data/tooth.h5', 'download_cont_fxid': 'f3096f1e-ead3-42e8-bc4d-bbf19261be69', 'download_data_fxid': '9dbaae0a-1cc1-46f3-bf40-8957610a42b7', 'funcx_ep': 'd86e31f7-71b2-4e42-8a54-7bc9d5e79df9', 'funcx_local_ep': 'de6a8104-e53e-4dbd-82f1-2e9a09462a31', 'headers': {'Authorization': 'Bearer AgX5Wxx888jkOJgz6PdQBzggXGvdlBXzK3wPbV0xYEvm31v3

In [14]:
flow_action

GlobusHTTPResponse({'action_id': '854f727d-bdf5-40a6-9801-6cda50e4db55', 'completion_time': '2021-02-24T22:11:03.558000+00:00', 'created_by': 'urn:globus:auth:identity:c4765424-d274-11e5-b894-cb4139f74ecf', 'details': {'output': {'Exec1Result': {'action_id': '4134086b-7385-41ff-80ff-d01462bcbc7e', 'details': {'completion_t': '1614204635.692847', 'exception': None, 'result': 'done', 'status': 'SUCCEEDED', 'task_id': '4134086b-7385-41ff-80ff-d01462bcbc7e'}, 'release_after': 'P30D', 'state_name': 'DownloadData', 'status': 'SUCCEEDED'}, 'Exec2Result': {'action_id': 'cc11c94b-b463-4a0f-9b33-0bc1e4190794', 'details': {'completion_t': '1614204660.17867', 'exception': None, 'result': None, 'status': 'SUCCEEDED', 'task_id': 'cc11c94b-b463-4a0f-9b33-0bc1e4190794'}, 'release_after': 'P30D', 'state_name': 'TomopyRecon', 'status': 'SUCCEEDED'}, 'input': {'container': 'https://45a53408-c797-11e6-9c33-22000a1e3b52.e.globus.org/Braid/containers/tomo.simg', 'dataset': 'https://45a53408-c797-11e6-9c33-2