-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathsession.py
122 lines (108 loc) · 4.44 KB
/
session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# -*- coding: utf-8 -*-
from node.ext.ldap import BASE
from node.ext.ldap import LDAPCommunicator
from node.ext.ldap import LDAPConnector
from node.ext.ldap import testLDAPConnectivity
import ldap
class LDAPSession(object):
"""LDAP Session binds always.
"""
def __init__(self, props):
self._props = props
connector = LDAPConnector(props=props)
self._communicator = LDAPCommunicator(connector)
def checkServerProperties(self):
"""Test if connection can be established.
"""
res = testLDAPConnectivity(props=self._props)
if res == 'success':
return (True, 'OK')
else:
return (False, res)
@property
def baseDN(self):
baseDN = self._communicator.baseDN
return baseDN
@baseDN.setter
def baseDN(self, baseDN):
self._communicator.baseDN = baseDN
def search(self, queryFilter='(objectClass=*)', scope=BASE, baseDN=None,
force_reload=False, attrlist=None, attrsonly=0,
page_size=None, cookie=None):
if not queryFilter:
# It makes no sense to really pass these to LDAP, therefore, we
# interpret them as "don't filter" which in LDAP terms is
# '(objectClass=*)'
queryFilter = '(objectClass=*)'
res = self._communicator.search(
queryFilter,
scope,
baseDN,
force_reload,
attrlist,
attrsonly,
page_size,
cookie
)
if page_size:
res, cookie = res
# ActiveDirectory returns entries with dn None, which can be ignored
res = [x for x in res if x[0] is not None]
if page_size:
return res, cookie
return res
def add(self, dn, data):
self._communicator.add(dn, data)
def authenticate(self, dn, pw):
"""Verify credentials, but don't rebind the session to that user
"""
# Let's bypass connector/communicator until they are sorted out
if self._props.ignore_cert: # pragma: no cover
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
elif self._props.tls_cacertfile: # pragma: no cover
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, self._props.tls_cacertfile)
elif self._props.tls_cacertdir: # pragma: no cover
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, self._props.tls_cacertdir)
if self._props.tls_clcertfile and self._props.tls_clkeyfile: # pragma: no cover
ldap.set_option(ldap.OPT_X_TLS_CERTFILE, self._props.tls_clcertfile)
ldap.set_option(ldap.OPT_X_TLS_KEYFILE, self._props.tls_clkeyfile)
elif self._props.tls_clcertfile or self._props.tls_clkeyfile: # pragma: no cover
logger.exception("Only client certificate or key have been provided.")
con = ldap.initialize(
self._props.uri,
bytes_mode=False,
bytes_strictness='silent'
)
# Turning referrals off since they cause problems with MS Active
# Directory More info: https://www.python-ldap.org/faq.html#usage
con.set_option(ldap.OPT_REFERRALS, 0)
try:
if self._props.start_tls:
con.start_tls_s()
con.simple_bind_s(dn, pw)
except (ldap.INVALID_CREDENTIALS, ldap.UNWILLING_TO_PERFORM):
# The UNWILLING_TO_PERFORM event might be thrown, if you query a
# local user named ``admin``, but the LDAP server is configured to
# deny such queries. Instead of raising an exception, just ignore
# this.
return False
else:
return True
def modify(self, dn, data, replace=False):
"""Modify an existing entry in the directory.
:param dn: Modification DN
:param data: Either list of 3 tuples (look at
``node.ext.ldap.base.LDAPCommunicator.modify`` for details), or a
dictionary representing the entry or parts of the entry.
XXX: dicts not yet
:param replace: If set to True, replace entry at DN entirely with data.
"""
result = self._communicator.modify(dn, data)
return result
def delete(self, dn):
self._communicator.delete(dn)
def passwd(self, userdn, oldpw, newpw):
result = self._communicator.passwd(userdn, oldpw, newpw)
return result
def unbind(self):
self._communicator.unbind()