# Use Globus Compute/Flows with WPS

## Setup Globus Compute Client

In [1]:
import time
import json
import datetime
import globus_sdk
import os

from globus_sdk import TimerJob
from globus_compute_sdk import Executor, Client
from globus_sdk.experimental.globus_app import UserApp

from globus_sdk.utils import slash_join

In [2]:
CLIENT_ID = "c781864e-a9c9-482e-8db8-d58ac5962a86"
my_app = UserApp("crocus-user-app", client_id=CLIENT_ID)

flows_client = globus_sdk.FlowsClient(app=my_app)

In [3]:
compute_endpoint = "66848d37-a40d-4c46-ac49-1bcc4bad1e54"

In [4]:
gce = Executor(endpoint_id=compute_endpoint)

## Configure the Endpoint and Submit the Function

In [5]:
gce = Executor(endpoint_id=compute_endpoint)

In [6]:
function_id = "35e5c49c-379e-4d6c-8430-1508caacbf56"

In [12]:
os.getcwd()

'/Users/mgrover/git_repos/esgf-crocus-globus-flows/flows/esgf'

In [56]:
# Prepare payload for ESGF ingest-wxt
esgf_wps_data = {
    "node": "DKRZ",
    "start_date":"1990-01-01",
    "end_date":"2000-01-01",
    "lat_min":0,
    "lat_max":35,
    "lon_min":65,
    "lon_max":100,
    "average_frequency":"year",
    "experiment_id":["historical"],
    "variable_id":["tas"],
    "member_id":["r1i1p1f1"],
    "table_id":["Amon"],
    "institution_id":["MIROC"],
    "odir":"/Users/mgrover/git_repos/esgf-crocus-globus-flows/data"
}

# Start the task
future = gce.submit_to_registered_function(function_id, kwargs=esgf_wps_data)

In [57]:
future.result()

['/Users/mgrover/git_repos/esgf-crocus-globus-flows/data/tas_Amon_MIROC6_historical_r1i1p1f1_gn_19900101-19990101_avg-year.nc']

In [58]:
flow_definition = {
    "Comment": "A Web Processing Services (WPS) Demonstration for ESGF Hosted Data",
    "StartAt": "TransferInput",
    "States": {
        "TransferInput": {
            "Comment": "Transfer input data",
            "Type": "Action",
            "ActionUrl": "https://transfer.actions.globus.org/transfer",
            "Parameters": {
                "source_endpoint.$": "$.input.source.id",
                "destination_endpoint.$": "$.input.destination.id",
                "DATA": [
                    {
                        "source_path.$": "$.input.source.path",
                        "destination_path.$": "$.input.destination.path",
                        "recursive": True,
                    }
                ]
            },
            "ResultPath": "$.TransferFiles",
            "WaitTime": 300,
            "Next": "ProcessESGFwps"
        },
        "ProcessESGFwps": {
            "Comment": "Process Web Processing Services for ESGF",
            "Type": "Action",
            "ActionUrl": "https://compute.actions.globus.org/",
            "Parameters": {
                "endpoint.$": "$.input.compute_endpoint",
                "function.$": "$.input.esgf_function",
                "kwargs.$": "$.input.esgf_kwargs"
            },
            "ResultPath": "$.ESGF_output",
            "WaitTime": 600,
            "End": True
        },
    }
}

In [65]:
flow_input = {
    "input": {
        "source": {
            "id": "03e6a23b-fb93-11ef-985b-0207be7ee3a1",
            "path": esgf_wps_data["odir"]
        },
        "destination": {
            "id": "03e6a23b-fb93-11ef-985b-0207be7ee3a1",
            "path": "/Users/mgrover/git_repos/esgf-crocus-globus-flows/flows/esgf/output"
        },
        "compute_endpoint": compute_endpoint,
        "esgf_kwargs": esgf_wps_data,
        "esgf_function": function_id,
    }
}

In [67]:
flow_input

{'input': {'source': {'id': '03e6a23b-fb93-11ef-985b-0207be7ee3a1',
   'path': '/Users/mgrover/git_repos/esgf-crocus-globus-flows/data'},
  'destination': {'id': '03e6a23b-fb93-11ef-985b-0207be7ee3a1',
   'path': '/Users/mgrover/git_repos/esgf-crocus-globus-flows/flows/esgf/output'},
  'compute_endpoint': '66848d37-a40d-4c46-ac49-1bcc4bad1e54',
  'esgf_kwargs': {'node': 'DKRZ',
   'start_date': '1990-01-01',
   'end_date': '2000-01-01',
   'lat_min': 0,
   'lat_max': 35,
   'lon_min': 65,
   'lon_max': 100,
   'average_frequency': 'year',
   'experiment_id': ['historical'],
   'variable_id': ['tas'],
   'member_id': ['r1i1p1f1'],
   'table_id': ['Amon'],
   'institution_id': ['MIROC'],
   'odir': '/Users/mgrover/git_repos/esgf-crocus-globus-flows/data'},
  'esgf_function': '35e5c49c-379e-4d6c-8430-1508caacbf56'}}

In [60]:
flow = flows_client.create_flow(title="ESGF Flow Local", definition=flow_definition, input_schema={})

In [61]:
flow_id = flow['id']
flow_id

'09b97ed1-e935-4f72-9ed0-858057931e0b'

In [62]:
specific_flow_client = globus_sdk.SpecificFlowClient(
    flow_id=flow_id,
    app=my_app,
)

In [68]:
run = specific_flow_client.run_flow(
  body=flow_input,
  label="ESGF WPS Example",
  tags=['ESGF', 'WPS', 'local']
)

In [69]:
# Get run details
# run = flows_client.get_run(run_id)

run_id = run['run_id']
run_status = run['status']
print("This flow can be monitored in the Web App:")
print(f"https://app.globus.org/runs/{run_id}")
print(f"Flow run started with ID: {run_id} - Status: {run_status}")

# Poll the Flow ser/vice to check on the status of the flow
while run_status == 'ACTIVE':
    time.sleep(5)
    run = flows_client.get_run(run_id)
    run_status = run['status']
    print(f'Run status: {run_status}')
    
# Run completed
print(json.dumps(run.data, indent=2))

This flow can be monitored in the Web App:
https://app.globus.org/runs/a809b54c-3ee4-4e14-863f-5bda96850a56
Flow run started with ID: a809b54c-3ee4-4e14-863f-5bda96850a56 - Status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: ACTIVE
Run status: SUCCEEDED
{
  "run_id": "a809b54c-3ee4-4e14-863f-5bda96850a56",
  "action_id": "a809b54c-3ee4-4e14-863f-5bda96850a56",
  "flow_id": "09b97ed1-e935-4f72-9ed0-858057931e0b",
  "flow_title": "ESGF Flow Local",
  "flow_last_updated": "2025-04-02T14:09:49.492555+00:00",
  "start_time": "2025-04-02T14:11:53.655981+00:00",
  "completion_time": "2025-04-02T14:12:40.276000+00:00",
  "status": "SUCCEEDED",
  "display_status": "SUCCEEDED",
  "details": {
    "code": "FlowSucceeded",
    "output": {
      "input": {
        "source": {
          "id": "03e6a23b-fb93-11ef-985b-0207be7ee3a1",
          "path": "/Users/mgrover/git_repos/esgf-crocus-globus