From 94e15952f69decc1586b1272d4d85421ccbd9897 Mon Sep 17 00:00:00 2001 From: Jon Novak Date: Wed, 26 Mar 2025 10:42:42 -0500 Subject: [PATCH 1/2] fix test to use proper url --- tests/auth/test_api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/auth/test_api.py b/tests/auth/test_api.py index 55bf8efa9..9c57b2a19 100644 --- a/tests/auth/test_api.py +++ b/tests/auth/test_api.py @@ -132,10 +132,10 @@ def test_impersonate_user(url, keycloak_client): r = None try: r = authenticate( - url=url, + auth_url=client.auth_url, client_id=rep_impersonation_client["clientId"], client_secret=rep_impersonation_client["secret"], - scope="opendid offline_access", + scope="openid offline_access", grant_type="urn:ietf:params:oauth:grant-type:token-exchange", subject_token=client.access_token, requested_token_type="urn:ietf:params:oauth:token-type:refresh_token", From 5e0a578ca6c7f1a05d8cf7db55f56cd1f97122e1 Mon Sep 17 00:00:00 2001 From: Jon Novak Date: Wed, 26 Mar 2025 11:52:09 -0500 Subject: [PATCH 2/2] pull out auth_url into its own method --- src/ansys/hps/client/__init__.py | 2 +- src/ansys/hps/client/authenticate.py | 22 +++++++ src/ansys/hps/client/client.py | 20 +------ tests/auth/test_authenticate.py | 86 +++++++++++++++++++++++++--- 4 files changed, 102 insertions(+), 28 deletions(-) diff --git a/src/ansys/hps/client/__init__.py b/src/ansys/hps/client/__init__.py index e91cb2a26..7a52afe44 100644 --- a/src/ansys/hps/client/__init__.py +++ b/src/ansys/hps/client/__init__.py @@ -23,7 +23,7 @@ from .__version__ import __ansys_apps_version__, __version__ from .auth import AuthApi -from .authenticate import authenticate +from .authenticate import authenticate, determine_auth_url from .client import Client from .exceptions import APIError, ClientError, HPSError from .jms import JmsApi, ProjectApi diff --git a/src/ansys/hps/client/authenticate.py b/src/ansys/hps/client/authenticate.py index 1b7a056f5..ce5d1f3be 100644 --- a/src/ansys/hps/client/authenticate.py +++ b/src/ansys/hps/client/authenticate.py @@ -48,6 +48,28 @@ def get_discovery_data(auth_url: str, timeout: int = 10, verify: bool | str = Tr return disco.json() +def determine_auth_url(hps_url: str, verify_ssl: bool, fallback_realm: str) -> str: + """Determine the authentication URL for the HPS server.""" + with requests.session() as session: + session.verify = verify_ssl + jms_info_url = hps_url.rstrip("/") + "/jms/api/v1" + resp = session.get(jms_info_url) + if resp.status_code != 200: + raise RuntimeError( + f"Failed to contact jms info endpoint {jms_info_url}, \ + status code {resp.status_code}: {resp.content.decode()}" + ) + else: + jms_data = resp.json() + if "services" in jms_data and "external_auth_url" in jms_data["services"]: + return resp.json()["services"]["external_auth_url"] + elif fallback_realm: + return f"{hps_url.rstrip('/')}/auth/realms/{fallback_realm}" + else: + log.warning("External_auth_url not found from JMS and no realm specified.") + return None + + def authenticate( auth_url: str = "https://127.0.0.1:8443/hps/auth/realms/rep", grant_type: str = "password", diff --git a/src/ansys/hps/client/client.py b/src/ansys/hps/client/client.py index 27177d3e1..7be7789ec 100644 --- a/src/ansys/hps/client/client.py +++ b/src/ansys/hps/client/client.py @@ -34,7 +34,7 @@ from ansys.hps.data_transfer.client import Client as DataTransferClient from ansys.hps.data_transfer.client import DataTransferApi -from .authenticate import authenticate +from .authenticate import authenticate, determine_auth_url from .connection import create_session from .exceptions import HPSError, raise_for_status from .warnings import UnverifiedHTTPSRequestsWarning @@ -183,23 +183,7 @@ def __init__( self.auth_url = auth_url if not auth_url: - with requests.session() as session: - session.verify = self.verify - jms_info_url = url.rstrip("/") + "/jms/api/v1" - resp = session.get(jms_info_url) - if resp.status_code != 200: - raise RuntimeError( - f"Failed to contact jms info endpoint {jms_info_url}, \ - status code {resp.status_code}: {resp.content.decode()}" - ) - else: - jms_data = resp.json() - if "services" in jms_data and "external_auth_url" in jms_data["services"]: - self.auth_url = resp.json()["services"]["external_auth_url"] - elif realm: - self.auth_url = f"{url.rstrip('/')}/auth/realms/{realm}" - else: - log.warning("External_auth_url not found from JMS and no realm specified.") + self.auth_url = determine_auth_url(url, self.verify, realm) if access_token: log.debug("Authenticate with access token") diff --git a/tests/auth/test_authenticate.py b/tests/auth/test_authenticate.py index 727fc297e..2531bdaa1 100644 --- a/tests/auth/test_authenticate.py +++ b/tests/auth/test_authenticate.py @@ -21,29 +21,97 @@ # SOFTWARE. import logging +from unittest.mock import MagicMock, patch import pytest import requests -from ansys.hps.client import Client, authenticate +from ansys.hps.client import authenticate, determine_auth_url log = logging.getLogger(__name__) def test_authenticate(url, username, password): - client = Client(url=url, username=username, password=password, verify=False) - resp = authenticate( - auth_url=client.auth_url, username=username, password=password, verify=False - ) + auth_url = determine_auth_url(hps_url=url, verify_ssl=False, fallback_realm="rep") + resp = authenticate(auth_url=auth_url, username=username, password=password, verify=False) assert "access_token" in resp assert "refresh_token" in resp +def test_determine_auth_url_with_ssl_verification(url): + with pytest.raises(requests.exceptions.SSLError) as ex_info: + determine_auth_url(hps_url=url, verify_ssl=True, fallback_realm="rep") + assert "CERTIFICATE_VERIFY_FAILED" in str(ex_info.value) + + def test_authenticate_with_ssl_verification(url, username, password): - # Doesn't matter that the auth url is wrong.... The first request will fail + auth_url = determine_auth_url(hps_url=url, verify_ssl=False, fallback_realm="rep") with pytest.raises(requests.exceptions.SSLError) as ex_info: - _ = authenticate( - auth_url=f"{url}/auth/realms/rep", username=username, password=password, verify=True - ) + _ = authenticate(auth_url=auth_url, username=username, password=password, verify=True) assert "CERTIFICATE_VERIFY_FAILED" in str(ex_info.value) + + +def test_determine_auth_url_success(): + hps_url = "https://example.com" + verify_ssl = False + fallback_realm = "rep" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "services": {"external_auth_url": "https://auth.example.com"} + } + + with patch("requests.Session.get", return_value=mock_response): + auth_url = determine_auth_url(hps_url, verify_ssl, fallback_realm) + assert auth_url == "https://auth.example.com" + + +def test_determine_auth_url_fallback_realm(): + hps_url = "https://example.com" + verify_ssl = False + fallback_realm = "rep" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {} + + with patch("requests.Session.get", return_value=mock_response): + auth_url = determine_auth_url(hps_url, verify_ssl, fallback_realm) + assert auth_url == f"{hps_url.rstrip('/')}/auth/realms/{fallback_realm}" + + +def test_determine_auth_url_no_realm(): + hps_url = "https://example.com" + verify_ssl = False + fallback_realm = None + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {} + + with ( + patch("requests.Session.get", return_value=mock_response), + patch("logging.Logger.warning") as mock_warning, + ): + auth_url = determine_auth_url(hps_url, verify_ssl, fallback_realm) + assert auth_url is None + mock_warning.assert_called_once_with( + "External_auth_url not found from JMS and no realm specified." + ) + + +def test_determine_auth_url_failure(): + hps_url = "https://example.com" + verify_ssl = False + fallback_realm = "rep" + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.content.decode.return_value = "Internal Server Error" + + with ( + patch("requests.Session.get", return_value=mock_response), + pytest.raises(RuntimeError) as ex_info, + ): + determine_auth_url(hps_url, verify_ssl, fallback_realm) + assert "Failed to contact jms info endpoint" in str(ex_info.value) + assert "status code 500" in str(ex_info.value) + assert "Internal Server Error" in str(ex_info.value)