Skip to content

Commit

Permalink
Merge pull request #127 from DLHub-Argonne/run-debug
Browse files Browse the repository at this point in the history
Send debug and options flags in `run` payload
  • Loading branch information
WardLT committed Feb 24, 2022
2 parents 9449f12 + 3dede3b commit e644191
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 30 deletions.
43 changes: 29 additions & 14 deletions dlhub_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
import json
import os
from tempfile import mkstemp
from typing import Union, Any, Optional

from typing import Union, Any, Optional, Tuple, Dict

import requests
import globus_sdk
Expand Down Expand Up @@ -230,21 +229,33 @@ def describe_methods(self, name, method=None):
metadata = self.describe_servable(name)
return get_method_details(metadata, method)

def run(self, name, inputs,
asynchronous=False, async_wait=5,
timeout: Optional[float] = None) -> Union[Any, DLHubFuture]:
def run(self, name: str, inputs: Any, parameters: Optional[Dict[str, Any]] = None,
asynchronous: bool = False, debug: bool = False, async_wait: float = 5,
timeout: Optional[float] = None)\
-> Union[
DLHubFuture,
Tuple[Any, Dict[str, Any]],
Any
]:
"""Invoke a DLHub servable
Args:
name (string): DLHub name of the servable of the form <user>/<servable_name>
name: DLHub name of the servable of the form <user>/<servable_name>
inputs: Data to be used as input to the function. Can be a string of file paths or URLs
asynchronous (bool): Whether to return from the function immediately or
wait for the execution to finish.
async_wait (float): How many seconds to wait between checking async status
timeout (float): How long to wait for a result to return.
Only used for synchronous calls
parameters: Any optional parameters to pass to the function.
asynchronous: Whether to return from the function immediately or wait for the execution to finish.
debug: Whether to capture the standard out and error printed during execution
async_wait: How many seconds to wait between checking async status
timeout: How long to wait for a result to return. Only used for synchronous calls
Returns:
Results of running the servable. If asynchronous, then a DLHubFuture holding the result
If asynchronous, a DLHubFuture for the execution
If debug, the output of the function and dictionary holding the following information:
- success: Whether the code inside ran without raising an exception
- stdout/stderr: Captured standard output and error, if requested
- timing: Execution time for the segment in seconds
- exc: Captured exception object
- error_message: Exception traceback
If neither, the output of the function
"""

if name not in self.fx_cache:
Expand All @@ -253,11 +264,15 @@ def run(self, name, inputs,
self.fx_cache.update({name: serv['dlhub']['funcx_id']})

funcx_id = self.fx_cache[name]
payload = {'data': inputs}
payload = {
'inputs': inputs,
'parameters': parameters,
'debug': debug
}
task_id = self._fx_client.run(payload, endpoint_id=self.fx_endpoint, function_id=funcx_id)

# Return the result
future = DLHubFuture(self, task_id, async_wait)
future = DLHubFuture(self, task_id, async_wait, debug)
return future.result(timeout=timeout) if not asynchronous else future

def run_serial(self, servables, inputs, async_wait=5):
Expand Down
23 changes: 15 additions & 8 deletions dlhub_sdk/tests/test_dlhub_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,25 @@ def test_get_servables(dl):


def test_run(dl):
user = "zhuozhao_uchicago"
name = "noop"
data = {"data": ["V", "Co", "Zr"]}
user = "aristana_uchicago"
name = "noop_v10"
data = True # accepts anything as input, but listed as Boolean in DLHub

# Test a synchronous request
res = dl.run("{}/{}".format(user, name), data, timeout=60)
assert res == 'Hello'
# res[0] contains model results, res[1] contains event data JSON
assert res == 'Hello world!'

# Do the same thing with debug mode
res = dl.run("{}/{}".format(user, name), data, timeout=60, debug=True)
# res[0] contains model results, res[1] contains event data JSON
assert res[0] == 'Hello world!'
assert isinstance(res[1], dict)

# Test an asynchronous request
task_id = dl.run("{}/{}".format(user, name), data, asynchronous=True)
assert isinstance(task_id, DLHubFuture)
assert task_id.result(timeout=60) == 'Hello'
res = dl.run("{}/{}".format(user, name), data, asynchronous=True)
assert isinstance(res, DLHubFuture)
assert res.result(timeout=60) == 'Hello world!'


@mark.skipif(not is_gha, reason='Avoid running this test except on larger-scale tests of the system')
Expand Down Expand Up @@ -213,6 +220,6 @@ def test_namespace(dl):


def test_status(dl):
future = dl.run('zhuozhao_uchicago/noop', 'test', asynchronous=True)
future = dl.run('aristana_uchicago/noop_v10', 'test', asynchronous=True)
# Need spec for Fx status returns
assert isinstance(dl.get_task_status(future.task_id), dict)
29 changes: 21 additions & 8 deletions dlhub_sdk/utils/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@
class DLHubFuture(Future):
"""Utility class for simplifying asynchronous execution in DLHub"""

def __init__(self, client, task_id: str, ping_interval: float):
def __init__(self, client, task_id: str, ping_interval: float, debug: bool):
"""
Args:
client (DLHubClient): Already-initialized client, used to check
task_id (str): Set the task ID of the
ping_interval (float): How often to ping the server to check status in seconds
client: Already-initialized client, used to check
task_id: Set the task ID of the
ping_interval: How often to ping the server to check status in seconds
debug: Whether to return the run metadata
"""
super().__init__()
self.client = client
self.task_id = task_id
self.ping_interval = ping_interval
self.debug = debug

# List of pending statuses returned by funcX.
# TODO: Replace this once funcX stops raising exceptions when a task is pending.
Expand Down Expand Up @@ -50,18 +52,29 @@ def running(self):
if super().running():
# If the task isn't already completed, check if it is still running
try:
status = self.client.get_result(self.task_id, verbose=True)
results = self.client.get_result(self.task_id, verbose=True)
except Exception as e:
# Check if it is "Task pending". funcX throws an exception on pending.
if e.args[0] in self.pending_statuses:
return True

# If not, something has gone wrong and we need to throw an exception
else:
self.set_exception(e)
return False

if isinstance(status, tuple):
# TODO pass in verbose setting?
self.set_result(status[0])
# If successfull, `status` now contains:
# (function_return, metadata), run_time
(return_val, metadata), _ = results

if not metadata['success']:
self.set_exception(metadata['exc'])
else:
# If debug: then return return_val and metadata
if self.debug:
self.set_result((return_val, metadata))
else:
self.set_result(return_val)
return False

return False
Expand Down

0 comments on commit e644191

Please sign in to comment.