In [7]:
import sys
import os
import time
sys.path.append(".")
os.environ['JUPYTER_PATH'] = '.'
CLIENT_ID = "e6c75d97-532a-4c88-b031-8584a319fa3e"

from globus.automate.client import (get_access_token_for_scope, create_action_client, 
                                    create_flows_client)

# Globus Automate: Flows and Actions

## Flow Definition

* Flows composed of *Action* invocations
* Each Action invocation reads from and contributes back to the *Flow State*

In [2]:
flow_definition = {
  "Comment": "Two step transfer",
  "StartAt": "Transfer1",
  "States": {
    "Transfer1": {
      "Comment": "Initial Transfer from Campus to DMZ",
      "Type": "Action",
      "ActionUrl": "https://actions.automate.globus.org/Transfer",
      "ActionScope": "https://auth.globus.org/scopes/helloworld.actions.automate.globus.org/globus_transfer_action_all",
      "InputPath": "$.Transfer1Input",
      "ResultPath": "$.Transfer1Result",
      "Next": "Transfer2"
    },
    "Transfer2": {
      "Comment": "Transfer from DMZ to dataset repository",
      "Type": "Action",
      "ActionUrl": "https://actions.automate.globus.org/Transfer",
      "ActionScope": "https://auth.globus.org/scopes/helloworld.actions.automate.globus.org/globus_transfer_action_all",
      "InputPath": "$.Transfer2Input",
      "ResultPath": "$.Transfer2Result",
      "End": True
    }
  }
}


* This flow composes two transfers into a single logical operation
  * Suitable, for example, for doing a two stage transfer between a local campus endpoint, a DMZ data transfer endpoint, and a dataset repository.
  * Each step in the Flow uses the same Action: Transfer which is referenced by URL
  * Globus Auth Scope information is required to authenticate operations to the Action.
  * Source and destination information for the Transfer state are given in `InputPath` and `ResultPath`
    * Format of the input is Action dependent


In [4]:
flows_client = create_flows_client(CLIENT_ID)
flow = flows_client.deploy_flow(flow_definition)
flow_id = flow['id']
flow_scope = flow['scope_string']
print(f'Newly created flow with id:\n{flow_id}\nand scope:\n{flow_scope}')

Newly created flow with id:
2376fac3-dd1a-4dc3-b55d-b376835433bd
and scope:
https://auth.globus.org/scopes/eec9b274-0c81-4334-bdc2-54e90e689b9a/flow_9503a86a_1ea6_4a78_bf75_73541c01ab76


* The newly created flow has an id, and a scope which will be used when invoking the flow as follows:

In [8]:
flow_input = {
  "Transfer1Input": {
    "source_endpoint_id": "go#ep1",
    "destination_endpoint_id": "go#ep2",
    "transfer_items": [
      {
        "source_path": "/~/campus_source/dataset1/",
        "destination_path": "/~/dmz_temp/dataset1",
        "recursive": True
      }
    ]
  },
  "Transfer2Input": {
    "source_endpoint_id": "go#ep2",
    "destination_endpoint_id": "go#ep1",
    "transfer_items": [
      {
        "source_path": "/~/dmz_temp/dataset1/",
        "destination_path": "/~/dataset_repository/dataset1",
        "recursive": True
      }
    ]
  }
}

flow_action = flows_client.run_flow(flow_id, flow_scope, flow_input)
flow_action_id = flow_action['id']
flow_status = flow_action['status']
print(f'Flow action started with id: {flow_action_id}')
while flow_status == 'RUNNING':
    time.sleep(2)
    flow_action = flows_client.flow_action_status(flow_id, flow_scope, flow_action_id)
    flow_status = flow_action['status']
    print(f'Flow status: {flow_status}')

Flow action started with id: 89fba79c-ac4b-4c8a-bf02-1c4b50edb980
Flow status: RUNNING
Flow status: RUNNING
Flow status: RUNNING
Flow status: SUCCEEDED


## All Actions are invoked with the same pattern via REST

1. run
2. status
3. release
4. (cancel)

### Each step goverend by Authentication.

* We can also run Actions directly to take advantage of their standard pattern

In [13]:
IDENTIFIERS_SCOPE = "https://auth.globus.org/scopes/5fac2e64-c734-4e6b-90ea-ff12ddbf9653/identifiers_run_status_release"
identifiers_request = {
  "namespace_id": "A63CZStxzei6",
  "location": ["https://app.globus.org/file-manager?origin_id=ddb59aef-6d04-11e5-ba46-22000b92c6ec"],
    "visible_to": ["public"],
    "metadata": {
      "creator": "Sally User",
      "title": "An Important Dataset"
    }
}
identifiers_token = get_access_token_for_scope(IDENTIFIERS_SCOPE, CLIENT_ID)
identifiers_client = create_action_client(
    "http://actions.automate.globus.org/Identifiers", identifiers_token)
# identifiers_action_description = identifiers_client.introspect()
# print(identifiers_action_description)
identifiers_action_status = identifiers_client.run(identifiers_request)
identifiers_action_id = identifiers_action_status["action_id"]
print(f'ActionId: {identifiers_action_id} Status: {identifiers_action_status["status"]}')
while identifiers_action_status["status"] not in ("SUCCEEDED", "FAILED"):
    time.sleep(2)
    identifiers_action_status = identifiers_client.status(identifiers_action_id)
    print(f'ActionId: {identifiers_action_id} Status: {identifiers_action_status["status"]}')

# identifiers_action_status = identifiers_client.release(identifiers_action_id)
# print(f"Final Complete Status: {identifiers_action_status.data}")
print(f"Link to new Identifier: {identifiers_action_status.data['details']['landing_page']}")


ActionId: WdYx7XiXLHPN Status: SUCCEEDED
Link to new Identifier: https://identifiers.globus.org/globus:u6dqNR04o7e


## Action Implementation Helper in Automate SDK

In [None]:
from globus.automate.common import AbstractActionProvider, ActionInstance, AuthState


class HelloWorldActionProvider(AbstractActionProvider):
    # Declare the format for inputs to this Action for use in Action Introspection
    # and for validation of input by the service
    request_body_schema = {
        "type": "object",
        "properties": {
            "echo_string": {"type": "string"},
            "sleep_time": {"type": "integer"},
        },
        "additionalProperties": False,
    }

    def __init__(self, *args, **kwargs):
        # Set the Actions REST API URL location on the server
        self.url_prefix = kwargs.get("url_prefix", "HelloWorld")
        
        # Define properties needed to do Authentication of requests via Globus Auth
        self.globus_auth_client_id = kwargs.get(
            "globus_auth_client_id", "5fac2e64-c734-4e6b-90ea-ff12ddbf9653"
        )
        self.globus_auth_client_name = kwargs.get(
            "globus_auth_client_name", "hello_world_action_provider"
        )
        self.globus_auth_client_secret = kwargs.get("globus_auth_client_secret")
        self.globus_auth_scope = kwargs.get(
            "globus_auth_scope",
            (
                "https://auth.globus.org/scopes/helloworld.actions.automate.globus.org/all"
            ),
        )
        
        # Set properties for Action Introspection
        self.title = "Hello World"
        self.subtitle = "An Action responding Hello to an input value"
        self.visible_to = kwargs.get("visible_to", ["public"])
        self.administered_by = kwargs.get("administered_by", ["foo@bar.com"])
        self.admin_contact = kwargs.get("admin_contact", "support@globus.org")
        self.synchronous = False
        self.log_supported = False
        self.runnable_by = kwargs.get("runnable_by", ["public"])
        self.input_schema = HelloWorldActionProvider.request_body_schema
        super(HelloWorldActionProvider, self).__init__(*args, **kwargs)

    def _action_done(self, action: ActionInstance) -> bool:
        """
        Helper for determining when a request with sleep_time in the request is
        completed.
        """
        if "sleep_time" not in action.request_body:
            return True
        else:
            now = datetime.datetime.now()
            run_length = (now - action.start_time).total_seconds()
            return run_length > int(action.request_body["sleep_time"])

    def run_action(self, action: ActionInstance, auth_state: AuthStatae) -> ActionInstance:
        # Callback for starting a new Action
        action.action_id = self.generate_actionid()
        action.details = {"Hello": "World"}
        if "echo_string" in action.request_body:
            action.details["hello"] = action.request_body["echo_string"]
        if self._action_done(action):
            action.status = "SUCCEEDED"
            action.completion_time = datetime.datetime.now()
        else:
            action.status = "ACTIVE"
        return action

    def check_status(self, action: ActionInstance, auth_state: AuthState) -> ActionInstance:
        # Callback for a user status check
        if self._action_done(action):
            action.status = "SUCCEEDED"
            action.completion_time = datetime.datetime.now()
        else:
            action.status = "ACTIVE"
        return action

    def cancel_action(self, action: ActionInstance, auth_state: AuthState) -> ActionInstance:
        # Callback for cancel
        action.status = 'FAILED'
        action.completion_time = datetime.datetime.now()
        return action

# Create the flask app, sqlalchemy engine and configure the action on the flask app
flask = Flask(__name__)
db_engine = sqlalchemy.create_engine(db_uri)
hello_world_provider = HelloWorldActionProvider()
hello_world_provider.set_flask_routes(flask, db_engine)