diff --git a/doc/changelog.d/773.added.md b/doc/changelog.d/773.added.md new file mode 100644 index 00000000..c4edd5b5 --- /dev/null +++ b/doc/changelog.d/773.added.md @@ -0,0 +1 @@ +Support OIDC with a provided Access Token \ No newline at end of file diff --git a/src/ansys/openapi/common/_oidc.py b/src/ansys/openapi/common/_oidc.py index c7309f80..5ae5b67f 100644 --- a/src/ansys/openapi/common/_oidc.py +++ b/src/ansys/openapi/common/_oidc.py @@ -128,8 +128,25 @@ def __init__( set_session_kwargs(self._authorized_session, self._api_session_configuration) logger.info("Configuration complete.") + def get_session_with_access_token(self, access_token: str) -> requests.Session: + """Create a :class:`~requests.Session` object with provided access token. + + This method configures a session with the provided access token, if the token is invalid, + or has expired, the session will be unable to authenticate. + + Parameters + ---------- + access_token : str + Access token for the API server, typically a Base-64 encoded JSON Web Token. + """ + logger.info("Setting access token...") + if access_token is None: + raise ValueError("Must provide a value for 'access_token', not None") + self._authorized_session.headers["Authorization"] = f"Bearer {access_token}" + return self._authorized_session + def get_session_with_provided_token(self, refresh_token: str) -> requests.Session: - """Create a :class:`OAuth2Session` object with provided tokens. + """Create a :class:`OAuth2Session` object with provided refresh token. This method configures a session to request an access token with the provided refresh token, an access token will be requested immediately. diff --git a/src/ansys/openapi/common/_session.py b/src/ansys/openapi/common/_session.py index 0758c75b..802c7bde 100644 --- a/src/ansys/openapi/common/_session.py +++ b/src/ansys/openapi/common/_session.py @@ -520,7 +520,33 @@ def with_stored_token(self, token_name: str = "ansys-openapi-common-oidc") -> Ap return self.with_token(refresh_token=refresh_token) - def with_token(self, refresh_token: str) -> ApiClientFactory: + def with_access_token(self, access_token: str) -> ApiClientFactory: + """Use a provided access token to authenticate the session. + + This method configures a session with the provided access token, if the token is invalid, + or has expired, the session will be unable to authenticate. + + Parameters + ---------- + access_token : str + Access token. + + Returns + ------- + ~ansys.openapi.common.ApiClientFactory + Original client factory object. + + .. versionadded:: 2.2.3 + """ + if self._session_factory is None: + return self._client_factory + self._client_factory._session = self._session_factory.get_session_with_access_token( + access_token=access_token + ) + self._client_factory._configured = True + return self._client_factory + + def with_refresh_token(self, refresh_token: str) -> ApiClientFactory: """Use a provided refresh token to authenticate the session. The refresh token will be used to request a new access token from the Identity Provider, @@ -535,6 +561,8 @@ def with_token(self, refresh_token: str) -> ApiClientFactory: ------- ~ansys.openapi.common.ApiClientFactory Original client factory object. + + .. versionadded:: 2.2.3 """ if self._session_factory is None: return self._client_factory @@ -544,6 +572,30 @@ def with_token(self, refresh_token: str) -> ApiClientFactory: self._client_factory._configured = True return self._client_factory + def with_token(self, refresh_token: str) -> ApiClientFactory: + """Use a provided refresh token to authenticate the session. + + The refresh token will be used to request a new access token from the Identity Provider, + this will be automatically refreshed shortly before expiration. + + Parameters + ---------- + refresh_token : str + Refresh token. + + Returns + ------- + ~ansys.openapi.common.ApiClientFactory + Original client factory object. + + Notes + ----- + The signature of this method will change in an upcoming release to allow both access and refresh + tokens to be provided. Update usages to provide the ``refresh_token`` keyword argument rather than + passing a positional argument. + """ + return self.with_refresh_token(refresh_token=refresh_token) + def authorize(self, login_timeout: int = 60) -> ApiClientFactory: """Authenticate the user interactively by opening a web browser and waiting for the user to log in. diff --git a/src/ansys/openapi/common/_util.py b/src/ansys/openapi/common/_util.py index c084f670..ff07cf32 100644 --- a/src/ansys/openapi/common/_util.py +++ b/src/ansys/openapi/common/_util.py @@ -19,7 +19,6 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. - from collections import OrderedDict import http.cookiejar from itertools import chain diff --git a/tests/test_oidc.py b/tests/test_oidc.py index 263bd1f6..44b4706d 100644 --- a/tests/test_oidc.py +++ b/tests/test_oidc.py @@ -197,6 +197,24 @@ def test_override_idp_configuration_with_no_headers_does_nothing(): assert response == configuration +def test_setting_access_token_with_no_token_throws(): + mock_factory = Mock() + with pytest.raises(ValueError): + OIDCSessionFactory.get_session_with_access_token(mock_factory, None) + + +def test_setting_access_token_sets_access_token(): + example_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWUsImlhdCI6MTUxNjIzOTAyMn0.KMUFsIDTnFmyG3nMiGM6H9FNFUROf3wh7SmqJp-QV30" + expected_header = f"Bearer {example_token}" + mock_factory = Mock() + mock_factory._authorized_session = requests.Session() + session = OIDCSessionFactory.get_session_with_access_token( + mock_factory, access_token=example_token + ) + + assert session.headers["Authorization"] == expected_header + + def test_setting_refresh_token_with_no_token_throws(): mock_factory = Mock() with pytest.raises(ValueError): diff --git a/tests/test_session_creation.py b/tests/test_session_creation.py index 4f07e8a5..9588dacd 100644 --- a/tests/test_session_creation.py +++ b/tests/test_session_creation.py @@ -387,7 +387,7 @@ def match_token_request(request): assert resp.status_code == 200 -def test_can_connect_with_oidc_using_token(): +def test_can_connect_with_oidc_using_refresh_token(): redirect_uri = "https://www.example.com/login/" authority_url = "https://www.example.com/authority/" client_id = "b4e44bfa-6b73-4d6a-9df6-8055216a5836" @@ -451,6 +451,47 @@ def match_token_request(request): assert resp.status_code == 200 +def test_can_connect_with_oidc_using_access_token(): + redirect_uri = "https://www.example.com/login/" + authority_url = "https://www.example.com/authority/" + client_id = "b4e44bfa-6b73-4d6a-9df6-8055216a5836" + access_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWUsImlhdCI6MTUxNjIzOTAyMn0.KMUFsIDTnFmyG3nMiGM6H9FNFUROf3wh7SmqJp-QV30" + well_known_response = json.dumps( + { + "token_endpoint": f"{authority_url}token", + "authorization_endpoint": f"{authority_url}authorization", + } + ) + authenticate_header = ( + f'Bearer redirecturi="{redirect_uri}", authority="{authority_url}", clientid="{client_id}"' + ) + + with requests_mock.Mocker() as m: + m.get( + f"{authority_url}.well-known/openid-configuration", + status_code=200, + text=well_known_response, + ) + m.get( + SECURE_SERVICELAYER_URL, + status_code=401, + headers={"WWW-Authenticate": authenticate_header}, + ) + m.get( + SECURE_SERVICELAYER_URL, + status_code=200, + request_headers={"Authorization": f"Bearer {access_token}"}, + ) + session = ( + ApiClientFactory(SECURE_SERVICELAYER_URL) + .with_oidc() + .with_access_token(access_token=access_token) + .connect() + ) + resp = session.rest_client.get(SECURE_SERVICELAYER_URL) + assert resp.status_code == 200 + + def test_neither_basic_nor_ntlm_throws(): with requests_mock.Mocker() as m: m.get(SERVICELAYER_URL, status_code=401, headers={"WWW-Authenticate": "Bearer"})