Skip to content

Commit

Permalink
Merge pull request #76 from DLHub-Argonne/funcx_updates_#66
Browse files Browse the repository at this point in the history
Funcx updates #66
  • Loading branch information
ryanchard committed Mar 12, 2020
2 parents 7f220c7 + 013f088 commit fe9bf72
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 39 deletions.
124 changes: 90 additions & 34 deletions dlhub_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from mdf_toolbox import login, logout
from mdf_toolbox.search_helper import SEARCH_LIMIT

from funcx.sdk.client import FuncXClient
from funcx.serialize import FuncXSerializer

from dlhub_sdk.config import DLHUB_SERVICE_ADDRESS, CLIENT_ID
from dlhub_sdk.utils.futures import DLHubFuture
from dlhub_sdk.utils.schemas import validate_against_dlhub_schema
Expand All @@ -18,7 +21,6 @@
# Directory for authenticaation tokens
_token_dir = os.path.expanduser("~/.dlhub/credentials")


class DLHubClient(BaseClient):
"""Main class for interacting with the DLHub service
Expand All @@ -34,7 +36,7 @@ class DLHubClient(BaseClient):
and providing that authorizer to the initializer (e.g., ``DLHubClient(auth)``)"""

def __init__(self, dlh_authorizer=None, search_client=None, http_timeout=None,
force_login=False, **kwargs):
force_login=False, fx_authorizer=None, **kwargs):
"""Initialize the client
Args:
Expand All @@ -49,16 +51,40 @@ def __init__(self, dlh_authorizer=None, search_client=None, http_timeout=None,
force_login (bool): Whether to force a login to get new credentials.
A login will always occur if ``dlh_authorizer`` or ``search_client``
are not provided.
no_local_server (bool): Disable spinning up a local server to automatically
copy-paste the auth code. THIS IS REQUIRED if you are on a remote server.
When used locally with no_local_server=False, the domain is localhost with
a randomly chosen open port number.
**Default**: ``True``.
fx_authorizer (:class:`GlobusAuthorizer
<globus_sdk.authorizers.base.GlobusAuthorizer>`):
An authorizer instance used to communicate with funcX.
If ``None``, will be created.
no_browser (bool): Do not automatically open the browser for the Globus Auth URL.
Display the URL instead and let the user navigate to that location manually.
**Default**: ``True``.
Keyword arguments are the same as for BaseClient.
"""
if force_login or not dlh_authorizer or not search_client:
if force_login or not dlh_authorizer or not search_client or not fx_authorizer:

auth_res = login(services=["search", "dlhub"], app_name="DLHub_Client",
fx_scope = "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all"
auth_res = login(services=["search", "dlhub",
fx_scope],
app_name="DLHub_Client",
client_id=CLIENT_ID, clear_old_tokens=force_login,
token_dir=_token_dir)
token_dir=_token_dir, no_local_server=kwargs.get("no_local_server", True),
no_browser=kwargs.get("no_browser", True))
dlh_authorizer = auth_res["dlhub"]
fx_authorizer = auth_res[fx_scope]
self._search_client = auth_res["search"]

self._fx_client = FuncXClient(force_login=True,fx_authorizer=fx_authorizer,
funcx_service_address='https://funcx.org/api/v1')

# funcX endpoint to use
self.fx_endpoint = '86a47061-f3d9-44f0-90dc-56ddc642c000'
# self.fx_endpoint = '2c92a06a-015d-4bfa-924c-b3d0c36bdad7'
self.fx_serializer = FuncXSerializer()
self.fx_cache = {}
super(DLHubClient, self).__init__("DLHub", environment='dlhub', authorizer=dlh_authorizer,
http_timeout=http_timeout, base_url=DLHUB_SERVICE_ADDRESS,
**kwargs)
Expand Down Expand Up @@ -106,6 +132,10 @@ def get_servables(self, only_latest_version=True):
output.append(r)
results = output

# Add these to the cache
for r in results:
self.fx_cache[r['dlhub']['shorthand_name']] = r['dlhub']['funcx_id']

return results

def list_servables(self):
Expand All @@ -127,8 +157,8 @@ def get_task_status(self, task_id):
dict: status block containing "status" key.
"""

r = self.get("{task_id}/status".format(task_id=task_id))
return r.data
r = self._fx_client.get_task_status(task_id)
return r

def describe_servable(self, name):
"""Get the description for a certain servable
Expand Down Expand Up @@ -166,46 +196,50 @@ def describe_methods(self, name, method=None):
metadata = self.describe_servable(name)
return get_method_details(metadata, method)

def run(self, name, inputs, input_type='python',
def run(self, name, inputs,
asynchronous=False, async_wait=5) -> Union[Any, DLHubFuture]:
"""Invoke a DLHub servable
Args:
name (string): DLHub name of the servable of the form <user>/<servable_name>
inputs: Data to be used as input to the function. Can be a string of file paths or URLs
input_type (string): How to send the data to DLHub. Can be "python" (which pickles
the data), "json" (which uses JSON to serialize the data), or "files" (which
sends the data as files).
asynchronous (bool): Whether to return from the function immediately or
wait for the execution to finish.
async_wait (float): How many sections wait between checking async status
async_wait (float): How many seconds to wait between checking async status
Returns:
Results of running the servable. If asynchronous, then the task ID
"""
servable_path = 'servables/{name}/run'.format(name=name)

# Prepare the data to be sent to DLHub
if input_type == 'python':
# data = {'python': codecs.encode(pkl.dumps(inputs), 'base64').decode()}
data = {'python': jsonpickle.encode(inputs)}
elif input_type == 'json':
data = {'data': inputs}
elif input_type == 'files':
raise NotImplementedError('Files support is not yet implemented')
else:
raise ValueError('Input type not recognized: {}'.format(input_type))

# Set the asynchronous option
data['asynchronous'] = asynchronous
if name not in self.fx_cache:
# Look it up and add it to the cache, this will raise an exception if not found.
serv = self.describe_servable(name)
self.fx_cache.update({name: serv['dlhub']['funcx_id']})

# Send the data to DLHub
r = self.post(servable_path, json_body=data)
if (not asynchronous and r.http_status != 200) \
or (asynchronous and r.http_status != 202):
raise Exception(r)
funcx_id = self.fx_cache[name]
payload = {'data': inputs}
task_id = self._fx_client.run(payload, endpoint_id=self.fx_endpoint, function_id=funcx_id)
#r = self.post(servable_path, json_body=data)
# if (not asynchronous and r.http_status != 200) \
# or (asynchronous and r.http_status != 202):
# raise Exception(r)

# Return the result
return DLHubFuture(self, r.data['task_id'], async_wait) if asynchronous else r.data
return DLHubFuture(self, task_id, async_wait).result() if not asynchronous else task_id

def get_result(self, task_id, verbose=False):
"""Get the result of a task_id
Args:
task_id str: The task's uuid
verbose bool: whether or not to return the full dlhub response
Returns:
Reult of running the servable
"""

result = self._fx_client.get_result(task_id)
if isinstance(result, tuple) and not verbose:
result = result[0]
return result

def publish_servable(self, model):
"""Submit a servable to DLHub
Expand All @@ -230,6 +264,9 @@ def publish_servable(self, model):
# Validate against the servable schema
validate_against_dlhub_schema(metadata, 'servable')

# Wipe the fx cache so we don't keep reusing an old servable
self.clear_funcx_cache()

# Get the data to be submitted as a ZIP file
fp, zip_filename = mkstemp('.zip')
os.close(fp)
Expand Down Expand Up @@ -270,6 +307,10 @@ def publish_repository(self, repository):

# Publish to DLHub
metadata = {"repository": repository}

# Wipe the fx cache so we don't keep reusing an old servable
self.clear_funcx_cache()

response = self.post('publish_repo', json_body=metadata)

task_id = response.data['task_id']
Expand Down Expand Up @@ -368,7 +409,7 @@ def search_by_authors(self, authors, match_all=True, limit=None, only_latest=Tru
def search_by_related_doi(self, doi, limit=None, only_latest=True):
"""Get all of the servables associated with a certain publication
Return:
Args:
doi (string): DOI of related paper
limit (int): Maximum number of results to return
only_latest (bool): Whether to return only the most recent version of the model
Expand All @@ -378,3 +419,18 @@ def search_by_related_doi(self, doi, limit=None, only_latest=True):

results = self.query.match_doi(doi).search(limit=limit)
return filter_latest(results) if only_latest else results

def clear_funcx_cache(self, servable=None):
"""Remove functions from the cache. Either remove a specific servable or wipe the whole cache.
Args:
Servable: str
The name of the servable to remove. Default None
"""

if servable:
del(self.fx_cache[servable])
else:
self.fx_cache = {}

return self.fx_cache
19 changes: 14 additions & 5 deletions dlhub_sdk/utils/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,21 @@ def _ping_server(self):
def running(self):
if super().running():
# If the task isn't already completed, check if it is still running
status = self.client.get_task_status(self.task_id)
# TODO (lw): What if the task fails on the server end? Do we have a "FAILURE" status?
if status['status'] == 'COMPLETED':
self.set_result(json.loads(status['result']))
try:
status = self.client.get_result(self.task_id, verbose=True)
except Exception as e:
# Check if it is "Task pending". funcX throws an exception on pending.
if e.args[0] == "Task pending":
return True
else:
self.set_exception(e)
return False

if isinstance(status, tuple):
# TODO pass in verbose setting?
self.set_result(status[0])
return False
return True

return False

def stop(self):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ globus-sdk>=1.7.0
jsonpickle>=1.0
requests>=2.20.0
mdf_toolbox>=0.4.0
funcx
jsonschema>=3.0.0

0 comments on commit fe9bf72

Please sign in to comment.