Permalink
Browse files

Update src/baruwa/auth/ad.py

* Adds the ability to authenticate against multiple AD environments, with unique search dn's, security group settings, etc. per mail domain.
* Auth against multiple domains requires the submitted changes to models.py, too.
* Adds the ability to authenticate users when their primaryGroup is configured as a Baruwa User/Admin group
* Added a conditional statement for result.has_key('memberOf') in check_group. It was previously throwing exception errors in the log.
  • Loading branch information...
1 parent b9573e8 commit 2696c633b98958d628abebd980adcf28f84e9d04 @liveaverage liveaverage committed Dec 14, 2012
Showing with 67 additions and 17 deletions.
  1. +67 −17 src/baruwa/auth/ad.py
View
@@ -21,6 +21,7 @@
#
import ldap
+import struct
import logging
from django.contrib.auth.models import User
@@ -29,10 +30,10 @@
from baruwa.utils.misc import get_exc_str
from baruwa.config.models import MailAuthHost
+from baruwa.config.models import MailADAuthHost
from baruwa.accounts.models import UserProfile
from baruwa.accounts.models import UserAddresses
-
logger = logging.getLogger()
fhandle = logging.FileHandler(settings.AD_LOG_FILE)
fhandle.setLevel(logging.DEBUG)
@@ -42,28 +43,35 @@
class ADUser(object):
ldap_connection = None
- AD_SEARCH_FIELDS = settings.AD_SEARCH_FIELDS
+ ad_search_fields = settings.AD_SEARCH_FIELDS
+ ad_ldap_scheme = settings.AD_LDAP_SCHEME
def get_ldap_url(self):
"""return ldap url"""
- return '%s%s:%s' % (settings.AD_LDAP_SCHEME,
+ return '%s%s:%s' % (self.ad_ldap_scheme,
self.ad_host,
self.ad_port)
- def __init__(self, username, host=None, port=None):
+ def __init__(self, username, host=None, port=None, ad_search_dn=None, ad_admin_group=None, ad_user_group=None, ad_auth_domain=None):
"""initialization"""
+
self.username = username
self.uname = username
self.ad_host = host if host else settings.AD_HOST_NAME
self.ad_port = port if port else settings.AD_LDAP_PORT
+ # Added for multi-domain authentication with AD parameters stored in auth_domain
+ self.ad_search_dn = ad_search_dn if ad_search_dn else settings.AD_SEARCH_DN
+ self.ad_admin_group = ad_admin_group if ad_admin_group else settings.AD_ADMIN_GROUP
+ self.ad_user_group = ad_user_group if ad_user_group else settings.AD_USER_GROUP
+
try:
- self.domain = self.username.split('@')[1]
+ self.domain = ad_auth_domain if ad_auth_domain else settings.AD_AUTH_DOMAIN
self.uname = self.username.split('@')[0]
self.username = self.username.split('@')[0]
self.user_bind_name = "%s@%s" % (self.username, self.domain)
except IndexError:
- self.domain = settings.AD_AUTH_DOMAIN
+ self.domain = ad_auth_domain if ad_auth_domain else settings.AD_AUTH_DOMAIN
self.user_bind_name = "%s@%s" % (self.username, self.domain)
self.is_bound = False
@@ -115,6 +123,14 @@ def disconnect(self):
self.ldap_connection.unbind_s()
self.is_bound = False
+ def sid2str(self, sid):
+ srl = ord(sid[0])
+ number_sub_id = ord(sid[1])
+ iav = struct.unpack('!Q','\x00\x00'+sid[2:8])[0]
+ sub_ids = [struct.unpack('<I',sid[8+4*i:12+4*i])[0] for i in range(number_sub_id)]
+
+ return 'S-%d-%d-%s' % (srl, iav, '-'.join([str(s) for s in sub_ids]))
+
def check_group(self, obj, group):
"""Check if user is in AD group"""
found = False
@@ -124,34 +140,56 @@ def check_group(self, obj, group):
res2 = self.ldap_connection.search_ext_s(obj,
ldap.SCOPE_BASE,
"(objectClass=*)",
- self.AD_SEARCH_FIELDS)
+ self.ad_search_fields)
+
if not res2:
return False
assert len(res2) >= 1, "Result should contain at least one element: %s\n" % res2
result = res2[0][1]
+ if result.has_key('primaryGroupID'):
+ pri_grp_rid = result['primaryGroupID'][0]
+ logger.debug("check_group primaryGroupID:%s\n" % pri_grp_rid)
+ domain_sid = self.ldap_connection.search_s(self.ad_search_dn, ldap.SCOPE_BASE)[0][1]['objectSid'][0]
+ logger.debug("check_group domain_sid:%s\nn" % (str(domain_sid)))
+ domain_sid_s = self.sid2str(domain_sid)
+ logger.debug("check_group domain_sid_s:%s\n" % domain_sid_s)
+ obj_sid = domain_sid_s + '-' + pri_grp_rid
+ pri_grp_cn = self.ldap_connection.search_s(self.ad_search_dn, ldap.SCOPE_SUBTREE, "objectSid=%s" % obj_sid, ['cn'])
+ logger.debug("check_group objectSid:%s" % obj_sid)
+ logger.debug("check_group pri_grp_cn:%s" % pri_grp_cn)
+ logger.debug("check_group pri_grp_cn MEM:%s" % pri_grp_cn[0][0])
+ if self.check_group (pri_grp_cn[0][0], group):
+ return True
if result.has_key('sAMAccountName'):
+ logger.debug("check_group sAMAccountName:%s\n" % (str(result['sAMAccountName'])))
if result['sAMAccountName'][0] == group:
return True
- for group2 in result['memberOf']:
- if self.check_group (group2, group):
- return True
+ if result.has_key('memberOf'):
+ logger.debug("check_group memberOf:%s\n" % (str(result['memberOf'])))
+ for group2 in result['memberOf']:
+ if self.check_group (group2, group):
+ return True
+
except Exception, exp:
logger.debug("AD auth backend error by fetching"
" ldap data: %s (%s)\n" % (str(exp), get_exc_str()))
+ logger.debug("Result2 is: %s\n" % (str(res2[0][1])))
+ logger.debug("Full Result is: %s\n" % (str(res2)))
return found
def get_data(self):
"""Get the user data from AD"""
try:
- res = self.ldap_connection.search_ext_s(settings.AD_SEARCH_DN,
+ res = self.ldap_connection.search_ext_s(self.ad_search_dn,
ldap.SCOPE_SUBTREE,
"sAMAccountName=%s" % self.uname,
- self.AD_SEARCH_FIELDS)
+ self.ad_search_fields)
+
if not res:
logger.error("b) AD auth ldap backend error by "
- "searching %s. No result.\n" % settings.AD_SEARCH_DN)
+ "searching %s. No result.\n" % self.ad_search_dn)
return False
assert len(res) >= 1, "c) Result should contain at least one element: %s\n" % res
result = res[0][1]
@@ -181,9 +219,14 @@ def get_data(self):
logger.error("Adding Address: %s\n", mail1)
basedn = res[0][0]
- if self.check_group(basedn, settings.AD_ADMIN_GROUP):
+
+ logger.debug("User basedn: %s\n" % basedn)
+ logger.debug("Get_data self.admin_group: %s\n" % self.ad_admin_group)
+ logger.debug("Get_data self.user_group: %s\n" % self.ad_user_group)
+
+ if self.check_group(basedn, self.ad_admin_group):
self.is_superuser = True
- elif self.check_group(basedn, settings.AD_USER_GROUP):
+ elif self.check_group(basedn, self.ad_user_group):
self.is_superuser = False
else:
logger.error("User %s not in group", self.username)
@@ -235,15 +278,22 @@ def authenticate(self, username=None, password=None):
logger.warning("No AD servers found for %s\n" % domain)
return None
+ adset = None
+
for host in hosts:
# process all hosts
- aduser = ADUser(username, host.address, host.port)
+
+ # Query each host for configured AD settings:
+ adset = MailADAuthHost.objects.get(ad_host=host)
+
+ aduser = ADUser(username, host.address, host.port, adset.ad_search_dn, adset.ad_admin_group, adset.ad_user_group, adset.ad_auth_domain)
if not aduser.connect(password):
logger.warning("AD bind failed for %s\n" % username)
continue
user = None
+
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
@@ -255,7 +305,7 @@ def authenticate(self, username=None, password=None):
if not aduser.get_data():
logger.warning("AD auth backend failed when reading data for"
- " %s. No Group information available.\n" % username)
+ " %s. No Group information available.\nAD_Auth_Domain:%s\tAD_Admin_Group:%s\tAD_User_Group:%s" % (username,adset.ad_auth_domain,adset.ad_admin_group,adset.ad_user_group))
user = None
continue
else:

0 comments on commit 2696c63

Please sign in to comment.