From 912c3668dce8ffc827201e14336f7d09570c7e31 Mon Sep 17 00:00:00 2001 From: gengjh Date: Tue, 9 Apr 2013 22:13:31 +0800 Subject: [PATCH] Replace password to "***" in the debug message Use regex pattern to replace password to "***" for both env vars and request body output Also includes a minor refactor to move the code up in the file as suggested by termie and henry regarding the review comments in https://review.openstack.org/#/c/26487/ (Original Change-Id: I890415c755dd383749f2d4382f53d0b3a6badc6c) Fix bug 1166697 Change-Id: I671ea25cca78b4dea1fbf2e63c89b82912279f2d --- keystone/common/wsgi.py | 57 +++++++++++++++++++++++++++++++---------- tests/test_wsgi.py | 44 ++++++++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 14 deletions(-) diff --git a/keystone/common/wsgi.py b/keystone/common/wsgi.py index ac91a6a111..bc9cc092b4 100644 --- a/keystone/common/wsgi.py +++ b/keystone/common/wsgi.py @@ -20,6 +20,7 @@ """Utility methods for working with WSGI servers.""" +import re import socket import sys @@ -48,6 +49,34 @@ PARAMS_ENV = 'openstack.params' +_RE_PASS = re.compile(r'([\'"].*?password[\'"]\s*:\s*u?[\'"]).*?([\'"])', + re.DOTALL) + + +def mask_password(message, is_unicode=False, secret="***"): + """Replace password with 'secret' in message. + + :param message: The string which include security information. + :param is_unicode: Is unicode string ? + :param secret: substitution string default to "***". + :returns: The string + + For example: + >>> mask_password('"password" : "aaaaa"') + '"password" : "***"' + >>> mask_password("'original_password' : 'aaaaa'") + "'original_password' : '***'" + >>> mask_password("u'original_password' : u'aaaaa'") + "u'original_password' : u'***'" + """ + if is_unicode: + message = unicode(message) + # Match the group 1,2 and replace all others with 'secret' + secret = r"\g<1>" + secret + r"\g<2>" + result = _RE_PASS.sub(secret, message) + return result + + class WritableLogger(object): """A thin wrapper that responds to `write` and logs.""" @@ -379,21 +408,23 @@ class Debug(Middleware): @webob.dec.wsgify(RequestClass=Request) def __call__(self, req): - LOG.debug('%s %s %s', ('*' * 20), 'REQUEST ENVIRON', ('*' * 20)) - for key, value in req.environ.items(): - LOG.debug('%s = %s', key, value) - LOG.debug('') - LOG.debug('%s %s %s', ('*' * 20), 'REQUEST BODY', ('*' * 20)) - for line in req.body_file: - LOG.debug(line) - LOG.debug('') + if LOG.isEnabledFor(logging.DEBUG): + LOG.debug('%s %s %s', ('*' * 20), 'REQUEST ENVIRON', ('*' * 20)) + for key, value in req.environ.items(): + LOG.debug('%s = %s', key, mask_password(value, + is_unicode=True)) + LOG.debug('') + LOG.debug('%s %s %s', ('*' * 20), 'REQUEST BODY', ('*' * 20)) + for line in req.body_file: + LOG.debug(mask_password(line)) + LOG.debug('') resp = req.get_response(self.application) - - LOG.debug('%s %s %s', ('*' * 20), 'RESPONSE HEADERS', ('*' * 20)) - for (key, value) in resp.headers.iteritems(): - LOG.debug('%s = %s', key, value) - LOG.debug('') + if LOG.isEnabledFor(logging.DEBUG): + LOG.debug('%s %s %s', ('*' * 20), 'RESPONSE HEADERS', ('*' * 20)) + for (key, value) in resp.headers.iteritems(): + LOG.debug('%s = %s', key, value) + LOG.debug('') resp.app_iter = self.print_generator(resp.app_iter) diff --git a/tests/test_wsgi.py b/tests/test_wsgi.py index 3c9eb2af15..5e607ec45d 100644 --- a/tests/test_wsgi.py +++ b/tests/test_wsgi.py @@ -21,13 +21,55 @@ from keystone import test -class ApplicationTest(test.TestCase): +class FakeApp(wsgi.Application): + def index(self, context): + return {'a': 'b'} + + +class BaseWSGITest(test.TestCase): + def setUp(self): + self.app = FakeApp() + super(BaseWSGITest, self).setUp() + def _make_request(self, url='/'): req = webob.Request.blank(url) args = {'action': 'index', 'controller': None} req.environ['wsgiorg.routing_args'] = [None, args] return req + def test_mask_password(self): + message = ("test = 'password': 'aaaaaa', 'param1': 'value1', " + "\"new_password\": 'bbbbbb'") + self.assertEqual(wsgi.mask_password(message, True), + u"test = 'password': '***', 'param1': 'value1', " + "\"new_password\": '***'") + + message = "test = 'password' : 'aaaaaa'" + self.assertEqual(wsgi.mask_password(message, False, '111'), + "test = 'password' : '111'") + + message = u"test = u'password' : u'aaaaaa'" + self.assertEqual(wsgi.mask_password(message, True), + u"test = u'password' : u'***'") + + message = 'test = "password" : "aaaaaaaaa"' + self.assertEqual(wsgi.mask_password(message), + 'test = "password" : "***"') + + message = 'test = "original_password" : "aaaaaaaaa"' + self.assertEqual(wsgi.mask_password(message), + 'test = "original_password" : "***"') + + message = 'test = "original_password" : ""' + self.assertEqual(wsgi.mask_password(message), + 'test = "original_password" : "***"') + + message = 'test = "param1" : "value"' + self.assertEqual(wsgi.mask_password(message), + 'test = "param1" : "value"') + + +class ApplicationTest(BaseWSGITest): def test_response_content_type(self): class FakeApp(wsgi.Application): def index(self, context):