/
test_request_log_record.py
117 lines (85 loc) · 4.52 KB
/
test_request_log_record.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
""" Test `RequestWebRecord` """
import logging
import os
import pytest
from sap.cf_logging import defaults
from sap.cf_logging.record.request_log_record import RequestWebRecord
from sap.cf_logging.core.context import Context
from sap.cf_logging.core.framework import Framework
from sap.cf_logging.core.request_reader import RequestReader
from sap.cf_logging.core.response_reader import ResponseReader
from sap.cf_logging.core.constants import REQUEST_KEY, RESPONSE_KEY
# pylint: disable=missing-docstring, invalid-name
FRAMEWORK = None
@pytest.fixture(autouse=True)
def before_each_setup_mocks(mocker):
context = Context()
mocker.patch.object(context, 'get', return_value=None)
request_reader = RequestReader()
request_reader.get_http_header = lambda self, key, default: 'some.host' \
if key == 'x-forwarded-for' else 'referer'
mocker.patch.object(request_reader, 'get_remote_user', return_value='user')
mocker.patch.object(request_reader, 'get_protocol', return_value='http')
mocker.patch.object(request_reader, 'get_content_length', return_value=0)
mocker.patch.object(request_reader, 'get_remote_ip',
return_value='1.2.3.4')
mocker.patch.object(request_reader, 'get_remote_port', return_value='1234')
mocker.patch.object(request_reader, 'get_path', return_value='/test/path')
mocker.patch.object(request_reader, 'get_method', return_value='GET')
response_reader = ResponseReader()
mocker.patch.object(response_reader, 'get_status_code', return_value='200')
mocker.patch.object(response_reader, 'get_response_size', return_value=0)
mocker.patch.object(response_reader, 'get_content_type',
return_value='text/plain')
_clean_log_env_vars()
global FRAMEWORK # pylint: disable=global-statement
FRAMEWORK = Framework('name', context, request_reader, response_reader)
yield
def test_hiding_sensitive_fields_by_default():
log_record = RequestWebRecord({REQUEST_KEY: None, RESPONSE_KEY: None},
FRAMEWORK, 'name', logging.DEBUG, 'pathname', 1, 'msg', [], None)
_assert_sensitive_fields_redacted(log_record)
assert log_record.remote_user == defaults.REDACTED
assert log_record.referer == defaults.REDACTED
def test_logging_sensitive_connection_data():
os.environ['LOG_SENSITIVE_CONNECTION_DATA'] = 'true'
log_record = RequestWebRecord({REQUEST_KEY: None, RESPONSE_KEY: None},
FRAMEWORK, 'name', logging.DEBUG, 'pathname', 1, 'msg', [], None)
assert log_record.remote_ip == '1.2.3.4'
assert log_record.remote_host == '1.2.3.4'
assert log_record.remote_port == '1234'
assert log_record.x_forwarded_for == 'some.host'
assert log_record.remote_user == defaults.REDACTED
assert log_record.referer == defaults.REDACTED
def test_logging_remote_user():
os.environ['LOG_REMOTE_USER'] = 'true'
log_record = RequestWebRecord({REQUEST_KEY: None, RESPONSE_KEY: None},
FRAMEWORK, 'name', logging.DEBUG, 'pathname', 1, 'msg', [], None)
_assert_sensitive_fields_redacted(log_record)
assert log_record.remote_user == 'user'
assert log_record.referer == defaults.REDACTED
def test_logging_referer():
os.environ['LOG_REFERER'] = 'true'
log_record = RequestWebRecord({REQUEST_KEY: None, RESPONSE_KEY: None},
FRAMEWORK, 'name', logging.DEBUG, 'pathname', 1, 'msg', [], None)
_assert_sensitive_fields_redacted(log_record)
assert log_record.remote_user == defaults.REDACTED
assert log_record.referer == 'referer'
def test_incorrect_env_var_value():
os.environ['LOG_SENSITIVE_CONNECTION_DATA'] = 'false'
os.environ['LOG_REMOTE_USER'] = 'some-string'
os.environ['LOG_REFERER'] = ''
log_record = RequestWebRecord({REQUEST_KEY: None, RESPONSE_KEY: None},
FRAMEWORK, 'name', logging.DEBUG, 'pathname', 1, 'msg', [], None)
_assert_sensitive_fields_redacted(log_record)
assert log_record.remote_user == defaults.REDACTED
assert log_record.referer == defaults.REDACTED
def _clean_log_env_vars():
for key in ['LOG_SENSITIVE_CONNECTION_DATA', 'LOG_REMOTE_USER', 'LOG_REFERER']:
if os.environ.get(key):
del os.environ[key]
def _assert_sensitive_fields_redacted(log_record):
assert log_record.remote_ip == defaults.REDACTED
assert log_record.remote_host == defaults.REDACTED
assert log_record.remote_port == defaults.REDACTED
assert log_record.x_forwarded_for == defaults.REDACTED