Skip to content

Commit

Permalink
Correctly decode utf-8 digest authorization header
Browse files Browse the repository at this point in the history
Fixes #1717
  • Loading branch information
webknjaz committed Jun 17, 2018
1 parent f04168d commit 8ea08c6
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 23 deletions.
17 changes: 4 additions & 13 deletions cherrypy/lib/auth_digest.py
Expand Up @@ -24,7 +24,6 @@
import functools
from hashlib import md5

from six.moves.urllib.parse import unquote_to_bytes
from six.moves.urllib.request import parse_http_list, parse_keqv_list

import cherrypy
Expand Down Expand Up @@ -142,15 +141,12 @@ def H(s):
return md5_hex(s)


def _try_decode_map_values(param_map, charset):
def _try_decode_header(header, charset):
global FALLBACK_CHARSET

for enc in (charset, FALLBACK_CHARSET):
try:
return {
k: tonative(v, enc)
for k, v in param_map.items()
}
return tonative(ntob(header, 'latin1'), enc)
except ValueError as ve:
last_err = ve
else:
Expand Down Expand Up @@ -183,18 +179,13 @@ def __init__(
if not self.matches(auth_header):
raise ValueError('Authorization scheme is not "Digest"')

scheme, params = auth_header.split(' ', 1)
self.auth_header = _try_decode_header(auth_header, accept_charset)

self.auth_header = auth_header
scheme, params = self.auth_header.split(' ', 1)

# make a dict of the params
items = parse_http_list(params)
paramsd = parse_keqv_list(items)
paramsd = {
k: unquote_to_bytes(v) if k != "uri" else v
for k, v in paramsd.items()
}
paramsd = _try_decode_map_values(paramsd, accept_charset)

self.realm = paramsd.get('realm')
self.username = paramsd.get('username')
Expand Down
20 changes: 10 additions & 10 deletions cherrypy/test/test_auth_digest.py
Expand Up @@ -9,8 +9,6 @@

from cherrypy.test import helper

from six.moves.urllib.parse import quote as urlencode


def _fetch_users():
return {'test': 'test', 'йюзер': 'їпароль'}
Expand All @@ -32,7 +30,7 @@ def index(self):
class DigestProtected:

@cherrypy.expose
def index(self):
def index(self, *args, **kwargs):
return "Hello %s, you've been authorized." % (
cherrypy.request.login)

Expand All @@ -54,7 +52,9 @@ def testPublic(self):
assert self.body == b'This is public.'

def _test_parametric_digest(self, username, realm):
self.getPage('/digest/')
test_uri = '/digest/?@/=%2F%40&%f0%9f%99%88=path'

self.getPage(test_uri)
assert self.status_code == 401

msg = 'Digest authentification scheme was not found'
Expand Down Expand Up @@ -82,27 +82,27 @@ def _test_parametric_digest(self, username, realm):
base_auth = ('Digest username="%s", '
'realm="%s", '
'nonce="%s", '
'uri="/digest/", '
'uri="%s", '
'algorithm=MD5, '
'response="%s", '
'qop=auth, '
'nc=%s, '
'cnonce="1522e61005789929"')

encoded_user = urlencode(username, 'utf-8')
encoded_user = username.encode('utf-8').decode('latin1')
auth_header = base_auth % (
encoded_user, realm, nonce,
encoded_user, realm, nonce, test_uri,
'11111111111111111111111111111111', '00000001',
)
auth = auth_digest.HttpDigestAuthorization(auth_header, 'GET')
# calculate the response digest
ha1 = get_ha1(auth.realm, auth.username)
response = auth.request_digest(ha1)
auth_header = base_auth % (
encoded_user, realm,
nonce, response, '00000001',
encoded_user, realm, nonce, test_uri,
response, '00000001',
)
self.getPage('/digest/', [('Authorization', auth_header)])
self.getPage(test_uri, [('Authorization', auth_header)])

def test_wrong_realm(self):
# send response with correct response digest, but wrong realm
Expand Down

0 comments on commit 8ea08c6

Please sign in to comment.