/
app.py
487 lines (385 loc) · 15 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
from quart import request
from quart.exceptions import HTTPException
from ldap.modlist import addModlist, modifyModlist
import asyncio, base64, quart, functools, io, ldap, ldif, sys, types
from typing import *
app = quart.Quart( __name__)
app.config.from_object( 'settings')
# Constant to add technical attributes in LDAP search results
WITH_OPERATIONAL_ATTRS = ('*','+')
# HTTP 401 headers
UNAUTHORIZED = { 'WWW-Authenticate': 'Basic realm="Login Required"' }
# Special fields
PHOTO = 'jpegPhoto'
# Special syntaxes
OCTET_STRING = '1.3.6.1.4.1.1466.115.121.1.40'
def authenticated( view: Callable):
''' Require authentication for a view,
set up the LDAP connection
and authenticate against the directory
with a simple_bind
'''
@functools.wraps( view)
async def wrapped_view( **values):
if not request.authorization:
return quart.Response(
'Please log in', 401, UNAUTHORIZED)
try:
# Set up LDAP connection
request.ldap = ldap.initialize( app.config['LDAP_URL'])
# Search user in HTTP headers
try:
dn, _attrs = await unique( request.ldap.search(
app.config['BASE_DN'],
ldap.SCOPE_SUBTREE,
'(%s=%s)' % (app.config['LOGIN_ATTR'], request.authorization.username)))
except ValueError:
raise ldap.INVALID_CREDENTIALS( {
'desc': 'Invalid user',
'info': "User '%s' unknown" % request.authorization.username})
# Try authenticating
await empty( request.ldap.simple_bind(
dn, request.authorization.password))
# On success, call the view function and release connection
data = await view( **values)
request.ldap.unbind_s()
return data
except ldap.INVALID_CREDENTIALS:
return quart.Response(
'Please log in', 401, UNAUTHORIZED)
except ldap.LDAPError as err:
args = err.args[0]
raise HTTPException( 500, args.get( 'info', ''),
args.get( 'desc', ''))
return wrapped_view
def api( view: Callable) -> quart.Response:
''' View decorator for JSON endpoints.
Forces authentication.
'''
@functools.wraps( view)
async def wrapped_view( **values) -> quart.Response:
data = await authenticated( view)( **values)
if type( data) is quart.Response: return data
return quart.jsonify( data)
return wrapped_view
def no_cache( view: types.FunctionType) -> quart.Response:
'View decorator to prevent browser caching. Must precede @api.'
@functools.wraps( view)
async def wrapped_view( **values) -> quart.Response:
resp = await view( **values)
resp.headers[ 'Cache-Control'] = 'no-cache, no-store, must-revalidate'
resp.headers[ 'Pragma'] = 'no-cache'
resp.headers[ 'Expires'] = '0'
return resp
return wrapped_view
async def result( msgid: int,) -> AsyncGenerator[ Tuple[str, Dict[str, List[bytes]]], None]:
'Concurrently gather results'
while True:
r_type, r_data = request.ldap.result( msgid=msgid, all=0, timeout=0)
# Throttle to 100 results / second
if r_type is None: await asyncio.sleep( 0.01)
elif r_data == []: break
else: yield r_data[0]
async def unique( msgid: int) -> Tuple[str, Dict[str, List[bytes]]]:
'Concurrently collect a unique result'
res = None
async for r in result( msgid):
if res is None: res = r
else:
request.ldap.abandon( msgid)
raise ValueError( "Expected unique result")
if res is None:
raise ValueError( "Expected unique result")
return res
async def empty( msgid: int) -> None:
'Concurrently wait for an empty result'
async for r in result( msgid):
request.ldap.abandon( msgid)
raise ValueError( "Unexpected result")
@app.route( '/')
async def index() -> quart.Response:
'Serve the main page'
return await static_file( 'index.html')
@app.route( '/<path:filename>')
async def static_file( filename: str) -> quart.Response:
'Serve static assets'
return await app.send_static_file( filename)
@app.route( '/api/whoami')
@no_cache
@api
async def whoami() -> str:
'DN of the current user'
return request.ldap.whoami_s().replace( 'dn:', '')
@app.route( '/api/tree/<basedn>')
@no_cache
@api
async def tree( basedn: str) -> List[ Dict[ str, Any]]:
'List directory entries'
scope = ldap.SCOPE_ONELEVEL
if basedn == 'base':
scope = ldap.SCOPE_BASE
basedn = app.config['BASE_DN']
return [ { 'dn': dn,
'structuralObjectClass' : attrs['structuralObjectClass'][0].decode(),
'hasSubordinates': b'TRUE' == attrs['hasSubordinates'][0] }
async for dn, attrs in result( request.ldap.search(
basedn, scope, attrlist=WITH_OPERATIONAL_ATTRS)) ]
def _entry( res: Tuple[ str, Any]) -> Dict[ str, Any]:
'Prepare an LDAP entry for transmission'
dn, attrs = res
ocs = set( [ oc.decode() for oc in attrs['objectClass'] ])
must_attrs, _may_attrs = app.schema.attribute_types( ocs)
soc = [ oc.names[0]
for oc in map( lambda o: app.schema.get_obj( ldap.schema.models.ObjectClass, o), ocs)
if oc.kind == 0]
aux = set( app.schema.get_obj( ldap.schema.models.ObjectClass, a).names[0]
for a in app.schema.get_applicable_aux_classes( soc[0]))
# Filter out binary attributes
binary = set()
for attr in attrs.keys():
obj = app.schema.get_obj( ldap.schema.models.AttributeType, attr)
# Octet strings are not used consistently.
# Try to decode as text and treat as binary on failure
if not obj.syntax or obj.syntax == OCTET_STRING:
try:
for val in attrs[attr]: val.decode()
except UnicodeError:
binary.add( attr)
else: # Check human-readable flag in schema
syntax = app.schema.get_obj( ldap.schema.models.LDAPSyntax, obj.syntax)
if syntax.not_human_readable: binary.add( attr)
return {
'attrs': { k: [ base64.b64encode( val).decode()
if k in binary else val.decode()
for val in values ]
for k, values in attrs.items() },
'meta': {
'dn': dn,
'required': [ app.schema.get_obj( ldap.schema.models.AttributeType, a).names[0]
for a in must_attrs],
'aux': sorted( aux - ocs),
'binary': sorted( binary),
}
}
@app.route( '/api/entry/<path:dn>', methods=('GET', 'POST', 'DELETE', 'PUT'))
@no_cache
@api
async def entry( dn: str) -> Optional[dict]:
'Edit directory entries'
if request.is_json:
json = await request.get_json()
# Copy JSON payload into a dictionary of non-empty byte strings
req = { k: [ s.encode() for s in filter( None, v) ]
for k,v in json.items()
if k != PHOTO}
if request.method == 'GET':
try:
return _entry( await unique( request.ldap.search( dn, ldap.SCOPE_BASE)))
except ValueError:
return None
elif request.method == 'POST':
# Get previous values from directory
res = await unique( request.ldap.search( dn, ldap.SCOPE_BASE))
mods = { k: v for k, v in res[1].items() if k in req }
modlist = modifyModlist( mods, req)
if modlist: # Apply changes and send changed keys back
await empty( request.ldap.modify( dn, modlist))
return { 'changed' : sorted( set( m[1] for m in modlist)) }
elif request.method == 'PUT':
# Create new object
modlist = addModlist( req)
if modlist:
await empty( request.ldap.add( dn, modlist))
return { 'changed' : ['dn'] } # Dummy
elif request.method == 'DELETE':
await empty( request.ldap.delete( dn))
return None # for mypy
@app.route( '/api/blob/<attr>/<int:index>/<path:dn>', methods=( 'GET', 'DELETE', 'PUT'))
@no_cache
@api
async def blob( attr: str, index: int, dn: str):
try:
_dn, attrs = await unique( request.ldap.search( dn, ldap.SCOPE_BASE))
except ValueError:
quart.abort( 404)
if request.method == 'GET':
if attr not in attrs or len( attrs[attr]) <= index:
quart.abort( 404)
resp = quart.Response( attrs[attr][index],
content_type='application/octet-stream')
resp.headers['Content-Disposition'] = \
'attachment; filename="%s-%d.bin"' % (attr, index)
return resp
elif request.method == 'PUT':
data = [(await request.files)['blob'].read()]
if attr in attrs:
await empty(
request.ldap.modify( dn, [(1, attr, None), (0, attr, data + attrs[attr])]))
else: await empty( request.ldap.modify( dn, [(0, attr, data)]))
elif request.method == 'DELETE':
if attr not in attrs or len( attrs[attr]) <= index:
quart.abort( 404)
await empty( request.ldap.modify( dn, [(1, attr, None)]))
data = attrs[attr][:index] + attrs[attr][index + 1:]
if data: await empty( request.ldap.modify( dn, [(0, attr, data)]))
return { 'changed' : [attr] } # dummy
@app.route( '/api/ldif/<path:dn>')
@no_cache
@authenticated
async def ldifDump( dn: str) -> quart.Response:
'Dump an entry as LDIF'
out = io.StringIO()
writer = ldif.LDIFWriter( out)
async for dn, attrs in result( request.ldap.search( dn, ldap.SCOPE_SUBTREE)):
writer.unparse( dn, attrs)
resp = quart.Response( out.getvalue(), content_type='text/plain')
resp.headers['Content-Disposition'] = \
'attachment; filename="%s.ldif"' % dn.split(',')[0].split('=')[1]
return resp
class LDIFReader( ldif.LDIFParser):
def __init__( self, input, con):
ldif.LDIFParser.__init__( self, io.BytesIO( input))
self.count = 0
self.con = con
def handle( self, dn, entry):
self.con.add_s( dn, addModlist( entry))
self.count += 1
@app.route( '/api/ldif', methods=('POST',))
@no_cache
@api
async def ldifUpload() -> quart.Response:
'Import LDIF'
reader = LDIFReader( await request.data, request.ldap)
reader.parse()
return reader.count
@app.route( '/api/rename/<newrdn>/<path:dn>')
@no_cache
@api
async def rename( dn: str, newrdn: str) -> None:
'Rename an entry'
await empty( request.ldap.rename( dn, newrdn, delold=0))
def _ename( entry: dict) -> Optional[str]:
'Try to extract a CN'
return entry['cn'][0].decode() if entry['cn'] else None
@app.route( '/api/entry/password/<path:dn>', methods=('POST',))
@no_cache
@api
async def passwd( dn: str) -> Optional[bool]:
'Edit directory entries'
if request.is_json:
args = await request.get_json()
if 'check' in args:
try:
con = ldap.initialize( app.config['LDAP_URL'])
con.simple_bind_s( dn, args['check'])
con.unbind_s()
return True
except ldap.INVALID_CREDENTIALS:
return False
elif 'new1' in args:
await empty( request.ldap.passwd( dn, args['old'], args['new1']))
return None # mypy
@app.route( '/api/search/<path:query>')
@no_cache
@api
async def search( query: str) -> List[ dict]:
'Search the directory'
q = query
patterns = app.config['SEARCH_PATTERNS']
# Search for an attribute prefix
if '=' in query:
attr, q = query.split( '=', 1)
patterns = ['(%s=%%s*)' % attr]
# Build query
res : List[dict] = []
if len(q) < app.config['SEARCH_QUERY_MIN']: return res
query = '(|%s)' % ''.join( pattern % q for pattern in patterns)
# Collect results
async for dn, attrs in result( request.ldap.search(
app.config['BASE_DN'], ldap.SCOPE_SUBTREE, query)):
res.append ( { 'dn': dn, 'name': _ename( attrs) or dn })
if len( res) == app.config['SEARCH_MAX']: break
return res
### LDAP Schema ###
app.schema = None
def _schema( schema_class):
'Get all objects from the schema for type'
for oid in app.schema.listall( schema_class):
obj = app.schema.get_obj( schema_class, oid)
if schema_class is ldap.schema.models.LDAPSyntax or not obj.obsolete:
yield obj
def _el( obj) -> dict:
'Basic information about an schema element'
name = obj.names[0]
return {
'oid' : obj.oid,
'name' : name[:1].lower() + name[1:],
'names' : obj.names,
'desc' : obj.desc,
'obsolete' : bool( obj.obsolete),
'sup' : sorted( obj.sup),
}
# Object class constants
SCHEMA_OC_KIND = {
0: 'structural',
1: 'abstract',
2: 'auxiliary',
}
def _oc( obj) -> dict:
'Additional information about an object class'
r = _el( obj)
r.update({
'may' : sorted( obj.may),
'must' : sorted( obj.must),
'kind' : SCHEMA_OC_KIND[ obj.kind]
})
return r
# Attribute usage constants
SCHEMA_ATTR_USAGE = {
0: 'userApplications',
1: 'directoryOperation',
2: 'distributedOperation',
3: 'dSAOperation',
}
def _at( obj) -> dict:
'Additional information about an object class'
r = _el( obj)
r.update({
'single_value' : bool( obj.single_value),
'no_user_mod' : bool( obj.no_user_mod),
'equality' : obj.equality,
'syntax' : obj.syntax,
'substr' : obj.substr,
'ordering' : obj.ordering,
'usage' : SCHEMA_ATTR_USAGE[ obj.usage],
})
return r
def _syntax( obj) -> dict:
'Additional information about an attribute syntax'
return {
'oid' : obj.oid,
'desc' : obj.desc,
'not_human_readable' : bool( obj.not_human_readable),
}
def _dict( key: str, items) -> dict:
'Create an dictionary with a given key'
return { obj[key].lower() : obj for obj in items }
@app.route( '/api/schema')
@no_cache
@api
async def schema() -> dict:
'Dump the schema'
# Load schema into the app
if app.schema is None:
# See: https://hub.packtpub.com/python-ldap-applications-part-4-ldap-schema/
_dn, subschema_entry = await unique(
request.ldap.search( app.config['SCHEMA_DN'],
ldap.SCOPE_BASE, attrlist=WITH_OPERATIONAL_ATTRS))
# See: https://www.python-ldap.org/en/latest/reference/ldap-schema.html
app.schema = ldap.schema.SubSchema( subschema_entry, check_uniqueness=2)
return dict( attributes = _dict( 'name', map( _at,
_schema( ldap.schema.models.AttributeType))),
objectClasses = _dict( 'name', map( _oc,
_schema( ldap.schema.models.ObjectClass))),
syntaxes = _dict( 'oid', map( _syntax,
_schema( ldap.schema.models.LDAPSyntax))))