Skip to content

Commit

Permalink
Merge pull request #64 from DLHub-Argonne/async_client
Browse files Browse the repository at this point in the history
Add Asynchronous Operations to Client
  • Loading branch information
WardLT committed Apr 19, 2019
2 parents 7a6d2cb + 40185ad commit a569d4b
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 4 deletions.
18 changes: 14 additions & 4 deletions dlhub_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

import jsonpickle
import requests
from typing import Union, Any
from globus_sdk.base import BaseClient, slash_join
from mdf_toolbox import login, logout
from mdf_toolbox.search_helper import SEARCH_LIMIT

from dlhub_sdk.config import DLHUB_SERVICE_ADDRESS, CLIENT_ID
from dlhub_sdk.utils.futures import DLHubFuture
from dlhub_sdk.utils.schemas import validate_against_dlhub_schema
from dlhub_sdk.utils.search import DLHubSearchHelper, get_method_details, filter_latest

Expand Down Expand Up @@ -164,7 +166,8 @@ def describe_methods(self, name, method=None):
metadata = self.describe_servable(name)
return get_method_details(metadata, method)

def run(self, name, inputs, input_type='python'):
def run(self, name, inputs, input_type='python',
asynchronous=False, async_wait=5) -> Union[Any, DLHubFuture]:
"""Invoke a DLHub servable
Args:
Expand All @@ -173,8 +176,11 @@ def run(self, name, inputs, input_type='python'):
input_type (string): How to send the data to DLHub. Can be "python" (which pickles
the data), "json" (which uses JSON to serialize the data), or "files" (which
sends the data as files).
asynchronous (bool): Whether to return from the function immediately or
wait for the execution to finish.
async_wait (float): How many sections wait between checking async status
Returns:
Results of running the servable
Results of running the servable. If asynchronous, then the task ID
"""
servable_path = 'servables/{name}/run'.format(name=name)

Expand All @@ -189,13 +195,17 @@ def run(self, name, inputs, input_type='python'):
else:
raise ValueError('Input type not recognized: {}'.format(input_type))

# Set the asynchronous option
data['asynchronous'] = asynchronous

# Send the data to DLHub
r = self.post(servable_path, json_body=data)
if r.http_status != 200:
if (not asynchronous and r.http_status != 200) \
or (asynchronous and r.http_status != 202):
raise Exception(r)

# Return the result
return r.data
return DLHubFuture(self, r.data['task_id'], async_wait) if asynchronous else r.data

def publish_servable(self, model):
"""Submit a servable to DLHub
Expand Down
6 changes: 6 additions & 0 deletions dlhub_sdk/tests/test_dlhub_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os

from dlhub_sdk.models.servables.python import PythonStaticMethodModel
from dlhub_sdk.utils.futures import DLHubFuture
from dlhub_sdk.client import DLHubClient


Expand Down Expand Up @@ -49,6 +50,11 @@ def test_run(self):
res = self.dl.run("{}/{}".format(user, name), data, input_type='python')
self.assertEqual({}, res)

# Test an asynchronous request
task_future = self.dl.run("{}/{}".format(user, name), data, asynchronous=True)
self.assertIsInstance(task_future, DLHubFuture)
self.assertEqual({}, task_future.result(timeout=60))

@skipUnless(is_travis, 'Publish test only runs on Travis')
def test_submit(self):
# Make an example function
Expand Down
59 changes: 59 additions & 0 deletions dlhub_sdk/utils/futures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Tools for dealing with asynchronous execution"""
from globus_sdk import GlobusAPIError
from concurrent.futures import Future
from threading import Thread
from time import sleep
import json


class DLHubFuture(Future):
"""Utility class for simplifying asynchronous execution in DLHub"""

def __init__(self, client, task_id: str, ping_interval: float):
"""
Args:
client (DLHubClient): Already-initialized client, used to check
task_id (str): Set the task ID of the
ping_interval (float): How often to ping the server to check status in seconds
"""
super().__init__()
self.client = client
self.task_id = task_id
self.ping_interval = ping_interval

# Once you create this, the task has already started
self.set_running_or_notify_cancel()

# Forcing the ping interval to be no less than 1s
if ping_interval < 1:
assert AttributeError('Ping interval must be at least 1 second')

# Start a thread that polls status
self._checker_thread = Thread(target=DLHubFuture._ping_server, args=(self,))
self._checker_thread.start()

def _ping_server(self):
while True:
sleep(self.ping_interval)
try:
if not self.running():
break
except GlobusAPIError:
# Keep pinging even if the results fail
continue

def running(self):
if super().running():
# If the task isn't already completed, check if it is still running
status = self.client.get_task_status(self.task_id)
# TODO (lw): What if the task fails on the server end? Do we have a "FAILURE" status?
if status['status'] == 'COMPLETED':
self.set_result(json.loads(status['result']))
return False
return True
return False

def stop(self):
"""Stop the execution of the function"""
# TODO (lw): Should be attempt to cancel the execution of the task on DLHub?
self.set_exception(Exception('Cancelled by user'))
16 changes: 16 additions & 0 deletions dlhub_sdk/utils/tests/test_futures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dlhub_sdk.utils.futures import DLHubFuture
from dlhub_sdk.client import DLHubClient
from unittest import TestCase

# ID of a task that has completed in DLHub
completed_task = 'b8e51bc6-4081-4ec9-9e3b-b0e52198c08d'


class TestFutures(TestCase):

def test_future(self):
client = DLHubClient()
future = DLHubFuture(client, completed_task, 1)
self.assertFalse(future.running())
self.assertTrue(future.done())
self.assertEquals({}, future.result())

0 comments on commit a569d4b

Please sign in to comment.