Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 128 additions & 21 deletions tests/test_splunk_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,65 @@ def _calculate_hmac_sha512(content: str) -> str:

return base64.b64encode(signature.digest()).decode("utf-8")

@pytest.mark.nhsd_apim_authorization(
access="healthcare_worker",
level="aal3",
login_form={"username": "656005750104"},
force_new_token=True,
)
def test_splunk_payload_schema(
self,
nhsd_apim_auth_headers,
nhsd_apim_proxy_url,
trace,
):
session_name = str(uuid4())
header_filters = {"trace_id": session_name}
trace.post_debugsession(session=session_name, header_filters=header_filters)

requests.get(
url=f"{nhsd_apim_proxy_url}/splunk-test",
headers={**nhsd_apim_auth_headers, **header_filters},
)

payload = json.loads(
get_variable_from_trace(trace, session_name, "splunkCalloutRequest.content")
)

trace.delete_debugsession_by_name(session_name)

with open("tests/utils/splunk_logging_schema.json") as f:
schema = json.load(f)

# If no exception is raised by validate(), the instance is valid.
validate(instance=payload, schema=schema)

def test_splunk_payload_schema_open_access(
self,
nhsd_apim_proxy_url,
trace,
):
session_name = str(uuid4())
header_filters = {"trace_id": session_name}
trace.post_debugsession(session=session_name, header_filters=header_filters)

requests.get(
url=f"{nhsd_apim_proxy_url}/open-access",
headers=header_filters,
)

payload = json.loads(
get_variable_from_trace(trace, session_name, "splunkCalloutRequest.content")
)

trace.delete_debugsession_by_name(session_name)

with open("tests/utils/splunk_logging_schema.json") as f:
schema = json.load(f)

# If no exception is raised by validate(), the instance is valid.
validate(instance=payload, schema=schema)

@pytest.mark.parametrize(
"expected_attr",
[
Expand Down Expand Up @@ -298,24 +357,21 @@ def test_splunk_auth_attributes_invalid_token(
assert meta["product"] == ""

@pytest.mark.nhsd_apim_authorization(
access="healthcare_worker",
level="aal3",
login_form={"username": "656005750104"},
force_new_token=True,
access="application", level="level3", force_new_token=True
)
def test_splunk_payload_schema(
self,
nhsd_apim_auth_headers,
nhsd_apim_proxy_url,
trace,
@pytest.mark.parametrize("message_type", ["request", "response"])
def test_splunk_deny_list_headers_not_logged(
self, _nhsd_apim_auth_token_data, nhsd_apim_proxy_url, trace, message_type
):
access_token = _nhsd_apim_auth_token_data["access_token"]

session_name = str(uuid4())
header_filters = {"trace_id": session_name}
trace.post_debugsession(session=session_name, header_filters=header_filters)

requests.get(
url=f"{nhsd_apim_proxy_url}/splunk-test",
headers={**nhsd_apim_auth_headers, **header_filters},
headers={"Authorization": f"Bearer {access_token}", **header_filters},
)

payload = json.loads(
Expand All @@ -324,24 +380,72 @@ def test_splunk_payload_schema(

trace.delete_debugsession_by_name(session_name)

with open("tests/utils/splunk_logging_schema.json") as f:
schema = json.load(f)
content = payload[message_type]
assert content["headers"]

# If no exception is raised by validate(), the instance is valid.
validate(instance=payload, schema=schema)
headers = json.loads(content["headers"])

def test_splunk_payload_schema_open_access(
deny_list = ["Authorization", "Cookie", "User-Agent"]
for denied_header in deny_list:
assert denied_header not in headers

@pytest.mark.nhsd_apim_authorization(
access="application", level="level3", force_new_token=True
)
@pytest.mark.parametrize(
"message_type,headers_already_logged",
[
(
"request",
[
{"header_name": "Content-Type", "splunk_key": "content_type"},
{
"header_name": "Content-Encoding",
"splunk_key": "content_encoding",
},
{
"header_name": "Content-Length",
"splunk_key": "content_length",
},
{"header_name": "X-Request-ID", "splunk_key": "requestID"},
{"header_name": "X-Correlation-ID", "splunk_key": "correlationID"},
{"header_name": "Host", "splunk_key": "host"},
{"header_name": "X-Forwarded-Port", "splunk_key": "port"},
],
),
(
"response",
[
{"header_name": "Content-Type", "splunk_key": "content_type"},
{
"header_name": "Content-Encoding",
"splunk_key": "content_encoding",
},
{
"header_name": "Content-Length",
"splunk_key": "content_length",
},
],
),
],
)
def test_splunk_headers_already_logged_not_duplicated(
self,
_nhsd_apim_auth_token_data,
nhsd_apim_proxy_url,
trace,
message_type,
headers_already_logged,
):
access_token = _nhsd_apim_auth_token_data["access_token"]

session_name = str(uuid4())
header_filters = {"trace_id": session_name}
trace.post_debugsession(session=session_name, header_filters=header_filters)

requests.get(
url=f"{nhsd_apim_proxy_url}/open-access",
headers=header_filters,
url=f"{nhsd_apim_proxy_url}/splunk-test",
headers={"Authorization": f"Bearer {access_token}", **header_filters},
)

payload = json.loads(
Expand All @@ -350,8 +454,11 @@ def test_splunk_payload_schema_open_access(

trace.delete_debugsession_by_name(session_name)

with open("tests/utils/splunk_logging_schema.json") as f:
schema = json.load(f)
content = payload[message_type]
assert content["headers"]

# If no exception is raised by validate(), the instance is valid.
validate(instance=payload, schema=schema)
headers = json.loads(content["headers"])

for logged_header in headers_already_logged:
assert logged_header["header_name"] not in headers
assert content[logged_header["splunk_key"]] is not None
8 changes: 8 additions & 0 deletions tests/utils/splunk_logging_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
"request": {
"type": "object",
"properties": {
"headers": {
"type": "string"
},
"uri": {
"type": "string"
},
Expand Down Expand Up @@ -81,6 +84,7 @@
}
},
"required": [
"headers",
"uri",
"verb",
"content_type",
Expand Down Expand Up @@ -335,6 +339,9 @@
"response": {
"type": "object",
"properties": {
"headers": {
"type": "string"
},
"status_code": {
"type": "string",
"pattern": "^[1-5][0-9][0-9]$"
Expand All @@ -353,6 +360,7 @@
}
},
"required": [
"headers",
"status_code",
"content_type",
"content_length",
Expand Down