Skip to content

Commit

Permalink
Changed to use keys fixture from test library
Browse files Browse the repository at this point in the history
  • Loading branch information
helen-brown committed Sep 29, 2023
1 parent 708cf83 commit 65c61d8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 58 deletions.
35 changes: 24 additions & 11 deletions tests/api_tests.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import asyncio
# flake8: noqaimport asyncio
from copy import deepcopy
from typing import Dict, List
from uuid import uuid4
Expand Down Expand Up @@ -55,6 +55,17 @@ def _generate_correlation_id(prefix: str) -> str:
return f'{prefix}_{uuid4()}'


def _assert_unauthorized_client_exception(exc_info):
message = str(exc_info.value).split(":", 1)
assert message[0] == "401" # response status_code
print(message[1])
# assert message[1]["error"] == "unauthorized_client"
# assert (
# message["0"]["error_description"]
# == "you have tried to request authorization but your application is not configured to use this authorization grant type"
# )


@pytest.mark.smoketest
def test_ping(service_url):
resp = requests.get(f"{service_url}/_ping")
Expand Down Expand Up @@ -124,7 +135,7 @@ def test_client_credentials_happy_path(immunisation_history_app: Dict, service_u
indirect=True,
)
async def test_immunization_no_auth_bearer_token_provided(
immunisation_history_app: Dict, service_url: str, environment: str
immunisation_history_app: Dict, service_url: str
):
await asyncio.sleep(1) # Add delay to tests to avoid 429 on service callout
correlation_id = _generate_correlation_id('test_immunization_no_auth_bearer_token_provided')
Expand Down Expand Up @@ -167,7 +178,8 @@ async def test_bad_nhs_number(immunisation_history_app: Dict, service_url: str,
"identity_proofing_level": immunisation_history_app["request_params"]["identity_proofing_level"]
}
token_response = conftest.get_token_nhs_login_token_exchange(
test_app=immunisation_history_app, environment=environment, subject_token_claims=subject_token_claims
test_app=immunisation_history_app, environment=environment, _jwt_keys=_jwt_keys,
subject_token_claims=subject_token_claims
)
correlation_id = _generate_correlation_id('test_bad_nhs_number')

Expand Down Expand Up @@ -253,7 +265,8 @@ def test_token_exchange_happy_path(immunisation_history_app: Dict, service_url:
"identity_proofing_level": immunisation_history_app["request_params"]["identity_proofing_level"]
}
token_response = conftest.get_token_nhs_login_token_exchange(
test_app=immunisation_history_app, environment=environment, subject_token_claims=subject_token_claims
test_app=immunisation_history_app, environment=environment, _jwt_keys=_jwt_keys,
subject_token_claims=subject_token_claims
)
token = token_response["access_token"]

Expand Down Expand Up @@ -300,13 +313,14 @@ def test_token_exchange_sad_path(immunisation_history_app: Dict, environment: st
subject_token_claims = {
"identity_proofing_level": immunisation_history_app["request_params"]["identity_proofing_level"]
}
conftest.check_for_unauthorised_token_exchange(
test_app=immunisation_history_app, environment=environment, subject_token_claims=subject_token_claims,
_jwt_keys=_jwt_keys
)
with pytest.raises(RuntimeError) as exc_info:
conftest.get_token_nhs_login_token_exchange(test_app=immunisation_history_app, environment=environment,
_jwt_keys=_jwt_keys,
subject_token_claims=subject_token_claims)

_assert_unauthorized_client_exception(exc_info=exc_info)


# TODO FIX
@pytest.mark.e2e
@pytest.mark.asyncio
@pytest.mark.parametrize(
Expand Down Expand Up @@ -355,7 +369,6 @@ async def test_user_restricted_access_not_permitted(test_product_and_app, servic
assert body["issue"][0]["code"] == "forbidden"


# TODO Fix
@pytest.mark.e2e
@pytest.mark.parametrize(
"test_product_and_app",
Expand All @@ -382,7 +395,7 @@ def test_token_exchange_invalid_identity_proofing_level_scope(test_product_and_a
"identity_proofing_level": test_app["request_params"]["identity_proofing_level"]
}
token_response = conftest.get_token_nhs_login_token_exchange(
test_app=test_app, environment=environment, subject_token_claims=subject_token_claims
test_app=test_app, environment=environment, _jwt_keys=_jwt_keys, subject_token_claims=subject_token_claims
)
token = token_response["access_token"]

Expand Down
62 changes: 15 additions & 47 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
ID_TOKEN_ISSUER = "https://identity.ptl.api.platform.nhs.uk/realms/NHS-Login-mock-internal-dev"


def get_env(variable_name: str) -> str:
def get_env(variable_name: str, default: str = None) -> str:
"""Returns a environment variable"""
try:
var = os.environ[variable_name]
var = os.environ.get(variable_name, default)
if not var:
raise RuntimeError(f"Variable is null, Check {variable_name}.")
return var
Expand All @@ -42,7 +42,8 @@ def get_product_names(suffixes) -> List[str]:


def get_oath_url(environment: str) -> str:
return f"https://{environment}.api.service.nhs.uk/oauth2-mock"
default_base_oauth_url = f"https://{environment}.api.service.nhs.uk"
return f'{get_env("OAUTH_BASE_URI", default=default_base_oauth_url)}/{get_env("OAUTH_PROXY", default="oauth2-mock")}'


def _get_nhs_login_private_key() -> str:
Expand Down Expand Up @@ -140,6 +141,7 @@ def check_for_unauthorised_headers(client_app: Dict, environment: str, _jwt_keys
def get_token_nhs_login_token_exchange(
test_app,
environment: str,
_jwt_keys,
subject_token_claims: Dict = None
):
"""Call identity server to get an access token"""
Expand All @@ -155,7 +157,7 @@ def get_token_nhs_login_token_exchange(
environment=environment,
identity_service_base_url=get_oath_url(environment),
client_id=test_app["credentials"][0]["consumerKey"],
jwt_private_key=test_app["app_jwt_private_key"],
jwt_private_key=_jwt_keys["private_key_pem"],
jwt_kid="test-1",
id_token=id_token_jwt,
)
Expand All @@ -170,43 +172,6 @@ def get_token_nhs_login_token_exchange(
return token_resp


def check_for_unauthorised_token_exchange(
test_app: Dict,
environment: str,
_jwt_keys,
subject_token_claims: Dict = None
):
"""Call identity server to get an access token"""
if subject_token_claims is not None:
id_token_jwt = nhs_login_id_token(
id_token_claims=subject_token_claims
)
else:
id_token_jwt = nhs_login_id_token()

# When
config = TokenExchangeConfig(
environment=environment,
identity_service_base_url=get_oath_url(environment),
client_id=test_app["credentials"][0]["consumerKey"],
jwt_private_key=_jwt_keys["private_key_pem"],
jwt_kid="test-1",
id_token=id_token_jwt,
)

authenticator = TokenExchangeAuthenticator(config=config)
with pytest.raises(RuntimeError) as exc_info:
authenticator.get_token()

message = json.loads(str(exc_info.value))
assert message.keys()[0] == 401 # response status_code
assert message["0"]["error"] == "unauthorized_client"
assert (
message["0"]["error_description"]
== "you have tried to request authorization but your application is not configured to use this authorization grant type"
)


def _create_app(dev_apps_api: DeveloperAppsAPI, app_name: str, api_products: List[str], app_attrs: Dict,
jwt_public_key_url: str):
full_app_attrs = {
Expand All @@ -228,20 +193,22 @@ def _create_app(dev_apps_api: DeveloperAppsAPI, app_name: str, api_products: Lis
return dev_apps_api.create_app(email=APP_EMAIL, body=body)


def _create_product(product_name, products_api, proxies, request_params):
def _create_product(product_name: str, products_api: ApiProductsAPI, proxies: List, scopes: List):
attributes = [
{"name": "access", "value": "public"},
{"name": "ratelimit", "value": "10ps"}
]
body = {
"apiResources": [],
"approvalType": "auto",
"attributes": [{"name": "access", "value": "public"}, {"name": "ratelimit", "value": "10ps"}],
"description": "Autogenerated product for immunisation history api testing",
"attributes": attributes,
"displayName": product_name,
"environments": ["internal-dev"],
"name": product_name,
"proxies": proxies,
"quota": 500,
"quotaInterval": "1",
"quotaTimeUnit": "minute",
"scopes": request_params.get("scopes", [])
"scopes": scopes
}
product = products_api.post_products(body=body)
return product
Expand Down Expand Up @@ -327,7 +294,8 @@ def test_product_and_app(client: ApigeeClient, jwt_public_key_url: str, request)
product_name = f"apim-auto-{uuid4()}"

try:
product = _create_product(product_name, products_api, proxies, request_params)
product = _create_product(product_name=product_name, products_api=products_api, proxies=proxies,
scopes=request_params.get("scopes", []))
except Exception as e:
print(e)
raise RuntimeError("Problem creating product")
Expand Down

0 comments on commit 65c61d8

Please sign in to comment.