<img src="img/automation_using_flows_header.png">
<img src="img/lightsources.png">

# Tomography Reconstructions

This notebook shows how [Tomopy](https://tomopy.readthedocs.io/en/latest/) reconstructions can be automated using [Globus Flows](https://www.globus.org/globus-flows-service). The notebook aims to demonstrate how an individual flow can be used to integrate different light sources and compute facilities.



### Using the Flows GUI

The Globus Flows web app can generate a `start` page to easily invoke flows. To create this page you need to attach an input schema to the flow. This is shown at the end of the notebook.

<img src="img/input1.png">

### Monitoring the flow
Once the flow has been started you can monitor the run. The events tab shows each step of the flow and provides details regarding the input and output of each action.

Here you can see the input to the Tomopy step of the flow, showing the input values that are passed into the function to execute.

<img src="img/running.png">

# Creating a Tomopy Flow

Here we explain how the flow is defined. Running these steps will register a flow of your own that you can then run.

To run these steps you will need to install:

`pip install -U globus_sdk`

Here we create a Globus FlowsClient to securely interact with the Flows service. This will prompt you to login and paste a token into the notebook.

In [41]:
import os
import json
import time
import globus_sdk

from globus_sdk.experimental.globus_app import UserApp
from globus_sdk.scopes import GCSCollectionScopeBuilder

# Tutorial client ID
# We recommend replacing this with your own client for any production use-cases
# Create your own at developers.globus.org
CLIENT_ID = "7ca21f4a-11de-4d97-8f84-cb66f7459981"

In [35]:
my_app = UserApp("tomography-user-app", client_id=CLIENT_ID)

flows_client = globus_sdk.FlowsClient(app=my_app)

Specify the flow definition. This JSON definition is derived from the Amazon Step Functions language. States of the flow are chained together by specifying the `Next` field to construct a pipeline of operations. This flow consists of two steps:

1. TransferInputs
2. Tomocupy
3. TransferResults

The first step, TransferFiles, uses the Globus Transfer action provider. The step is given a 300s walltime and the entire input is required to be passed into the step. Static values can be used here to simplify user input.

The second step, Tomocupy, uses the Globus Compute action provider. Input is dymanically passed in as `kwargs`, which are then passed to the function to be executed. The step is given a 600s walltime and is the conclusion of the flow.

In [66]:
flow_definition = {
    "Comment": "Transfer and run Tomopy",
    "StartAt": "TransferInput",
    "States": {
        "TransferInput": {
            "Comment": "Transfer input file",
            "Type": "Action",
            "ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
            "Parameters": {
                "source_endpoint_id.$": "$.input.transfer_in.source.id",
                "destination_endpoint_id.$": "$.input.transfer_in.destination.id",
                "transfer_items": [
                    {
                        "source_path.$": "$.input.transfer_in.source.path",
                        "destination_path.$": "$.input.transfer_in.destination.path",
                        "recursive": False
                    }
                ]
            },
            "ResultPath": "$.TransferInput",
            "WaitTime": 300,
            "Next": "Tomopy"
        },
        "Tomopy": {
            "Comment": "Perform Tomopy analysis",
            "Type": "Action",
            "ActionUrl": "https://compute.actions.globus.org/fxap",
            "Parameters": {
                "endpoint.$": "$.input.compute_endpoint_id",
                "function.$": "$.input.compute_function_id",
                "kwargs.$": "$.input.compute_function_kwargs"
            },
            "ResultPath": "$.TomopyOutput",
            "WaitTime": 600,
            "Next": "TransferResults"
        },
        "TransferResults": {
            "Comment": "Transfer results",
            "Type": "Action",
            "ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
            "Parameters": {
                "source_endpoint_id.$": "$.input.transfer_out.source.id",
                "destination_endpoint_id.$": "$.input.transfer_out.destination.id",
                "transfer_items": [
                    {
                        "source_path.$": "$.TomopyOutput.details.result[0][1]",
                        "destination_path.$": "$.input.transfer_out.destination.path",
                        "recursive": False
                    }
                ]
            },
            "ResultPath": "$.TransferFiles",
            "WaitTime": 300,
            "End": True
        },
    }
}

Register the flow. We leave the input schema blank and will later update it to support the web interface.

In [7]:
flow = flows_client.create_flow(definition=flow_definition, title="Tomopy IRI Flow", input_schema={})
flow_id = flow['id']
print(flow)
flow_scope = flow['globus_auth_scope']
print(f'Newly created flow with id:\n{flow_id}\nand scope:\n{flow_scope}')

{
  "id": "3224aea6-2ab1-495b-8b76-89fd71d838c1",
  "title": "Tomopy IRI Flow",
  "subtitle": "",
  "description": "",
  "definition": {
    "StartAt": "TransferInput",
    "States": {
      "TransferInput": {
        "Parameters": {
          "source_endpoint_id.$": "$.input.transfer_in.source.id",
          "destination_endpoint_id.$": "$.input.transfer_in.destination.id",
          "transfer_items": [
            {
              "source_path.$": "$.input.transfer_in.source.path",
              "destination_path.$": "$.input.transfer_in.destination.path",
              "recursive": false
            }
          ]
        },
        "Type": "Action",
        "Comment": "Transfer input file",
        "Next": "Tomopy",
        "ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
        "ResultPath": "$.TransferInput",
        "WaitTime": 300
      },
      "Tomopy": {
        "Parameters": {
          "endpoint.$": "$.input.compute_endpoint_id",
          "function.$":

## Creating a Tomopy Function

To run Tomopy in the flow we need to define a Tomopy function and register it with Globus Compute.

**Note, this uses my personal compute endpoint, not a service account endpoint. Access to this endpoint is restricted.**

In [76]:
import globus_compute_sdk

polaris_ep = '0f44e1f7-baa5-4dfd-a2f6-73051004cfaa'
gce = globus_compute_sdk.Executor(endpoint_id=polaris_ep)

Define a function to call tomocupy.

This is a simple wrapper function that uses subprocess to invoke a templated bash command to run the tomopy cli.

In [83]:
def tomopy_wrapper(filename, command="recon", reconstruction_type="slice", 
                     save_folder="_rec", collection_path="/home/share/godata/"):
    import subprocess
    import glob
    import os
    
    cmd = f"tomopy {command} --file-name {filename} --reconstruction-type {reconstruction_type} --save-folder={save_folder}"
    res = subprocess.run(cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Get the most recent reconstruction file
    list_of_files = glob.glob(f'{save_folder}/slice_rec/*')
    recon_file = max(list_of_files, key=os.path.getctime)

    result_path = os.path.join(collection_path, "file1.txt")
    
    return recon_file, result_path, res.returncode, res.stdout.decode("utf-8"), res.stderr.decode("utf-8")

Register the function with Compute. 

In [84]:
tomo_func = gce.client.register_function(tomopy_wrapper)

In [85]:
tomo_func

'5e9ecadd-7da7-468a-a715-9c71a28ee1a9'

### Test the function

Test the function by running directly with Globus Compute. 

In [86]:
fn = "/home/rchard/src/IRI/Tomography/phantom_projections.h5"

In [87]:
future = gce.submit_to_registered_function(args=[fn], kwargs={'save_folder': '/home/rchard/src/IRI/Tomography/outputs'}, function_id=tomo_func)

In [91]:
future.result()[0]

'/home/rchard/src/IRI/Tomography/outputs/slice_rec/recon_phantom_projections-10.tiff'

## Running the flow

We can now specify input and start the flow.

In [92]:
source_transfer_endpoint = '6c54cade-bde5-45c1-bdea-f4bd71dba2cc'
dest_transfer_endpoint = '31ce9ba0-176d-45a5-add3-f37d233ba47d'

In [93]:
flow_input = {
    "input": {
        "transfer_in": {
            "source": {
                "id": source_transfer_endpoint,
                "path": "/home/share/godata/file1.txt"
            },
            "destination": {
                "id": dest_transfer_endpoint,
                "path": "/~/file1.txt"
            }
        },
        "transfer_out": {
            "source": {
                "id": source_transfer_endpoint,
                "path": "/home/share/godata/file2.txt"
            },
            "destination": {
                "id": dest_transfer_endpoint,
                "path": "/~/file2.txt"
            }
        },
        "compute_endpoint_id": polaris_ep,
        "compute_function_id": tomo_func,
        "compute_function_kwargs": {
        "command": "recon",
        "reconstruction_type": "slice",
        "save_folder": "/home/rchard/src/IRI/Tomography/outputs",
        "filename": "/home/rchard/src/IRI/Tomography/phantom_projections.h5"
        }
    }
}

Add scopes to the flow to use Mapped Collections. This is necessary if you cannot use a Guest Collection at the facility.

In [57]:
# Get the data access scope Mapped collections
source_access_scope = GCSCollectionScopeBuilder(source_transfer_endpoint).make_mutable("data_access", optional=True)
dest_access_scope = GCSCollectionScopeBuilder(dest_transfer_endpoint).make_mutable("data_access", optional=True)

transfer_scope = globus_sdk.scopes.TransferScopes.make_mutable("all")
transfer_scope.add_dependency(source_access_scope)
transfer_scope.add_dependency(dest_access_scope)

# Add the data access scopes as dependencies to the flow
flow_scope = specific_flow_client.scopes.make_mutable("user")
flow_scope.add_dependency(transfer_scope)

my_app.add_scope_requirements({'flow': [flow_scope]})

In [73]:
specific_flow_client = globus_sdk.SpecificFlowClient(
    flow_id=flow_id,
    app=my_app,
)

In [94]:
run = specific_flow_client.run_flow(
  body=flow_input,
  label="Tomopy run",
  tags=['Tomopy', 'IRI', 'example']
)

In [95]:
run_id = run['run_id']
run_status = run['status']
print("This flow can be monitored in the Web App:")
print(f"https://app.globus.org/runs/{run_id}")
print(f"Flow run started with ID: {run_id} - Status: {run_status}")

# Poll the Flow service to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')
    
# Run completed
# print(json.dumps(run.data, indent=2))

This flow can be monitored in the Web App:
https://app.globus.org/runs/75c55289-20f4-44e6-8d01-cdea39bde5ea
Flow run started with ID: 75c55289-20f4-44e6-8d01-cdea39bde5ea - Status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: SUCCEEDED


In [96]:
run['details']['output']['TomopyOutput']['details']['result'][0][0]

'/home/rchard/src/IRI/Tomography/outputs/slice_rec/recon_phantom_projections-11.tiff'

# Make it IRIy

In [100]:
site_configs = {
    "lightsource": {
        "aps": {
            "transfer_endpoint": "6c54cade-bde5-45c1-bdea-f4bd71dba2cc",
            "source_path": "/home/share/godata/file1.txt",
            "result_path": "/~/file1.txt",
        },
        "als": {},
        "nsls2": {},
    },
    "compute": {
        "alcf": {
            "compute_endpoint": "0f44e1f7-baa5-4dfd-a2f6-73051004cfaa",
            "transfer_endpoint": "31ce9ba0-176d-45a5-add3-f37d233ba47d",
            "staging_path": "/~/file1.txt",
        },
        "nersc": {},
        "olcf": {},
    }
}

In [101]:
def create_payload(lightsource, compute):
    flow_input = {
        "input": {
            "transfer_in": {
                "source": {
                    "id": site_configs['lightsource'][lightsource]['transfer_endpoint'],
                    "path": site_configs['lightsource'][lightsource]['source_path']
                },
                "destination": {
                    "id": site_configs['compute'][compute]['transfer_endpoint'],
                    "path": site_configs['compute'][compute]['staging_path']
                }
            },
            "transfer_out": {
                "source": {
                    "id": site_configs['compute'][compute]['transfer_endpoint'],
                },
                "destination": {
                    "id": site_configs['lightsource'][lightsource]['transfer_endpoint'],
                    "path": site_configs['lightsource'][lightsource]['result_path'],
                }
            },
            "compute_endpoint_id": site_configs['compute'][compute]['compute_endpoint'],
            "compute_function_id": tomo_func,
            "compute_function_kwargs": {
            "command": "recon",
            "reconstruction_type": "slice",
            "save_folder": "/home/rchard/src/IRI/Tomography/outputs",
            "filename": "/home/rchard/src/IRI/Tomography/phantom_projections.h5"
            }
        }
    }
    return flow_input

In [102]:
lightsource = "aps"
compute = "alcf"

run = specific_flow_client.run_flow(
  body=create_payload(lightsource, compute),
  label="Tomopy run",
  tags=['Tomopy', 'IRI', 'example', f"lightsource:{lightsource}", f"compute:{compute}"]
)

In [103]:
run_id = run['run_id']
run_status = run['status']
print("This flow can be monitored in the Web App:")
print(f"https://app.globus.org/runs/{run_id}")
print(f"Flow run started with ID: {run_id} - Status: {run_status}")

# Poll the Flow service to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')

This flow can be monitored in the Web App:
https://app.globus.org/runs/de3eb00a-b2f0-4706-97e6-fe81d582b131
Flow run started with ID: de3eb00a-b2f0-4706-97e6-fe81d582b131 - Status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: SUCCEEDED


## Attaching an input schema

We can use a JSON input schema to both generate the Web interface and provide additional handrails when starting the flow. Here we define the schema and update the flow to include it.

Example schema's can be found here: https://github.com/globus/globus-flows-trigger-examples

In [None]:
schema = {
    "required": [
        "input"
    ],
    "properties": {
        "input": {
            "type": "object",
            "required": [
                "source",
                "destination",
                "recursive_tx",
                "compute_endpoint_id",
                "compute_function_id",
                "compute_function_kwargs"
            ],
            "properties": {
                "source": {
                    "type": "object",
                    "title": "Select source collection and path",
                    "description": "The source collection and path (path MUST end with a slash)",
                    "format": "globus-collection",
                    "required": [
                        "id",
                        "path"
                    ],
                    "properties": {
                        "id": {
                            "type": "string",
                            "format": "uuid"
                        },
                        "path": {
                            "type": "string"
                        }
                    },
                    "additionalProperties": False
                },
                "destination": {
                    "type": "object",
                    "title": "Select destination collection and path",
                    "description": "The destination collection and path (path MUST end with a slash); default collection is 'Globus Tutorials on ALCF Eagle'",
                    "format": "globus-collection",
                    "required": [
                        "id",
                        "path"
                    ],
                    "properties": {
                        "id": {
                            "type": "string",
                            "format": "uuid"
                        },
                        "path": {
                            "type": "string"
                        }
                    },
                    "additionalProperties": False
                },
                "recursive_tx": {
                    "type": "boolean",
                    "title": "Recursive transfer",
                    "description": "Whether or not to transfer recursively, must be true when transferring a directory.",
                    "default": True,
                    "additionalProperties": False
                },
                "compute_endpoint_id": {
                    "type": "string",
                    "format": "uuid",                        
                    "title": "Globus Compute Endpoint ID",
                    "default": polaris_ep,
                    "description": "The UUID of the Globus Compute endpoint where Tomocupy will run",
                    "additionalProperties": False
                },
                "compute_function_id": {
                    "type": "string",
                    "format": "uuid",                        
                    "title": "Globus Compute Function ID",
                    "default": tomo_func,
                    "description": "The UUID of the function to invoke; must be registered with the Globus Compute service",
                    "additionalProperties": False
                },
                "compute_function_kwargs": {
                    "type": "object",
                    "title": "Function Inputs",
                    "description": "Inputs to pass to the function",
                    "required": [
                        "filename",
                        "command",
                        "reconstruction_type",
                        "rotation_axis",
                        "nsino_per_chunk"
                    ],
                    "properties": {
                        "filename": {
                            "type": "string",
                        },
                        "command": {
                            "type" : "string",
                            "description": "Reconstruction command: recon, recon_steps",
                            "default": "recon",
                            "enum" : [
                                "recon",
                                "recon_steps"
                            ]
                        },
                        "reconstruction_type": {
                            "type": "string",
                            "description": "Reconstruction type: full, try",
                            "default": "full",
                            "enum": [
                                "full", 
                                "try"
                            ]
                        },
                        "rotation_axis": {
                            "type": "string",
                            "default": "782.5"
                        },
                        "nsino_per_chunk": {
                            "type": "integer",
                            "default": 4
                        },
                    },
                    "additionalProperties": False
                }
            },
            "additionalProperties": False
        },    
    },
    "additionalProperties": False
}

In [67]:
flows_client.update_flow(flow_id, definition=flow_definition, input_schema={})

GlobusHTTPResponse({"id": "3224aea6-2ab1-495b-8b76-89fd71d838c1", "definition": {"Comment": "Transfer and run Tomopy", "StartAt": "TransferInput", "States": {"TransferInput": {"Comment": "Transfer input file", "Type": "Action", "ActionUrl": "https://actions.automate.globus.org/transfer/transfer", "Parameters": {"source_endpoint_id.$": "$.input.transfer_in.source.id", "destination_endpoint_id.$": "$.input.transfer_in.destination.id", "transfer_items": [{"source_path.$": "$.input.transfer_in.source.path", "destination_path.$": "$.input.transfer_in.destination.path", "recursive": false}]}, "ResultPath": "$.TransferInput", "WaitTime": 300, "Next": "Tomopy"}, "Tomopy": {"Comment": "Perform Tomopy analysis", "Type": "Action", "ActionUrl": "https://compute.actions.globus.org/fxap", "Parameters": {"endpoint.$": "$.input.compute_endpoint_id", "function.$": "$.input.compute_function_id", "kwargs.$": "$.input.compute_function_kwargs"}, "ResultPath": "$.TomopyOutput", "WaitTime": 600, "Next": "Tra

## Try it!

In [None]:
print(f'https://app.globus.org/flows/{flow_id}')