-
Notifications
You must be signed in to change notification settings - Fork 0
FEAT 222 Add client credential flow #228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
891d2f2
3fe806d
291da3b
89487f7
251cd43
f58daf6
4ad2fb2
659148c
f693cfb
757384d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
import os | ||
import warnings | ||
from typing import Tuple, Union, Container, Optional, Mapping, TypeVar, Any | ||
from typing import Tuple, Union, Optional, Mapping, TypeVar, Any, Callable | ||
from functools import wraps | ||
from copy import copy | ||
|
||
import requests | ||
from urllib3.util.retry import Retry | ||
|
@@ -31,7 +33,7 @@ | |
# noinspection PyUnresolvedReferences | ||
import requests_auth # type: ignore[import] | ||
import keyring | ||
from ._oidc import OIDCSessionFactory | ||
from ._oidc import OIDCSessionFactory, get_client_credential_auth | ||
except ImportError: | ||
_oidc_enabled = False | ||
|
||
|
@@ -51,6 +53,29 @@ | |
|
||
_platform_windows = False | ||
|
||
Return_Type = TypeVar("Return_Type") | ||
|
||
|
||
def require_oidc(func: Callable[..., Return_Type]) -> Callable[..., Return_Type]: | ||
"""Enforce that OIDC features are enabled before executing the wrapped function/method. | ||
|
||
Raises | ||
------ | ||
ImportError | ||
If the OIDC features have not been installed. | ||
""" | ||
|
||
@wraps(func) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is nice |
||
def wrapper(*args: Any, **kwargs: Any) -> Return_Type: | ||
if not _oidc_enabled: | ||
raise ImportError( | ||
"OpenID Connect features are not enabled. To use them, run `pip install ansys-openapi-common[oidc]`." | ||
) | ||
return func(*args, **kwargs) | ||
|
||
return wrapper | ||
|
||
|
||
# Required to allow the ApiClientFactory to be subclassed. This ensures that Pylance | ||
# understands that the subclass is returned by the builder methods instead of the base class | ||
Api_Client_Factory = TypeVar("Api_Client_Factory", bound="ApiClientFactory") | ||
|
@@ -261,11 +286,52 @@ def with_autologon(self: Api_Client_Factory) -> Api_Client_Factory: | |
return self | ||
raise ConnectionError("Unable to connect with autologon.") | ||
|
||
def with_oidc( | ||
@require_oidc | ||
def with_oidc_client_credentials_flow( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like it would make more sense to take the token_url here, and leave the api_url as the actual service_url. We could always fall back to the api_url as the token_url if it isn't specified, but it just seems a bit clumsy right now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should get all the information we need in the initial 401 response from the API url and from the well known endpoint at the identity provider. I'll check this branch out and have a look at what it's doing today |
||
self: Api_Client_Factory, | ||
client_id: str, | ||
client_secret: str, | ||
scope: Optional[str] = "", | ||
) -> Api_Client_Factory: | ||
"""Set up client authentication for use with OpenID Connect using the Client Credentials flow. | ||
|
||
Parameters | ||
---------- | ||
client_id : :class:`str` | ||
Resource owner username. Provided by the Identity provider. | ||
client_secret : :class:`str` | ||
Resource owner password. Provided by the Identity provider. | ||
scope : Union[:class:`str`, :class:`list`[:class:`str`]], optional | ||
Single scope or list of scopes required by the application. | ||
|
||
Returns | ||
------- | ||
:class:`~ansys.openapi.common.ApiClientFactory` | ||
Current client factory object. | ||
|
||
Notes | ||
----- | ||
OIDC Authentication requires the ``[oidc]`` extra to be installed. | ||
""" | ||
|
||
auth = get_client_credential_auth( | ||
token_url=self._api_url, | ||
client_id=client_id, | ||
client_secret=client_secret, | ||
scope=scope, | ||
session=copy(self._session), | ||
) | ||
self._session.auth = auth | ||
self._configured = True | ||
return self | ||
|
||
@require_oidc | ||
def with_oidc_authorization_flow( | ||
self, | ||
idp_session_configuration: Optional[SessionConfiguration] = None, | ||
) -> "OIDCSessionBuilder": | ||
"""Set up client authentication for use with OpenID Connect. | ||
) -> "AuthorizationSessionBuilder": | ||
"""Set up client authentication for use with OpenID Connect using the authorization flow. Currently | ||
only authorization flow with PKCE is supported. | ||
|
||
Parameters | ||
---------- | ||
|
@@ -274,20 +340,17 @@ def with_oidc( | |
|
||
Returns | ||
------- | ||
:class:`~ansys.openapi.common.OIDCSessionBuilder` | ||
:class:`~ansys.openapi.common.AuthorizationSessionBuilder` | ||
Builder object to authenticate via OIDC. | ||
|
||
Notes | ||
----- | ||
OIDC Authentication requires the ``[oidc]`` extra to be installed. | ||
""" | ||
if not _oidc_enabled: | ||
raise ImportError( | ||
"OpenID Connect features are not enabled. To use them, run `pip install ansys-openapi-common[oidc]`." | ||
) | ||
|
||
initial_response = self._session.get(self._api_url) | ||
if self.__handle_initial_response(initial_response): | ||
return OIDCSessionBuilder(self) | ||
return AuthorizationSessionBuilder(self) | ||
|
||
session_factory = OIDCSessionFactory( | ||
self._session, | ||
|
@@ -296,7 +359,7 @@ def with_oidc( | |
idp_session_configuration, | ||
) | ||
|
||
return OIDCSessionBuilder(self, session_factory) | ||
return AuthorizationSessionBuilder(self, session_factory) | ||
|
||
def __test_connection(self) -> bool: | ||
"""Attempt to connect to the API server. If this returns a 2XX status code, the method returns | ||
|
@@ -380,9 +443,9 @@ def __get_authenticate_header( | |
return parse_authenticate(response.headers["www-authenticate"]) | ||
|
||
|
||
class OIDCSessionBuilder: | ||
"""Helps create OpenID Connect sessions from different types of input and provides OIDC-specific | ||
configuration options. | ||
class AuthorizationSessionBuilder: | ||
"""Helps create OpenID Connect Authorize Flow sessions from different types of input and provides | ||
configuration options specific to the Authorization Flow. | ||
|
||
Parameters | ||
---------- | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we provide multiple scopes here, using a comma separated list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. In re-reading the requests-auth docs, the docstring there says "Scope parameter sent to token URL as body. Can also be a list of scopes." - my interpretation of this is that it can be
str
orlist[str]
. https://colin-b.github.io/requests_auth/#client-credentials-flow