# Flow with a Compute Function

This notebook demonstrates how to construct a Globus Flow that includes a Globus Compute function call. Our flow will **scan a folder** in a Globus collection and **ingest the list of files** into a new entry in a search index. We'll use Globus Compute to **prepare the folder listing** for ingest into the index.

The outline of the notebook is as follows.

1. Import required Python modules and specify the search index and compute endpoint to be used.
2. Authenticate with Globus and create Globus SDK FlowsClient object.
3. **Define the function** and **register it** with Globus Compute (to get a UUID).
4. **Define and deploy** the Flow.
5. **Run the flow** with a Globus collection and path.

Each time the flow is run, a new entry will be created in the search index. We'll use a **simple web application** to display the contents of the index so we can see entries appear as tutorial participants run their flows!

## Important: Join the Tutorial Users group!

Before using this notebook, make sure you've joined the 
[Tutorial Users](https://app.globus.org/groups/50b6a29c-63ac-11e4-8062-22000ab68755/about) 
group! 

* Click the link above and then click "Join this Group." 

## 1. Import modules and specify Search Index and Compute Endpoint

NOTE: The `source_collection` is one of our Globus tutorial collections. Because it's a mapped collection, it requires a special scope during authentication. Feel free to change this to any other mapped collection that you have access to. The flow will also work with any guest collection (aka "shared collection") without changing this setting.

In [None]:
import sys
import os
import time
import json
import uuid
import pickle
import base64

import globus_sdk
import globus_sdk.scopes
import globus_compute_sdk

# ID of this tutorial notebook as registered with Globus Auth
CLIENT_ID = 'f794186b-f330-4595-b6c6-9c9d3e903e47'

# This search index will work for you, but free to replace it with your own search index
search_index_id = "2c635508-f599-4547-a0fc-536db83d1dee"   # Created and shared by tutorial instructor

# Feel free to replace the collection UUIDs below with those of your own collections
source_collection = "6c54cade-bde5-45c1-bdea-f4bd71dba2cc"  # "Globus Tutorial Collection 1"

# This compute endpoint will work for you, but feel free to replace it with your own endpoint
compute_endpoint_id = '4b116d3c-1703-4f8f-9f6f-39921e5864df' # "Globus Compute Tutorial Endpoint"

## 2. Authenticate and create a FlowClient and Globus Compute Client object

If you're using this notebook with the Globus JupyterHub, the following code will get your login information from the JupyterHub and you won't have to login again. Otherwise, you'll be prompted to login to Globus.

* We'll use the `flows_client` object (created by `globus_sdk.FlowsClient()`) to deploy our flow.
* We'll use the `gcc` object (created by `globus_compute_sdk.Client()`) to register our Compute function.

In [None]:
# Create transfer scope with data_access scope dependency from the source mapped collection.
transfer_scope = globus_sdk.scopes.TransferScopes.make_mutable("all")
data_access_scope = globus_sdk.scopes.GCSCollectionScopeBuilder(source_collection).data_access
transfer_scope.add_dependency(data_access_scope)

# Get Globus Auth token data from the JupyterHub environment. If tokens already exist from logging into
# jupyter.demo.globus.org, tokens from the environment can be used instead. Otherwise, do a Native App flow.
globus_data_raw = os.getenv("GLOBUS_DATA")
if globus_data_raw:
    tokens = pickle.loads(base64.b64decode(os.getenv('GLOBUS_DATA')))['tokens']
else:
    # Do a native app authentication flow to get tokens that allow us to interact with the Globus Flows service
    scopes = [
        "openid",
        "profile",
        "email",
        transfer_scope,
        globus_sdk.FlowsClient.scopes.manage_flows,
        globus_sdk.FlowsClient.scopes.run_manage,
    ]
    native_auth_client = globus_sdk.NativeAppAuthClient(CLIENT_ID)
    native_auth_client.oauth2_start_flow(requested_scopes=scopes)
    print(f"Login Here:\n\n{native_auth_client.oauth2_get_authorize_url()}")
    
    # Authenticate and come back with your authorization code; paste it into the prompt below.
    auth_code = input('Authorization Code: ')
    response = native_auth_client.oauth2_exchange_code_for_tokens(auth_code)
    
    # Save the new token in a place where the flows client can retrieve it.
    tokens = response.by_resource_server
    
    # These are the saved scopes for the flow
    print(json.dumps(tokens, indent=2))

# Uncomment the line below to introspect tokens
#print(json.dumps(tokens, indent=2))

# Create a variable for storing flow scope tokens. Each newly deployed flow scope needs to be authorized separately,
# and will have its own set of tokens. Save each of these tokens by scope.
saved_flow_scopes = {}

# Add a callback to the flows client for fetching scopes. It will draw scopes from `saved_flow_scopes`
def get_flow_authorizer(flow_id):
    return globus_sdk.AccessTokenAuthorizer(access_token=saved_flow_scopes[flow_id]['access_token'])

# Setup the Flow client, using tokens from our Jupyterhub login to access the Globus Flows service, and
# set the `get_flow_authorizer` callback for any new flows we authorize.
flows_authorizer = globus_sdk.AccessTokenAuthorizer(access_token=tokens['flows.globus.org']['access_token'])
flows_client = globus_sdk.FlowsClient(authorizer=flows_authorizer)

# Setup the Globus Compute client
gcc = globus_compute_sdk.Client()     # get a Globus Compute client

## 3.a Define the Compute function

We'll use the Transfer Action Provider's "ls" action to scan the contents of the folder. The output of the action looks like this:

```
    {
        "DATA": [
          {
            "DATA_TYPE": "file",
            "group": "tutorial",
            "last_modified": "2024-09-19 17:11:23+00:00",
            "link_group": null,
            "link_last_modified": null,
            "link_size": null,
            "link_target": null,
            "link_user": null,
            "name": "file1.txt",
            "permissions": "0644",
            "size": 4,
            "type": "file",
            "user": "tutorial"
          },
          {
            "DATA_TYPE": "file",
            "group": "tutorial",
            "last_modified": "2024-09-19 17:11:23+00:00",
            "link_group": null,
            "link_last_modified": null,
            "link_size": null,
            "link_target": null,
            "link_user": null,
            "name": "file2.txt",
            "permissions": "0644",
            "size": 4,
            "type": "file",
            "user": "tutorial"
          }
        ],
        "DATA_TYPE": "file_list",
        "absolute_path": "/home/share/godata/",
        "endpoint": "6c54cade-bde5-45c1-bdea-f4bd71dba2cc",
        "length": 2,
        "path": "/home/share/godata/",
        "rename_supported": true,
        "symlink_supported": false,
        "total": 2
      }
```
We want to translate this into a much simpler format so it can be added to a search index. We want the search entry to look like this:

```
      {
        "path": "/home/share/godata/",
        "file_list": [
          "file1.txt",
          "file2.txt",
        ]
      }

```

In [None]:
def simplify_file_list(**folder_contents):
    import time    # so we can add a timestampt
    files = []
    for file in folder_contents["DATA"]:
        files.append(file["name"])
    entry = {
        "time_published": time.asctime(),
        "path": folder_contents["path"],
        "file_list": files
    }
    return entry

## 3.b Register the function with Globus Compute

Now we'll use our Globus Compute Client (`gcc`) to register the function with Globus Compute and get a UUID that we can use in our Flow definition.

NOTE: When you execute this, you'll probably be asked to login to Globus and allow the notebook to use Globus Compute on your behalf.

In [None]:
function_uuid = gcc.register_function(simplify_file_list)
print(f"function uuid = {function_uuid}")

## 4.a Define the Flow

Now, we'll define our flow. It has three states:
1. ListFiles (Globus Transfer action provider)
2. CreateEntry (Globus Compute action provider with our function)
3. AddEntry (Globus Search action provider)

Pay attention to how the output of the `ListFiles` state gets passed into the `CreateEntry` state, and the output from `CreateEntry` gets passed to `AddEntry`. Every time the flow is run, each state adds its output to the run's state (`$`) and the following state takes what it needs for its input. When a run completes, you can inspect the inputs and outputs of each state in the Event Log tab to see the details!

In [None]:
# Define flow
flow_definition = {
    "Comment": "Add a new entry in a search index containing a summary of the contents of a folder",
    "StartAt": "ListFiles",
    "States": {
        "ListFiles": {
            "Comment": "List the contents of the folder specified by the flow inputs",
            "Type": "Action",
            "ActionUrl": "https://transfer.actions.globus.org/ls",
            "Parameters": {
                "endpoint_id.$": "$.input.source.id",
                "path.$": "$.input.source.path"
            },
            "ResultPath": "$.ls_result",
            "Next": "CreateEntry"
        },
        "CreateEntry": {
            "Comment": "Use Globus Compute to process the file listing and turn it into a search entry",
            "Type": "Action",
            "ActionUrl": "https://compute.actions.globus.org",
            "Parameters": {
                "endpoint": compute_endpoint_id,
                "function": function_uuid,
                "kwargs.$":  "$.ls_result.details"
            },
            "ResultPath": "$.CreateEntry",
            "Next": "AddEntry"
        },
        "AddEntry": {
            "Comment": "Add the new search entry to the search index",
            "Type": "Action",
            "ActionUrl": "https://actions.globus.org/search/ingest",
            "Parameters": {
               "search_index": search_index_id,
               "subject.$": "$.input.entry_subject",
               "visible_to": ["public"],
               "content.$": "$.CreateEntry.details.results[0].output"
            },
            "ResultPath": "$.AddEntry",
            "End": True
        }
    }
}

## 4.b Define the input schema

All Flows require schemas, written in JSON Schema. The Input Schema is deployed with the ``flow_definition``.

When the flow is run, the input schema is used to:
* display an input entry form in the Globus web app, and
* validate that the flow's inputs are of the right types before proceeding.

In [None]:
# Define input schema
input_schema = {
    "required": [
        "input"
    ],
    "properties": {
        "input": {
            "type": "object",
            "required": [
                "source",
                "entry_subject"
            ],
            "properties": {
                "source": {
                    "type": "object",
                    "title": "Select source collection and path",
                    "description": "The source collection and path (path MUST end with a slash)",
                    "format": "globus-collection",
                    "required": [
                        "id",
                        "path"
                    ],
                    "properties": {
                        "id": {
                            "type": "string",
                            "format": "uuid"
                        },
                        "path": {
                            "type": "string"
                        }
                    },
                    "additionalProperties": False
                },
                "entry_subject": {
                    "type": "string",
                    "title": "Subject of the entry to be created",
                    "description": "Unique string for each entry in the search index"
                }
            },
            "additionalProperties": False
        }
    },
    "additionalProperties": False
}

## 4.c Deploy the flow

We'll use our `flows_client`, the `flow_definition`, and `input_schema` to deploy the flow. We'll create a random display name for the flow.

In [None]:
# Create the flow
# Set the flow's title so you can easily identify it
flow_title = f"Tutorial-List-Compute-Search-{str(uuid.uuid4())[:6]}"

# Deploy the flow!
flow = flows_client.create_flow(
    title=flow_title,
    definition=flow_definition,
    input_schema=input_schema
)
flow_id = flow['id']

# Add a dependency on the Transfer service, since we'll use it in the flow
flow_scope = globus_sdk.SpecificFlowClient(flow_id).scopes.make_mutable("user")
flow_scope.add_dependency(transfer_scope)

"""
# If you change the flow, you will need to update it.
# For example, to make this flow visibe to another user:
flow = flows_client.update_flow(
    flow_id=flow_id, 
    flow_viewers=[f"urn:globus:auth:identity:{identity_id}"]),
)
"""

print(f"Successfully created flow: '{flow_title}'")
print(f"(ID: {flow_id})")
print(f"Flow scope: {flow_scope}\n\n")
print(f"View the flow in the Web App: https://app.globus.org/flows/{flow_id}")
print(f"Note: You can start your flow directly from the Web App")

# 5. Run the flow

It's time to run our flow! Remember, this flow requires three inputs: a *unique subject string* for the search entry we're going to create, and a *source collection* and *source path* to scan. We need to set those inputs up and then call the Flows API to run the flow.

## 5.a Fetch user identity and create a unique subject string

Every entry in a search index must have a unique subject string to identify it. We'll use our username and a random string to create it. We just logged in, so we can get the username from the access token for the Globus Auth API.

In [None]:
# Create an Auth client so we can look up identities
auth_authorizer = globus_sdk.AccessTokenAuthorizer(access_token=tokens['auth.globus.org']['access_token'])
ac = globus_sdk.AuthClient(authorizer=auth_authorizer)

# Get the user's primary identity
primary_identity = ac.oauth2_userinfo()
identity_id = primary_identity['sub']

# Create a unique subject for the new search entry
new_entry_subject = f"{primary_identity['preferred_username']}/{str(uuid.uuid4())[:8]}"

print(f"Entry's subject will be \"{ new_entry_subject }\".")

## 5.b Define flow input

If your flow includes parameterized input, you must provide values for those properties when running the flow. Like the flow definition, flow input is defined as a JSON document. You must provide a value for each input property in your flow. (Input properties are part of the flow's "state" and can be accessed in a flow definition by prefixing values with `$.` and providing the path to the property, as seen in the flow definition above).

For the `MoveFiles` action, we must specify source and destination collection IDs and source and destination paths. For the `SetPermissions` action we must specify the collection ID, the type of entity to which we're granting permission, the entity's ID, and the permission (read or read/write).

In [None]:
# We already defined the unique subject string above.
# Now, we need the folder to be scanned. We need a source_collection and source_path.

# Feel free to replace the collection UUIDs below with those of your own collections
source_collection = "6c54cade-bde5-45c1-bdea-f4bd71dba2cc"  # "Globus Tutorial Collection 1"
# The following path exists on Globus Tutorial Collection 1. 
# Replace with another path, maybe "/"?, if you change the source_collection.
source_path = "/home/share/godata"   

flow_input = {
    "input": {
        # Transfer input
        "source": {
            "id": source_collection,
            "path": source_path
        },
        "entry_subject": new_entry_subject   # We just created this above, remember?
    }
}

## 5.c Authorize the flow

Once your flow has been created, in order to run it, you will need to authorize it to interact with other services on your behalf. Globus Flow service generates a dedicated scope for each flow. To give consent to this flow, we need to get a properly scoped access token (see `flow_scope` above, where we deployed the flow), and then we can use this token to execute the flow. Note that you will be required to consent again.

In [None]:
# If the flow scope is already saved, we don't need a new one.
if flow_id not in tokens:
    # Do a native app authentication flow and get tokens that include the newly deployed flow scope
    native_auth_client = globus_sdk.NativeAppAuthClient(CLIENT_ID)
    native_auth_client.oauth2_start_flow(requested_scopes=flow_scope)
    print(f"Login Here:\n\n{native_auth_client.oauth2_get_authorize_url()}")
    
    # Authenticate and come back with your authorization code; paste it into the prompt below.
    auth_code = input('Authorization Code: ')
    token_response = native_auth_client.oauth2_exchange_code_for_tokens(auth_code)
    
    # Save the new token in a place where the flows client can retrieve it.
    tokens[flow_id] = token_response.by_resource_server[flow_id]
    
    # These are the saved scopes for the flow
    print(json.dumps(tokens, indent=2))

## 5.d Run the flow

We're finally ready to run the flow. You can monitor and manage your flow runs from the Globus Web App (https://app.globus.org/runs)

Note: If you run the flow multiple times it will fail after the first run because, once the access rule is set on the collection/path, setting it again will fail. Run the code in the "Remove Access Rule" cell below to clear things up on the destination endpoint before running the flow again.

In [None]:
# Get a client for the flow
specific_flow_authorizer = globus_sdk.AccessTokenAuthorizer(
    access_token=tokens[flow_id]['access_token'],
)
specific_flow_client = globus_sdk.SpecificFlowClient(
    flow_id=flow_id,
    authorizer=specific_flow_authorizer,
)

# Run the flow
# Set a descriptive label for this flow run
run_label = f"Scan/Compute/Search tutorial run for {primary_identity['preferred_username']}"
run = specific_flow_client.run_flow(
  body=flow_input,
  label=run_label,
  tags=['tutorial', 'scan-compute-search-flow']
)

# Get run details
run_id = run['run_id']
run_status = run['status']
print("This flow can be monitored in the Web App:")
print(f"https://app.globus.org/runs/{run_id}")
print(f"Flow run started with ID: {run_id} - Status: {run_status}")

# Poll the Flow service to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')
    
# Run completed
print(json.dumps(run.data, indent=2))

## 6. View entries in the search index

Our flow adds a new entry to the search index defined at the beginning of this notebook. We can search the index to see all of these entries! Here we generate a link that opens the [Argonne Community Data Co-op](https://acdc.alcf.anl.gov/) web application to view the search index and its entries.

In [None]:
print("Click here to view the tutorial search index:")
print(f"https://acdc.alcf.anl.gov/globus-tutorial/{search_index_id}/")