Skip to content

Commit

Permalink
Merge pull request #86 from DLHub-Argonne/fx_auth_login
Browse files Browse the repository at this point in the history
Adds necessary authorizers for funcx
  • Loading branch information
ZhuozhaoLi committed Oct 21, 2020
2 parents e1ebf8e + 8ef3d9b commit 99e3a89
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 15 deletions.
40 changes: 27 additions & 13 deletions dlhub_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
from tempfile import mkstemp

import requests
import globus_sdk

from typing import Union, Any, Optional
from globus_sdk.base import BaseClient, slash_join
from mdf_toolbox import login, logout
from mdf_toolbox.search_helper import SEARCH_LIMIT

from funcx.sdk.client import FuncXClient
from funcx.serialize import FuncXSerializer

from dlhub_sdk.config import DLHUB_SERVICE_ADDRESS, CLIENT_ID
from dlhub_sdk.utils.futures import DLHubFuture
Expand All @@ -36,7 +37,7 @@ class DLHubClient(BaseClient):
and providing that authorizer to the initializer (e.g., ``DLHubClient(auth)``)"""

def __init__(self, dlh_authorizer=None, search_client=None, http_timeout=None,
force_login=False, fx_authorizer=None, **kwargs):
force_login=False, fx_authorizer=None, openid_authorizer=None, **kwargs):
"""Initialize the client
Args:
Expand All @@ -60,17 +61,22 @@ def __init__(self, dlh_authorizer=None, search_client=None, http_timeout=None,
<globus_sdk.authorizers.base.GlobusAuthorizer>`):
An authorizer instance used to communicate with funcX.
If ``None``, will be created.
openid_authorizer (:class:`GlobusAuthorizer
<globus_sdk.authorizers.base.GlobusAuthorizer>`):
An authorizer instance used to communicate with OpenID.
If ``None``, will be created.
no_browser (bool): Do not automatically open the browser for the Globus Auth URL.
Display the URL instead and let the user navigate to that location manually.
**Default**: ``True``.
Keyword arguments are the same as for BaseClient.
"""
if force_login or not dlh_authorizer or not search_client or not fx_authorizer:

if force_login or not dlh_authorizer or not search_client \
or not fx_authorizer or not openid_authorizer:
fx_scope = "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all"
auth_res = login(services=["search", "dlhub",
fx_scope, "openid"],
app_name="DLHub_Client",
make_clients=False,
client_id=CLIENT_ID,
clear_old_tokens=force_login,
token_dir=_token_dir,
Expand All @@ -79,20 +85,28 @@ def __init__(self, dlh_authorizer=None, search_client=None, http_timeout=None,
# openid_authorizer = auth_res["openid"]
dlh_authorizer = auth_res["dlhub"]
fx_authorizer = auth_res[fx_scope]
self._search_client = auth_res["search"]

self._fx_client = FuncXClient(force_login=force_login,
no_local_server=kwargs.get("no_local_server", True),
no_browser=kwargs.get("no_browser", True),
funcx_service_address='https://api.funcx.org/v1',)
openid_authorizer = auth_res['openid']
search_authorizer = auth_res['search']
self._fx_client = FuncXClient(force_login=force_login,
fx_authorizer=fx_authorizer,
search_authorizer=search_authorizer,
openid_authorizer=openid_authorizer,
no_local_server=kwargs.get("no_local_server", True),
no_browser=kwargs.get("no_browser", True),
funcx_service_address='https://api.funcx.org/v1')
self._search_client = globus_sdk.SearchClient(authorizer=search_authorizer,
http_timeout=5 * 60)

else:
self._search_client = search_client
# funcX endpoint to use
self.fx_endpoint = '86a47061-f3d9-44f0-90dc-56ddc642c000'
# self.fx_endpoint = '2c92a06a-015d-4bfa-924c-b3d0c36bdad7'
self.fx_serializer = FuncXSerializer()
self.fx_cache = {}
super(DLHubClient, self).__init__("DLHub", environment='dlhub', authorizer=dlh_authorizer,
http_timeout=http_timeout, base_url=DLHUB_SERVICE_ADDRESS,
super(DLHubClient, self).__init__("DLHub", environment='dlhub',
authorizer=dlh_authorizer,
http_timeout=http_timeout,
base_url=DLHUB_SERVICE_ADDRESS,
**kwargs)

def logout(self):
Expand Down
7 changes: 6 additions & 1 deletion dlhub_sdk/utils/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ def __init__(self, client, task_id: str, ping_interval: float):
self.task_id = task_id
self.ping_interval = ping_interval

# List of pending statuses returned by funcX.
# TODO: Replace this once funcX stops raising exceptions when a task is pending.
self.pending_statuses = ["received", "waiting-for-ep", "waiting-for-nodes",
"waiting-for-launch", "running"]

# Once you create this, the task has already started
self.set_running_or_notify_cancel()

Expand Down Expand Up @@ -48,7 +53,7 @@ def running(self):
status = self.client.get_result(self.task_id, verbose=True)
except Exception as e:
# Check if it is "Task pending". funcX throws an exception on pending.
if e.args[0] == "Task pending":
if e.args[0] in self.pending_statuses:
return True
else:
self.set_exception(e)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ globus-sdk>=1.9.0
requests>=2.24.0
mdf_toolbox>=0.5.4
jsonschema>=3.2.0
funcx>=0.0.2a0
funcx>=0.0.5

0 comments on commit 99e3a89

Please sign in to comment.