# Fusion Analysis at ALCF with Globus Flows

This notebook shows how [Globus Flows](https://www.globus.org/globus-flows-service) can be used to perform analysis using [ionorb](https://github.com/GAmfe/ionorbgpu).

Globus Flows is a reliable and secure platform for orchestrating and performing research data management and analysis tasks. A flow is often needed to manage data coming from instruments, e.g., image files can be moved from local storage attached to a microscope to a high-performance storage system where they may be accessed by all members of the research project.

In this notebook we show how Flows can be used to launch ionorb tasks at ALCF. We then walk through the process of creating a simple flow and describe potential extensions to make it applicable in real use cases.

More examples of creating and running flows can be found on our [demo instance](https://jupyter.demo.globus.org/hub/).

To run this notebook you will need to install:

`pip install -U globus_automate_client globus_compute_sdk`

### A note on Auth
A flow manages authentication by seamlessly passing tokens between different services. When the flow is started it first acquires tokens for each service used within the flow to perform actions on the user's behalf. Provided the user has access to the Globus Transfer and Compute endpoints the flow will be able to move data and perform analysis as the user.

While this example notebook employs user tokens to run, production deployments can be automated using an ALCF Service account and Globus Client credential. A service account provides access to ALCF for a specific resource (e.g., a beamline). Following this model, a Globus Compute endpoint can be configured on behalf of the service account and the flow can be granted access. This allows flows to be launched without a human in the loop, making it ideal for automated analysis and publication pipelines.

# Create Compute endpoint

To run this notebook, you will first have to create a Globus Compute Endpoint on Polaris.  Refer to the Globus Compute docs or the README.md file for details.  The endpoint will have a unique UUID that you can query with `globus-compute-endpoint list`.

# Testing the Compute endpoint

It is important to check whether the compute endpoint is active. The endpoint will need restarting following ALCF maintenance periods.

In [1]:
compute_endpoint = "317e156c-7712-4600-8cc2-3f5025b1c69a"

In [2]:
from globus_compute_sdk import Executor, Client
gce = Executor(endpoint_id=compute_endpoint)
gc = Client()

In [4]:
ep_status = gc.get_endpoint_status(compute_endpoint)
print(ep_status['status'])

online


Alternatively, you can send it a simple task

In [5]:
def test():
    return "hello"

fut = gce.submit(test)

print(fut)

<ComputeFuture at 0x107bd28b0 state=pending>


In [6]:
fut.result()

'hello'

## Creating a simple flow

Here we explain how the flow is defined. Running these steps will register a flow of your own that you can then run.

To run these steps you will need to install:

In [7]:
from globus_automate_client import create_flows_client

# Create a flows client to register and run the flow
fc = create_flows_client()

Specify the flow definition. This JSON definition is derived from the Amazon Step Functions language. States of the flow are chained together by specifying the `Next` field to construct a pipeline of operations. This simple flow consists of two steps:

1. TransferFiles
2. Run Fusion application ionorb

The first step, TransferFiles, uses the Globus Transfer action provider. The step is given a 300s walltime and the entire input (source, dest, recursive) is required to be passed into the step. Static values can be used here to simplify user input.

The second step, Fusion, uses the Globus Compute action provider. Input is dymanically passed in as `kwargs`, which are then passed to the function to be executed. The step is given a 3600s walltime (to manage long queue times) and is the conclusion of the flow.

In [21]:
flow_definition = {
    "Comment": "Transfer and run Fusion ionorb",
    "StartAt": "TransferFiles",
    "States": {
        "TransferFiles": {
            "Comment": "Transfer files",
            "Type": "Action",
            "ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
            "Parameters": {
                "source_endpoint_id.$": "$.input.source.id",
                "destination_endpoint_id.$": "$.input.destination.id",
                "transfer_items": [
                    {
                        "source_path.$": "$.input.source.path",
                        "destination_path.$": "$.input.destination.path",
                        "recursive.$": "$.input.recursive_tx"
                    }
                ]
            },
            "ResultPath": "$.TransferFiles",
            "WaitTime": 300,
            "Next": "Fusion"
        },
        "Fusion": {
            "Comment": "Fusion",
            "Type": "Action",
            "ActionUrl": "https://compute.actions.globus.org/",
            "Parameters": {
                "endpoint.$": "$.input.compute_endpoint_id",
                "function.$": "$.input.compute_function_id",
                "kwargs.$": "$.input.compute_function_kwargs"
            },
            "ResultPath": "$.FusionOutput",
            "WaitTime": 3600,
            "End": True
        }
    }
}

Register the flow. We leave the input schema blank and can later update it to support the web interface.

In [22]:
flow = fc.deploy_flow(flow_definition, title="Fusion tutorial example flow", input_schema={})
flow_id = flow['id']
print(flow)
flow_scope = flow['globus_auth_scope']
print(f'Newly created flow with id:\n{flow_id}\nand scope:\n{flow_scope}')

{
  "id": "b969db26-c94b-47dc-adea-4daa6a2e7924",
  "title": "Fusion tutorial example flow",
  "subtitle": "",
  "description": "",
  "definition": {
    "StartAt": "TransferFiles",
    "States": {
      "TransferFiles": {
        "Parameters": {
          "source_endpoint_id.$": "$.input.source.id",
          "destination_endpoint_id.$": "$.input.destination.id",
          "transfer_items": [
            {
              "source_path.$": "$.input.source.path",
              "destination_path.$": "$.input.destination.path",
              "recursive.$": "$.input.recursive_tx"
            }
          ]
        },
        "Type": "Action",
        "Comment": "Transfer files",
        "Next": "Fusion",
        "ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
        "ResultPath": "$.TransferFiles",
        "WaitTime": 300
      },
      "Fusion": {
        "Parameters": {
          "endpoint.$": "$.input.compute_endpoint_id",
          "function.$": "$.input.compute_fun

## Defining a ionorb function

To run ionorb via the flow we need to register a function with Globus Compute and pass the function's uuid into the flow as input. Here we define and register this function.

Define a function to call ionorb. 

This is a simple wrapper function that uses subprocess to invoke a templated bash command to run.

* NOTE: Until tested, this simply constructs a singularity exec command and then returns the string, rather than executing it.

Further postprocessing can be applied within this function. Any JSON returned here can be used within the flow in subsequent steps.

In [12]:
def fusion_wrapper(run_directory, config_path="ionorb_stl2d_boris.config"):
    import subprocess, os, time, shutil

    start = time.time()
    os.chdir(run_directory)
    command = f"/eagle/IRIBeta/bin/ionorb_stl_boris2d {config_path}"
    res = subprocess.run(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    end = time.time()
    runtime = end - start

    if res.returncode != 0:
        raise Exception(f"Application failed with non-zero return code: {res.returncode} stdout='{res.stdout.decode('utf-8')}' stderr='{res.stderr.decode('utf-8')}' runtime={runtime} completion_time={completion_time}")
    else:
        return res.returncode, res.stdout.decode("utf-8"), res.stderr.decode("utf-8"), runtime

Confirm the function works as expected

In [13]:
fut = gce.submit(fusion_wrapper, run_directory="/eagle/IRIBeta/test_runs/test")

In [14]:
fut.result()

(0,
 ' Version = 3f5b9155894e68aa397e10e636c7c4c58ba649a7(*)\n Built on Fri 01 Sep 2023 09:52:53 AM CDT\n Run date 09/12/2023 11:04:49.774\n Configuration parameters used:\n   dT       =   1.0000000000000001E-009\n   report   =         2000\n   n_reports=          110\n   B_fname  =B_164476.B.txt\n   birth_fn =birth_164476.birth.txt\n   stl_fn   =\n DIII-D_Model_26OCT2016_withHPH_And2xLocatingTriangles_BINARY_meters_noPositiveSpace.STL\n   nels      =       150000\n   m         =    2.000000000000000       (   3.3452438000000000E-027 )\n   q         =    1.000000000000000       (   1.6021766000000000E-019 )\n   B res.         =            65 x           65\n   B_ripple       =   F\n   Interp.   =  bilinear\n   stl_triangles =        479888\n   dPhi          =     0.000000    \n   PhiSignChange =   F\n   Saving only hits\n \n Calibrating...\n   Fraction of particles on GPU  67%\n \n Starting...\n[                       3] time=  0.0000E+00 step=           1 nels=      150000\n[         

Register the function explicitly with Compute to get a UUID that can be passed into the flow. 

In [15]:
fusion_func_uuid = gc.register_function(fusion_wrapper)

In [16]:
fusion_func_uuid

'8d4fd67d-887f-475e-bb09-e13921c5d72e'

## Running the flow

We can now specify input and start the flow.

First we construct an input payload to pass to the flow. This defines the transfer inputs (source, destination, recursive_tx), the compute endpoint and function id, and the compute payload kwargs.

Note, the transfer is performed between the two public globus tutorial transfer endpoints.

In [23]:
flow_input = {
    "input": {
      "source": {
        "id": "ddb59aef-6d04-11e5-ba46-22000b92c6ec",
        "path": "/~/"
      },
      "destination": {
        "id": "ddb59af0-6d04-11e5-ba46-22000b92c6ec",
        "path": "/~/"
      },
      "recursive_tx": True,
      "compute_endpoint_id": compute_endpoint,
      "compute_function_id": fusion_func_uuid,
      "compute_function_kwargs": {
        "run_directory": "/eagle/IRIBeta/test_runs/test",
      }
    }
}

Submit the flow for execution. You will be asked to grant permission to the flow to perform the transfer and compute step on your behalf. Following that, you can monitor the flow either here or at the Globus webapp (the link is generated).

In [24]:
import time

flow_action = fc.run_flow(flow_id, flow_scope, flow_input, 
                          label="Tutorial Fusion Run", 
                          tags=['ionorb', 'Fusion', 'Demo'])

flow_action_id = flow_action['action_id']
flow_status = flow_action['status']

print(f'Follow your flow here: https://app.globus.org/runs/{flow_action_id}')

while flow_status == 'ACTIVE':
    time.sleep(10)
    flow_action = fc.flow_action_status(flow_id, flow_scope, flow_action_id)
    flow_status = flow_action['status']
    print(f'Flow status: {flow_status}')
    
print(f'Check the event log here: https://app.globus.org/runs/{flow_action_id}/logs')

Please log into Globus here:
----------------------------
https://auth.globus.org/v2/oauth2/authorize?client_id=e6c75d97-532a-4c88-b031-8584a319fa3e&redirect_uri=https%3A%2F%2Fauth.globus.org%2Fv2%2Fweb%2Fauth-code&scope=https%3A%2F%2Fauth.globus.org%2Fscopes%2Fb969db26-c94b-47dc-adea-4daa6a2e7924%2Fflow_b969db26_c94b_47dc_adea_4daa6a2e7924_user&state=_default&response_type=code&code_challenge=KuSL6upnRZ4CrTvy2-PdHCrB5LnxX6ZwETRrfjPmMJA&code_challenge_method=S256&access_type=offline&prefill_named_grant=Globus+Automate+Command+Line+Interface+on+oberon.local
----------------------------
[0m
Enter the resulting Authorization Code here:

Follow your flow here: https://app.globus.org/runs/64b9f08c-2c77-4e57-b5b4-ace7e6ab32a6
Flow status: ACTIVE
Flow status: ACTIVE
Flow status: ACTIVE
Flow status: ACTIVE
Flow status: SUCCEEDED
Check the event log here: https://app.globus.org/runs/64b9f08c-2c77-4e57-b5b4-ace7e6ab32a6/logs


You can also check the output of the flow directly here.

In [25]:
flow_action['details']['output']['FusionOutput']['details']['result']

[[0,
  ' Version = 3f5b9155894e68aa397e10e636c7c4c58ba649a7(*)\n Built on Fri 01 Sep 2023 09:52:53 AM CDT\n Run date 09/12/2023 12:37:05.776\n Configuration parameters used:\n   dT       =   1.0000000000000001E-009\n   report   =         2000\n   n_reports=          110\n   B_fname  =B_164476.B.txt\n   birth_fn =birth_164476.birth.txt\n   stl_fn   =\n DIII-D_Model_26OCT2016_withHPH_And2xLocatingTriangles_BINARY_meters_noPositiveSpace.STL\n   nels      =       150000\n   m         =    2.000000000000000       (   3.3452438000000000E-027 )\n   q         =    1.000000000000000       (   1.6021766000000000E-019 )\n   B res.         =            65 x           65\n   B_ripple       =   F\n   Interp.   =  bilinear\n   stl_triangles =        479888\n   dPhi          =     0.000000    \n   PhiSignChange =   F\n   Saving only hits\n \n Calibrating...\n   Fraction of particles on GPU  67%\n \n Starting...\n[                       1] time=  0.0000E+00 step=           1 nels=      150000\n[       

## Attaching an input schema

We can use a JSON input schema to both generate the Web interface and provide additional handrails when starting the flow. Here we define the schema and update the flow to include it.

Example schema's can be found here: https://github.com/globus/globus-flows-trigger-examples

In [26]:
schema = {
    "required": [
        "input"
    ],
    "properties": {
        "input": {
            "type": "object",
            "required": [
                "source",
                "destination",
                "recursive_tx",
                "compute_endpoint_id",
                "compute_function_id",
                "compute_function_kwargs"
            ],
            "properties": {
                "source": {
                    "type": "object",
                    "title": "Select source collection and path",
                    "description": "The source collection and path (path MUST end with a slash)",
                    "format": "globus-collection",
                    "required": [
                        "id",
                        "path"
                    ],
                    "properties": {
                        "id": {
                            "type": "string",
                            "format": "uuid"
                        },
                        "path": {
                            "type": "string"
                        }
                    },
                    "additionalProperties": False
                },
                "destination": {
                    "type": "object",
                    "title": "Select destination collection and path",
                    "description": "The destination collection and path (path MUST end with a slash); default collection is 'Globus Tutorials on ALCF Eagle'",
                    "format": "globus-collection",
                    "required": [
                        "id",
                        "path"
                    ],
                    "properties": {
                        "id": {
                            "type": "string",
                            "format": "uuid"
                        },
                        "path": {
                            "type": "string"
                        }
                    },
                    "additionalProperties": False
                },
                "recursive_tx": {
                    "type": "boolean",
                    "title": "Recursive transfer",
                    "description": "Whether or not to transfer recursively, must be true when transferring a directory.",
                    "default": True,
                    "additionalProperties": False
                },
                "compute_endpoint_id": {
                    "type": "string",
                    "format": "uuid",                        
                    "title": "Globus Compute Endpoint ID",
                    "default": compute_endpoint,
                    "description": "The UUID of the Globus Compute endpoint where Tomocupy will run",
                    "additionalProperties": False
                },
                "compute_function_id": {
                    "type": "string",
                    "format": "uuid",                        
                    "title": "Globus Compute Function ID",
                    "default": fusion_func_uuid,
                    "description": "The UUID of the function to invoke; must be registered with the Globus Compute service",
                    "additionalProperties": False
                },
                "compute_function_kwargs": {
                    "type": "object",
                    "title": "Function Inputs",
                    "description": "Inputs to pass to the function",
                    "required": [
                        "run_directory",
                        "config_path",
                    ],
                    "properties": {
                        "run_directory": {
                            "type": "string",
                        },
                        "config_path": {
                            "type" : "string",
                            "description": "Path to input config",
                            "default": "ionorb_stl2d_boris.config"
                        },
                    },
                    "additionalProperties": False
                }
            },
            "additionalProperties": False
        },    
    },
    "additionalProperties": False
}

In [27]:
fc.update_flow(flow_id, flow_definition=flow_definition, input_schema=schema)

GlobusHTTPResponse({"id":"b969db26-c94b-47dc-adea-4daa6a2e7924","definition":{"Comment":"Transfer and run Fusion ionorb","StartAt":"TransferFiles","States":{"TransferFiles":{"Comment":"Transfer files","Type":"Action","ActionUrl":"https://actions.automate.globus.org/transfer/transfer","Parameters":{"source_endpoint_id.$":"$.input.source.id","destination_endpoint_id.$":"$.input.destination.id","transfer_items":[{"source_path.$":"$.input.source.path","destination_path.$":"$.input.destination.path","recursive.$":"$.input.recursive_tx"}]},"ResultPath":"$.TransferFiles","WaitTime":300,"Next":"Fusion"},"Fusion":{"Comment":"Fusion","Type":"Action","ActionUrl":"https://compute.actions.globus.org/","Parameters":{"endpoint.$":"$.input.compute_endpoint_id","function.$":"$.input.compute_function_id","kwargs.$":"$.input.compute_function_kwargs"},"ResultPath":"$.FusionOutput","WaitTime":3600,"End":true}}},"input_schema":{"required":["input"],"properties":{"input":{"type":"object","required":["source"

## Try it!

In [28]:
print(f'https://app.globus.org/flows/{flow_id}/start')

https://app.globus.org/flows/b969db26-c94b-47dc-adea-4daa6a2e7924/start


Channel closed.  Reopening.
  <Channel number=1 CLOSED conn=<SelectConnection CLOSED transport=None params=<URLParameters host=compute.amqps.globus.org port=5671 virtual_host=/ ssl=True>>>
  (Stream connection lost: ConnectionResetError(54, 'Connection reset by peer'))
Channel closed.  Reopening.
  <Channel number=1 CLOSED conn=<SelectConnection CLOSED transport=None params=<URLParameters host=compute.amqps.globus.org port=5671 virtual_host=/ ssl=True>>>
  (Stream connection lost: ConnectionResetError(54, 'Connection reset by peer'))
Channel closed.  Reopening.
  <Channel number=1 CLOSED conn=<SelectConnection CLOSED transport=None params=<URLParameters host=compute.amqps.globus.org port=5671 virtual_host=/ ssl=True>>>
  (Stream connection lost: ConnectionResetError(54, 'Connection reset by peer'))
Channel closed.  Reopening.
  <Channel number=1 CLOSED conn=<SelectConnection CLOSED transport=None params=<URLParameters host=compute.amqps.globus.org port=5671 virtual_host=/ ssl=True>>>
 