# 3-Step Flow for Astronomy Web Prototype

This notebook contains the steps to create and deploy a Globus Flow that performs to following tasks
* Run computation on ALCF Polaris using Globus Compute
* Ingest results metadata into a Globus Search Index (to be linked to ACDC portal)
* Move latest computation results back to the prototype server's environment

In [None]:
# Import packages
import uuid
import json
import time

# Import Globus software developper kit
import globus_sdk
from globus_automate_client import FlowsClient

# Import Globus scopes
from globus_sdk.scopes import SearchScopes
from globus_sdk.scopes import TransferScopes
from globus_sdk.scopes import FlowsScopes

## Globus UUIDs

In [None]:
# My laptop's endpoint ID
laptop_collection_id = "..."  # My work laptop

# Eagle astro_portal guest collection ID
eagle_collection_id = "..."  # astro_portal

# Globus Compute endpoint ID
compute_endpoint_id = "..."  # EDTB

# Globus Compute function ID
compute_function_id = "..."  # run_gce()

# Globus Search index ID
search_index_id = "..."  # astro_portal_index

## Auth Tokens

In [None]:
# Set Native App client
CLIENT_ID = '...'  # DataServiceDev - TransferTest
native_auth_client = globus_sdk.NativeAppAuthClient(CLIENT_ID)

# Initialize Globus Auth flow with relevant scopes
auth_kwargs = {"requested_scopes": [FlowsScopes.manage_flow, SearchScopes.all]}
native_auth_client.oauth2_start_flow(**auth_kwargs)

# Explicitly start the flow
print(f"Login Here:\n\n{native_auth_client.oauth2_get_authorize_url()}")

In [None]:
# Add the authorization code that you got from Globus
auth_code = "..."

# Exchange code for access tokens
response_token = native_auth_client.oauth2_exchange_code_for_tokens(auth_code)

# Split tokens based on their resource server
# This is the token that allows to create a flow client
# but is before the flow gets authorized, after which 
# you get another (more powerful) access token code
tokens = response_token.by_resource_server

In [None]:
# Search
tokens["search.api.globus.org"]

In [None]:
# Transfer
tokens["transfer.api.globus.org"]

## Flow Client

In [None]:
# Create a variable for storing flow scope tokens. Each newly deployed flow scope needs to be authorized separately,
# and will have its own set of tokens. Save each of these tokens by scope.
# Whatever is in this dictionary has passed the deployment authorization,
# meaning you do not need to authorize over and over each time you want to run the flow.
saved_flow_scopes = {}

# Add a callback to the flows client for fetching scopes. It will draw scopes from `saved_flow_scopes`
def get_flow_authorizer(flow_url, flow_scope, client_id):
    return globus_sdk.AccessTokenAuthorizer(access_token=saved_flow_scopes[flow_scope]['access_token'])

# Setup the Flow client, using Globus Auth tokens to access the Globus Flows service, and
# set the `get_flow_authorizer` callback for any new flows we authorize.
flows_authorizer = globus_sdk.AccessTokenAuthorizer(access_token=tokens['flows.globus.org']['access_token'])
flows_client = FlowsClient.new_client(CLIENT_ID, get_flow_authorizer, flows_authorizer)

## Flow Definition

 https://docs.globus.org/api/flows/hosted-action-providers/

In [None]:
# Define flow
flow_definition = {
    "Comment": "Launch calculation and transfer results to my laptop",
    
    # Define where the flow starts (which state/action is called first)
    "StartAt": "compute",
    
    # States list all individual actions
    "States": {
        
        # Compute action (launch GCE codes)
        # https://globus-compute.readthedocs.io/en/latest/actionprovider.html
        "compute": {
            "Comment": "Launch GCE codes",
            "Type": "Action",
            "ActionUrl": "https://compute.actions.globus.org",
            #"ActionScope": "https://auth.globus.org/scopes/b3db7e59-a6f1-4947-95c2-59d6b7a70f8c/action_all",
            
            # Define the enpoints to invoke the compute function (task)
            "Parameters": {
                "tasks": [{
                  "endpoint.$": "$.input.compute_endpoint_id",
                  "function.$": "$.input.compute_function_id",
                  "payload": {
                      "param_list.$": "$.input.compute_input_data.param_list",
                      "n_proc.$": "$.input.compute_input_data.n_proc"
                  }
              }]
            },
            
            # Define the path where compute output will be listed
            # This can be called in other Actions
            "ResultPath": "$.ComputeResults",
            
            # Maximum amount time to wait for the Action to complete [in seconds]
            # Action will be aborded if it takes too long
            "WaitTime": 300,
            
            # This calls the next action
            "Next": "TransferFiles",
        },
        
        # Transfer action (from Eagle to my laptop)
        # https://docs.globus.org/api/flows/hosted-action-providers/ap-transfer-transfer/
        "TransferFiles": {
            "Comment": "Transfer from Eagle to my laptop",
            "Type": "Action",
            "ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
            
            # Define source and destination paths for the transfer
            "Parameters": {
                "source_endpoint_id.$": "$.input.source.id",
                "destination_endpoint_id.$": "$.input.destination.id",
                "transfer_items": [
                    {
                        # Note: ..details.result is always an array, so you need to do .0
                        # Note: If adding two strings, use .= for key and ommit $. for value
                        # Note: If using 1 variable, use .$ for key and use $. for value
                        "source_path.=": "input.source.path + ComputeResults.details.result[0][0]",
                        "destination_path.=": "input.destination.path + ComputeResults.details.result[0][0]",
                        "recursive.$": "$.input.recursive_tx"
                    }
                ]
            },
            
            # Define the path where compute output will be listed
            "ResultPath": "$.dummy",
            
            # Maximum amount time to wait for the Action to complete in seconds
            # Action will be aborded if it takes too long
            "WaitTime": 300,
            
            # This calls the next action
            "Next": "IngestMetadata",
        },
        
        # Ingest metadata in existing Globus Search index
        "IngestMetadata": {
            "Comment": "Ingest metadata",
            "Type": "Action",
            "ActionUrl": "https://actions.globus.org/search/ingest",
            
            # Define metadata ingest parameters
            "Parameters": {
                "search_index.$": "$.input.search_ingest_document.search_index",
                "subject.$": "$.ComputeResults.details.result[0][1].subject",
                "visible_to.$": "$.ComputeResults.details.result[0][1].visible_to",
                "content.$": "$.ComputeResults.details.result[0][1].content",
                "id.$": "$.ComputeResults.details.result[0][1].id"
            },
            
            # Maximum amount time to wait for the Action to complete in seconds
            # Action will be aborded if it takes too long
            "WaitTime": 300,
            
            # This is how you end a flow
            "End": True
        }
    }
}

## Flow Schema

In [None]:
# Define input schema to validate user input
input_schema = {
    "required": [
        "input"
    ],
    "properties": {
        "input": {
            "type": "object",
            
            # This lists all input field that must be provided
            # They are the ones that starts with $. in the flow definition
            "required": [
                "compute_endpoint_id",
                "compute_function_id",
                "compute_input_data",
                "source",
                "destination",
                "recursive_tx",
                "search_ingest_document"
            ],
            
            # Define each individual "required" field
            "properties": {
                
                # Compute endpoint UUID
                "compute_endpoint_id": {
                    "type": "string",
                    "title": "Compute endpoint UUID.",
                    "description": "Endpoint at which computation will performed.",
                    "default": compute_endpoint_id
                },
                
                # Compute function UUID
                "compute_function_id": {
                    "type": "string",
                    "title": "Compute function UUID.",
                    "description": "Function (task) to be executed on compute endpoint.",
                    "default": compute_function_id
                },
                
                # Compute function input data
                "compute_input_data": {
                    "type": "object",
                    "title": "Input data required by compute function.",
                    "description": "Compute function input data.",
                    "required": [
                        "param_list",
                        "n_proc"
                    ],
                    "properties": {
                        "param_list": {
                            "type": "array",
                            "description": "List of input dictionaries (list of GCE input parameters).",
                        },
                        "n_proc": {
                            "type": "integer",
                            "description": "Number of core for running task in parallel.",
                            "default": 1
                        },
                    },
                    "additionalProperties": False
                },
                
                # Souce endpoint where data will be transfered from
                # Note: Paths MUST end with a slash
                "source": {
                    "type": "object",
                    "title": "Select source collection and path.",
                    "description": "Source collection and path.",
                    "format": "globus-collection",
                    
                    # Required "sub" fields
                    "required": [
                        "id",
                        "path"
                    ],
                    
                    # Define each required "sub" field
                    "properties": {
                        "id": {
                            "type": "string",
                            "format": "uuid",
                            "default": eagle_collection_id
                        },
                        "path": {
                            "type": "string"
                        }
                    },
                    "additionalProperties": False
                },

                # Destination endpoint where data will be transfered to
                # Note: Paths MUST end with a slash
                "destination": {
                    "type": "object",
                    "title": "Select destination collection and path.",
                    "description": "The destination collection and path.",
                    "format": "globus-collection",
                    
                    # Required "sub" fields
                    "required": [
                        "id",
                        "path"
                    ],
                    
                    # Define each required "sub" field
                    "properties": {
                        "id": {
                            "type": "string",
                            "format": "uuid",
                            "default": laptop_collection_id
                        },
                        "path": {
                            "type": "string"
                        }
                    },
                    "additionalProperties": False
                },
                
                # Whether transfer is recursive
                "recursive_tx": {
                    "type": "boolean",
                    "title": "Recursive transfer.",
                    "description": "Whether or not to transfer recursively, true when transferring directories.",
                    "default": True,
                },
                
                "search_ingest_document": {
                    "type": "object",
                    "title": "Search ingest document",
                    "description": "The subject and metadata to be ingested into Globus Search",
                    "required": [
                        "search_index"
                    ],
                    "properties": {
                        "search_index": {
                            "type": "string",
                            "title": "Globus Search Index ID",
                            "description": "The UUID of the Globus Search index that will hold this metadata",
                            "default": search_index_id
                        }
                    }
                }
            },
            "additionalProperties": False
        }
    },
    "additionalProperties": False
}

## Register Flow

In [None]:
# Deploy the flow
flow = flows_client.deploy_flow(
  flow_definition, 
  title = f"3-step-astro_portal",
  input_schema = input_schema,
)

# Store flow information
flow_id = flow['id']
flow_scope = flow['globus_auth_scope']
flow_name = flow['title']

## Authorize Flow

In [None]:
# Once deployed, the flow needs to be authorized
# If the flow scope is already saved, we don't need a new one.
if flow_scope not in saved_flow_scopes:
#if True:
    
    # Do a native app authentication flow and get tokens that include the newly deployed flow scope
    native_auth_client = globus_sdk.NativeAppAuthClient(CLIENT_ID)
    native_auth_client.oauth2_start_flow(requested_scopes=flow_scope)
    print(f"Login Here:\n\n{native_auth_client.oauth2_get_authorize_url()}")
    
    # Authenticate and come back with your authorization code; paste it into the prompt below.
    auth_code = input('Authorization Code: ')
    token_response = native_auth_client.oauth2_exchange_code_for_tokens(auth_code)
    
    # Save the new token in a place where the flows client can retrieve it.
    saved_flow_scopes[flow_scope] = token_response.by_scopes[flow_scope]
    
    # These are the saved scopes for the flow
    print(json.dumps(saved_flow_scopes, indent=2))

## Print Flow Details

In [None]:
print("CLIENT_ID:", CLIENT_ID)
print("Flow name:", flow_name)
print("Flow access token:", saved_flow_scopes[flow_scope]['access_token'])
print("Flow ID:", flow_id)
print("Flow auth scope:", flow_scope)

## Test Flow

In [None]:
# Dummy set of GCE parrameters
#param_list = []
#for i in range(3):
#    param_list.append({"nb_1a_per_m":(i+1), "nb_nsm_per_m":i, "sfe":3.51e-10,
#                       "table":"yield_tables/agb_and_massive_stars_C15_N13_0_0_HNe.txt"})

nb_1a_list = [0.0001, 0.00021544, 0.00046416, 0.001, 0.00215443,
      0.00464159, 0.01, 0.02154435, 0.04641589, 0.1]
nb_nsm_list = [1.0e-06, 2.15443469e-06, 4.64158883e-06, 1.0e-05,
      2.15443469e-05, 4.64158883e-05, 1.0e-04, 2.15443469e-04, 4.64158883e-04, 1.0e-03]

param_list = []
for nb_1a in nb_1a_list:
    for nb_nsm in nb_nsm_list:
        param_list.append({"nb_1a_per_m":nb_1a, 
                           "nb_nsm_per_m":nb_nsm,
                           "ns_merger_on":True,
                           "nsm_dtd_power":[1e8,1e9,-1],
                           "m_ej_nsm":2e-2,
                           "t_star":1.0,
                           "sfe":3.78e-9,
                           "table":"yield_tables/agb_and_massive_stars_nugrid_MESAonly_fryer12delay.txt"})

# Define flow inputs
flow_input = {
    "input": {

        # Compute input
        "compute_endpoint_id": compute_endpoint_id,
        "compute_function_id": compute_function_id,
        "compute_input_data": {
            "param_list": param_list,
            "n_proc": 32
        },

        # Transfer input
        "source": {
            "id": eagle_collection_id,
            "path": "./"
        },
        "destination": {
            "id": laptop_collection_id,
            "path": "/Users/benoitcote/Desktop/Globus/gce_data/"
        },

        # False if transfer is just one file
        # True if transfer is a whole directory
        "recursive_tx": False,

        # Globus search ID
        "search_ingest_document": {
            "search_index": search_index_id
        }
    }
}

In [None]:
# Run the flow
flow_action = flows_client.run_flow(
  flow_id = flow_id,
  flow_scope = flow_scope,
  flow_input = flow_input,
  label="test 3-step flow",
)

# Get flow execution parameters
flow_action_id = flow_action['action_id']
flow_status = flow_action['status']
print(f"Flow can be monitored in the webapp below: \nhttps://app.globus.org/runs/{flow_action_id}")
print(f"Flow action started with ID: {flow_action_id} - Status: {flow_status}")

In [None]:
# Poll the Flow service to check on the status of the flow
while flow_status == 'ACTIVE':
    time.sleep(5)
    flow_action = flows_client.flow_action_status(flow_id, flow_scope, flow_action_id)
    flow_status = flow_action['status']
    print(f'Flow status: {flow_status}')
    
# Flow completed (hopefully successfully!)
print(json.dumps(flow_action.data, indent=2))