Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Multiple named configurations #6

Open
wants to merge 21 commits into from

1 participant

@lmctv

This pull request is a superset of the previous one; hope you don't mind this much noise
coming from my side these days...

Thank you,

l.
lmctv added some commits
@lmctv lmctv Early getting of the searches from the registry
To avoid useless activation of the context managers both in
Connector.ldap_login_query() and Connector.user_groups()
d96fda3
@lmctv lmctv Wrap both LDAP operations in the same try: block
to correctly check exception from the first search too.
d31c9a9
@lmctv lmctv Skip server searches on empty query filter.
The API has been preserved by refactoring the caching
and connection searching into a new _LDAPQuery.execute_cache
method that gets called in turn by _LDAPQuery.execute
6f1de90
@lmctv lmctv Be nicer to the directory server
by setting a sizelimit on authentication searches.
9d17d4d
@lmctv lmctv Allow skipping/postponing the dn search
by refactoring the _LDAPQuery class:

  - slightly refactor _LDAPQuery.execute by splitting it into
    a new method _LDAPQuery.execute_cache, which does the real work
    of searching and caching, and a replacement _LDAPQuery.execute
    which will skip the call to execute_cache when filter_tmpl
    is empty.

  - directly call _LDAPQuery.execute_cache after entering the
    user-bind self.manager.connection() context manager
74f65da
@lmctv lmctv Escape login identifier before searching the entry.
This will avoid trivial DOS and ldap.FILTER_ERROR exceptions on
attempted logins by users sporting "funny" login names, like 'user*name'
or 'user(middle)name'.

This is a forward port of lmctv/pyramid_ldap@f305744
to silence merge conflicts.
2232f87
@lmctv lmctv Create context-named connections
This way, it becomes possible to add distinct ldap connectors
to the same pyramid app, and reference them by adding a context=...
discriminator to the api calls:

    ldap_setup()
    ldap_set_login_query()
    ldap_set_groups_query()
    get_ldap_connector()

and to the Connector's __init__ method.

Feature comes complete with unit tests...
d8b5b6b
@lmctv lmctv Change authenticated_user's representation
to help groupfinder discriminate between configured backend contexts
c1df1ff
@lmctv lmctv Adapt tests to the new representation. f5ba93d
@lmctv lmctv Factor-out a get_connector_name
to simplify downstream integration.
8410b3d
@lmctv lmctv Add documentation for get_ldap_connector_name. aba3228
@lmctv lmctv Merge remote-tracking branch 'upstream/master'
into multiple_named_configurations
f3cd446
@lmctv lmctv Docstring changes
Shorten one line and correct one typo.
d377b9d
@lmctv lmctv Add myself to CONTRIBUTORS 82be5c4
@lmctv lmctv Set default values for login and password 830efef
@lmctv

@mcdonc: please tell me where should I send the beer box I bought to try and bribe you into taking a look at this PR :-)

lmctv added some commits
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 21, 2013
  1. @lmctv

    Early getting of the searches from the registry

    lmctv authored
    To avoid useless activation of the context managers both in
    Connector.ldap_login_query() and Connector.user_groups()
  2. @lmctv

    Wrap both LDAP operations in the same try: block

    lmctv authored
    to correctly check exception from the first search too.
  3. @lmctv

    Skip server searches on empty query filter.

    lmctv authored
    The API has been preserved by refactoring the caching
    and connection searching into a new _LDAPQuery.execute_cache
    method that gets called in turn by _LDAPQuery.execute
  4. @lmctv

    Be nicer to the directory server

    lmctv authored
    by setting a sizelimit on authentication searches.
  5. @lmctv

    Allow skipping/postponing the dn search

    lmctv authored
    by refactoring the _LDAPQuery class:
    
      - slightly refactor _LDAPQuery.execute by splitting it into
        a new method _LDAPQuery.execute_cache, which does the real work
        of searching and caching, and a replacement _LDAPQuery.execute
        which will skip the call to execute_cache when filter_tmpl
        is empty.
    
      - directly call _LDAPQuery.execute_cache after entering the
        user-bind self.manager.connection() context manager
  6. @lmctv

    Escape login identifier before searching the entry.

    lmctv authored
    This will avoid trivial DOS and ldap.FILTER_ERROR exceptions on
    attempted logins by users sporting "funny" login names, like 'user*name'
    or 'user(middle)name'.
    
    This is a forward port of lmctv/pyramid_ldap@f305744
    to silence merge conflicts.
Commits on Jan 25, 2013
  1. @lmctv

    Create context-named connections

    lmctv authored
    This way, it becomes possible to add distinct ldap connectors
    to the same pyramid app, and reference them by adding a context=...
    discriminator to the api calls:
    
        ldap_setup()
        ldap_set_login_query()
        ldap_set_groups_query()
        get_ldap_connector()
    
    and to the Connector's __init__ method.
    
    Feature comes complete with unit tests...
  2. @lmctv

    Change authenticated_user's representation

    lmctv authored
    to help groupfinder discriminate between configured backend contexts
  3. @lmctv
Commits on Mar 2, 2013
  1. @lmctv

    Factor-out a get_connector_name

    lmctv authored
    to simplify downstream integration.
  2. @lmctv
Commits on Jun 9, 2013
  1. @lmctv

    Merge remote-tracking branch 'upstream/master'

    lmctv authored
    into multiple_named_configurations
  2. @lmctv

    Docstring changes

    lmctv authored
    Shorten one line and correct one typo.
  3. @lmctv

    Add myself to CONTRIBUTORS

    lmctv authored
  4. @lmctv
Commits on Jul 1, 2014
  1. @lmctv

    Merge branch 'master' of github.com:Pylons/pyramid_ldap into multiple…

    lmctv authored
    …_named_configurations
    
    Conflicts:
    	pyramid_ldap/__init__.py
  2. @lmctv
  3. @lmctv

    Module users should know what changed

    lmctv authored
    without being forced to read the vcs log
  4. @lmctv
Commits on Jul 2, 2014
  1. @lmctv

    Rename context → realm

    lmctv authored
Commits on Jul 5, 2014
  1. @lmctv

    Document presence and usage of ``realm`` parameter

    lmctv authored
    in `ldap_setup`,  `get_ldap_connector`, `ldap_set_login_query`
    and `ldap_set_groups_query` API calls.
This page is out of date. Refresh to see the latest.
View
13 CHANGES.txt
@@ -1,6 +1,19 @@
Next release
------------
+- Support creation of multiple co-existing ldap contexts in the same
+ application. See https://github.com/Pylons/pyramid_ldap/pull/6
+
+- Add a "search_after_bind" attribute to the _LDAPQuery class, to
+ help in reading directory entry attributes when the directory server
+ is configured to hide them in the before login search phase.
+ See https://github.com/Pylons/pyramid_ldap/pull/5
+
+- Set a size limit on pre-login entry dn searches.
+
+- Escape user-provided login names, avoiding server errors or trivial
+ DOSs caused by user names like 'user*name' or 'user(middle)name'
+
- Prevent the use of zero-length password authentication. See
https://github.com/Pylons/pyramid_ldap/pull/13
View
2  CONTRIBUTORS.txt
@@ -103,4 +103,4 @@ Contributors
------------
- Chris McDonough, 2013/04/24
-
+- Lorenzo M. Catucci, 2013/06/09
View
261 pyramid_ldap/__init__.py
@@ -1,5 +1,7 @@
try:
import ldap
+ import ldap.filter
+ import ldapurl
except ImportError: # pragma: no cover
# this is for benefit of being able to build the docs on rtd.org
class ldap(object):
@@ -15,7 +17,7 @@ class ldap(object):
from pyramid.compat import bytes_
try:
- from ldappool import ConnectionManager
+ from ldappool import ConnectionManager, BackendError
except ImportError as e: # pragma: no cover
class ConnectionManager(object):
def __init__(self, *arg, **kw):
@@ -27,17 +29,20 @@ def __init__(self, *arg, **kw):
class _LDAPQuery(object):
""" Represents an LDAP query. Provides rudimentary in-RAM caching of
query results."""
- def __init__(self, base_dn, filter_tmpl, scope, cache_period):
+ def __init__(self, base_dn, filter_tmpl, scope, cache_period,
+ search_after_bind=False):
self.base_dn = base_dn
self.filter_tmpl = filter_tmpl
self.scope = scope
self.cache_period = cache_period
self.last_timeslice = 0
self.cache = {}
+ self.search_after_bind = search_after_bind
def __str__(self):
return ('base_dn=%(base_dn)s, filter_tmpl=%(filter_tmpl)s, '
- 'scope=%(scope)s, cache_period=%(cache_period)s' %
+ 'scope=%(scope)s, cache_period=%(cache_period)s '
+ 'search_after_bind=%(search_after_bind)s'%
self.__dict__)
def query_cache(self, cache_key):
@@ -57,13 +62,8 @@ def query_cache(self, cache_key):
return result
- def execute(self, conn, **kw):
- cache_key = (
- bytes_(self.base_dn % kw, 'utf-8'),
- self.scope,
- bytes_(self.filter_tmpl % kw, 'utf-8')
- )
-
+ def execute_cache(self, conn, *cache_key, **kw):
+ sizelimit = kw.get('sizelimit', 0)
logger.debug('searching for %r' % (cache_key,))
if self.cache_period:
@@ -73,27 +73,63 @@ def execute(self, conn, **kw):
(cache_key,)
)
else:
- result = conn.search_s(*cache_key)
+ result = conn.search_ext_s(*cache_key, sizelimit=sizelimit)
self.cache[cache_key] = result
else:
- result = conn.search_s(*cache_key)
+ result = conn.search_ext_s(*cache_key, sizelimit=sizelimit)
logger.debug('search result: %r' % (result,))
return result
+ def execute(self, conn, sizelimit=0, **kw):
+ """ Returns an entry set resulting from querying the connected backend
+ for entries matching parameters in kw.
+
+ Skip the real query and return an hard-coded result based on string
+ interpolation of ``base_dn`` if the ``filter_tmpl`` attribute
+ is empty"""
+ search_filter = self.filter_tmpl % kw
+ search_base = self.base_dn % kw
+ if search_filter:
+ cache_key = (
+ bytes_(search_base, 'utf-8'),
+ self.scope,
+ bytes_(search_filter, 'utf-8')
+ )
+ return self.execute_cache(conn, *cache_key, sizelimit=sizelimit)
+
+ result = [(search_base, {})]
+ logger.debug('result generated by string interpolation: %r' % result)
+ return result
+
def _timeslice(period, when=None):
if when is None: # pragma: no cover
when = time.time()
return when - (when % period)
+def _activity_identifier(base_identifier, realm=''):
+ if realm:
+ return '-'.join((base_identifier, realm))
+ else:
+ return base_identifier
+
+def _registry_identifier(base_identifier, realm=''):
+ if realm:
+ return '_'.join((base_identifier, realm))
+ else:
+ return base_identifier
+
class Connector(object):
""" Provides API methods for accessing LDAP authentication information."""
- def __init__(self, registry, manager):
+ def __init__(self, registry, manager, realm=''):
self.registry = registry
self.manager = manager
+ self.realm = realm
+ self.login_qry_identif = _registry_identifier('ldap_login_query', realm)
+ self.group_qry_identif = _registry_identifier('ldap_groups_query', realm)
- def authenticate(self, login, password):
+ def authenticate(self, login='', password=''):
""" Given a login name and a password, return a tuple of ``(dn,
attrdict)`` if the matching user if the user exists and his password
is correct. Otherwise return ``None``.
@@ -103,7 +139,7 @@ def authenticate(self, login, password):
dictionary mapping LDAP user attributes to sequences of values. The
keys and values in the dictionary values provided will be decoded
from UTF-8, recursively, where possible. The dictionary returned is
- a case-insensitive dictionary implemenation.
+ a case-insensitive dictionary implementation.
A zero length password will always be considered invalid since it
results in a request for "unauthenticated authentication" which should
@@ -117,22 +153,34 @@ def authenticate(self, login, password):
if password == '':
return None
- with self.manager.connection() as conn:
- search = getattr(self.registry, 'ldap_login_query', None)
- if search is None:
- raise ConfigurationError(
- 'ldap_set_login_query was not called during setup')
-
- result = search.execute(conn, login=login, password=password)
- if len(result) == 1:
- login_dn = result[0][0]
- else:
- return None
+ search = getattr(self.registry, self.login_qry_identif, None)
+ if search is None:
+ raise ConfigurationError(
+ 'ldap_set_login_query was not called during setup')
+
try:
+ if login:
+ escaped_login = ldap.filter.escape_filter_chars(login)
+ else:
+ escaped_login = ''
+ with self.manager.connection() as conn:
+ result = search.execute(conn, login=escaped_login, password=password, sizelimit=1)
+ if len(result) == 1:
+ login_dn = result[0][0]
+ else:
+ return None
with self.manager.connection(login_dn, password) as conn:
# must invoke the __enter__ of this thing for it to connect
- return _ldap_decode(result[0])
- except ldap.LDAPError:
+ if search.search_after_bind:
+ result = search.execute_cache(conn, login_dn,
+ ldap.SCOPE_BASE,
+ '(objectClass=*)')
+ return _ldap_tag_dn_decode(result, realm=self.realm)[0]
+ except (ldap.LDAPError, ldap.SIZELIMIT_EXCEEDED, ldap.INVALID_CREDENTIALS):
+ logger.debug('Exception in authenticate with login %r - - ' % login,
+ exc_info=True)
+ return None
+ except BackendError:
logger.debug('Exception in authenticate with login %r' % login,
exc_info=True)
return None
@@ -153,14 +201,14 @@ def user_groups(self, userdn):
called, using this function will raise an
:exc:`pyramid.exceptions.ConfiguratorError`
"""
+ search = getattr(self.registry, self.group_qry_identif, None)
+ if search is None:
+ raise ConfigurationError(
+ 'set_ldap_groups_query was not called during setup')
with self.manager.connection() as conn:
- search = getattr(self.registry, 'ldap_groups_query', None)
- if search is None:
- raise ConfigurationError(
- 'set_ldap_groups_query was not called during setup')
try:
result = search.execute(conn, userdn=userdn)
- return _ldap_decode(result)
+ return _ldap_tag_dn_decode(result)
except ldap.LDAPError:
logger.debug(
'Exception in user_groups with userdn %r' % userdn,
@@ -168,14 +216,25 @@ def user_groups(self, userdn):
return None
def ldap_set_login_query(config, base_dn, filter_tmpl,
- scope=ldap.SCOPE_ONELEVEL, cache_period=0):
- """ Configurator method to set the LDAP login search. ``base_dn`` is the
- DN at which to begin the search. ``filter_tmpl`` is a string which can
- be used as an LDAP filter: it should contain the replacement value
- ``%(login)s``. Scope is any valid LDAP scope value
- (e.g. ``ldap.SCOPE_ONELEVEL``). ``cache_period`` is the number of seconds
- to cache login search results; if it is 0, login search results will not
- be cached.
+ scope=ldap.SCOPE_ONELEVEL, cache_period=0,
+ search_after_bind=False, realm=''):
+ """ Configurator method to set the LDAP login search.
+
+ - **base_dn**: the DN at which to begin the search **[mandatory]**
+ - **filter_tmpl**: an LDAP search filter **[mandatory]**
+
+ At least one of these parameters should contain the replacement value
+ ``%(login)s``
+
+ - **scope**: A valid ldap search scope
+ **default**: ``ldap.SCOPE_ONELEVEL``
+ - **cache_period**: the number of seconds to cache login search results
+ if 0, results will not be cached
+ **default**: ``0``
+ - **search_after_bind**: do a base search on the entry itself after
+ a successful bind
+ - **realm**: A connection identifier
+ **default**: ``''``
Example::
@@ -187,29 +246,58 @@ def ldap_set_login_query(config, base_dn, filter_tmpl,
The registered search must return one and only one value to be considered
a valid login.
+
+ If the ``filter_tmpl`` is empty, the directory will not be searched, and
+ the entry dn will be assumed to be equal to the ``%(login)s``-replaced
+ ``base_dn``, and no entry's attribute will be fetched from the LDAP server,
+ leading to faster operation.
+ Both in this case, and in the case of servers configured to only allow
+ reading some needed entry's attribute only to the bound entry itself,
+ ``search_after_bind`` can be set to ``True`` if there is a need to read
+ the entry's attribute.
+
+ Example::
+
+ config.set_ldap_login_query(
+ base_dn='sAMAccountName=%(login)s,CN=Users,DC=example,DC=com',
+ filter_tmpl=''
+ scope=ldap.SCOPE_ONELEVEL,
+ search_after_bind=True
+ )
+
"""
- query = _LDAPQuery(base_dn, filter_tmpl, scope, cache_period)
+ query_identif = _registry_identifier('ldap_login_query', realm)
+ intr_identif = _registry_identifier('pyramid_ldap', realm)
+ act_identif = _activity_identifier('pyramid_ldap', realm)
+
+ query = _LDAPQuery(base_dn, filter_tmpl, scope, cache_period,
+ search_after_bind=search_after_bind)
def register():
- config.registry.ldap_login_query = query
+ setattr(config.registry, query_identif, query)
intr = config.introspectable(
- 'pyramid_ldap login query',
+ '%s login query' % intr_identif,
None,
str(query),
- 'pyramid_ldap login query'
+ 'login query'
)
- config.action('ldap-set-login-query', register, introspectables=(intr,))
+ config.action(act_identif, register, introspectables=(intr,))
def ldap_set_groups_query(config, base_dn, filter_tmpl,
- scope=ldap.SCOPE_SUBTREE, cache_period=0):
- """ Configurator method to set the LDAP groups search. ``base_dn`` is
- the DN at which to begin the search. ``filter_tmpl`` is a string which
- can be used as an LDAP filter: it should contain the replacement value
- ``%(userdn)s``. Scope is any valid LDAP scope value
- (e.g. ``ldap.SCOPE_SUBTREE``). ``cache_period`` is the number of seconds
- to cache groups search results; if it is 0, groups search results will
- not be cached.
+ scope=ldap.SCOPE_SUBTREE, cache_period=0, realm=''):
+ """ Configurator method to set the LDAP groups search.
+
+ - **base_dn**: the DN at which to begin the search **[mandatory]**
+ - **filter_tmpl**: a string which can be used as an LDAP filter:
+ it should contain the replacement value ``%(userdn)s`` **[mandatory]**
+ - **scope**: A valid ldap search scope
+ **default**: ``ldap.SCOPE_SUBTREE``
+ - **cache_period**: the number of seconds to cache login search results
+ if 0, results will not be cached
+ **default**: ``0``
+ - **realm**: A connection identifier
+ **default**: ``''``
Example::
@@ -220,19 +308,26 @@ def ldap_set_groups_query(config, base_dn, filter_tmpl,
)
"""
+ query_identif = _registry_identifier('ldap_groups_query', realm)
+ intr_identif = _registry_identifier('pyramid_ldap', realm)
+ act_identif = _activity_identifier('ldap-set-groups-query', realm)
+
query = _LDAPQuery(base_dn, filter_tmpl, scope, cache_period)
+
def register():
- config.registry.ldap_groups_query = query
+ setattr(config.registry, query_identif, query)
+
intr = config.introspectable(
- 'pyramid_ldap groups query',
+ '%s groups query' % intr_identif,
None,
str(query),
- 'pyramid_ldap groups query'
+ '%s groups query' % intr_identif
)
- config.action('ldap-set-groups-query', register, introspectables=(intr,))
+
+ config.action(act_identif, register, introspectables=(intr,))
def ldap_setup(config, uri, bind=None, passwd=None, pool_size=10, retry_max=3,
- retry_delay=.1, use_tls=False, timeout=-1, use_pool=True):
+ retry_delay=.1, use_tls=False, timeout=-1, use_pool=True, realm=''):
""" Configurator method to set up an LDAP connection pool.
- **uri**: ldap server uri **[mandatory]**
@@ -247,7 +342,13 @@ def ldap_setup(config, uri, bind=None, passwd=None, pool_size=10, retry_max=3,
- **timeout**: connector timeout. **default: -1**
- **use_pool**: activates the pool. If False, will recreate a connector
each time. **default: True**
+ - **realm**: A connection identifier
+ **default**: ``''``
"""
+ conn_identif = _registry_identifier('ldap_connector', realm)
+ intr_identif = _registry_identifier('pyramid_ldap', realm)
+ act_identif = _activity_identifier('ldap-setup', realm)
+
vals = dict(
uri=uri, bind=bind, passwd=passwd, size=pool_size,
retry_max=retry_max, retry_delay=retry_delay, use_tls=use_tls,
@@ -258,23 +359,31 @@ def ldap_setup(config, uri, bind=None, passwd=None, pool_size=10, retry_max=3,
def get_connector(request):
registry = request.registry
- return Connector(registry, manager)
+ return Connector(registry, manager, realm)
- config.set_request_property(get_connector, 'ldap_connector', reify=True)
+ config.set_request_property(get_connector, conn_identif, reify=True)
intr = config.introspectable(
- 'pyramid_ldap setup',
+ '%s setup' % intr_identif,
None,
pprint.pformat(vals),
- 'pyramid_ldap setup'
+ '%s setup' % intr_identif,
)
- config.action('ldap-setup', None, introspectables=(intr,))
-
-def get_ldap_connector(request):
- """ Return the LDAP connector attached to the request. If
- :meth:`pyramid.config.Configurator.ldap_setup` was not called, using
- this function will raise an :exc:`pyramid.exceptions.ConfigurationError`."""
- connector = getattr(request, 'ldap_connector', None)
+ config.action(act_identif, None, introspectables=(intr,))
+
+def get_ldap_connector_name(realm=''):
+ """ Return the name of the connector attached to the request
+ for the named **realm**."""
+ return _registry_identifier('ldap_connector', realm)
+
+def get_ldap_connector(request, realm=''):
+ """ Return the LDAP connector attached to the request for the connection
+ identified by the ``realm`` name.
+ If :meth:`pyramid.config.Configurator.ldap_setup` was not called for the
+ named ``realm``, using this function will raise
+ an :exc:`pyramid.exceptions.ConfigurationError`."""
+ conn_name = get_ldap_connector_name(realm)
+ connector = getattr(request, conn_name, None)
if connector is None:
raise ConfigurationError(
'You must call Configurator.ldap_setup during setup '
@@ -287,8 +396,9 @@ def groupfinder(userdn, request):
each group belonging to the user specified by ``userdn`` to as a
principal in the list of results; if the user does not exist, it returns
None."""
- connector = get_ldap_connector(request)
- group_list = connector.user_groups(userdn)
+ parsed = ldapurl.LDAPUrl(userdn)
+ connector = get_ldap_connector(request, realm=parsed.hostport)
+ group_list = connector.user_groups(parsed.dn)
if group_list is None:
return None
group_dns = []
@@ -301,6 +411,13 @@ def _ldap_decode(result):
using the utf-8 encoding """
return _Decoder().decode(result)
+def _ldap_tag_dn_decode(result, realm=''):
+
+ return [('ldap://%s/%s' % (realm, ldapurl.ldapUrlEscape(dn)),
+ attributes)
+ for dn, attributes in _ldap_decode(result)
+ ]
+
class _Decoder(object):
"""
Stolen from django-auth-ldap.
View
84 pyramid_ldap/tests.py
@@ -69,13 +69,13 @@ def _callFUT(self, dn, request):
def test_no_group_list(self):
request = testing.DummyRequest()
request.ldap_connector = DummyLDAPConnector('dn', None)
- result = self._callFUT('dn', request)
+ result = self._callFUT('ldap:///dn', request)
self.assertEqual(result, None)
def test_with_group_list(self):
request = testing.DummyRequest()
request.ldap_connector = DummyLDAPConnector('dn', [('groupdn', None)])
- result = self._callFUT('dn', request)
+ result = self._callFUT('ldap:///dn', request)
self.assertEqual(result, ['groupdn'])
class Test_get_ldap_connector(unittest.TestCase):
@@ -137,6 +137,72 @@ def test_it_defaults(self):
ldap.SCOPE_ONELEVEL)
self.assertEqual(config.registry.ldap_login_query.cache_period, 0)
+class Test_get_ldap_connector_w_realm(unittest.TestCase):
+ named_realm = 'REALM'
+ def _callFUT(self, request):
+ from pyramid_ldap import get_ldap_connector
+ return get_ldap_connector(request, realm=self.named_realm)
+
+ def test_no_connector(self):
+ request = testing.DummyRequest()
+ self.assertRaises(ConfigurationError, self._callFUT, request)
+
+ def test_with_connector(self):
+ request = testing.DummyRequest()
+ setattr(request, 'ldap_connector_%s' % self.named_realm, True)
+ result = self._callFUT(request)
+ self.assertEqual(result, True)
+
+class Test_ldap_setup_w_realm(unittest.TestCase):
+ named_realm = 'REALM'
+ def _callFUT(self, config, uri, **kw):
+ from pyramid_ldap import ldap_setup
+ return ldap_setup(config, uri, realm=self.named_realm, **kw)
+
+ def test_it_defaults(self):
+ from pyramid_ldap import Connector
+ config = DummyConfig()
+ self._callFUT(config, 'ldap://')
+ self.assertEqual(config.prop_name, 'ldap_connector_%s' % self.named_realm)
+ self.assertEqual(config.prop_reify, True)
+ request = testing.DummyRequest()
+ self.assertEqual(config.prop(request).__class__, Connector)
+
+class Test_ldap_set_groups_query_w_realm(unittest.TestCase):
+ named_realm = 'REALM'
+ def _callFUT(self, config, base_dn, filter_tmpl, **kw):
+ from pyramid_ldap import ldap_set_groups_query
+ return ldap_set_groups_query(config, base_dn, filter_tmpl, realm=self.named_realm, **kw)
+
+ def test_it_defaults(self):
+ import ldap
+ config = DummyConfig()
+ self._callFUT(config, 'dn', 'tmpl')
+ qry = getattr(config.registry, 'ldap_groups_query_%s' % self.named_realm)
+ self.assertRaises(AttributeError, lambda: config.registry.ldap_groups_query.base_dn)
+ self.assertEqual(qry.base_dn, 'dn')
+ self.assertEqual(qry.filter_tmpl, 'tmpl')
+ self.assertEqual(qry.scope,
+ ldap.SCOPE_SUBTREE)
+ self.assertEqual(qry.cache_period, 0)
+
+class Test_ldap_set_login_query_w_realm(unittest.TestCase):
+ named_realm = 'REALM'
+ def _callFUT(self, config, base_dn, filter_tmpl, **kw):
+ from pyramid_ldap import ldap_set_login_query
+ return ldap_set_login_query(config, base_dn, filter_tmpl, realm=self.named_realm, **kw)
+
+ def test_it_defaults(self):
+ import ldap
+ config = DummyConfig()
+ self._callFUT(config, 'dn', 'tmpl')
+ qry = getattr(config.registry, 'ldap_login_query_%s' % self.named_realm)
+ self.assertEqual(qry.base_dn, 'dn')
+ self.assertEqual(qry.filter_tmpl, 'tmpl')
+ self.assertEqual(qry.scope,
+ ldap.SCOPE_ONELEVEL)
+ self.assertEqual(qry.cache_period, 0)
+
class TestConnector(unittest.TestCase):
def _makeOne(self, registry, manager):
from pyramid_ldap import Connector
@@ -166,7 +232,7 @@ def test_authenticate_search_returns_one_result(self):
registry = Dummy()
registry.ldap_login_query = DummySearch([('a', 'b')])
inst = self._makeOne(registry, manager)
- self.assertEqual(inst.authenticate(None, None), ('a', 'b'))
+ self.assertEqual(inst.authenticate(None, None), ('ldap:///a', 'b'))
def test_authenticate_search_bind_raises(self):
import ldap
@@ -186,7 +252,7 @@ def test_user_groups_search_returns_result(self):
registry = Dummy()
registry.ldap_groups_query = DummySearch([('a', 'b')])
inst = self._makeOne(registry, manager)
- self.assertEqual(inst.user_groups(None), [('a', 'b')])
+ self.assertEqual(inst.user_groups(None), [('ldap:///a', 'b')])
def test_user_groups_execute_raises(self):
import ldap
@@ -278,9 +344,10 @@ def connection(self, username=None, password=None):
raise e
class DummySearch(object):
- def __init__(self, result, exc=None):
+ def __init__(self, result, exc=None, search_after_bind=False):
self.result = result
self.exc = exc
+ self.search_after_bind = search_after_bind
def execute(self, conn, **kw):
if self.exc is not None:
@@ -296,3 +363,10 @@ def search_s(self, *arg):
self.arg = arg
return self.result
+ def search_ext_s(self, *arg, **kw):
+ import ldap
+ sizelimit = kw.get('sizelimit', 0)
+ self.arg = arg
+ if sizelimit and len(self.result) > sizelimit:
+ raise ldap.SIZELIMIT_EXCEEDED
+ return self.result
Something went wrong with that request. Please try again.