# Globus and Funcx 

Using Globus and Funcx to automate the process of uploading data to NeSI, running something on NeSI and then copying results back.

Requirements:

* Globus account
* Globus personal endpoint running on the machine you are executing this notebook
* NeSI account, username and 2nd factor, for authenticating with the NeSI Globus endpoint and running a Funcx endpoint

Authentication Steps:

1. Connect to NeSI cluster and start a funcx endpoint there
2. Start funcX client locally
3. Request an access token from Globus
4. Connect to source Globus endpoint
5. Connect to NeSI Globus endpoint

Processing Steps:

6. Transfer input data to NeSI using Globus
7. Run the workflow using funcX
8. Copy result back using Globus

Steps 1-5 above require authentication. Once you have authenticated you should not need to rerun those steps every time you need to transfer data or run a function remotely, the credentials/tokens are cached (either indefinitely or for a limited time).

References:

* [Globus tutorial](https://globus-sdk-python.readthedocs.io/en/stable/tutorial.html)
* [funcX endpoint documentation](https://funcx.readthedocs.io/en/latest/endpoints.html)

## 1. Start funcx endpoint on NeSI

### Install and configure funcx endpoint if you have not done it before

Connect to a Mahuika login node by SSH and run the following commands to install funcx:

```sh
ssh mahuika
module load Python
pip install --user funcx funcx_endpoint
funcx-endpoint configure
```

During the final command you will be asked to authenticate with Globus Auth so that your endpoint can be made available to funcx running outside of NeSI.

For more details see: https://funcx.readthedocs.io/en/latest/endpoints.html.

### Start the funcx endpoint on NeSI

A default endpoint profile is created during the configure step above, which will suffice for us. We will be using funcx to submit jobs to Slurm or check the status of submitted jobs; no computationally expensive tasks should run directly on the endpoint itself.

```sh
# we are still on the Mahuika login node here...
funcx-endpoint start
```

Now list your endpoints, confirm that the *default* endpoint is "Active" and make a note of your endpoint ID:

```sh
funcx-endpoint list
+---------------+-------------+--------------------------------------+
| Endpoint Name |   Status    |             Endpoint ID              |
+===============+=============+======================================+
| default       | Active      | f9058e4c-3b93-46a9-a764-81810c86c2b3 |
+---------------+-------------+--------------------------------------+
```

In [1]:
# store your funcx endpoint id here
funcx_endpoint = "f9058e4c-3b93-46a9-a764-81810c86c2b3"

## 2. Start funcX client locally

Start the funcX client locally so we can submit jobs to the NeSI funcX endpoint we just created. This will also require authentication with Globus Auth.

In [2]:
from funcx.sdk.client import FuncXClient

fxc = FuncXClient()

## 3. Request access token from Globus

### Register an app with Globus, if you haven't done it already

Note: I think this is a one off, you can reuse the same client id.

> Navigate to the [Developer Site](https://developers.globus.org/) and select “Register your app with Globus.” You will be prompted to login – do so with the account you wish to use as your app’s administrator...

In [3]:
# identifier for the app we created on globus website above, can be reused
CLIENT_ID = "6ffc9c02-cf62-4268-a695-d9d100181962"

### Get Globus access token

> Talking to Globus Services as a user requires that you authenticate to your new App and get it Tokens, credentials proving that you logged into it and gave it permission to access the service.

The standard access token is valid for about 10 mins, using the refresh token (from Advanced 2 in tutorial) should last longer (forever?).

In [None]:
import globus_sdk

client = globus_sdk.NativeAppAuthClient(CLIENT_ID)
client.oauth2_start_flow(refresh_tokens=True)

authorize_url = client.oauth2_get_authorize_url()
print('Please go to this URL and login: {0}'.format(authorize_url))

auth_code = input('Please enter the code here: ').strip()
token_response = client.oauth2_exchange_code_for_tokens(auth_code)

# globus access token
globus_auth_data = token_response.by_resource_server['auth.globus.org']
globus_at = globus_auth_data['access_token']

# for Globus transfer service
globus_transfer_data = token_response.by_resource_server['transfer.api.globus.org']
# the refresh token and access token, often abbr. as RT and AT
transfer_rt = globus_transfer_data['refresh_token']
transfer_at = globus_transfer_data["access_token"]
expires_at_s = globus_transfer_data['expires_at_seconds']

### Create authorizer object and check that we can use it

Refresh authorizer should refresh the token as needed.

In [5]:
# a GlobusAuthorizer is an auxiliary object we use to wrap the token
authorizer = globus_sdk.RefreshTokenAuthorizer(
    transfer_rt, client, access_token=transfer_at, expires_at=expires_at_s)
tc = globus_sdk.TransferClient(authorizer=authorizer)

In [None]:
print("My Globus Endpoints:")
for ep in tc.endpoint_search(filter_scope="my-endpoints"):
    print("  - [{}] {}".format(ep["id"], ep["display_name"]))

## 4. Connect to source Globus endpoint

Connect to the personal endpoint you have on your local machine (e.g. where you are running this notebook).

You should not need to do any authentication since Globus will use the token you generated above.

We list the files in the input data directory to check our access is working.

In [7]:
import os

# put your Globus endpoint id here:
#src_endpoint = "1fdfb7aa-544e-11eb-87b7-02187389bd35"
src_endpoint = "6890f1a4-3f21-11eb-b55a-02d9497ca481"

# paths to input and output
src_base_path = os.getcwd()
src_input_path = os.path.join(src_base_path, "input")
src_output_path = os.path.join(src_base_path, "output")

# activate the endpoint
res_src_ep = tc.endpoint_autoactivate(src_endpoint, if_expires_in=3600)
if res_src_ep['code'] == 'AutoActivationFailed':
    print("Endpoint activation failed!")
    print(res_src_ep)
else:
    print(f"Endpoint activation succeeded: {res_src_ep['code']}")
    
    # list the source directory
    print("Source files:")
    for entry in tc.operation_ls(src_endpoint, path=src_input_path):
        print(f'  {entry["name"]} ({entry["type"]})')

Endpoint activation succeeded: AutoActivated.GlobusOnlineCredential
Source files:
  apoa1.namd (file)
  apoa1.pdb (file)
  apoa1.psf (file)
  par_all22_popc.xplor (file)
  par_all22_prot_lipid.xplor (file)
  run.sl (file)


## 5. Connect to the NeSI Globus endpoint

Authentication to the NeSI endpoint is more complicated. In addition to the Globus token we also need to authenticate with our NeSI username, password and 2nd factor. You will be prompted to enter these below.

Authentication to the NeSI endpoint lasts for 1-2 days, i.e. you will not need to reenter your NeSI credentials every time you use the NeSI endpoint. 

In [8]:
nesi_endpoint = "3064bb28-e940-11e8-8caa-0a1d4c5c824a"  # NeSI endpoint
nesi_path = "/nesi/nobackup/nesi99999/csco212/cer_instrument_data/funcx/test-workflow"

In [None]:
# for testing authentication we have to deactivate first...
#tc.endpoint_deactivate(nesi_endpoint)

In [9]:
# activate the endpoint
res_nesi_ep = tc.endpoint_autoactivate(nesi_endpoint, if_expires_in=3600)
res_nesi_ep["message"]

'The endpoint could not be auto activated, fill in the returned activation_requirements and POST them back to /activate to perform manual activation.'

In [11]:
# if auto activation failed, try activate
if res_nesi_ep['code'] == 'AutoActivationFailed':
    print("Endpoint activation failed!")
    print(res_nesi_ep["message"])
    #print(res_nesi_ep)
    
    # try normal activate, first get username and password
    import getpass
    nesi_user = input("Enter NeSI username: ").strip()
    nesi_pass = getpass.getpass(prompt="Enter NeSI password and 2nd factor joined together: ")

    # then get requirements and fill in user/pass
    req = tc.endpoint_get_activation_requirements(nesi_endpoint)
    rd = req.data
    for i in range(rd["length"]):
        name = rd["DATA"][i]["name"]
        if name == "username":
            rd["DATA"][i]["value"] = nesi_user
        elif name == "passphrase":
            rd["DATA"][i]["value"] = nesi_pass

    # activate
    res_nesi_ep = tc.endpoint_activate(nesi_endpoint, rd, if_expires_in=3600)
    print(res_nesi_ep["code"])
    print(res_nesi_ep["message"])
    
else:
    print(f"Endpoint activation succeeded: {res_dst_ep['code']}")

Endpoint activation failed!
The endpoint could not be auto activated, fill in the returned activation_requirements and POST them back to /activate to perform manual activation.


Enter NeSI username:  csco212
Enter NeSI password and 2nd factor joined together:  ···················


Activated.MyProxyCredential
Endpoint activated successfully using a credential fetched from a MyProxy server.


## 6. Transfer input data to NeSI

First we make a directory name that the simulation will be stored under, then copy the data under that directory.

In [23]:
# make a directory for running under
from datetime import datetime

# get a unique name for this run
workdirbase = datetime.now().strftime("%Y%m%dT%H%M%S")
workdirname = workdirbase
got_dirname = False
existing_names = [item["name"] for item in tc.operation_ls(nesi_endpoint, path=nesi_path)]
count = 0
while not got_dirname:
    # check the directory does not already exist
    if workdirname in existing_names:
        count += 1
        workdirname = f"{workdirbase}.{count:06d}"
    else:
        got_dirname = True
print(f"Directory: {workdirname}")

Directory: 20210319T100025


In [24]:
# initiate the data transfer to NeSI
tdata = globus_sdk.TransferData(tc, src_endpoint,
                                    nesi_endpoint,
                                    label="Copying input data",
                                    sync_level="checksum")

# add the input files to the transfer
nesi_work_path = nesi_path + "/" + workdirname
print(f"Working directory on NeSI will be: {nesi_work_path}")
tdata.add_item(src_input_path, nesi_work_path,
               recursive=True)

# actually start the transfer
transfer_result = tc.submit_transfer(tdata)
task_id = transfer_result["task_id"]
print("task_id =", transfer_result["task_id"])

# the task id can be used to refer to this transfer
# for example, here we wait for the data transfer to complete
while not tc.task_wait(task_id, timeout=10, polling_interval=10):
    print("waiting for transfer to complete...")
print("transfer to NeSI is complete")

Working directory on NeSI will be: /nesi/nobackup/nesi99999/csco212/cer_instrument_data/funcx/test-workflow/20210319T100025
task_id = ff048c14-882c-11eb-b79f-f57b2d55370d
waiting for transfer to complete...
waiting for transfer to complete...
waiting for transfer to complete...
waiting for transfer to complete...
waiting for transfer to complete...
waiting for transfer to complete...
waiting for transfer to complete...
waiting for transfer to complete...
waiting for transfer to complete...
transfer to NeSI is complete


You can also login to Globus web interface and see the status of your transfer there.

## 7. Run the processing using funcX

Two functions are called using FuncX:

1. Submit job to Slurm
2. Check Slurm job status

In [14]:
print(f"FuncX endpoint id: {funcx_endpoint}")

FuncX endpoint id: f9058e4c-3b93-46a9-a764-81810c86c2b3


In [34]:
# function that submits a job to Slurm (assumes submit script and other required inputs were uploaded via Globus)
def run_processing(submit_script, work_dir=None):
    """Runs the given command in a Slurm job."""
    # have to load modules within the function
    import os
    import subprocess
    
    # change to working directory
    if work_dir is not None and os.path.isdir(work_dir):
        os.chdir(work_dir)
    
    # submit the Slurm job and return the job id
    submit_cmd = f'sbatch --priority=9999 {submit_script}'
    with open("submit_cmd.txt", "w") as fh:
        fh.write(submit_cmd + "\n")
    output = subprocess.check_output(submit_cmd, shell=True, universal_newlines=True)
    
    return output

# register the function with FuncX - we have to do this so that we can execute the function on our remote endpoint
processing_func_id = fxc.register_function(run_processing)
processing_func_id

'1551c2b7-b588-4d59-a6c0-a76058fc9275'

In [32]:
# function that checks Slurm job status
def check_status(jobid):
    """Check Slurm job status."""
    # have to load modules within the function
    import subprocess
    
    # submit the Slurm job and return the job id
    cmd = f'sacct -j {jobid} -X -o State -n'
    output = subprocess.check_output(cmd, shell=True, universal_newlines=True)
    
    return output

# register the function with FuncX - we have to do this so that we can execute the function on our remote endpoint
status_func_id = fxc.register_function(check_status)
status_func_id

'07ecba30-237b-4dc2-843c-b844021d6b4c'

### Submit the job to Slurm

In [35]:
import time

resid = fxc.run("run.sl", work_dir=nesi_work_path, endpoint_id=funcx_endpoint, function_id=processing_func_id)
print(f"funcX result id: {resid}")

# wait for the job to run
print("submitting job via funcX:")
while True:
    try:
        res = fxc.get_result(resid)
    except Exception as e:
        stre = str(e)
        if stre in ["waiting-for-ep", "waiting-for-launch", "running"]:  # these are "good" exceptions 
            print(stre)
            time.sleep(10)
        else: 
            raise  # other exceptions mean something went wrong
    else:
        break  # no exception means the job finished successfully

jobid = res.split()[-1]
print(f"Job submitted: {jobid}")

funcX result id: d3eb5057-84b2-471e-b4b9-ade6fde03b9f
submitting job via funcX:
waiting-for-ep
Job submitted: 18621980


### Wait for the job to complete

In [36]:
job_finished = False
while not job_finished:
    resid = fxc.run(jobid, endpoint_id=funcx_endpoint, function_id=status_func_id)
    print("submitting status job via funcX:")
    while True:
        try:
            res = fxc.get_result(resid)
        except Exception as e:
            stre = str(e)
            if stre in ["waiting-for-ep", "waiting-for-launch", "running"]:  # these are "good" exceptions 
                print(stre)
                time.sleep(10)
            else: 
                raise  # other exceptions mean something went wrong
        else:
            break  # no exception means the job finished successfully
    job_status = res.strip()
    print(f"Job status is: {job_status}")
    if job_status not in ("RUNNING", "PENDING"):  # TODO: check possible statuses
        job_finished = True
print("Job finished")

submitting status job via funcX:
waiting-for-ep
Job status is: RUNNING
submitting status job via funcX:
waiting-for-ep
Job status is: COMPLETED
Job finished


## 8. Copy result back using Globus

In [22]:
import os

from_path = nesi_work_path
to_path = os.path.join(src_output_path, workdirname)
tdata = globus_sdk.TransferData(tc, nesi_endpoint,
                                    src_endpoint,
                                    label="Copying results",
                                    sync_level="checksum")
tdata.add_item(from_path, to_path, recursive=True)
transfer_result = tc.submit_transfer(tdata)
task_id = transfer_result["task_id"]
print("task_id =", transfer_result["task_id"])

# wait for the data transfer to complete
while not tc.task_wait(task_id, timeout=10, polling_interval=10):
    print("waiting for transfer to complete...")
print(f"transfer from NeSI is complete: {to_path}")

task_id = f20b90cc-882a-11eb-85ce-8d7a7e0e97e7
waiting for transfer to complete...
waiting for transfer to complete...
waiting for transfer to complete...
waiting for transfer to complete...
transfer from NeSI is complete: /home/cdjs/DocumentsSync/work/projects/cer_instrument_data/funcx/funcx-globus-nesi-demo/output/20210319T092437
