Skip to content

Commit

Permalink
[#4796] [#4796] Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
amercader committed Jul 26, 2019
1 parent 199024f commit 5957c3a
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 78 deletions.
30 changes: 15 additions & 15 deletions ckan/lib/repoze_plugins/auth_tkt.py
Expand Up @@ -19,24 +19,24 @@ def __init__(self, httponly, *args, **kwargs):
self.httponly = httponly

def _get_cookies(self, *args, **kwargs):
'''
u'''
Override method in superclass to ensure HttpOnly is set appropriately.
'''
super_cookies = super(CkanAuthTktCookiePlugin, self). \
_get_cookies(*args, **kwargs)

cookies = []
for k, v in super_cookies:
replace_with = '; HttpOnly' if self.httponly else ''
v = v.replace('; HttpOnly', '') + replace_with
replace_with = u'; HttpOnly' if self.httponly else u''
v = v.replace(u'; HttpOnly', u'') + replace_with
cookies.append((k, v))

return cookies


def make_plugin(secret=None,
secretfile=None,
cookie_name='auth_tkt',
cookie_name=u'auth_tkt',
secure=False,
include_ip=False,
timeout=None,
Expand All @@ -46,29 +46,29 @@ def make_plugin(secret=None,

# ckan specifics:
# Get secret from beaker setting if necessary
if secret is None or secret == 'somesecret':
secret = config['beaker.session.secret']
if secret is None or secret == u'somesecret':
secret = config[u'beaker.session.secret']
# Session timeout and reissue time for auth cookie
if timeout is None and config.get('who.timeout'):
timeout = config.get('who.timeout')
if reissue_time is None and config.get('who.reissue_time'):
reissue_time = config.get('who.reissue_time')
if timeout is None and config.get(u'who.timeout'):
timeout = config.get(u'who.timeout')
if reissue_time is None and config.get(u'who.reissue_time'):
reissue_time = config.get(u'who.reissue_time')
if timeout is not None and reissue_time is None:
reissue_time = int(math.ceil(int(timeout) * 0.1))
# Set httponly based on config value. Default is True
httponly = config.get('who.httponly', True)
httponly = config.get(u'who.httponly', True)
# Set secure based on config value. Default is False
secure = config.get('who.secure', False)
secure = config.get(u'who.secure', False)

# back to repoze boilerplate
if (secret is None and secretfile is None):
raise ValueError("One of 'secret' or 'secretfile' must not be None.")
raise ValueError(u"One of 'secret' or 'secretfile' must not be None.")
if (secret is not None and secretfile is not None):
raise ValueError("Specify only one of 'secret' or 'secretfile'.")
raise ValueError(u"Specify only one of 'secret' or 'secretfile'.")
if secretfile:
secretfile = os.path.abspath(os.path.expanduser(secretfile))
if not os.path.exists(secretfile):
raise ValueError("No such 'secretfile': %s" % secretfile)
raise ValueError(u"No such 'secretfile': %s" % secretfile)
secret = open(secretfile).read().strip()
if timeout:
timeout = int(timeout)
Expand Down
124 changes: 62 additions & 62 deletions ckan/lib/repoze_plugins/friendly_form.py
Expand Up @@ -24,7 +24,7 @@
#
##############################################################################

"""Collection of :mod:`repoze.who` friendly forms"""
u'''Collection of :mod:`repoze.who` friendly forms'''

from six.moves.urllib.parse import urlparse, urlunparse, urlencode, parse_qs

Expand All @@ -34,15 +34,15 @@

from repoze.who.interfaces import IChallenger, IIdentifier

__all__ = ['FriendlyFormPlugin']
__all__ = [u'FriendlyFormPlugin']


def construct_url(environ):
return Request(environ).url


class FriendlyFormPlugin(object):
"""
u'''
:class:`RedirectingFormPlugin
<repoze.who.plugins.form.RedirectingFormPlugin>`-like form plugin with
more features.
Expand All @@ -60,23 +60,23 @@ class FriendlyFormPlugin(object):
You should keep in mind that if you're using a post-login or a post-logout
page, that page will receive the referrer URL as a query string variable
whose name is "came_from".
whose name is 'came_from'.
Forms can be submitted with any encoding (non-ASCII credentials are
supported) and ISO-8859-1 (aka "Latin-1") is the default one.
supported) and ISO-8859-1 (aka 'Latin-1') is the default one.
"""
'''
implements(IChallenger, IIdentifier)

classifications = {
IIdentifier: ["browser"],
IChallenger: ["browser"],
IIdentifier: [u'browser'],
IChallenger: [u'browser'],
}

def __init__(self, login_form_url, login_handler_path, post_login_url,
logout_handler_path, post_logout_url, rememberer_name,
login_counter_name=None, charset="iso-8859-1"):
"""
login_counter_name=None, charset=u'iso-8859-1'):
u'''
:param login_form_url: The URL/path where the login form is located.
:type login_form_url: str
Expand Down Expand Up @@ -107,7 +107,7 @@ def __init__(self, login_form_url, login_handler_path, post_login_url,
.. versionchanged:: 1.0.1
Added the ``charset`` argument.
"""
'''
self.login_form_url = login_form_url
self.login_handler_path = login_handler_path
self.post_login_url = post_login_url
Expand All @@ -116,66 +116,66 @@ def __init__(self, login_form_url, login_handler_path, post_login_url,
self.rememberer_name = rememberer_name
self.login_counter_name = login_counter_name
if not login_counter_name:
self.login_counter_name = '__logins'
self.login_counter_name = u'__logins'
self.charset = charset

# IIdentifier
def identify(self, environ):
"""
u'''
Override the parent's identifier to introduce a login counter
(possibly along with a post-login page) and load the login counter into
the ``environ``.
"""
'''
request = Request(environ, charset=self.charset)

path_info = environ['PATH_INFO']
script_name = environ.get('SCRIPT_NAME') or '/'
path_info = environ[u'PATH_INFO']
script_name = environ.get(u'SCRIPT_NAME') or u'/'
query = request.GET

if path_info == self.login_handler_path:
# We are on the URL where repoze.who processes authentication. #
# Let's append the login counter to the query string of the
# "came_from" URL. It will be used by the challenge below if
# 'came_from' URL. It will be used by the challenge below if
# authorization is denied for this request.
form = dict(request.POST)
form.update(query)
try:
login = form['login']
password = form['password']
login = form[u'login']
password = form[u'password']
except KeyError:
credentials = None
else:
if request.charset == "us-ascii":
if request.charset == u'us-ascii':
credentials = {
'login': str(login),
'password': str(password),
u'login': str(login),
u'password': str(password),
}
else:
credentials = {'login': login, 'password': password}
credentials = {u'login': login, u'password': password}

try:
credentials['max_age'] = form['remember']
credentials[u'max_age'] = form[u'remember']
except KeyError:
pass

referer = environ.get('HTTP_REFERER', script_name)
destination = form.get('came_from', referer)
referer = environ.get(u'HTTP_REFERER', script_name)
destination = form.get(u'came_from', referer)

if self.post_login_url:
# There's a post-login page, so we have to replace the
# destination with it.
destination = self._get_full_path(self.post_login_url,
environ)
if 'came_from' in query:
if u'came_from' in query:
# There's a referrer URL defined, so we have to pass it to
# the post-login page as a GET variable.
destination = self._insert_qs_variable(destination,
'came_from',
query['came_from'])
u'came_from',
query[u'came_from'])
failed_logins = self._get_logins(environ, True)
new_dest = self._set_logins_in_url(destination, failed_logins)
environ['repoze.who.application'] = HTTPFound(location=new_dest)
environ[u'repoze.who.application'] = HTTPFound(location=new_dest)
return credentials

elif path_info == self.logout_handler_path:
Expand All @@ -184,11 +184,11 @@ def identify(self, environ):
params = dict(list(r.GET.items()) + list(r.POST.items()))
form = UnicodeMultiDict(params)
form.update(query)
referer = environ.get('HTTP_REFERER', script_name)
came_from = form.get('came_from', referer)
referer = environ.get(u'HTTP_REFERER', script_name)
came_from = form.get(u'came_from', referer)
# set in environ for self.challenge() to find later
environ['came_from'] = came_from
environ['repoze.who.application'] = HTTPUnauthorized()
environ[u'came_from'] = came_from
environ[u'repoze.who.application'] = HTTPUnauthorized()
return None

elif path_info == self.login_form_url or self._get_logins(environ):
Expand All @@ -197,57 +197,57 @@ def identify(self, environ):
# So let's load the counter into the environ and then hide it from
# the query string (it will cause problems in frameworks like TG2,
# where this unexpected variable would be passed to the controller)
environ['repoze.who.logins'] = self._get_logins(environ, True)
environ[u'repoze.who.logins'] = self._get_logins(environ, True)
# Hiding the GET variable in the environ:
if self.login_counter_name in query:
del query[self.login_counter_name]
environ['QUERY_STRING'] = urlencode(query, doseq=True)
environ[u'QUERY_STRING'] = urlencode(query, doseq=True)

# IChallenger
def challenge(self, environ, status, app_headers, forget_headers):
"""
u'''
Override the parent's challenge to avoid challenging the user on
logout, introduce a post-logout page and/or pass the login counter
to the login form.
"""
'''
url_parts = list(urlparse(self.login_form_url))
query = url_parts[4]
query_elements = parse_qs(query)
came_from = environ.get('came_from', construct_url(environ))
query_elements['came_from'] = came_from
came_from = environ.get(u'came_from', construct_url(environ))
query_elements[u'came_from'] = came_from
url_parts[4] = urlencode(query_elements, doseq=True)
login_form_url = urlunparse(url_parts)
login_form_url = self._get_full_path(login_form_url, environ)
destination = login_form_url
# Configuring the headers to be set:
cookies = [
(h, v) for (h, v) in app_headers if h.lower() == 'set-cookie'
(h, v) for (h, v) in app_headers if h.lower() == u'set-cookie'
]
headers = forget_headers + cookies

if environ['PATH_INFO'] == self.logout_handler_path:
if environ[u'PATH_INFO'] == self.logout_handler_path:
# Let's log the user out without challenging.
came_from = environ.get('came_from')
came_from = environ.get(u'came_from')
if self.post_logout_url:
# Redirect to a predefined "post logout" URL.
# Redirect to a predefined u'post logout' URL.
destination = self._get_full_path(self.post_logout_url,
environ)
if came_from:
destination = self._insert_qs_variable(
destination, 'came_from', came_from)
destination, u'came_from', came_from)
else:
# Redirect to the referrer URL.
script_name = environ.get('SCRIPT_NAME', '')
destination = came_from or script_name or '/'
script_name = environ.get(u'SCRIPT_NAME', u'')
destination = came_from or script_name or u'/'

elif 'repoze.who.logins' in environ:
elif u'repoze.who.logins' in environ:
# Login failed! Let's redirect to the login form and include
# the login counter in the query string
environ['repoze.who.logins'] += 1
environ[u'repoze.who.logins'] += 1
# Re-building the URL:
destination = self._set_logins_in_url(destination,
environ['repoze.who.logins'])
environ[u'repoze.who.logins'])

return HTTPFound(location=destination, headers=headers)

Expand All @@ -262,29 +262,29 @@ def forget(self, environ, identity):
return rememberer.forget(environ, identity)

def _get_rememberer(self, environ):
rememberer = environ['repoze.who.plugins'][self.rememberer_name]
rememberer = environ[u'repoze.who.plugins'][self.rememberer_name]
return rememberer

def _get_full_path(self, path, environ):
"""
u'''
Return the full path to ``path`` by prepending the SCRIPT_NAME.
If ``path`` is a URL, do nothing.
"""
if path.startswith('/'):
path = environ.get('SCRIPT_NAME', '') + path
'''
if path.startswith(u'/'):
path = environ.get(u'SCRIPT_NAME', u'') + path
return path

def _get_logins(self, environ, force_typecast=False):
"""
u'''
Return the login counter from the query string in the ``environ``.
If it's not possible to convert it into an integer and
``force_typecast`` is ``True``, it will be set to zero (int(0)).
Otherwise, it will be ``None`` or an string.
"""
'''
variables = Request(environ).queryvars
failed_logins = variables.get(self.login_counter_name)
if force_typecast:
Expand All @@ -295,24 +295,24 @@ def _get_logins(self, environ, force_typecast=False):
return failed_logins

def _set_logins_in_url(self, url, logins):
"""
u'''
Insert the login counter variable with the ``logins`` value into
``url`` and return the new URL.
"""
'''
return self._insert_qs_variable(url, self.login_counter_name, logins)

def _insert_qs_variable(self, url, var_name, var_value):
"""
u'''
Insert the variable ``var_name`` with value ``var_value`` in the query
string of ``url`` and return the new URL.
"""
'''
url_parts = list(urlparse(url))
query_parts = parse_qs(url_parts[4])
query_parts[var_name] = var_value
url_parts[4] = urlencode(query_parts, doseq=True)
return urlunparse(url_parts)

def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, id(self))
return u'<%s %s>' % (self.__class__.__name__, id(self))
2 changes: 1 addition & 1 deletion ckan/tests/lib/test_auth_tkt.py
Expand Up @@ -3,7 +3,7 @@
from nose import tools as nose_tools

from ckan.tests import helpers
from ckan.lib.auth_tkt import make_plugin
from ckan.lib.repoze_plugins.auth_tkt import make_plugin


class TestCkanAuthTktCookiePlugin(helpers.FunctionalTestBase):
Expand Down

0 comments on commit 5957c3a

Please sign in to comment.