Skip to content
Browse files

Back to 100% coverage under 2.x.

One remaining failure under 3.2.
  • Loading branch information...
1 parent 513d65d commit ff80e0cafbed76db5dc9293d7376743a5fba8760 @tseaver tseaver committed Mar 19, 2012
Showing with 112 additions and 215 deletions.
  1. +2 −215 repoze/who/_auth_tkt.py
  2. +110 −0 repoze/who/tests/test__auth_tkt.py
View
217 repoze/who/_auth_tkt.py
@@ -40,9 +40,7 @@
import time as time_mod
from repoze.who._compat import encodestring
-from repoze.who._compat import get_cookies
from repoze.who._compat import SimpleCookie
-from repoze.who._compat import STRING_TYPES
from repoze.who._compat import url_quote
from repoze.who._compat import url_unquote
@@ -183,7 +181,7 @@ def calculate_digest(ip, timestamp, secret, userid, tokens, user_data):
return digest
-if type(chr(1)) == type(b''):
+if type(chr(1)) == type(b''): #pragma NO COVER Python < 3.0
def ints2bytes(ints):
return b''.join(map(chr, ints))
else: #pragma NO COVER Python >= 3.0
@@ -207,215 +205,4 @@ def maybe_encode(s, encoding='utf8'):
return s
-class AuthTKTMiddleware(object):
-
- """
- Middleware that checks for signed cookies that match what
- `mod_auth_tkt <http://www.openfusion.com.au/labs/mod_auth_tkt/>`_
- looks for (if you have mod_auth_tkt installed, you don't need this
- middleware, since Apache will set the environmental variables for
- you).
-
- Arguments:
-
- ``secret``:
- A secret that should be shared by any instances of this application.
- If this app is served from more than one machine, they should all
- have the same secret.
-
- ``cookie_name``:
- The name of the cookie to read and write from. Default ``auth_tkt``.
-
- ``secure``:
- If the cookie should be set as 'secure' (only sent over SSL) and if
- the login must be over SSL. (Defaults to False)
-
- ``httponly``:
- If the cookie should be marked as HttpOnly, which means that it's
- not accessible to JavaScript. (Defaults to False)
-
- ``include_ip``:
- If the cookie should include the user's IP address. If so, then
- if they change IPs their cookie will be invalid.
-
- ``logout_path``:
- The path under this middleware that should signify a logout. The
- page will be shown as usual, but the user will also be logged out
- when they visit this page.
-
- If used with mod_auth_tkt, then these settings (except logout_path) should
- match the analogous Apache configuration settings.
-
- This also adds two functions to the request:
-
- ``environ['repoze.who._auth_tkt.set_user'](userid, tokens='',
- user_data='')``
-
- This sets a cookie that logs the user in. ``tokens`` is a
- string (comma-separated groups) or a list of strings.
- ``user_data`` is a string for your own use.
-
- ``environ['repoze.who._auth_tkt.logout_user']()``
-
- Logs out the user.
- """
-
- def __init__(self, app, secret, cookie_name='auth_tkt', secure=False,
- include_ip=True, logout_path=None, httponly=False,
- no_domain_cookie=True, current_domain_cookie=True,
- wildcard_cookie=True):
- self.app = app
- self.secret = secret
- self.cookie_name = cookie_name
- self.secure = secure
- self.httponly = httponly
- self.include_ip = include_ip
- self.logout_path = logout_path
- self.no_domain_cookie = no_domain_cookie
- self.current_domain_cookie = current_domain_cookie
- self.wildcard_cookie = wildcard_cookie
-
- def __call__(self, environ, start_response):
- #cookies = request.get_cookies(environ)
- cookies = get_cookies(environ)
- if self.cookie_name in cookies:
- cookie_value = cookies[self.cookie_name].value
- else:
- cookie_value = ''
- if cookie_value:
- if self.include_ip:
- remote_addr = environ['REMOTE_ADDR']
- else:
- # mod_auth_tkt uses this dummy value when IP is not
- # checked:
- remote_addr = '0.0.0.0'
- # @@: This should handle bad signatures better:
- # Also, timeouts should cause cookie refresh
- try:
- timestamp, userid, tokens, user_data = parse_ticket(
- self.secret, cookie_value, remote_addr)
- tokens = ','.join(tokens)
- environ['REMOTE_USER'] = userid
- if environ.get('REMOTE_USER_TOKENS'):
- # We want to add tokens/roles to what's there:
- tokens = environ['REMOTE_USER_TOKENS'] + ',' + tokens
- environ['REMOTE_USER_TOKENS'] = tokens
- environ['REMOTE_USER_DATA'] = user_data
- environ['AUTH_TYPE'] = 'cookie'
- except BadTicket:
- # bad credentials, just ignore without logging the user
- # in or anything
- pass
- set_cookies = []
-
- def set_user(userid, tokens='', user_data=''):
- set_cookies.extend(self.set_user_cookie(
- environ, userid, tokens, user_data))
-
- def logout_user():
- set_cookies.extend(self.logout_user_cookie(environ))
-
- environ['repoze.who._auth_tkt.set_user'] = set_user
- environ['repoze.who._auth_tkt.logout_user'] = logout_user
- if self.logout_path and environ.get('PATH_INFO') == self.logout_path:
- logout_user()
-
- def cookie_setting_start_response(status, headers, exc_info=None):
- headers.extend(set_cookies)
- return start_response(status, headers, exc_info)
-
- return self.app(environ, cookie_setting_start_response)
-
- def set_user_cookie(self, environ, userid, tokens, user_data):
- if not isinstance(tokens, STRING_TYPES):
- tokens = ','.join(tokens)
- if self.include_ip:
- remote_addr = environ['REMOTE_ADDR']
- else:
- remote_addr = '0.0.0.0'
- ticket = AuthTicket(
- self.secret,
- userid,
- remote_addr,
- tokens=tokens,
- user_data=user_data,
- cookie_name=self.cookie_name,
- secure=self.secure)
- # @@: Should we set REMOTE_USER etc in the current
- # environment right now as well?
- cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
- wild_domain = '.' + cur_domain
-
- cookie_options = ""
- if self.secure:
- cookie_options += "; secure"
- if self.httponly:
- cookie_options += "; HttpOnly"
-
- cookies = []
- if self.no_domain_cookie:
- cookies.append(('Set-Cookie', '%s=%s; Path=/%s' % (
- self.cookie_name, ticket.cookie_value(), cookie_options)))
- if self.current_domain_cookie:
- cookies.append(('Set-Cookie', '%s=%s; Path=/; Domain=%s%s' % (
- self.cookie_name, ticket.cookie_value(), cur_domain,
- cookie_options)))
- if self.wildcard_cookie:
- cookies.append(('Set-Cookie', '%s=%s; Path=/; Domain=%s%s' % (
- self.cookie_name, ticket.cookie_value(), wild_domain,
- cookie_options)))
-
- return cookies
-
- def logout_user_cookie(self, environ):
- cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
- wild_domain = '.' + cur_domain
- expires = 'Sat, 01-Jan-2000 12:00:00 GMT'
- cookies = [
- ('Set-Cookie', '%s=""; Expires="%s"; Path=/' %
- (self.cookie_name, expires)),
- ('Set-Cookie', '%s=""; Expires="%s"; Path=/; Domain=%s' %
- (self.cookie_name, expires, cur_domain)),
- ('Set-Cookie', '%s=""; Expires="%s"; Path=/; Domain=%s' %
- (self.cookie_name, expires, wild_domain)),
- ]
- return cookies
-
-
-def asbool(obj):
- # Lifted from paste.deploy.converters
- if isinstance(obj, STRING_TYPES):
- obj = obj.strip().lower()
- if obj in ['true', 'yes', 'on', 'y', 't', '1']:
- return True
- elif obj in ['false', 'no', 'off', 'n', 'f', '0']:
- return False
- else:
- raise ValueError(
- "String is not true/false: %r" % obj)
- return bool(obj)
-
-def make_auth_tkt_middleware(
- app,
- global_conf,
- secret=None,
- cookie_name='auth_tkt',
- secure=False,
- include_ip=True,
- logout_path=None,
- ):
- """
- Creates the `AuthTKTMiddleware
- <class-repoze.who._auth_tkt.AuthTKTMiddleware.html>`_.
-
- ``secret`` is required, but can be set globally or locally.
- """
- secure = asbool(secure)
- include_ip = asbool(include_ip)
- if secret is None:
- secret = global_conf.get('secret')
- if not secret:
- raise ValueError(
- "You must provide a 'secret' (in global or local configuration)")
- return AuthTKTMiddleware(
- app, secret, cookie_name, secure, include_ip, logout_path or None)
+# Original Paste AuthTktMiddleware stripped: we don't have a use for it.
View
110 repoze/who/tests/test__auth_tkt.py
@@ -89,6 +89,116 @@ def test_cookie_secure_w_tokens_and_userdata(self):
self.assertEqual(cookie['oatmeal']['secure'], 'true')
+class BadTicketTests(unittest.TestCase):
+
+ def _getTargetClass(self):
+ from .._auth_tkt import BadTicket
+ return BadTicket
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def test_wo_expected(self):
+ exc = self._makeOne('message')
+ self.assertEqual(exc.args, ('message',))
+ self.assertEqual(exc.expected, None)
+
+ def test_w_expected(self):
+ exc = self._makeOne('message', 'foo')
+ self.assertEqual(exc.args, ('message',))
+ self.assertEqual(exc.expected, 'foo')
+
+
+class Test_parse_ticket(unittest.TestCase):
+
+ def _callFUT(self, secret='SEEKRIT', ticket=None, ip='1.2.3.4'):
+ from .._auth_tkt import parse_ticket
+ return parse_ticket(secret, ticket, ip)
+
+ def test_bad_timestamp(self):
+ from .._auth_tkt import BadTicket
+ TICKET = '12345678901234567890123456789012XXXXXXXXuserid!'
+ try:
+ self._callFUT(ticket=TICKET)
+ except BadTicket as e:
+ self.failUnless(e.args[0].startswith(
+ 'Timestamp is not a hex integer:'))
+ else:
+ self.fail('Did not raise')
+
+ def test_no_bang_after_userid(self):
+ from .._auth_tkt import BadTicket
+ TICKET = '1234567890123456789012345678901201020304userid'
+ try:
+ self._callFUT(ticket=TICKET)
+ except BadTicket as e:
+ self.assertEqual(e.args[0], 'userid is not followed by !')
+ else:
+ self.fail('Did not raise')
+
+ def test_wo_tokens_or_data_bad_digest(self):
+ from .._auth_tkt import BadTicket
+ TICKET = '1234567890123456789012345678901201020304userid!'
+ try:
+ self._callFUT(ticket=TICKET)
+ except BadTicket as e:
+ self.assertEqual(e.args[0], 'Digest signature is not correct')
+ else:
+ self.fail('Did not raise')
+
+ def test_wo_tokens_or_data_ok_digest(self):
+ from .._auth_tkt import calculate_digest
+ digest = calculate_digest('1.2.3.4', _WHEN, 'SEEKRIT', 'USERID', '', '')
+ TICKET = '%s%08xUSERID!' % (digest, _WHEN)
+ timestamp, userid, tokens, user_data = self._callFUT(ticket=TICKET)
+ self.assertEqual(timestamp, _WHEN)
+ self.assertEqual(userid, 'USERID')
+ self.assertEqual(tokens, [''])
+ self.assertEqual(user_data, '')
+
+ def test_w_tokens_and_data_ok_digest(self):
+ from .._auth_tkt import calculate_digest
+ digest = calculate_digest('1.2.3.4', _WHEN, 'SEEKRIT', 'USERID',
+ 'a,b', 'DATA')
+ TICKET = '%s%08xUSERID!a,b!DATA' % (digest, _WHEN)
+ timestamp, userid, tokens, user_data = self._callFUT(ticket=TICKET)
+ self.assertEqual(timestamp, _WHEN)
+ self.assertEqual(userid, 'USERID')
+ self.assertEqual(tokens, ['a', 'b'])
+ self.assertEqual(user_data, 'DATA')
+
+
+class Test_helpers(unittest.TestCase):
+
+ # calculate_digest is not very testable, and fully exercised throug callers.
+
+ def test_ints_to_bytes(self):
+ from struct import pack
+ from .._auth_tkt import ints2bytes
+ self.assertEqual(ints2bytes([1, 2, 3, 4]), pack('>BBBB', 1, 2, 3, 4))
+
+ def test_encode_ip_timestamp(self):
+ from struct import pack
+ from .._auth_tkt import encode_ip_timestamp
+ self.assertEqual(encode_ip_timestamp('1.2.3.4', _WHEN),
+ pack('>BBBBL', 1, 2, 3, 4, _WHEN))
+
+ def test_maybe_encode_bytes(self):
+ from .._auth_tkt import maybe_encode
+ foo = b'foo'
+ self.failUnless(maybe_encode(foo) is foo)
+
+ def test_maybe_encode_native_string(self):
+ from .._auth_tkt import maybe_encode
+ foo = 'foo'
+ self.assertEqual(maybe_encode(foo), b'foo')
+
+ def test_maybe_encode_unicode(self):
+ from .._auth_tkt import maybe_encode
+ from .._compat import u
+ foo = u('foo')
+ self.assertEqual(maybe_encode(foo), b'foo')
+
_WHEN = 1234567
class _Timemod(object):

0 comments on commit ff80e0c

Please sign in to comment.
Something went wrong with that request. Please try again.