# Analysis Flow

This notebook demonstrates how an Automate Flow can be used to perform analysis. 
We use a flow that combines Globus Transfer and funcX to move data and then execute an analysis code on the data.

In [1]:
from globus_automate_client import (create_flows_client, graphviz_format, state_colors_for_log,
                                    create_action_client, 
                                    create_flows_client)

import time
import json
import sys
import os

from funcx.sdk.client import FuncXClient

CLIENT_ID = "e6c75d97-532a-4c88-b031-8584a319fa3e"

## Create an analysis function

Use funcX to register a simple analysis function.

In [2]:
# fxc = FuncXClient()
fxc = FuncXClient(use_offprocess_checker=False)

In [3]:
import funcx
funcx.__version__

'0.2.3'

In [4]:
def file_size(pathname):
    """Return the size of a file"""
    return 1
#     import os
#     return os.path.getsize(data['pathname'])

func_uuid = fxc.register_function(file_size)
print(func_uuid)

81c33e1e-1740-4f5d-a320-d4bf96163808


In [5]:
payload = {'pathname': '/etc/hostname'}
tutorial_endpoint = '4b116d3c-1703-4f8f-9f6f-39921e5864df' # Public tutorial endpoint
res = fxc.run(payload, endpoint_id=tutorial_endpoint, function_id=func_uuid)
print(res)

47ae41eb-84ff-45ef-82ca-fc309dcb8478


In [6]:
fxc.get_result(res)

1

## Create the flow

In [7]:
flow_definition = {
  "Comment": "An analysis flow",
  "StartAt": "funcx",
  "States": {
    "funcx": {
      "Comment": "Run a funcX function",
      "Type": "Action",
      "ActionUrl": "https://automate.funcx.org",
      "ActionScope": "https://auth.globus.org/scopes/b3db7e59-a6f1-4947-95c2-59d6b7a70f8c/action_all",
      "Parameters": {
          "tasks": [{
            "endpoint.$": "$.input.fx_ep",
            "function.$": "$.input.fx_id",
            "payload": {
                "pathname.$": "$.input.pathname"
            }
        }]
      },
      "ResultPath": "$.AnalyzeResult",
      "WaitTime": 600,
      "End": True
    }
  }
}


Register the flow

In [8]:
flows_client = create_flows_client(CLIENT_ID)
flow = flows_client.deploy_flow(flow_definition, title="Stills process workflow")
flow_id = flow['id']
print(flow)
flow_scope = flow['globus_auth_scope']
print(f'Newly created flow with id:\n{flow_id}\nand scope:\n{flow_scope}')

Please log into Globus here:
----------------------------
https://auth.globus.org/v2/oauth2/authorize?client_id=e6c75d97-532a-4c88-b031-8584a319fa3e&redirect_uri=https%3A%2F%2Fauth.globus.org%2Fv2%2Fweb%2Fauth-code&scope=https%3A%2F%2Fauth.globus.org%2Fscopes%2Feec9b274-0c81-4334-bdc2-54e90e689b9a%2Fmanage_flows&state=_default&response_type=code&code_challenge=aS-krQby4JACEUMthX-7XWS53-rhkBAxuHEVRqZJXO4&code_challenge_method=S256&access_type=offline&prefill_named_grant=Globus+Automate+Command+Line+Interface+on+ripchip
----------------------------
[0m
Enter the resulting Authorization Code here: GlobusHTTPResponse({'action_url': 'https://flows.globus.org/flows/0b6c40ff-d5f4-4d98-96b8-4f63597e1e1a', 'administered_by': [], 'api_version': '1.0', 'created_at': '2021-06-02T14:46:44.168811+00:00', 'created_by': 'urn:globus:auth:identity:95278182-10a1-11e6-9c7e-7b385f033313', 'definition': {'Comment': 'An analysis flow', 'StartAt': 'funcx', 'States': {'funcx': {'ActionScope': 'https://auth.gl

In [9]:
filename = 'test.txt'

flow_input = {
    "input": {
        "fx_id": func_uuid,
        "fx_ep": tutorial_endpoint,
        "pathname": '/etc/hostname'
    }
}

Run the flow and wait for it to complete

Note: This will try to transfer a file called "test.txt" from your Globus Tutorial Endpoint 1.

In [10]:
flow_action = flows_client.run_flow(flow_id, flow_scope, flow_input)
# print(flow_action)
flow_action_id = flow_action['action_id']
flow_status = flow_action['status']
print(f'Flow action started with id: {flow_action_id}')
while flow_status == 'ACTIVE':
    time.sleep(5)
    flow_action = flows_client.flow_action_status(flow_id, flow_scope, flow_action_id)
    flow_status = flow_action['status']
    print(f'Flow status: {flow_status}')

Please log into Globus here:
----------------------------
https://auth.globus.org/v2/oauth2/authorize?client_id=e6c75d97-532a-4c88-b031-8584a319fa3e&redirect_uri=https%3A%2F%2Fauth.globus.org%2Fv2%2Fweb%2Fauth-code&scope=https%3A%2F%2Fauth.globus.org%2Fscopes%2F0b6c40ff-d5f4-4d98-96b8-4f63597e1e1a%2Fflow_0b6c40ff_d5f4_4d98_96b8_4f63597e1e1a_user&state=_default&response_type=code&code_challenge=wUuayzyGf14d5kZRlvINJQuHps2B_VzayLwVV9CIvt4&code_challenge_method=S256&access_type=offline&prefill_named_grant=Globus+Automate+Command+Line+Interface+on+ripchip
----------------------------
[0m
Enter the resulting Authorization Code here: Flow action started with id: 3c88e0bb-7671-41e2-a0aa-02dc7b79ee99
Flow status: ACTIVE
Flow status: ACTIVE
Flow status: ACTIVE
Flow status: ACTIVE
Flow status: ACTIVE
Flow status: ACTIVE
Flow status: ACTIVE
Flow status: ACTIVE
Flow status: SUCCEEDED


Check the result of the analysis function

In [11]:
flow_action['details']

{'output': {'AnalyzeResult': {'action_id': '98cc74ca-4ecf-4f2a-a52f-8c5bb03bb040',
   'details': '{"80fdf41a-9d7f-4c6e-affa-56cb775d07e9": {"result": 1}}',
   'display_status': 'Function Results Received',
   'state_name': 'funcx',
   'status': 'SUCCEEDED'},
  'input': {'fx_ep': '4b116d3c-1703-4f8f-9f6f-39921e5864df',
   'fx_id': '81c33e1e-1740-4f5d-a320-d4bf96163808',
   'pathname': '/etc/hostname'}}}