# Automate XPCS

In [1]:
from globus_automate_client import (create_flows_client, graphviz_format, state_colors_for_log,
                                    get_access_token_for_scope, create_action_client, 
                                    create_flows_client)
from IPython.display import display, display_svg, clear_output
from ipywidgets import widgets
import time
import json
import sys
import os

from funcx.sdk.client import FuncXClient
CLIENT_ID = "e6c75d97-532a-4c88-b031-8584a319fa3e"
clutch_globus_ep = 'b0e921df-6d04-11e5-ba46-22000b92c6ec'
theta_globus_ep = '08925f04-569f-11e7-bef8-22000b9a448b'
petreldata_xpcs_globus_ep = 'e55b4eab-6d04-11e5-ba46-22000b92c6ec'

## funcX

This is an example of using funcX to perform work on Theta. This defines a simple function that will sleep for some period of time (determined by the input given to it), registers it with funcX, then calls it to run on an endpoint I have active on Theta.



In [2]:
fxc = FuncXClient()
login_funcx_ep = 'eb932339-a5b9-4124-9779-9aa4eae6dca0'
theta_funcx_ep = '1199c811-2075-447c-bbcc-2cd15663b342'

In [3]:
def check_dependencies_on_funcx_endpoint(event):
    import h5py
    import matplotlib
    import pprint
    # Install on the theta login node with: 
    # pip install -e git+https://github.com/globus-labs/globus-automation#egg=globus-automation
    from XPCS.process.xpcs_metadata import gather
    from XPCS.process.xpcs_plots import make_plots
    from XPCS.process.xpcs_qc import check_hdf_dataset
    # pip install -e git+https://github.com/globusonline/pilot1-tools@dev#egg=pilot1-tools
    from pilot.client import PilotClient
    pc = PilotClient()
    assert pc.is_logged_in(), 'Not logged into client! Please run `pilot login --no-local-server` on theta login node.'
    context = pc.context.get_context()
    projects = pc.project.load_all()
    
    assert context['app_name'] == 'xpcs app', 'Wrong context set! Please run `pilot context set xpcs`'
    pc.project.current = 'nick-testing'
    return {'context': context, 'projects': projects}
    

check_dep_func = fxc.register_function(check_dependencies_on_funcx_endpoint)

In [4]:
check_dep_task = fxc.run([], endpoint_id=login_funcx_ep, function_id=check_dep_func)
print(f'Running Funcx Task on Login Node {check_dep_task}\n', end='')
while True:
    try:
        print(json.dumps(fxc.get_result(check_dep_task), indent=4))
        break
    except Exception as e:
        if str(e) == 'Task pending':
            print('.', end='')
            time.sleep(1)
            continue
        raise

Running Funcx Task on Login Node 40b96555-3dbb-4f46-8ec1-e04a546e6ae9
................................................{
    "context": {
        "app_name": "xpcs app",
        "client_id": "e4d82438-00df-4dbd-ab90-b6258933c335",
        "manifest_index": "6871e83e-866b-41bc-8430-e3cf83b43bdc",
        "manifest_subject": "globus://project-manifest.json",
        "projects_base_path": "/XPCSDATA",
        "projects_cache_timeout": "86400",
        "projects_default_resource_server": "petrel_https_server",
        "projects_default_search_index": "6871e83e-866b-41bc-8430-e3cf83b43bdc",
        "projects_endpoint": "e55b4eab-6d04-11e5-ba46-22000b92c6ec",
        "projects_group": "368beb47-c9c5-11e9-b455-0efb3ba9a670",
        "projects_portal_url": "https://petreldata.net/xpcs/projects/{project}/{subject}/",
        "scopes": [
            "profile",
            "openid",
            "urn:globus:auth:scope:search.api.globus.org:all",
            "urn:globus:auth:scope:transfer.api.globu

In [5]:
check_dep_task = fxc.run([], endpoint_id=theta_funcx_ep, function_id=check_dep_func)
print(f'Running Funcx Task on Theta Node {check_dep_task}\n', end='')
while True:
    try:
        print(json.dumps(fxc.get_result(check_dep_task), indent=4))
        break
    except Exception as e:
        if str(e) == 'Task pending':
            print('.', end='')
            time.sleep(2)
            continue
        raise

Running Funcx Task on Theta Node e13ac239-2a6b-4e09-b32b-8fe7edfc9aa0
...................................{
    "context": {
        "app_name": "xpcs app",
        "client_id": "e4d82438-00df-4dbd-ab90-b6258933c335",
        "manifest_index": "6871e83e-866b-41bc-8430-e3cf83b43bdc",
        "manifest_subject": "globus://project-manifest.json",
        "projects_base_path": "/XPCSDATA",
        "projects_cache_timeout": "86400",
        "projects_default_resource_server": "petrel_https_server",
        "projects_default_search_index": "6871e83e-866b-41bc-8430-e3cf83b43bdc",
        "projects_endpoint": "e55b4eab-6d04-11e5-ba46-22000b92c6ec",
        "projects_group": "368beb47-c9c5-11e9-b455-0efb3ba9a670",
        "projects_portal_url": "https://petreldata.net/xpcs/projects/{project}/{subject}/",
        "scopes": [
            "profile",
            "openid",
            "urn:globus:auth:scope:search.api.globus.org:all",
            "urn:globus:auth:scope:transfer.api.globus.org:all"
  

### Now create a funcX function to run XPCS Corr

Now we can use the same principle to define a function to run the Corr code. The function requires an hdf and imm input to be given then uses subprocess to call the corr tool on the data.

In [6]:
from XPCS.process.funcx_functions import process_corr, process_hdf
corr_func_id = fxc.register_function(process_corr, description="An XPCS corr function.")
hdf_func_id = fxc.register_function(process_corr, description="Plot images, generate metadata for petreldata.net")


FileNotFoundError: [Errno 2] No such file or directory: 'flows/clutch_theta_petreldata.json'

In [8]:
# This flow does not return the result to APS.

flow_definition = {
  "Comment": "Automate XPCS",
  "StartAt": "Transfer1",
  "States": {
    "Transfer1": {
      "Comment": "Initial Transfer from APS to ALCF",
      "Type": "Action",
      "ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
      "ActionScope": "https://auth.globus.org/scopes/actions.globus.org/transfer/transfer",
      "InputPath": "$.Transfer1Input",
      "ResultPath": "$.Transfer1Result",
      "WaitTime": 6000,
      "Next": "Transfer2"
    },
    "Transfer2": {
      "Comment": "Second Transfer from APS to ALCF",
      "Type": "Action",
      "ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
      "ActionScope": "https://auth.globus.org/scopes/actions.globus.org/transfer/transfer",
      "InputPath": "$.Transfer2Input",
      "ResultPath": "$.Transfer2Result",
      "WaitTime": 6000,
      "Next": "ExecCorr"
    },
    "ExecCorr": {
      "Comment": "Use corr to process the data",
      "Type": "Action",
      "ActionUrl": "https://dev.funcx.org/automate",
      "ActionScope": "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/automate2",
      "InputPath": "$.Exec1Input",
      "ResultPath": "$.Exec1Result",
      "WaitTime": 12000,
      "Next": "ExecPlots"
    },
    "ExecPlots": {
      "Comment": "Generate plots from the data",
      "Type": "Action",
      "ActionUrl": "https://dev.funcx.org/automate",
      "ActionScope": "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/automate2",
      "InputPath": "$.Exec2Input",
      "ResultPath": "$.Exec2Result",
      "WaitTime": 12000,
      "Next": "ExecPilot"
    },
    "ExecPilot": {
      "Comment": "Generate plots from the data",
      "Type": "Action",
      "ActionUrl": "https://funcx.org/automate",
      "ActionScope": "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/automate2",
      "InputPath": "$.Exec3Input",
      "ResultPath": "$.Exec3Result",
      "WaitTime": 12000,
      "End": True
    }
  }
}

## Automate XPCS

Alright, now we have a function that can run corr on Theta given an input hdf and imm file. Now we can combine using the function with Automate to first transfer data to Theta, use funcX, then return the result.

This example outlines a flow that does a transfer, runs funcX, then does another transfer. We can parameterize these steps with whatever commands and endpoints we like. We use longer timeouts to account for transfer and compute delays.

In [7]:
# Create the client, and login
flows_client = create_flows_client(CLIENT_ID)
# Deploy the new flow
flow = flows_client.deploy_flow(xpcs_flow_definition, title="XPCS")
flow_id = flow.data['id']
flow_scope = flow['globus_auth_scope']
print(f'Newly created flow with id:\n{flow_id}\nand scope:\n{flow_scope}')

# Display the visualization for the new flow
get_resp = flows_client.get_flow(flow_id)
flow_def = get_resp.data['definition']
flow_graph = graphviz_format(flow_def)
display(flow_graph)

NameError: name 'xpcs_flow_definition' is not defined

In [None]:
flow_input = {
  "Transfer1Input": {
    "source_endpoint_id": clutch_globus_ep,
    "destination_endpoint_id": theta_globus_ep,
    "transfer_items": [
      {
        "source_path": "/data/xpcs8/2019-1/comm201901/cluster_results/A001_Aerogel_1mm_att6_Lq0_001_0001-1000.hdf",
        "destination_path": "/projects/APSDataAnalysis/Automate/1568234886/A001_Aerogel_1mm_att6_Lq0_001_0001-1000.hdf",
        "recursive": False
      }
    ],
    "sync_level": "exists"
  },
  "Transfer2Input": {
    "source_endpoint_id": clutch_globus_ep,
    "destination_endpoint_id": theta_globus_ep,
    "transfer_items": [
      {
        "source_path": "/data/xpcs8/2019-1/comm201901/A001_Aerogel_1mm_att6_Lq0_001/A001_Aerogel_1mm_att6_Lq0_001_00001-01000.imm",
        "destination_path": "/projects/APSDataAnalysis/Automate/1568234886/A001_Aerogel_1mm_att6_Lq0_001_00001-01000.imm",
        "recursive": False
      }
    ],
    "sync_level": "exists"

  },
  "Exec1Input": {
    "endpoint": theta_funcx_ep,
    "func": corr_func_id,
    "data": {
      "hdf": "/projects/APSDataAnalysis/Automate/1568234886/A001_Aerogel_1mm_att6_Lq0_001_0001-1000.hdf",
      "imm": "/projects/APSDataAnalysis/Automate/1568234886/A001_Aerogel_1mm_att6_Lq0_001_00001-01000.imm"
    }
  },
  "Exec2Input": {
    "endpoint": theta_funcx_ep,
    "func": hdf_func_id,
    "data": {
      "hdf": "/projects/APSDataAnalysis/Automate/1568234886/A001_Aerogel_1mm_att6_Lq0_001_0001-1000.hdf",
      "custom_metadata": {
        "description": f"funcx: Automated data processing.",
        "creators": [{"creatorName": "8-ID"}],
        "publisher": "Automate",
        "title": "A001_Aerogel_1mm_att6_Lq0_001_0001-1000",
        "subjects": [{"subject": "XPCS"}, {"subject": "8-ID"}],
        "publicationYear": "2019",
        "resourceType": {
            "resourceType": "Dataset",
            "resourceTypeGeneral": "Dataset"
        }
      }
    }
  },
  "Transfer3Input": {
    "source_endpoint_id": theta_globus_ep,
    "destination_endpoint_id": petreldata_xpcs_globus_ep,
    "transfer_items": [
      {
        "source_path": "/projects/APSDataAnalysis/Automate/1568234886/A001_Aerogel_1mm_att6_Lq0_001_0001-1000",
        "destination_path": "/XPCSDATA/test/nick-testing",
        "recursive": False
      }
    ]
  },
}


In [None]:
flow_action = flows_client.run_flow(flow_id, flow_scope, flow_input)
print(flow_action)
flow_action_id = flow_action['action_id']
flow_status = flow_action['status']
print(f'Flow action started with id: {flow_action_id}')
while flow_status == 'ACTIVE':
    time.sleep(10)
    flow_action = flows_client.flow_action_status(flow_id, flow_scope, flow_action_id)
    flow_status = flow_action['status']
    flow_description = flow_action['details'].get('description')
    print(f'Flow status: {flow_status}: {flow_description}')