Permalink
Browse files

Merge pull request #7 from f1ori/master

LDAP testing environment and doctests
  • Loading branch information...
xhochy committed Oct 7, 2011
2 parents d9ec572 + b1ebae1 commit a7f9c1c35f34f8f9c79efd6c80ece9e520ce234b
Showing with 4,639 additions and 15 deletions.
  1. +1 −1 .gitignore
  2. +28 −10 README
  3. +56 −4 ldapom.py
  4. +81 −0 openldap/__init__.py
  5. +591 −0 openldap/schema/core.ldif
  6. +610 −0 openldap/schema/core.schema
  7. +200 −0 openldap/schema/cosine.ldif
  8. +2,571 −0 openldap/schema/cosine.schema
  9. +120 −0 openldap/schema/nis.ldif
  10. +237 −0 openldap/schema/nis.schema
  11. +68 −0 openldap/slapd.conf
  12. +53 −0 openldap/testdata.ldif
  13. +23 −0 tests.py
View
@@ -7,4 +7,4 @@ MANIFEST
.settings/
doxygen.log
doxygen/
-
+openldap/ldapdata
View
38 README
@@ -3,41 +3,59 @@ LDAP object mapper
This python module provides a simple LDAP object mapper
+
License
-------
MIT, so do what you want, but leave the copyright notice in the code
+
Github
---------
Fork it!
https://github.com/f1ori/ldapom
+
Usage
-----
Playing around:
>>> import ldapom
- >>> lc = ldapom.LdapConnection(uri='ldap://localhost:1389', base='dc=example,dc=com', login='cn=admin,dc=example,dc=com', password='admin')
- >>> node = lc.get_ldap_node('cn=f1ori,ou=people,dc=example,dc=com')
+ >>> lc = ldapom.LdapConnection(uri='ldap://localhost:1381', base='dc=example,dc=com', login='cn=admin,dc=example,dc=com', password='admin')
+ >>> node = lc.get_ldap_node('cn=jack,dc=example,dc=com')
>>> node # just show
- <LdapNode: cn=f1ori,ou=people,dc=example,dc=com>
- >>> node.givenName # show name
- <LdapAttribute: givenName=Richter>
- >>> node.givenName = 'Meier' # change givenname
+ <LdapNode: cn=jack,dc=example,dc=com>
+ >>> node.sn # show surname
+ <LdapAttribute: sn=O'Niel>
+ >>> node.sn = 'Meier' # change givenname
>>> node.save() # save all changes
>>> node.delete() # delete
Create a new ldap node:
- >>> node2 = lc.new_ldap_node('cn=newuser,ou=people,dc=example,dc=com')
- >>> node2.objectClass = ['class1', 'class2']
- >>> node2.givenName = 'new'
+ >>> node2 = lc.new_ldap_node('cn=newuser,dc=example,dc=com')
+ >>> node2.objectClass = ['person']
+ >>> node2.cn = 'newuser'
+ >>> node2.sn = 'newuser'
>>> node2.save()
+Search for attributes:
+
+ >>> result = lc.search('objectClass=posixAccount')
+ >>> list(result)
+ [<LdapNode: cn=daniel,dc=example,dc=com>, <LdapNode: cn=sam,dc=example,dc=com>]
+
Howto use TLS:
- >>> lc = ldapom.LdapConnection(uri='ldaps://your.server.com', base='dc=example,dc=com', login='cn=admin,dc=example,dc=com', password='admin', cert='/path/to/cert')
+ >>> lc = ldapom.LdapConnection(uri='ldaps://your.server.com', base='dc=example,dc=com', login='cn=admin,dc=example,dc=com', password='admin', cert='/path/to/cert') # doctest: +SKIP
+
+
+Testing
+-------
+
+Make sure, openldap is installed then run
+
+python2 tests.py
View
@@ -262,6 +262,11 @@ def search(self, *args, **kwargs):
def check_if_dn_exists(self, dn):
"""
Search ldap-server for dn and return a boolean
+
+ >>> ldap_connection.check_if_dn_exists('cn=jack,dc=example,dc=com')
+ True
+ >>> ldap_connection.check_if_dn_exists('cn=foobar,dc=example,dc=com')
+ False
"""
try:
res = self.query(base=dn, scope=ldap.SCOPE_BASE)
@@ -275,7 +280,15 @@ def check_if_dn_exists(self, dn):
## @return LdapNode
def get_ldap_node(self, dn):
"""
- Create LdapNode-Object linked to this connection
+ Create lazy LdapNode-Object linked to this connection
+
+ >>> ldap_connection.get_ldap_node('cn=jack,dc=example,dc=com')
+ <LdapNode: cn=jack,dc=example,dc=com>
+ >>> ldap_connection.get_ldap_node('cn=nobody,dc=example,dc=com') # this won't check, if node exists
+ <LdapNode: cn=nobody,dc=example,dc=com>
+ >>> _.cn
+ Traceback (most recent call last):
+ NO_SUCH_OBJECT: {'desc': 'No such object'}
"""
return LdapNode(self, dn)
@@ -290,6 +303,12 @@ def retrieve_ldap_node(self, dn):
instead of lazily.
It will raise ldap.NO_SUCH_OBJECT if the entry cannot be found.
+
+ >>> ldap_connection.retrieve_ldap_node('cn=jack,dc=example,dc=com')
+ <LdapNode: cn=jack,dc=example,dc=com>
+ >>> ldap_connection.retrieve_ldap_node('cn=nobody,dc=example,dc=com')
+ Traceback (most recent call last):
+ NO_SUCH_OBJECT: {'desc': 'No such object'}
"""
node = LdapNode(self, dn)
node.retrieve_attributes()
@@ -300,6 +319,17 @@ def retrieve_ldap_node(self, dn):
def new_ldap_node(self, dn):
"""
Create new LdapNode-Object linked to this connection
+
+ >>> node = ldap_connection.new_ldap_node('cn=newuser,dc=example,dc=com')
+ >>> node.objectClass = ['person']
+ >>> node.sn = 'Daniel'
+ >>> node.cn = 'newuser'
+ >>> node.save() # the object is created not until here!
+ >>> node = ldap_connection.get_ldap_node('cn=newuser,dc=example,dc=com')
+ >>> unicode(node.sn)
+ u'Daniel'
+ >>> unicode(node.cn)
+ u'newuser'
"""
return LdapNode(self, dn, new=True)
@@ -492,16 +522,38 @@ def save(self):
attr.discard_change_list()
def delete(self):
- "delete this object in ldap"
+ """
+ delete this object in ldap
+
+ >>> ldap_connection.check_if_dn_exists('cn=jack,dc=example,dc=com')
+ True
+ >>> node = ldap_connection.get_ldap_node('cn=jack,dc=example,dc=com')
+ >>> node.delete()
+ >>> ldap_connection.check_if_dn_exists('cn=jack,dc=example,dc=com')
+ False
+ """
self._conn.delete(_encode_utf8(self._dn))
self._valid = False
def check_password(self, password):
- "check password for this ldap-object"
+ """
+ check password for this ldap-object
+
+ >>> jack_node.check_password('jack')
+ True
+ >>> jack_node.check_password('wrong_pw')
+ False
+ """
return self._conn.authenticate( _encode_utf8(self._dn), _encode_utf8(password) )
def set_password(self, password):
- "set password for this ldap-object immediately"
+ """
+ set password for this ldap-object immediately
+
+ >>> jack_node.set_password('asdfä')
+ >>> jack_node.check_password('asdfä')
+ True
+ """
# Issue a LDAP Password Modify Extended Operation
self._conn.set_password(_encode_utf8(self._dn), _encode_utf8(password))
View
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+
+import getpass
+import os.path
+import os
+
+import subprocess
+from subprocess import Popen, check_call
+from time import sleep
+
+
+def get_real_path():
+ """
+ Get absolute path of this module
+ """
+ return os.path.realpath(os.path.dirname(__file__))
+
+
+def get_url_path(path=get_real_path()):
+ """
+ make path ready for url parameter
+ """
+ return path.replace('/', '%2F')
+
+
+class LdapServer(object):
+ """
+ Manager for OpenLDAP testing server
+ """
+ def __init__(self, port=1381, tls_port=1382, config_file='slapd.conf', path=get_real_path()):
+ self.server = None
+ self.port = port
+ self.tls_port = tls_port
+ self.config_file = config_file
+ self.path = path
+
+ def __del__(self):
+ """
+ destructor
+ """
+ self.stop()
+
+ def load_data(self, ldif_file='testdata.ldif'):
+ """
+ Reset ldap to ldif_file
+ """
+ check_call(['rm', '-rf', '%s/ldapdata' % self.path])
+ check_call(['mkdir', '-p', '%s/ldapdata' % self.path])
+ check_call(['ldapadd', '-H', 'ldap://localhost:%d' % self.port, '-D', 'cn=admin,dc=example,dc=com', '-w', 'admin', '-x'],
+ stdin = open('%s/%s' % (self.path, ldif_file), "r"),
+ stdout = open("/dev/null", "w"),
+ )
+
+ def start(self, clean=True):
+ """
+ start ldap server
+ """
+ conn_str = 'ldapi://%s%%2Fldapi ldap://127.0.0.1:1381' % get_url_path(self.path)
+ self.server = Popen(['slapd', '-f', self.config_file, '-h', conn_str],
+ cwd = self.path,
+ stdout = subprocess.PIPE,
+ )
+ self.server.stdout.read() # read until end -> slapd went to background
+ if clean:
+ self.load_data()
+
+ def stop(self):
+ """
+ stop ldap server
+ """
+ if self.server:
+ self.server.terminate()
+ self.server = None
+
+ def restart(self):
+ """
+ restart ldapserver without clearing the server
+ """
+ self.stop()
+ self.start(clean=False)
+
Oops, something went wrong.

0 comments on commit a7f9c1c

Please sign in to comment.