Skip to content

Commit

Permalink
Merge pull request #898 from elexpander/dev
Browse files Browse the repository at this point in the history
Support MS Active Directory persistent search
  • Loading branch information
cannatag committed Dec 26, 2020
2 parents b2fea9d + 9627cb7 commit 3775511
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 2 deletions.
132 changes: 130 additions & 2 deletions docs/manual/source/microsoft.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,133 @@ Extended Microsoft operations
Microsoft extended operations are intended for Active Directory::

extend.microsoft
extend.microsoft.dir_sync(sync_base, sync_filter, attributes, cookie, object_security, ancestors_first, public_data_only, incremental_values, max_length, hex_guid)
extend.microsoft.modify_password(user, new_password, old_password=None)
extend.microsoft.dir_sync(
sync_base,
sync_filter,
attributes,
cookie,
object_security,
ancestors_first,
public_data_only,
incremental_values,
max_length,
hex_guid
)
extend.microsoft.modify_password(
user,
new_password,
old_password=None
)
extend.microsoft.persistent_search(
search_base,
search_scope,
attributes,
streaming,
callback
)


Microsoft persistent search is similar to the :doc:`standard persistent search</standard>` but not the same. It does not tell about object
creation or deletion, it simply notifies when an object has been modified and sends all the attributes that were requested. This means it
doesn't tell what changed. The control used is **LDAP_SERVER_NOTIFICATION_OID** (1.2.840.113556.1.4.528).

This search needs the *AsyncStream* strategy to work properly. This strategy sends each received packet to an external thread where it can
be processed as soon as it is received.
As the standard persistent_search, it never sends the *SearchDone* packet, the :doc:`abandon operation</abandon>` may be used to tell AD
to cancel the persistent search.

The persistent_search() method has limited parameters compared a :doc:`standard search</searches>`. However, it accepts some additional parameters specific
to the persistent search::

def persistent_search(self,
search_base='',
search_scope=SUBTREE,
attributes=ALL_ATTRIBUTES,
streaming=True,
callback=None
):

If you don't pass any parameters the search should be globally applied in your LDAP server and all object attributes are returned.

The only permitted search filter is ``(objectclass = *)`` so it has been fixed within the function.

To enable Persistent Searches to get any object modification as they happen (for logging purpose)::

from ldap3 import Server, Connection, ASYNC_STREAM

s = Server('myserver')
c = Connection(s, 'cn=admin,o=resources', 'password', client_strategy=ASYNC_STREAM)

c.stream = open('myfile.log', 'w+')
p = c.extend.microsoft.persistent_search(base, scope, ['objectClass', 'sn'])

now the persistent search is running in an internal thread. Each modification is recorded in the log in LDIF-CHANGE format, with the event type,
event time and the modified dn and changelog number (if available) as comments.

For example an output from my test suite is the following::

# 2020-12-23T15:41:40.578021
dn: cn=dn-1,ou=test,dc=domain,dc=local
objectClass: User
objectClass: organizationalPerson
objectClass: Person
objectClass: Top
sn: dn-1

# 2020-12-23T15:41:40.579555
dn: cn=dn-1,ou=test,dc=domain,dc=local
objectClass: User
objectClass: organizationalPerson
objectClass: Person
objectClass: Top
sn: dn-1

# 2020-12-23T15:41:45.349306
dn: cn=dn-2,ou=test,dc=domain,dc=local
objectClass: User
objectClass: organizationalPerson
objectClass: Person
objectClass: Top
sn: dn-2

There's no sign of what happened to the object. All we know is that there was a modification.

If you want to temporary stop the persistent search you can use ``p.stop()``. Use ``p.start()`` to start it again.


If you call the ``persistent_search()`` method with ``streaming=False`` you can get the modified entries with the ``p.next()`` method.
Each call to ``p.next(block=False, timeout=None)`` returns one event, with the extended control already decoded (as dict values) if
available::

from ldap3 import Server, Connection, ASYNC_STREAM

s = Server('myserver')
c = Connection(s, 'cn=admin,o=resources', 'password', client_strategy=ASYNC_STREAM, auto_bind=True)

p = c.extend.microsoft.persistent_search(streaming=False)
p.start()
while True:
print(p.next(block=True))

When using ``next(block=False)`` or ``next(block=True, timeout=10)`` the method returns `None` if nothing is received from the server.

Alternatively you may use the ``funnel`` method to iterate over the received changes. It is a generator::

for result_entry in p.funnel(block=True):
print(result_entry['dn'])

If you call the ``persistent_search()`` method with ``callback=myfunction`` (where `myfunction` is a callable, including lambda, accepting
a dict as parameter) your function will be called for each event received in the persistent search.
The function will be called in the same thread of the persistent search, so it should not block::

from ldap3 import Server, Connection, ASYNC_STREAM, ALL_ATTRIBUTES

def change_detected(result_entry):
print(result_entry['dn'])
print(result_entry['attributes'])
s = Server('myserver')
c = Connection(s, 'cn=admin,o=resources', 'password', client_strategy=ASYNC_STREAM, auto_bind=True)
p = c.extend.microsoft.persistent_search(base, scope, ALL_ATTRIBUTES, callback=change_detected)


18 changes: 18 additions & 0 deletions ldap3/extend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from .microsoft.unlockAccount import ad_unlock_account
from .microsoft.addMembersToGroups import ad_add_members_to_groups
from .microsoft.removeMembersFromGroups import ad_remove_members_from_groups
from .microsoft.persistentSearch import ADPersistentSearch
from .novell.partition_entry_count import PartitionEntryCount
from .novell.replicaInfo import ReplicaInfo
from .novell.listReplicas import ListReplicas
Expand Down Expand Up @@ -307,6 +308,23 @@ def remove_members_from_groups(self, members, groups, fix=True):
groups_dn=groups,
fix=fix)

def persistent_search(self,
search_base='',
search_scope=SUBTREE,
attributes=ALL_ATTRIBUTES,
streaming=True,
callback=None
):

if callback:
streaming = False
return ADPersistentSearch(self._connection,
search_base,
search_scope,
attributes,
streaming,
callback)


class ExtendedOperationsRoot(ExtendedOperationContainer):
def __init__(self, connection):
Expand Down
117 changes: 117 additions & 0 deletions ldap3/extend/microsoft/persistentSearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
"""

# Created on 2016.07.08
#
# Author: Giovanni Cannata
#
# Copyright 2016 - 2020 Giovanni Cannata
#
# This file is part of ldap3.
#
# ldap3 is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ldap3 is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with ldap3 in the COPYING and COPYING.LESSER files.
# If not, see <http://www.gnu.org/licenses/>.

try:
from queue import Empty
except ImportError: # Python 2
# noinspection PyUnresolvedReferences
from Queue import Empty

from ...core.exceptions import LDAPExtensionError
from ...utils.dn import safe_dn
from ...protocol.microsoft import persistent_search_control


class ADPersistentSearch(object):
def __init__(self,
connection,
search_base,
search_scope,
attributes,
streaming,
callback
):
if connection.strategy.sync:
raise LDAPExtensionError('Persistent Search needs an asynchronous streaming connection')

if connection.check_names and search_base:
search_base = safe_dn(search_base)

self.connection = connection
self.message_id = None
self.base = search_base
self.scope = search_scope
self.attributes = attributes
self.controls = [persistent_search_control()]

# this is the only filter permitted by AD persistent search
self.filter = '(objectClass=*)'

self.connection.strategy.streaming = streaming
if callback and callable(callback):
self.connection.strategy.callback = callback
elif callback:
raise LDAPExtensionError('callback is not callable')

self.start()

def start(self):
if self.message_id: # persistent search already started
return

if not self.connection.bound:
self.connection.bind()

with self.connection.strategy.async_lock:
self.message_id = self.connection.search(search_base=self.base,
search_filter=self.filter,
search_scope=self.scope,
attributes=self.attributes,
controls=self.controls)
self.connection.strategy.persistent_search_message_id = self.message_id

def stop(self, unbind=True):
self.connection.abandon(self.message_id)
if unbind:
self.connection.unbind()
if self.message_id in self.connection.strategy._responses:
del self.connection.strategy._responses[self.message_id]
if hasattr(self.connection.strategy, '_requests') and self.message_id in self.connection.strategy._requests: # asynchronous strategy has a dict of request that could be returned by get_response()
del self.connection.strategy._requests[self.message_id]
self.connection.strategy.persistent_search_message_id = None
self.message_id = None

def next(self, block=False, timeout=None):
if not self.connection.strategy.streaming and not self.connection.strategy.callback:
try:
return self.connection.strategy.events.get(block, timeout)
except Empty:
return None

raise LDAPExtensionError('Persistent search is not accumulating events in queue')

def funnel(self, block=False, timeout=None):
done = False
while not done:
try:
entry = self.connection.strategy.events.get(block, timeout)
except Empty:
yield None
if entry['type'] == 'searchResEntry':
yield entry
else:
done = True

yield entry
3 changes: 3 additions & 0 deletions ldap3/protocol/microsoft.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,6 @@ def security_descriptor_control(criticality=False, sdflags=0x0F):
sdcontrol = SdFlags()
sdcontrol.setComponentByName('Flags', sdflags)
return [build_control('1.2.840.113556.1.4.801', criticality, sdcontrol)]

def persistent_search_control(criticality=False):
return build_control('1.2.840.113556.1.4.528', criticality, value=None)

0 comments on commit 3775511

Please sign in to comment.