Skip to content

Commit

Permalink
Merge pull request #116 from DLHub-Argonne/test-client
Browse files Browse the repository at this point in the history
Overhaul how we use clients
  • Loading branch information
WardLT committed Sep 17, 2021
2 parents cfaa62e + 5adf4e1 commit 32c8ccf
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 288 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,4 @@ jobs:
- name: Test with pytest
run: |
# run all testing documents, except the local tests
pytest ./dlhub_sdk -k "not test_dlhub_client"
pytest ./dlhub_sdk
112 changes: 62 additions & 50 deletions dlhub_sdk/client.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import logging
import json
import os
from tempfile import mkstemp
from typing import Union, Any, Optional


import requests
import globus_sdk

from typing import Union, Any, Optional
from globus_sdk.authorizers import GlobusAuthorizer
from globus_sdk.base import BaseClient, slash_join
from mdf_toolbox import login, logout
from mdf_toolbox.globus_search.search_helper import SEARCH_LIMIT

from funcx.sdk.client import FuncXClient

from dlhub_sdk.config import DLHUB_SERVICE_ADDRESS, CLIENT_ID
from dlhub_sdk.utils.futures import DLHubFuture
from dlhub_sdk.utils.schemas import validate_against_dlhub_schema
from dlhub_sdk.utils.search import DLHubSearchHelper, get_method_details, filter_latest


# Directory for authentication tokens
_token_dir = os.path.expanduser("~/.dlhub/credentials")
logger = logging.getLogger(__name__)


class DLHubClient(BaseClient):
Expand All @@ -28,26 +29,28 @@ class DLHubClient(BaseClient):
Holds helper operations for performing common tasks with the DLHub service. For example,
`get_servables` produces a list of all servables registered with DLHub.
For most cases, we recommend creating a new DLHubClient by calling ``DLHubClient.login``.
This operation will check if you have saved any credentials to disk before using the CLI or SDK
and, if not, get new credentials and save them for later use.
For cases where disk access is unacceptable, you can create the client by creating an authorizer
following the
`tutorial for the Globus SDK <https://globus-sdk-python.readthedocs.io/en/stable/tutorial/>`_
and providing that authorizer to the initializer (e.g., ``DLHubClient(auth)``)"""

def __init__(self, dlh_authorizer=None, search_client=None, http_timeout=None,
force_login=False, fx_authorizer=None, openid_authorizer=None, **kwargs):
The initializer offers several routes for authenticating with the services:
1. **Token-based authentication**. The default route for login is to log in with your own
Globus-associated account. Provided no arguments, the DLHub client will ask you
to authenticate with Globus Auth then store your login tokens on your computer.
The DLHub SDK will re-use these tokens unless you later call :meth:`.logout`
or initialize the client with ``force_login=True`.
2. **Pre-defined authorizers**. The alternate route is to create authorizers
`using the Globus SDK directly <https://globus-sdk-python.readthedocs.io/en/stable/tutorial/>`_
and providing that authorizer to the initializer (e.g., ``DLHubClient(dlhub_authorizer=auth)``).
You must provide authorizers DLHub for all sub-services: Globus Search, FuncX, and OpenID.
"""

def __init__(self, dlh_authorizer: Optional[GlobusAuthorizer] = None,
search_authorizer: Optional[GlobusAuthorizer] = None,
fx_authorizer: Optional[GlobusAuthorizer] = None,
openid_authorizer: Optional[GlobusAuthorizer] = None,
http_timeout: Optional[int] = None,
force_login: bool = False, **kwargs):
"""Initialize the client
Args:
dlh_authorizer (:class:`GlobusAuthorizer
<globus_sdk.authorizers.base.GlobusAuthorizer>`):
An authorizer instance used to communicate with DLHub.
If ``None``, will be created.
search_client (:class:`SearchClient <globus_sdk.SearchClient>`):
An authenticated SearchClient to communicate with Globus Search.
If ``None``, will be created.
http_timeout (int): Timeout for any call to service in seconds. (default is no timeout)
force_login (bool): Whether to force a login to get new credentials.
A login will always occur if ``dlh_authorizer`` or ``search_client``
Expand All @@ -57,21 +60,34 @@ def __init__(self, dlh_authorizer=None, search_client=None, http_timeout=None,
When used locally with no_local_server=False, the domain is localhost with
a randomly chosen open port number.
**Default**: ``True``.
no_browser (bool): Do not automatically open the browser for the Globus Auth URL.
Display the URL instead and let the user navigate to that location manually.
**Default**: ``True``.
dlh_authorizer (:class:`GlobusAuthorizer <globus_sdk.authorizers.base.GlobusAuthorizer>`):
An authorizer instance used to communicate with DLHub.
If ``None``, will be created from your account's credentials.
search_authorizer (:class:`GlobusAuthorizer <globus_sdk.authorizers.base.GlobusAuthorizer>`):
An authenticated SearchClient to communicate with Globus Search.
If ``None``, will be created from your account's credentials.
fx_authorizer (:class:`GlobusAuthorizer
<globus_sdk.authorizers.base.GlobusAuthorizer>`):
An authorizer instance used to communicate with funcX.
If ``None``, will be created.
If ``None``, will be created from your account's credentials.
openid_authorizer (:class:`GlobusAuthorizer
<globus_sdk.authorizers.base.GlobusAuthorizer>`):
An authorizer instance used to communicate with OpenID.
If ``None``, will be created.
no_browser (bool): Do not automatically open the browser for the Globus Auth URL.
Display the URL instead and let the user navigate to that location manually.
**Default**: ``True``.
Keyword arguments are the same as for BaseClient.
If ``None``, will be created from your account's credentials.
Keyword arguments are the same as for :class:`BaseClient <globus_sdk.base.BaseClient>`.
"""
if force_login or not dlh_authorizer or not search_client \
or not fx_authorizer or not openid_authorizer:

authorizers = [dlh_authorizer, search_authorizer, openid_authorizer, fx_authorizer]
# Get authorizers through Globus login if any are not provided
if not all(a is not None for a in authorizers):
# If some but not all were provided, warn the user they could be making a mistake
if any(a is not None for a in authorizers):
logger.warning('You have defined some of the authorizers but not all. DLHub is falling back to login. '
'You must provide authorizers for DLHub, Search, OpenID, FuncX.')

fx_scope = "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all"
auth_res = login(services=["search", "dlhub",
fx_scope, "openid"],
Expand All @@ -82,26 +98,24 @@ def __init__(self, dlh_authorizer=None, search_client=None, http_timeout=None,
token_dir=_token_dir,
no_local_server=kwargs.get("no_local_server", True),
no_browser=kwargs.get("no_browser", True))
# openid_authorizer = auth_res["openid"]

# Unpack the authorizers
dlh_authorizer = auth_res["dlhub"]
fx_authorizer = auth_res[fx_scope]
openid_authorizer = auth_res['openid']
search_authorizer = auth_res['search']
self._fx_client = FuncXClient(force_login=force_login,
fx_authorizer=fx_authorizer,
search_authorizer=search_authorizer,
openid_authorizer=openid_authorizer,
no_local_server=kwargs.get("no_local_server", True),
no_browser=kwargs.get("no_browser", True),
)
self._search_client = globus_sdk.SearchClient(authorizer=search_authorizer,
http_timeout=5 * 60)

else:
self._search_client = search_client
# Define the subclients needed by the service
self._fx_client = FuncXClient(fx_authorizer=fx_authorizer,
search_authorizer=search_authorizer,
openid_authorizer=openid_authorizer,
no_local_server=kwargs.get("no_local_server", True),
no_browser=kwargs.get("no_browser", True))
self._search_client = globus_sdk.SearchClient(authorizer=search_authorizer,
http_timeout=5 * 60)

# funcX endpoint to use
self.fx_endpoint = '86a47061-f3d9-44f0-90dc-56ddc642c000'
# self.fx_endpoint = '2c92a06a-015d-4bfa-924c-b3d0c36bdad7'
self.fx_cache = {}
super(DLHubClient, self).__init__("DLHub", environment='dlhub',
authorizer=dlh_authorizer,
Expand Down Expand Up @@ -134,8 +148,8 @@ def get_servables(self, only_latest_version=True):
"""

# Get all of the servables
results, info = self.query.match_field('dlhub.type', 'servable')\
.add_sort('dlhub.owner', ascending=True).add_sort('dlhub.name', ascending=False)\
results, info = self.query.match_field('dlhub.type', 'servable') \
.add_sort('dlhub.owner', ascending=True).add_sort('dlhub.name', ascending=False) \
.add_sort('dlhub.publication_date', ascending=False).search(info=True)
if info['total_query_matches'] > SEARCH_LIMIT:
raise RuntimeError('DLHub contains more servables than we can return in one entry. '
Expand Down Expand Up @@ -193,8 +207,8 @@ def describe_servable(self, name):
raise AttributeError('Please enter name in the form <user>/<servable_name>')

# Create a query for a single servable
query = self.query.match_servable('/'.join(split_name[1:]))\
.match_owner(split_name[0]).add_sort("dlhub.publication_date", False)\
query = self.query.match_servable('/'.join(split_name[1:])) \
.match_owner(split_name[0]).add_sort("dlhub.publication_date", False) \
.search(limit=1)

# Raise error if servable is not found
Expand Down Expand Up @@ -409,7 +423,7 @@ def search_by_servable(self, servable_name=None, owner=None, version=None,
# Perform the query
results, info = (self.query.match_servable(servable_name=servable_name, owner=owner,
publication_date=version)
.search(limit=limit, info=True))
.search(limit=limit, info=True))

# Filter out the latest models
if only_latest:
Expand Down Expand Up @@ -469,10 +483,8 @@ def clear_funcx_cache(self, servable=None):
"""

if servable:
del(self.fx_cache[servable])
del (self.fx_cache[servable])
else:
self.fx_cache = {}

return self.fx_cache


0 comments on commit 32c8ccf

Please sign in to comment.