Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ansys/hps/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from .__version__ import __ansys_apps_version__, __version__
from .auth import AuthApi
from .authenticate import authenticate
from .authenticate import authenticate, determine_auth_url
from .client import Client
from .exceptions import APIError, ClientError, HPSError
from .jms import JmsApi, ProjectApi
Expand Down
22 changes: 22 additions & 0 deletions src/ansys/hps/client/authenticate.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,28 @@ def get_discovery_data(auth_url: str, timeout: int = 10, verify: bool | str = Tr
return disco.json()


def determine_auth_url(hps_url: str, verify_ssl: bool, fallback_realm: str) -> str:
"""Determine the authentication URL for the HPS server."""
with requests.session() as session:
session.verify = verify_ssl
jms_info_url = hps_url.rstrip("/") + "/jms/api/v1"
resp = session.get(jms_info_url)
if resp.status_code != 200:
raise RuntimeError(
f"Failed to contact jms info endpoint {jms_info_url}, \
status code {resp.status_code}: {resp.content.decode()}"
)
else:
jms_data = resp.json()
if "services" in jms_data and "external_auth_url" in jms_data["services"]:
return resp.json()["services"]["external_auth_url"]
elif fallback_realm:
return f"{hps_url.rstrip('/')}/auth/realms/{fallback_realm}"
else:
log.warning("External_auth_url not found from JMS and no realm specified.")
return None


def authenticate(
auth_url: str = "https://127.0.0.1:8443/hps/auth/realms/rep",
grant_type: str = "password",
Expand Down
20 changes: 2 additions & 18 deletions src/ansys/hps/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from ansys.hps.data_transfer.client import Client as DataTransferClient
from ansys.hps.data_transfer.client import DataTransferApi

from .authenticate import authenticate
from .authenticate import authenticate, determine_auth_url
from .connection import create_session
from .exceptions import HPSError, raise_for_status
from .warnings import UnverifiedHTTPSRequestsWarning
Expand Down Expand Up @@ -183,23 +183,7 @@ def __init__(
self.auth_url = auth_url

if not auth_url:
with requests.session() as session:
session.verify = self.verify
jms_info_url = url.rstrip("/") + "/jms/api/v1"
resp = session.get(jms_info_url)
if resp.status_code != 200:
raise RuntimeError(
f"Failed to contact jms info endpoint {jms_info_url}, \
status code {resp.status_code}: {resp.content.decode()}"
)
else:
jms_data = resp.json()
if "services" in jms_data and "external_auth_url" in jms_data["services"]:
self.auth_url = resp.json()["services"]["external_auth_url"]
elif realm:
self.auth_url = f"{url.rstrip('/')}/auth/realms/{realm}"
else:
log.warning("External_auth_url not found from JMS and no realm specified.")
self.auth_url = determine_auth_url(url, self.verify, realm)

if access_token:
log.debug("Authenticate with access token")
Expand Down
86 changes: 77 additions & 9 deletions tests/auth/test_authenticate.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,97 @@
# SOFTWARE.

import logging
from unittest.mock import MagicMock, patch

import pytest
import requests

from ansys.hps.client import Client, authenticate
from ansys.hps.client import authenticate, determine_auth_url

log = logging.getLogger(__name__)


def test_authenticate(url, username, password):
client = Client(url=url, username=username, password=password, verify=False)
resp = authenticate(
auth_url=client.auth_url, username=username, password=password, verify=False
)
auth_url = determine_auth_url(hps_url=url, verify_ssl=False, fallback_realm="rep")
resp = authenticate(auth_url=auth_url, username=username, password=password, verify=False)

assert "access_token" in resp
assert "refresh_token" in resp


def test_determine_auth_url_with_ssl_verification(url):
with pytest.raises(requests.exceptions.SSLError) as ex_info:
determine_auth_url(hps_url=url, verify_ssl=True, fallback_realm="rep")
assert "CERTIFICATE_VERIFY_FAILED" in str(ex_info.value)


def test_authenticate_with_ssl_verification(url, username, password):
# Doesn't matter that the auth url is wrong.... The first request will fail
auth_url = determine_auth_url(hps_url=url, verify_ssl=False, fallback_realm="rep")
with pytest.raises(requests.exceptions.SSLError) as ex_info:
_ = authenticate(
auth_url=f"{url}/auth/realms/rep", username=username, password=password, verify=True
)
_ = authenticate(auth_url=auth_url, username=username, password=password, verify=True)
assert "CERTIFICATE_VERIFY_FAILED" in str(ex_info.value)


def test_determine_auth_url_success():
hps_url = "https://example.com"
verify_ssl = False
fallback_realm = "rep"
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"services": {"external_auth_url": "https://auth.example.com"}
}

with patch("requests.Session.get", return_value=mock_response):
auth_url = determine_auth_url(hps_url, verify_ssl, fallback_realm)
assert auth_url == "https://auth.example.com"


def test_determine_auth_url_fallback_realm():
hps_url = "https://example.com"
verify_ssl = False
fallback_realm = "rep"
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {}

with patch("requests.Session.get", return_value=mock_response):
auth_url = determine_auth_url(hps_url, verify_ssl, fallback_realm)
assert auth_url == f"{hps_url.rstrip('/')}/auth/realms/{fallback_realm}"


def test_determine_auth_url_no_realm():
hps_url = "https://example.com"
verify_ssl = False
fallback_realm = None
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {}

with (
patch("requests.Session.get", return_value=mock_response),
patch("logging.Logger.warning") as mock_warning,
):
auth_url = determine_auth_url(hps_url, verify_ssl, fallback_realm)
assert auth_url is None
mock_warning.assert_called_once_with(
"External_auth_url not found from JMS and no realm specified."
)


def test_determine_auth_url_failure():
hps_url = "https://example.com"
verify_ssl = False
fallback_realm = "rep"
mock_response = MagicMock()
mock_response.status_code = 500
mock_response.content.decode.return_value = "Internal Server Error"

with (
patch("requests.Session.get", return_value=mock_response),
pytest.raises(RuntimeError) as ex_info,
):
determine_auth_url(hps_url, verify_ssl, fallback_realm)
assert "Failed to contact jms info endpoint" in str(ex_info.value)
assert "status code 500" in str(ex_info.value)
assert "Internal Server Error" in str(ex_info.value)
Loading