# Tomopy Reconstruction Demo for ALS at ALCF

This notebook demonstrates how to perform tomography reconstructions using Tomopy for ALS data on Polaris at ALCF using [Globus Flows](https://www.globus.org/globus-flows-service).  In this example, Globus flows will launch the application on Polaris and then transfer results from the Eagle filesystem.

This notebook can be run from anywhere, it only requires a local installation of Globus software (described below) and access to a Globus Compute Endpoint setup by the user on Polaris that has access to tomopy (also described below).

This demo uses Globus Flows and Globus Compute.  Globus Flows is a reliable and secure platform for orchestrating and performing research data management and analysis tasks. A flow is often needed to manage data coming from instruments, e.g., image files can be moved from local storage attached to a microscope to a high-performance storage system where they may be accessed by all members of the research project.  Globus Compute is a remote executor for tasks expressed as python functions that are sent to remote machines following a fire-and-forget model.

In this notebook we will first describe necessary setup tasks for the local environment and on Polaris; second, we will describe how to create and test a Globus Compute function that can remotely launch a tomopy task on Polaris compute nodes; and third, we will describe how to incorporate this function with a Globus Flow that coordinates the execution of the tomopy task with a data transfer step.

More examples of creating and running Globus Flows can be found on Globus' [demo instance](https://jupyter.demo.globus.org/hub/).


## Tomopy on Polaris

Tomopy has been installed in a conda environment on Polaris at this path which is accessible to members of the IRIBeta allocation: `/eagle/IRIBeta/als/env/tomopy`.

## Local Setup

This notebook can be run from anywhere.  The only requirement is a local environment, such as a conda environment, that has python 3.11 installed along with the Globus packages `globus_compute_sdk` and `globus_cli`.  If you have a local installation of conda you can set up an environment that can run this notebook with these steps:

```bash
conda create -n globus_env python==3.11
conda activate globus_env
pip install globus_compute_sdk globus_cli python-dotenv
```

Note that the tomopy environment on Polaris contains python 3.11. It is therefore necessary for this environment on your local machine to have a python version close to this version.

## Create a Globus Compute Endpoint on Polaris

The first step for a user to execute applications on Polaris through the Globus service is to create a Globus compute endpoint on Polaris.  This requires the user to do a one-time setup task to configure the endpoint.

In a shell seperate from this notebook, log into Polaris.  Copy the file included with this notebook called `template_config.yaml` to the Polaris filesystem (doesn't matter where).  Inside `template_config.yaml` you should see options setting your project name (`IRIBeta`), the queue you will use (`debug`), and commands that activate a pre-made conda environment on Polaris that can run tomopy.

In your shell on Polaris, execute the following commands:

```bash
module use /soft/modulefiles
module load conda
conda activate /eagle/IRIBeta/als/env/tomopy
globus-compute-endpoint configure --endpoint-config template_config.yaml als_endpoint
globus-compute-endpoint start als_endpoint
globus-compute-endpoint list
```
This will create an endpoint and display its status.  Its status should be listed as `running`.  There will also be displayed a unique Endpoint ID in the form of a UUID.  Copy the file `.env.example` to `.env`. Then edit the file aast the endpoint id in Copy that ID and paste it below as the value for `GLOBUS_FLOW_ENDPOINT_ID`. `.env` is in `.gitignore` so won't be accidently pushed to github.

In [1]:
# reload accompanying python code
%load_ext autoreload
%autoreload 2


Your endpoint is now active as a daemon process running on the Polaris login node.  It is communicating with the Globus service and waiting for work.  If you ever want to stop the process you can run:
```bash
globus-compute-endpoint stop als_endpoint
```
Your process may need to be periodically restarted, for example after Polaris comes back from a maintance period.

If you ever need to make changes to your endpoint configuration, you can find the settings file in `~/.globus_compute/als_endpoint/config.yaml`.  Edit this file and then restart the endpoint with `globus-compute-endpoint restart als_endpoint` to make the changes active.

This endpoint will submit work to the `debug` queue since this demo is for learning purposes.  In production, ALS will be able to submit work to the [demand queue](https://docs.alcf.anl.gov/polaris/running-jobs/#queues) which will give immediate access to Polaris compute nodes.

## Create a Function

We first need to create a python function that wraps around the application call.  We will call it `reconstruction_wrapper` that takes as an input the directory on the eagle file system where the input data are located, `rundir`, and the name of the input `parametersfile`.

In [8]:
def reconstruction_wrapper(rundir, parametersfile="inputOneSliceOfEach.txt"):
    import os
    import time
    import subprocess

    start = time.time()

    # Move to directory where data are located
    os.chdir(rundir)

    # Run reconstruction.py
    command = f"python /eagle/IRIBeta/als/example/reconstruction.py {parametersfile}"
    res = subprocess.run(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    end = time.time()
    
    return f"Reconstructed data specified in {parametersfile} in {end-start} seconds;\n {res}"

## Authenticate Client and Test Function

We will now instantiate a Globus Compute client to test the function.  Globus will prompt the user for their credentials if running for the first time.  The user should have a Globus account through their ALCF account and should validate with their ALCF credentials.

In [2]:
import os

import dotenv
from globus_compute_sdk import Client, Executor

dotenv.load_dotenv()

gc = Client()
# polaris_endpoint_id = os.getenv("GLOBUS_FLOW_ENDPOINT_ID")
polaris_endpoint_id = "b33b3f5f-eb14-4bbd-89ed-9333a6ece6b6"
gce = Executor(endpoint_id=polaris_endpoint_id)

print(os.getenv("GLOBUS_CLIENT_ID"))

c27ef6c0-b404-441e-9b76-102bf35b2974


In [9]:
future = gce.submit(reconstruction_wrapper, "/eagle/IRIBeta/als/example")
print(future.result())

(worker SDK version: 2.12.0; worker OS: Linux-5.14.21-150500.55.49-default-x86_64-with-glibc2.31)


Reconstructed data specified in inputOneSliceOfEach.txt in 17.838907718658447 seconds;
 CompletedProcess(args=['python', '/eagle/IRIBeta/als/example/reconstruction.py', 'inputOneSliceOfEach.txt'], returncode=0, stdout=b"Read user input:\n{'filename': '20220923_154543_ethan_robin_SiCircuit.h5', 'sinoused': [-1, 1, 1]}\nRead user input:\n{'filename': '20220923_160531_ethan_robin_climbing-vine_x00y00.h5', 'sinoused': [-1, 1, 1]}\nRead user input:\n{'filename': '20220923_160531_ethan_robin_climbing-vine_x00y01.h5', 'sinoused': [-1, 1, 1]}\nRead user input:\n{'filename': '20220923_160531_ethan_robin_climbing-vine_x00y02.h5', 'sinoused': [-1, 1, 1]}\nRead user input:\n{'filename': '20220923_160531_ethan_robin_climbing-vine_x00y03.h5', 'sinoused': [-1, 1, 1]}\nRead user input:\n{'filename': '20220923_160531_ethan_robin_climbing-vine_x00y04.h5', 'sinoused': [-1, 1, 1]}\nRead user input:\n{'filename': '20220923_160531_ethan_robin_climbing-vine_x00y05.h5', 'sinoused': [-1, 1, 1]}\nRead user inpu

## Register Function

Now that the function has been tested and works, register the function with the Globus service.  This will allow the user to call the function from within a flow.

In [10]:
reconstruction_func = gc.register_function(reconstruction_wrapper)

print(reconstruction_func)

9916e300-fb19-4033-be3b-9c6ab12e0432


In [None]:
future = gce.submit_to_registered_function(args=["/eagle/IRIBeta/als/example"], function_id=reconstruction_func)
future.result()

'Reconstructed data specified in inputOneSliceOfEach.txt in 17.712284088134766 seconds;\n CompletedProcess(args=[\'python\', \'/eagle/IRIBeta/als/example/reconstruction.py\', \'inputOneSliceOfEach.txt\'], returncode=0, stdout=b"Read user input:\\n{\'filename\': \'20220923_154543_ethan_robin_SiCircuit.h5\', \'sinoused\': [-1, 1, 1]}\\nRead user input:\\n{\'filename\': \'20220923_160531_ethan_robin_climbing-vine_x00y00.h5\', \'sinoused\': [-1, 1, 1]}\\nRead user input:\\n{\'filename\': \'20220923_160531_ethan_robin_climbing-vine_x00y01.h5\', \'sinoused\': [-1, 1, 1]}\\nRead user input:\\n{\'filename\': \'20220923_160531_ethan_robin_climbing-vine_x00y02.h5\', \'sinoused\': [-1, 1, 1]}\\nRead user input:\\n{\'filename\': \'20220923_160531_ethan_robin_climbing-vine_x00y03.h5\', \'sinoused\': [-1, 1, 1]}\\nRead user input:\\n{\'filename\': \'20220923_160531_ethan_robin_climbing-vine_x00y04.h5\', \'sinoused\': [-1, 1, 1]}\\nRead user input:\\n{\'filename\': \'20220923_160531_ethan_robin_climb

## Incorporate Function into a Flow

Now we will incorporate the Tomopy function into an example flow to run Tomopy on Polaris in coordination with other tasks.

This example simply includes two steps:
1. Run Tomopy via Globus Compute
2. Transfer results from the eagle file system to the home file system.

This can easily be extended to include other steps to import data, perform postprocessing, or publish and catalog results.

This is the flow definition for this two-step flow.

In [None]:
flow_definition = {
    "Comment": "Run Reconstruction and transfer results",
    "StartAt": "Reconstruction",
    "States": {
        "Reconstruction": {
            "Comment": "Reconstruction with Tomopy",
            "Type": "Action",
            "ActionUrl": "https://compute.actions.globus.org/fxap",
            "Parameters": {
                "endpoint.$": "$.input.compute_endpoint_id",
                "function.$": "$.input.compute_function_id",
                "kwargs.$": "$.input.compute_function_kwargs"
            },
            "ResultPath": "$.ReconstructionOutput",
            "WaitTime": 3600,
            "Next": "Transfer_Out"
        },
        "Transfer_Out": {
            "Comment": "Transfer files",
            "Type": "Action",
            "ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
            "Parameters": {
                "source_endpoint_id.$": "$.input.source.id",
                "destination_endpoint_id.$": "$.input.destination.id",
                "transfer_items": [
                    {
                        "source_path.$": "$.input.source.path",
                        "destination_path.$": "$.input.destination.path",
                        "recursive.$": "$.input.recursive_tx"
                    }
                ]
            },
            "ResultPath": "$.TransferFiles",
            "WaitTime": 300,
            "End": True
        },
    }
}

Next, we need to provide a flows client id to run the flow.  For now we will use the demo client id, but a project should create a client for their work.

In [32]:
import os
import time
import globus_sdk

from utils import get_flows_client, get_specific_flow_client

# Tutorial client ID
# We recommend replacing this with your own client for any production use-cases
# Create your own at developers.globus.org
# CLIENT_ID = "61338d24-54d5-408f-a10d-66c06b59f6d2"

Now get an instance of the flows client.  You will be asked to validate credentials with the Globus service.  

In [34]:
fc = get_flows_client()

!!!!!!!!!


(worker SDK version: 2.12.0; worker OS: Linux-5.14.21-150500.55.49-default-x86_64-with-glibc2.31)


AuthAPIError: ('POST', 'https://auth.globus.org/v2/oauth2/token', 'Basic', 401, 'UNAUTHORIZED', 'Basic auth failed', 'ec293f4b-7926-41e3-84be-ea88730c21b4')

Next, create a flow.  You will again be asked to validate credentials with the globus service.

In [9]:
flow = fc.create_flow(definition=flow_definition, title="Reconstruction flow", input_schema={})
flow_id = flow['id']
print(flow)
flow_scope = flow['globus_auth_scope']
print(f'Newly created flow with id:\n{flow_id}\nand scope:\n{flow_scope}')

{
  "id": "91ce59ba-cbf1-45e3-b4f3-c86aeaacda42",
  "title": "Reconstruction flow",
  "subtitle": "",
  "description": "",
  "definition": {
    "StartAt": "Reconstruction",
    "States": {
      "Reconstruction": {
        "Parameters": {
          "endpoint.$": "$.input.compute_endpoint_id",
          "function.$": "$.input.compute_function_id",
          "kwargs.$": "$.input.compute_function_kwargs"
        },
        "Type": "Action",
        "Comment": "Reconstruction with Tomopy",
        "Next": "Transfer_Out",
        "ActionUrl": "https://compute.actions.globus.org/fxap",
        "ResultPath": "$.ReconstructionOutput",
        "WaitTime": 3600
      },
      "Transfer_Out": {
        "Parameters": {
          "source_endpoint_id.$": "$.input.source.id",
          "destination_endpoint_id.$": "$.input.destination.id",
          "transfer_items": [
            {
              "source_path.$": "$.input.source.path",
              "destination_path.$": "$.input.destination.path",


## Run the Flow

Now we need to create a set of inputs for the flow, that will follow the json structure below.  The key elements that are needed are:
1. The Globus compute endpoint id
2. The Globus compute function id
3. Inputs to the Globus compute function
4. The endpoint and path from which to transfer data
5. The endpoint and path to which to transfer data

The user should choose a destination path and endpoint that they have access to.  As a sample endpoint, below the ALCF /home space is used.  To use this endpoint, the user should set a path in their home space (but remove the leading `/home` so that it will appear like `/csimpson/als_example`).

In [6]:
# lcfhome_transfer_endpoint_id = "9032dd3a-e841-4687-a163-2720da731b5b"
nersc_transfer_endpoint_id = "6bdc7956-fc0f-4ad2-989c-7aa5ee643a79"
destination_path_on_alcfhome = "/global/cfs/cdirs/als/data_mover/iribeta/" # Note that paths for transfers on the home endpoint should remove the leading /home

The example problem is setup in a directory on the Eagle filesystem at ALCF, so that endpoint and path is used as the source of the data.  All the endpoints and functions are added to a `flow_input`.

In [12]:
eagle_transfer_endpoint_id = "05d2c76a-e867-4f67-aa57-76edeb0beda0"
source_path_on_eagle = "/IRIBeta/als/example" # Note that paths for transfers on the eagle endpoint should remove the leading /eagle

function_inputs = {"rundir": "/eagle/IRIBeta/als/example"}

flow_input = {
    "input": {
      "source": {
        "id": eagle_transfer_endpoint_id,
        "path": source_path_on_eagle
      },
      "destination": {
        "id": nersc_transfer_endpoint_id,
        "path": destination_path_on_alcfhome
      },
      "recursive_tx": True,
      "compute_endpoint_id": polaris_endpoint_id,
      "compute_function_id": reconstruction_func,
      "compute_function_kwargs": function_inputs
    }
}

In [15]:
# collection_ids should contain all the transfer endpoint ids involved in the flow
collection_ids = [flow_input["input"]["source"]["id"], flow_input["input"]["destination"]["id"]]
from utils import get_specific_flow_client
run_client = get_specific_flow_client("91ce59ba-cbf1-45e3-b4f3-c86aeaacda42", collection_ids=collection_ids)

In [17]:
flow_action = run_client.run_flow(flow_input, label="ALS run", tags=["demo", "als", "tomopy"])
flow_run_id = flow_action['action_id']

print(f'Flow action started with id: {flow_run_id}')

print(f"Monitor your flow here: https://app.globus.org/runs/{flow_run_id}")
import time
flow_status = flow_action['status']
while flow_status in ['ACTIVE', 'INACTIVE']:
    time.sleep(10)
    flow_action = fc.get_run(flow_run_id)
    flow_status = flow_action['status']
    print(f'Flow status: {flow_status}')
print(f'Final status: {flow_status}')

Flow action started with id: da9e769b-c686-4823-8646-eeaa7f7db6b8
Monitor your flow here: https://app.globus.org/runs/da9e769b-c686-4823-8646-eeaa7f7db6b8


NameError: name 'fc' is not defined

## Next Steps

Additional steps to the flow can be included if needed (for example an initial transfer step before tomopy is run).  The flow can also be adapted to be executed through the Globus web UI by adding a schema.  More information on flows can be found [here](https://docs.globus.org/guides/tutorials/flow-automation/create-a-flow/).

The Globus compute endpoint configuration can be adapted to the user's needs.  Multiple instances of tomopy can be run in parallel on each node by adapting the `config.yaml` file.

The content in this notebook can also be adapted to be run in a python script or a bash script.  More information can be found in the documentation for the [globus python api](https://globus-sdk-python.readthedocs.io/en/stable/services/flows.html) or the documentation for the [cli api](https://docs.globus.org/cli/reference/#globus_flows_commands).