# Kanzus Pipeline Example

This notebook demonstrates how the Kanzus pipeline is used to perform on-demand distributed analysis.

In [1]:
import os
import sys
import json
import time
import numpy as np


from funcx.sdk.client import FuncXClient
from gladier.client import GladierClient as GladierBaseClient
from globus_automate_client import (create_flows_client, create_action_client, create_flows_client)

## Creating and using pipelines

Here we create a simple pipeline to move data and run an analysis function. The pipeline is just two steps but shows how Globus Automate and funcX can be used to create a reliable and secure distributed flow.

### Register a function to use

Start by defining a function and registering it with funcX. This function will be used within the example pipeline.

In [2]:
fxc = FuncXClient()

def file_size(data):
    """Return the size of a file"""
    import os
    return os.path.getsize(data['pathname'])

func_uuid = fxc.register_function(file_size)

Test the function works on some data

In [3]:
payload = {'pathname': '/etc/hostname'}
theta_ep = '8f2f2eab-90d2-45ba-a771-b96e6d530cad'
res = fxc.run(payload, endpoint_id=theta_ep, function_id=func_uuid)

In [15]:
fxc.get_result(res)

12

### Define a flow for the function

Now define a flow to perform a Globus Transfer and then run the above function.

In [16]:
flow_definition = {
  "Comment": "An analysis flow",
  "StartAt": "Transfer",
  "States": {
    "Transfer": {
      "Comment": "Initial transfer",
      "Type": "Action",
      "ActionUrl": "https://actions.automate.globus.org/transfer/transfer",
      "Parameters": {
        "source_endpoint_id.$": "$.input.source_endpoint", 
        "destination_endpoint_id.$": "$.input.dest_endpoint",
        "transfer_items": [
          {
            "source_path.$": "$.input.source_path",
            "destination_path.$": "$.input.dest_path",
            "recursive": False
          }
        ]
      },
      "ResultPath": "$.Transfer1Result",
      "Next": "Analyze"
    },
    "Analyze": {
      "Comment": "Run a funcX function",
      "Type": "Action",
      "ActionUrl": "https://api.funcx.org/automate",
      "ActionScope": "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all",
      "Parameters": {
          "tasks": [{
            "endpoint.$": "$.input.fx_ep",
            "func.$": "$.input.fx_id",
            "payload": {
                "pathname.$": "$.input.pathname"
            }
        }]
      },
      "ResultPath": "$.AnalyzeResult",
      "End": True
    }
  }
}

Register and run the flow

In [54]:
flows_client = create_flows_client()
flow = flows_client.deploy_flow(flow_definition, title="Stills process workflow")
flow_id = flow['id']
flow_scope = flow['globus_auth_scope']
print(f'Newly created flow with id: {flow_id}')

Newly created flow with id: 9b0317f9-42a0-4ede-8eb2-2fc8af4903d8


In [18]:
src_ep = 'ddb59aef-6d04-11e5-ba46-22000b92c6ec' # EP1
dest_ep = 'ddb59af0-6d04-11e5-ba46-22000b92c6ec' # EP2
filename = 'test.txt'

flow_input = {
    "input": {
        "source_endpoint": src_ep,
        "source_path": f"/~/{filename}",
        "dest_endpoint": dest_ep,
        "dest_path": f"/~/{filename}",
        "result_path": f"/~/out_{filename}",
        "fx_id": func_uuid,
        "fx_ep": theta_ep,
        "pathname": '/etc/hostname'
    }
}

In [56]:
flow_action = flows_client.run_flow(flow_id, flow_scope, flow_input)
flow_action_id = flow_action['action_id']
flow_status = flow_action['status']

print(f'Flow action started with id: {flow_action_id}')
while flow_status == 'ACTIVE':
    time.sleep(10)
    flow_action = flows_client.flow_action_status(flow_id, flow_scope, flow_action_id)
    flow_status = flow_action['status']
    print(f'Flow status: {flow_status}')

Please log into Globus here:
---------------------------
https://auth.globus.org/v2/oauth2/authorize?client_id=e6c75d97-532a-4c88-b031-8584a319fa3e&redirect_uri=https%3A%2F%2Fauth.globus.org%2Fv2%2Fweb%2Fauth-code&scope=https%3A%2F%2Fauth.globus.org%2Fscopes%2F9b0317f9-42a0-4ede-8eb2-2fc8af4903d8%2Fflow_9b0317f9_42a0_4ede_8eb2_2fc8af4903d8_user&state=_default&response_type=code&code_challenge=KiiENsqw9ufZyh2WUPuJEiFNs-0mv1sUwVY7w-A7Fuk&code_challenge_method=S256&access_type=offline&prefill_named_grant=Globus+Automate+Command+Line+Interface+on+ryan-laptop
---------------------------

Enter the resulting Authorization Code here: RISNnmIVEsu6uDkBOQmnZGUQWOpz7O
Flow action started with id: ebfd0f77-335e-4d76-8dea-49c51ffbdc51
Flow status: ACTIVE
Flow status: SUCCEEDED


In [57]:
flow_action['details']['output']['AnalyzeResult']

{'action_id': 'f798becc-b50f-4a07-a9b7-121b90dbbd98',
 'details': {'completion_t': '1620137251.372325',
  'exception': None,
  'result': 12,
  'status': 'SUCCEEDED',
  'task_id': 'f798becc-b50f-4a07-a9b7-121b90dbbd98'},
 'release_after': 'P30D',
 'state_name': 'Analyze',
 'status': 'SUCCEEDED'}

## Gladier Beam XY Search

Gladier removes a lot of this complexity by managing registering functions and flows when they change. Here we create a Gladier client and specify the tools that will be used in the flow.

In [44]:
from gladier_kanzus.flows.search_flow import flow_definition

class KanzusXYSearchClient(GladierBaseClient):
    client_id = 'e6c75d97-532a-4c88-b031-8584a319fa3e'

    gladier_tools = [
        'gladier_kanzus.tools.XYSearch',
        'gladier_kanzus.tools.CreatePhil',
        'gladier_kanzus.tools.DialsStills',
        'gladier_kanzus.tools.XYPlot',
        'gladier_kanzus.tools.SSXGatherData',
        'gladier_kanzus.tools.SSXPublish',
    ]
    flow_definition = flow_definition


search_client = KanzusXYSearchClient()

Register the stills function in a container

In [45]:
from gladier_kanzus.tools.dials_stills import funcx_stills_process as stills_cont

container =  '/home/rvescovi/.funcx/containers/dials_v1.simg'
dials_cont_id = fxc.register_container(location=container, container_type='singularity')
stills_cont_fxid = fxc.register_function(stills_cont, container_uuid=dials_cont_id)

Define input to the flow. This describes the dataset that will be analyzed and the parameters for analysis.

In [46]:
conf = {'local_endpoint': '8f2f2eab-90d2-45ba-a771-b96e6d530cad',
        'queue_endpoint': '23519765-ef2e-4df2-b125-e99de9154611',
        }

data_dir = '/eagle/APSDataAnalysis/SSX/Demo/test2'
proc_dir = f'{data_dir}/xy'
upload_dir = f'{data_dir}/test2_images'

flow_input = {
    "input": {
        #Processing variables
        "proc_dir": proc_dir,
        "data_dir": data_dir,
        "upload_dir": upload_dir,

        #Dials specific variables.
        "input_files": "Test_33_{00001..00010}.cbf", 
        "input_range": "00001..00010",
        "nproc": 10,
        "beamx": "-214.400",
        "beamy": "218.200",

        # xy search parameters
        "step": "1",

        # funcX endpoints
        "funcx_local_ep": conf['local_endpoint'],
        "funcx_queue_ep": conf['queue_endpoint'],

        # container hack for stills
        "stills_cont_fxid": stills_cont_fxid,

        # publication
        "trigger_name": f"{data_dir}/Test_33_00001.cbf"
    }
}


In [47]:
flow_input['input']

{'proc_dir': '/eagle/APSDataAnalysis/SSX/Demo/test2/xy',
 'data_dir': '/eagle/APSDataAnalysis/SSX/Demo/test2',
 'upload_dir': '/eagle/APSDataAnalysis/SSX/Demo/test2/test2_images',
 'input_files': 'Test_33_{00001..00010}.cbf',
 'input_range': '00001..00010',
 'nproc': 10,
 'beamx': '-214.400',
 'beamy': '218.200',
 'step': '1',
 'funcx_local_ep': '8f2f2eab-90d2-45ba-a771-b96e6d530cad',
 'funcx_queue_ep': '23519765-ef2e-4df2-b125-e99de9154611',
 'stills_cont_fxid': 'a7a76468-e03f-4d3e-801b-faac7270de80',
 'trigger_name': '/eagle/APSDataAnalysis/SSX/Demo/test2/Test_33_00001.cbf'}

In [48]:
search_flow = search_client.start_flow(flow_input=flow_input)

In [49]:
search_flow['action_id']

'c96d3a5d-4e58-4919-9028-4ffa2b0407f3'

In [51]:
search_client.get_status(search_flow['action_id'])

{'action_id': 'c96d3a5d-4e58-4919-9028-4ffa2b0407f3',
 'completion_time': 'None',
 'created_by': 'urn:globus:auth:identity:c4765424-d274-11e5-b894-cb4139f74ecf',
 'details': {'action_statuses': [{'action_id': '387997b5-a6b1-49a7-80df-7cd30e7ecd8e',
    'details': None,
    'release_after': 'P30D',
    'state_name': 'run_stills',
    'status': 'ACTIVE'}],
  'code': 'ActionRunning',
  'description': 'An Action is running'},
 'display_status': 'ACTIVE',
 'flow_id': 'e79bb595-4efc-4389-872c-12469ad27262',
 'flow_last_updated': '2021-04-30T22:09:21.868914+00:00',
 'label': None,
 'manage_by': ['urn:globus:auth:identity:29afd777-2ab2-4d1f-821f-6ae7b3c6c0f8',
  'urn:globus:auth:identity:d6c5cb7c-ec7c-4570-aad5-25a8844cf11d',
  'urn:globus:auth:identity:c4765424-d274-11e5-b894-cb4139f74ecf',
  'urn:globus:auth:identity:c6fe410b-266e-491b-99fd-2661081b7211',
  'urn:globus:auth:identity:cdfba89f-484b-47f2-8642-336465abeddb',
  'urn:globus:auth:identity:b3cfe5e8-7b12-4150-82cc-497cd5028439'],
 'm

### Check the results

https://petreldata.net/kanzus/projects/ssx/globus%253A%252F%252Fc7683485-3c3f-454a-94c0-74310c80b32a%252Fssx%252Ftest_images/

# Kanzus Pipeline

The full Kanzus pipeline is designed to be triggered as data are collected. It reactively moves data to ALCF, performs analysis, analyzes the PRIME results, and publishes results to the portal.

In [52]:
from gladier_kanzus.flows.tutorial_flow import flow_definition as kanzus_flow_def

class KanzusSSXGladier(GladierBaseClient):
    client_id = 'e6c75d97-532a-4c88-b031-8584a319fa3e'
    gladier_tools = [
        'gladier_kanzus.tools.CreatePhil',
        'gladier_kanzus.tools.DialsStills',
        'gladier_kanzus.tools.SSXGatherData',
        'gladier_kanzus.tools.SSXPlot',
        'gladier_kanzus.tools.SSXPublish',
    ]
    flow_definition = kanzus_flow_def
    
kanzus_client = KanzusSSXGladier()

In [54]:
data_dir = '/eagle/APSDataAnalysis/SSX/Demo/test2'
proc_dir = f'{data_dir}/test2_processing'
upload_dir = f'{data_dir}/test2_images'

flow_input = {
    "input": {
        #Processing variables
        "proc_dir": proc_dir,
        "data_dir": data_dir,
        "upload_dir": upload_dir,

        #Dials specific variables.
        "input_files": "Test_33_{00001..00256}.cbf", 
        "input_range": "00001..00256",
        "nproc": 10,
        "beamx": "-214.400",
        "beamy": "218.200",

        # xy search parameters
        "step": "1",

        # funcX endpoints
        "funcx_local_ep": conf['local_endpoint'],
        "funcx_queue_ep": conf['queue_endpoint'],

        # container hack for stills
        "stills_cont_fxid": stills_cont_fxid,

        # publication
        "trigger_name": f"{data_dir}/Test_33_00001.cbf"
    }
}

In [55]:
kanzus_flow = kanzus_client.start_flow(flow_input=flow_input)

In [56]:
kanzus_client.get_status(kanzus_flow['action_id'])

{'action_id': '5918ca83-a7f6-410a-b191-e2fab2e5d8cd',
 'completion_time': 'None',
 'created_by': 'urn:globus:auth:identity:c4765424-d274-11e5-b894-cb4139f74ecf',
 'details': {'code': 'ActionStarted',
  'description': 'State Dials Create Phil of type Action started',
  'details': {'input': {'tasks': [{'endpoint': '8f2f2eab-90d2-45ba-a771-b96e6d530cad',
      'func': '5f775c1e-7200-4713-a4da-1496701a1c61',
      'payload': {'beamx': '-214.400',
       'beamy': '218.200',
       'data_dir': '/eagle/APSDataAnalysis/SSX/Demo/test2',
       'funcx_create_phil_funcx_id': '5f775c1e-7200-4713-a4da-1496701a1c61',
       'funcx_local_ep': '8f2f2eab-90d2-45ba-a771-b96e6d530cad',
       'funcx_queue_ep': '23519765-ef2e-4df2-b125-e99de9154611',
       'funcx_stills_process_funcx_id': 'f58fa476-20a9-4589-aaf8-2caa3f4366ec',
       'input_files': 'Test_33_{00001..00256}.cbf',
       'input_range': '00001..00256',
       'metadata': {'creators': [{'creatorName': 'Kanzus'}],
        'description': 'Auto

### Check the results

https://petreldata.net/kanzus/projects/ssx/globus%253A%252F%252Fc7683485-3c3f-454a-94c0-74310c80b32a%252Fssx%252Ftest_images/

# Virtual Beamline