Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changelog.d/773.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support OIDC with a provided Access Token
19 changes: 18 additions & 1 deletion src/ansys/openapi/common/_oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,25 @@ def __init__(
set_session_kwargs(self._authorized_session, self._api_session_configuration)
logger.info("Configuration complete.")

def get_session_with_access_token(self, access_token: str) -> requests.Session:
"""Create a :class:`~requests.Session` object with provided access token.

This method configures a session with the provided access token, if the token is invalid,
or has expired, the session will be unable to authenticate.

Parameters
----------
access_token : str
Access token for the API server, typically a Base-64 encoded JSON Web Token.
"""
logger.info("Setting access token...")
if access_token is None:
raise ValueError("Must provide a value for 'access_token', not None")
self._authorized_session.headers["Authorization"] = f"Bearer {access_token}"
return self._authorized_session

def get_session_with_provided_token(self, refresh_token: str) -> requests.Session:
"""Create a :class:`OAuth2Session` object with provided tokens.
"""Create a :class:`OAuth2Session` object with provided refresh token.

This method configures a session to request an access token with the provided refresh token,
an access token will be requested immediately.
Expand Down
54 changes: 53 additions & 1 deletion src/ansys/openapi/common/_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,33 @@

return self.with_token(refresh_token=refresh_token)

def with_token(self, refresh_token: str) -> ApiClientFactory:
def with_access_token(self, access_token: str) -> ApiClientFactory:
"""Use a provided access token to authenticate the session.

This method configures a session with the provided access token, if the token is invalid,
or has expired, the session will be unable to authenticate.

Parameters
----------
access_token : str
Access token.

Returns
-------
~ansys.openapi.common.ApiClientFactory
Original client factory object.

.. versionadded:: 2.2.3
"""
if self._session_factory is None:
return self._client_factory

Check warning on line 542 in src/ansys/openapi/common/_session.py

View check run for this annotation

Codecov / codecov/patch

src/ansys/openapi/common/_session.py#L542

Added line #L542 was not covered by tests
self._client_factory._session = self._session_factory.get_session_with_access_token(
access_token=access_token
)
self._client_factory._configured = True
return self._client_factory

def with_refresh_token(self, refresh_token: str) -> ApiClientFactory:
"""Use a provided refresh token to authenticate the session.

The refresh token will be used to request a new access token from the Identity Provider,
Expand All @@ -535,6 +561,8 @@
-------
~ansys.openapi.common.ApiClientFactory
Original client factory object.

.. versionadded:: 2.2.3
"""
if self._session_factory is None:
return self._client_factory
Expand All @@ -544,6 +572,30 @@
self._client_factory._configured = True
return self._client_factory

def with_token(self, refresh_token: str) -> ApiClientFactory:
"""Use a provided refresh token to authenticate the session.

The refresh token will be used to request a new access token from the Identity Provider,
this will be automatically refreshed shortly before expiration.

Parameters
----------
refresh_token : str
Refresh token.

Returns
-------
~ansys.openapi.common.ApiClientFactory
Original client factory object.

Notes
-----
The signature of this method will change in an upcoming release to allow both access and refresh
tokens to be provided. Update usages to provide the ``refresh_token`` keyword argument rather than
passing a positional argument.
"""
return self.with_refresh_token(refresh_token=refresh_token)

def authorize(self, login_timeout: int = 60) -> ApiClientFactory:
"""Authenticate the user interactively by opening a web browser and waiting for the user to log in.

Expand Down
1 change: 0 additions & 1 deletion src/ansys/openapi/common/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from collections import OrderedDict
import http.cookiejar
from itertools import chain
Expand Down
18 changes: 18 additions & 0 deletions tests/test_oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,24 @@ def test_override_idp_configuration_with_no_headers_does_nothing():
assert response == configuration


def test_setting_access_token_with_no_token_throws():
mock_factory = Mock()
with pytest.raises(ValueError):
OIDCSessionFactory.get_session_with_access_token(mock_factory, None)


def test_setting_access_token_sets_access_token():
example_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWUsImlhdCI6MTUxNjIzOTAyMn0.KMUFsIDTnFmyG3nMiGM6H9FNFUROf3wh7SmqJp-QV30"
expected_header = f"Bearer {example_token}"
mock_factory = Mock()
mock_factory._authorized_session = requests.Session()
session = OIDCSessionFactory.get_session_with_access_token(
mock_factory, access_token=example_token
)

assert session.headers["Authorization"] == expected_header


def test_setting_refresh_token_with_no_token_throws():
mock_factory = Mock()
with pytest.raises(ValueError):
Expand Down
43 changes: 42 additions & 1 deletion tests/test_session_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def match_token_request(request):
assert resp.status_code == 200


def test_can_connect_with_oidc_using_token():
def test_can_connect_with_oidc_using_refresh_token():
redirect_uri = "https://www.example.com/login/"
authority_url = "https://www.example.com/authority/"
client_id = "b4e44bfa-6b73-4d6a-9df6-8055216a5836"
Expand Down Expand Up @@ -451,6 +451,47 @@ def match_token_request(request):
assert resp.status_code == 200


def test_can_connect_with_oidc_using_access_token():
redirect_uri = "https://www.example.com/login/"
authority_url = "https://www.example.com/authority/"
client_id = "b4e44bfa-6b73-4d6a-9df6-8055216a5836"
access_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWUsImlhdCI6MTUxNjIzOTAyMn0.KMUFsIDTnFmyG3nMiGM6H9FNFUROf3wh7SmqJp-QV30"
well_known_response = json.dumps(
{
"token_endpoint": f"{authority_url}token",
"authorization_endpoint": f"{authority_url}authorization",
}
)
authenticate_header = (
f'Bearer redirecturi="{redirect_uri}", authority="{authority_url}", clientid="{client_id}"'
)

with requests_mock.Mocker() as m:
m.get(
f"{authority_url}.well-known/openid-configuration",
status_code=200,
text=well_known_response,
)
m.get(
SECURE_SERVICELAYER_URL,
status_code=401,
headers={"WWW-Authenticate": authenticate_header},
)
m.get(
SECURE_SERVICELAYER_URL,
status_code=200,
request_headers={"Authorization": f"Bearer {access_token}"},
)
session = (
ApiClientFactory(SECURE_SERVICELAYER_URL)
.with_oidc()
.with_access_token(access_token=access_token)
.connect()
)
resp = session.rest_client.get(SECURE_SERVICELAYER_URL)
assert resp.status_code == 200


def test_neither_basic_nor_ntlm_throws():
with requests_mock.Mocker() as m:
m.get(SERVICELAYER_URL, status_code=401, headers={"WWW-Authenticate": "Bearer"})
Expand Down