-
Notifications
You must be signed in to change notification settings - Fork 81
/
acceptance_test.py
651 lines (539 loc) · 23.9 KB
/
acceptance_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
# --- BEGIN COPYRIGHT BLOCK ---
# Copyright (C) 2017 Red Hat, Inc.
# All rights reserved.
#
# License: GPL (version 3 or any later version).
# See LICENSE for details.
# --- END COPYRIGHT BLOCK ---
#
import pytest
import logging
from lib389.replica import Replicas
from lib389.tasks import *
from lib389.utils import *
from lib389.topologies import topology_m4 as topo_m4
from lib389.topologies import topology_m2 as topo_m2
from . import get_repl_entries
from lib389.idm.user import UserAccount
from lib389.replica import ReplicationManager
from lib389._constants import *
pytestmark = pytest.mark.tier0
TEST_ENTRY_NAME = 'mmrepl_test'
TEST_ENTRY_DN = 'uid={},{}'.format(TEST_ENTRY_NAME, DEFAULT_SUFFIX)
NEW_SUFFIX_NAME = 'test_repl'
NEW_SUFFIX = 'o={}'.format(NEW_SUFFIX_NAME)
NEW_BACKEND = 'repl_base'
DEBUGGING = os.getenv("DEBUGGING", default=False)
if DEBUGGING:
logging.getLogger(__name__).setLevel(logging.DEBUG)
else:
logging.getLogger(__name__).setLevel(logging.INFO)
log = logging.getLogger(__name__)
@pytest.fixture(scope="function")
def create_entry(topo_m4, request):
"""Add test entry to master1"""
log.info('Adding entry {}'.format(TEST_ENTRY_DN))
test_user = UserAccount(topo_m4.ms["master1"], TEST_ENTRY_DN)
if test_user.exists():
log.info('Deleting entry {}'.format(TEST_ENTRY_DN))
test_user.delete()
test_user.create(properties={
'uid': TEST_ENTRY_NAME,
'cn': TEST_ENTRY_NAME,
'sn': TEST_ENTRY_NAME,
'userPassword': TEST_ENTRY_NAME,
'uidNumber' : '1000',
'gidNumber' : '2000',
'homeDirectory' : '/home/mmrepl_test',
})
@pytest.fixture(scope="function")
def new_suffix(topo_m4, request):
"""Add a new suffix and enable a replication on it"""
for num in range(1, 5):
log.info('Adding suffix:{} and backend: {} to master{}'.format(NEW_SUFFIX, NEW_BACKEND, num))
topo_m4.ms["master{}".format(num)].backend.create(NEW_SUFFIX, {BACKEND_NAME: NEW_BACKEND})
topo_m4.ms["master{}".format(num)].mappingtree.create(NEW_SUFFIX, NEW_BACKEND)
try:
topo_m4.ms["master{}".format(num)].add_s(Entry((NEW_SUFFIX, {
'objectclass': 'top',
'objectclass': 'organization',
'o': NEW_SUFFIX_NAME,
'description': NEW_SUFFIX_NAME
})))
except ldap.LDAPError as e:
log.error('Failed to add suffix ({}): error ({})'.format(NEW_SUFFIX, e.message['desc']))
raise
def fin():
for num in range(1, 5):
log.info('Deleting suffix:{} and backend: {} from master{}'.format(NEW_SUFFIX, NEW_BACKEND, num))
topo_m4.ms["master{}".format(num)].mappingtree.delete(NEW_SUFFIX)
topo_m4.ms["master{}".format(num)].backend.delete(NEW_SUFFIX)
request.addfinalizer(fin)
def test_add_entry(topo_m4, create_entry):
"""Check that entries are replicated after add operation
:id: 024250f1-5f7e-4f3b-a9f5-27741e6fd405
:setup: Four masters replication setup, an entry
:steps:
1. Check entry on all other masters
:expectedresults:
1. The entry should be replicated to all masters
"""
entries = get_repl_entries(topo_m4, TEST_ENTRY_NAME, ["uid"])
assert all(entries), "Entry {} wasn't replicated successfully".format(TEST_ENTRY_DN)
def test_modify_entry(topo_m4, create_entry):
"""Check that entries are replicated after modify operation
:id: 36764053-622c-43c2-a132-d7a3ab7d9aaa
:setup: Four masters replication setup, an entry
:steps:
1. Modify the entry on master1 - add attribute
2. Wait for replication to happen
3. Check entry on all other masters
4. Modify the entry on master1 - replace attribute
5. Wait for replication to happen
6. Check entry on all other masters
7. Modify the entry on master1 - delete attribute
8. Wait for replication to happen
9. Check entry on all other masters
:expectedresults:
1. Attribute should be successfully added
2. Some time should pass
3. The change should be present on all masters
4. Attribute should be successfully replaced
5. Some time should pass
6. The change should be present on all masters
4. Attribute should be successfully deleted
8. Some time should pass
9. The change should be present on all masters
"""
log.info('Modifying entry {} - add operation'.format(TEST_ENTRY_DN))
test_user = UserAccount(topo_m4.ms["master1"], TEST_ENTRY_DN)
test_user.add('mail', '{}@redhat.com'.format(TEST_ENTRY_NAME))
time.sleep(1)
all_user = topo_m4.all_get_dsldapobject(TEST_ENTRY_DN, UserAccount)
for u in all_user:
assert "{}@redhat.com".format(TEST_ENTRY_NAME) in u.get_attr_vals_utf8('mail')
log.info('Modifying entry {} - replace operation'.format(TEST_ENTRY_DN))
test_user.replace('mail', '{}@greenhat.com'.format(TEST_ENTRY_NAME))
time.sleep(1)
all_user = topo_m4.all_get_dsldapobject(TEST_ENTRY_DN, UserAccount)
for u in all_user:
assert "{}@greenhat.com".format(TEST_ENTRY_NAME) in u.get_attr_vals_utf8('mail')
log.info('Modifying entry {} - delete operation'.format(TEST_ENTRY_DN))
test_user.remove('mail', '{}@greenhat.com'.format(TEST_ENTRY_NAME))
time.sleep(1)
all_user = topo_m4.all_get_dsldapobject(TEST_ENTRY_DN, UserAccount)
for u in all_user:
assert "{}@greenhat.com".format(TEST_ENTRY_NAME) not in u.get_attr_vals_utf8('mail')
def test_delete_entry(topo_m4, create_entry):
"""Check that entry deletion is replicated after delete operation
:id: 18437262-9d6a-4b98-a47a-6182501ab9bc
:setup: Four masters replication setup, an entry
:steps:
1. Delete the entry from master1
2. Check entry on all other masters
:expectedresults:
1. The entry should be deleted
2. The change should be present on all masters
"""
log.info('Deleting entry {} during the test'.format(TEST_ENTRY_DN))
topo_m4.ms["master1"].delete_s(TEST_ENTRY_DN)
entries = get_repl_entries(topo_m4, TEST_ENTRY_NAME, ["uid"])
assert not entries, "Entry deletion {} wasn't replicated successfully".format(TEST_ENTRY_DN)
@pytest.mark.parametrize("delold", [0, 1])
def test_modrdn_entry(topo_m4, create_entry, delold):
"""Check that entries are replicated after modrdn operation
:id: 02558e6d-a745-45ae-8d88-34fe9b16adc9
:parametrized: yes
:setup: Four masters replication setup, an entry
:steps:
1. Make modrdn operation on entry on master1 with both delold 1 and 0
2. Check entry on all other masters
:expectedresults:
1. Modrdn operation should be successful
2. The change should be present on all masters
"""
newrdn_name = 'newrdn'
newrdn_dn = 'uid={},{}'.format(newrdn_name, DEFAULT_SUFFIX)
log.info('Modify entry RDN {}'.format(TEST_ENTRY_DN))
try:
topo_m4.ms["master1"].modrdn_s(TEST_ENTRY_DN, 'uid={}'.format(newrdn_name), delold)
except ldap.LDAPError as e:
log.error('Failed to modrdn entry (%s): error (%s)' % (TEST_ENTRY_DN,
e.message['desc']))
raise e
try:
entries_new = get_repl_entries(topo_m4, newrdn_name, ["uid"])
assert all(entries_new), "Entry {} wasn't replicated successfully".format(newrdn_name)
if delold == 0:
entries_old = get_repl_entries(topo_m4, TEST_ENTRY_NAME, ["uid"])
assert all(entries_old), "Entry with old rdn {} wasn't replicated successfully".format(TEST_ENTRY_DN)
else:
entries_old = get_repl_entries(topo_m4, TEST_ENTRY_NAME, ["uid"])
assert not entries_old, "Entry with old rdn {} wasn't removed in replicas successfully".format(
TEST_ENTRY_DN)
finally:
log.info('Remove entry with new RDN {}'.format(newrdn_dn))
topo_m4.ms["master1"].delete_s(newrdn_dn)
def test_modrdn_after_pause(topo_m4):
"""Check that changes are properly replicated after replica pause
:id: 6271dc9c-a993-4a9e-9c6d-05650cdab282
:setup: Four masters replication setup, an entry
:steps:
1. Pause all replicas
2. Make modrdn operation on entry on master1
3. Resume all replicas
4. Wait for replication to happen
5. Check entry on all other masters
:expectedresults:
1. Replicas should be paused
2. Modrdn operation should be successful
3. Replicas should be resumed
4. Some time should pass
5. The change should be present on all masters
"""
newrdn_name = 'newrdn'
newrdn_dn = 'uid={},{}'.format(newrdn_name, DEFAULT_SUFFIX)
log.info('Adding entry {}'.format(TEST_ENTRY_DN))
try:
topo_m4.ms["master1"].add_s(Entry((TEST_ENTRY_DN, {
'objectclass': 'top person'.split(),
'objectclass': 'organizationalPerson',
'objectclass': 'inetorgperson',
'cn': TEST_ENTRY_NAME,
'sn': TEST_ENTRY_NAME,
'uid': TEST_ENTRY_NAME
})))
except ldap.LDAPError as e:
log.error('Failed to add entry (%s): error (%s)' % (TEST_ENTRY_DN,
e.message['desc']))
raise e
log.info('Pause all replicas')
topo_m4.pause_all_replicas()
log.info('Modify entry RDN {}'.format(TEST_ENTRY_DN))
try:
topo_m4.ms["master1"].modrdn_s(TEST_ENTRY_DN, 'uid={}'.format(newrdn_name))
except ldap.LDAPError as e:
log.error('Failed to modrdn entry (%s): error (%s)' % (TEST_ENTRY_DN,
e.message['desc']))
raise e
log.info('Resume all replicas')
topo_m4.resume_all_replicas()
log.info('Wait for replication to happen')
time.sleep(3)
try:
entries_new = get_repl_entries(topo_m4, newrdn_name, ["uid"])
assert all(entries_new), "Entry {} wasn't replicated successfully".format(newrdn_name)
finally:
log.info('Remove entry with new RDN {}'.format(newrdn_dn))
topo_m4.ms["master1"].delete_s(newrdn_dn)
@pytest.mark.bz842441
def test_modify_stripattrs(topo_m4):
"""Check that we can modify nsds5replicastripattrs
:id: f36abed8-e262-4f35-98aa-71ae55611aaa
:setup: Four masters replication setup
:steps:
1. Modify nsds5replicastripattrs attribute on any agreement
2. Search for the modified attribute
:expectedresults: It should be contain the value
1. nsds5replicastripattrs should be successfully set
2. The modified attribute should be the one we set
"""
m1 = topo_m4.ms["master1"]
agreement = m1.agreement.list(suffix=DEFAULT_SUFFIX)[0].dn
attr_value = b'modifiersname modifytimestamp'
log.info('Modify nsds5replicastripattrs with {}'.format(attr_value))
m1.modify_s(agreement, [(ldap.MOD_REPLACE, 'nsds5replicastripattrs', [attr_value])])
log.info('Check nsds5replicastripattrs for {}'.format(attr_value))
entries = m1.search_s(agreement, ldap.SCOPE_BASE, "objectclass=*", ['nsds5replicastripattrs'])
assert attr_value in entries[0].data['nsds5replicastripattrs']
def test_new_suffix(topo_m4, new_suffix):
"""Check that we can enable replication on a new suffix
:id: d44a9ed4-26b0-4189-b0d0-b2b336ddccbd
:setup: Four masters replication setup, a new suffix
:steps:
1. Enable replication on the new suffix
2. Check if replication works
3. Disable replication on the new suffix
:expectedresults:
1. Replication on the new suffix should be enabled
2. Replication should work
3. Replication on the new suffix should be disabled
"""
m1 = topo_m4.ms["master1"]
m2 = topo_m4.ms["master2"]
repl = ReplicationManager(NEW_SUFFIX)
repl.create_first_master(m1)
repl.join_master(m1, m2)
repl.test_replication(m1, m2)
repl.test_replication(m2, m1)
repl.remove_master(m1)
repl.remove_master(m2)
def test_many_attrs(topo_m4, create_entry):
"""Check a replication with many attributes (add and delete)
:id: d540b358-f67a-43c6-8df5-7c74b3cb7523
:setup: Four masters replication setup, a test entry
:steps:
1. Add 10 new attributes to the entry
2. Delete few attributes: one from the beginning,
two from the middle and one from the end
3. Check that the changes were replicated in the right order
:expectedresults:
1. The attributes should be successfully added
2. Delete operations should be successful
3. The changes should be replicated in the right order
"""
m1 = topo_m4.ms["master1"]
add_list = ensure_list_bytes(map(lambda x: "test{}".format(x), range(10)))
delete_list = ensure_list_bytes(map(lambda x: "test{}".format(x), [0, 4, 7, 9]))
test_user = UserAccount(topo_m4.ms["master1"], TEST_ENTRY_DN)
log.info('Modifying entry {} - 10 add operations'.format(TEST_ENTRY_DN))
for add_name in add_list:
test_user.add('description', add_name)
log.info('Check that everything was properly replicated after an add operation')
entries = get_repl_entries(topo_m4, TEST_ENTRY_NAME, ["description"])
for entry in entries:
assert all(entry.getValues("description")[i] == add_name for i, add_name in enumerate(add_list))
log.info('Modifying entry {} - 4 delete operations for {}'.format(TEST_ENTRY_DN, str(delete_list)))
for delete_name in delete_list:
test_user.remove('description', delete_name)
log.info('Check that everything was properly replicated after a delete operation')
entries = get_repl_entries(topo_m4, TEST_ENTRY_NAME, ["description"])
for entry in entries:
for i, value in enumerate(entry.getValues("description")):
assert value == [name for name in add_list if name not in delete_list][i]
assert value not in delete_list
def test_double_delete(topo_m4, create_entry):
"""Check that double delete of the entry doesn't crash server
:id: 5b85a5af-df29-42c7-b6cb-965ec5aa478e
:feature: Multi master replication
:setup: Four masters replication setup, a test entry
:steps: 1. Delete the entry
2. Delete the entry on the second master
3. Check that server is alive
:expectedresults: Server hasn't crash
"""
log.info('Deleting entry {} from master1'.format(TEST_ENTRY_DN))
topo_m4.ms["master1"].delete_s(TEST_ENTRY_DN)
log.info('Deleting entry {} from master2'.format(TEST_ENTRY_DN))
try:
topo_m4.ms["master2"].delete_s(TEST_ENTRY_DN)
except ldap.NO_SUCH_OBJECT:
log.info("Entry {} wasn't found master2. It is expected.".format(TEST_ENTRY_DN))
log.info('Make searches to check if server is alive')
entries = get_repl_entries(topo_m4, TEST_ENTRY_NAME, ["uid"])
assert not entries, "Entry deletion {} wasn't replicated successfully".format(TEST_ENTRY_DN)
def test_password_repl_error(topo_m4, create_entry):
"""Check that error about userpassword replication is properly logged
:id: d4f12dc0-cd2c-4b92-9b8d-d764a60f0698
:feature: Multi master replication
:setup: Four masters replication setup, a test entry
:steps: 1. Change userpassword on master 1
2. Restart the servers to flush the logs
3. Check the error log for an replication error
:expectedresults: We don't have a replication error in the error log
"""
m1 = topo_m4.ms["master1"]
m2 = topo_m4.ms["master2"]
TEST_ENTRY_NEW_PASS = 'new_{}'.format(TEST_ENTRY_NAME)
log.info('Clean the error log')
m2.deleteErrorLogs()
log.info('Set replication loglevel')
m2.config.loglevel((ErrorLog.REPLICA,))
log.info('Modifying entry {} - change userpassword on master 2'.format(TEST_ENTRY_DN))
test_user_m1 = UserAccount(topo_m4.ms["master1"], TEST_ENTRY_DN)
test_user_m2 = UserAccount(topo_m4.ms["master2"], TEST_ENTRY_DN)
test_user_m3 = UserAccount(topo_m4.ms["master3"], TEST_ENTRY_DN)
test_user_m4 = UserAccount(topo_m4.ms["master4"], TEST_ENTRY_DN)
test_user_m1.set('userpassword', TEST_ENTRY_NEW_PASS)
log.info('Restart the servers to flush the logs')
for num in range(1, 5):
topo_m4.ms["master{}".format(num)].restart(timeout=10)
m1_conn = test_user_m1.bind(TEST_ENTRY_NEW_PASS)
m2_conn = test_user_m2.bind(TEST_ENTRY_NEW_PASS)
m3_conn = test_user_m3.bind(TEST_ENTRY_NEW_PASS)
m4_conn = test_user_m4.bind(TEST_ENTRY_NEW_PASS)
log.info('Check the error log for the error with {}'.format(TEST_ENTRY_DN))
assert not m2.ds_error_log.match('.*can.t add a change for uid={}.*'.format(TEST_ENTRY_NAME))
def test_invalid_agmt(topo_m4):
"""Test adding that an invalid agreement is properly rejected and does not crash the server
:id: 92f10f46-1be1-49ca-9358-784359397bc2
:setup: MMR with four masters
:steps:
1. Add invalid agreement (nsds5ReplicaEnabled set to invalid value)
2. Verify the server is still running
:expectedresults:
1. Invalid repl agreement should be rejected
2. Server should be still running
"""
m1 = topo_m4.ms["master1"]
# Add invalid agreement (nsds5ReplicaEnabled set to invalid value)
AGMT_DN = 'cn=whatever,cn=replica,cn="dc=example,dc=com",cn=mapping tree,cn=config'
try:
invalid_props = {RA_ENABLED: 'True', # Invalid value
RA_SCHEDULE: '0001-2359 0123456'}
m1.agreement.create(suffix=DEFAULT_SUFFIX, host='localhost', port=389, properties=invalid_props)
except ldap.UNWILLING_TO_PERFORM:
m1.log.info('Invalid repl agreement correctly rejected')
except ldap.LDAPError as e:
m1.log.fatal('Got unexpected error adding invalid agreement: ' + str(e))
assert False
else:
m1.log.fatal('Invalid agreement was incorrectly accepted by the server')
assert False
# Verify the server is still running
try:
m1.simple_bind_s(DN_DM, PASSWORD)
except ldap.LDAPError as e:
m1.log.fatal('Failed to bind: ' + str(e))
assert False
def test_warining_for_invalid_replica(topo_m4):
"""Testing logs to indicate the inconsistency when configuration is performed.
:id: dd689d03-69b8-4bf9-a06e-2acd19d5e2c8
:setup: MMR with four masters
:steps:
1. Setup nsds5ReplicaBackoffMin to 20
2. Setup nsds5ReplicaBackoffMax to 10
:expectedresults:
1. nsds5ReplicaBackoffMin should set to 20
2. An error should be generated and also logged in the error logs.
"""
replicas = Replicas(topo_m4.ms["master1"])
replica = replicas.list()[0]
log.info('Set nsds5ReplicaBackoffMin to 20')
replica.set('nsds5ReplicaBackoffMin', '20')
with pytest.raises(ldap.UNWILLING_TO_PERFORM):
log.info('Set nsds5ReplicaBackoffMax to 10')
replica.set('nsds5ReplicaBackoffMax', '10')
log.info('Resetting configuration: nsds5ReplicaBackoffMin')
replica.remove_all('nsds5ReplicaBackoffMin')
log.info('Check the error log for the error')
assert topo_m4.ms["master1"].ds_error_log.match('.*nsds5ReplicaBackoffMax.*10.*invalid.*')
@pytest.mark.ds51082
def test_csnpurge_large_valueset(topo_m2):
"""Test csn generator test
:id: 63e2bdb2-0a8f-4660-9465-7b80a9f72a74
:setup: MMR with 2 masters
:steps:
1. Create a test_user
2. add a large set of values (more than 10)
3. delete all the values (more than 10)
4. configure the replica to purge those values (purgedelay=5s)
5. Waiting for 6 second
6. do a series of update
:expectedresults:
1. Should succeeds
2. Should succeeds
3. Should succeeds
4. Should succeeds
5. Should succeeds
6. Should not crash
"""
m1 = topo_m2.ms["master2"]
test_user = UserAccount(m1, TEST_ENTRY_DN)
if test_user.exists():
log.info('Deleting entry {}'.format(TEST_ENTRY_DN))
test_user.delete()
test_user.create(properties={
'uid': TEST_ENTRY_NAME,
'cn': TEST_ENTRY_NAME,
'sn': TEST_ENTRY_NAME,
'userPassword': TEST_ENTRY_NAME,
'uidNumber' : '1000',
'gidNumber' : '2000',
'homeDirectory' : '/home/mmrepl_test',
})
# create a large value set so that it is sorted
for i in range(1,20):
test_user.add('description', 'value {}'.format(str(i)))
# delete all values of the valueset
for i in range(1,20):
test_user.remove('description', 'value {}'.format(str(i)))
# set purging delay to 5 second and wait more that 5second
replicas = Replicas(m1)
replica = replicas.list()[0]
log.info('nsds5ReplicaPurgeDelay to 5')
replica.set('nsds5ReplicaPurgeDelay', '5')
time.sleep(6)
# add some new values to the valueset containing entries that should be purged
for i in range(21,25):
test_user.add('description', 'value {}'.format(str(i)))
@pytest.mark.ds51244
def test_urp_trigger_substring_search(topo_m2):
"""Test that a ADD of a entry with a '*' in its DN, triggers
an internal search with a escaped DN
:id: 9869bb39-419f-42c3-a44b-c93eb0b77667
:setup: MMR with 2 masters
:steps:
1. enable internal operation loggging for plugins
2. Create on M1 a test_user with a '*' in its DN
3. Check the test_user is replicated
4. Check in access logs that the internal search does not contain '*'
:expectedresults:
1. Should succeeds
2. Should succeeds
3. Should succeeds
4. Should succeeds
"""
m1 = topo_m2.ms["master1"]
m2 = topo_m2.ms["master2"]
# Enable loggging of internal operation logging to capture URP intop
log.info('Set nsslapd-plugin-logging to on')
for inst in (m1, m2):
inst.config.loglevel([AccessLog.DEFAULT, AccessLog.INTERNAL], service='access')
inst.config.set('nsslapd-plugin-logging', 'on')
inst.restart()
# add a user with a DN containing '*'
test_asterisk_uid = 'asterisk_*_in_value'
test_asterisk_dn = 'uid={},{}'.format(test_asterisk_uid, DEFAULT_SUFFIX)
test_user = UserAccount(m1, test_asterisk_dn)
if test_user.exists():
log.info('Deleting entry {}'.format(test_asterisk_dn))
test_user.delete()
test_user.create(properties={
'uid': test_asterisk_uid,
'cn': test_asterisk_uid,
'sn': test_asterisk_uid,
'userPassword': test_asterisk_uid,
'uidNumber' : '1000',
'gidNumber' : '2000',
'homeDirectory' : '/home/asterisk',
})
# check that the ADD was replicated on M2
test_user_m2 = UserAccount(m2, test_asterisk_dn)
for i in range(1,5):
if test_user_m2.exists():
break
else:
log.info('Entry not yet replicated on M2, wait a bit')
time.sleep(2)
# check that M2 access logs does not "(&(objectclass=nstombstone)(nscpentrydn=uid=asterisk_*_in_value,dc=example,dc=com))"
log.info('Check that on M2, URP as not triggered such internal search')
pattern = ".*\(Internal\).*SRCH.*\(&\(objectclass=nstombstone\)\(nscpentrydn=uid=asterisk_\*_in_value,dc=example,dc=com.*"
found = m2.ds_access_log.match(pattern)
log.info("found line: %s" % found)
assert not found
@pytest.mark.skipif(ds_is_older('1.4.4'), reason="Not implemented")
def test_csngen_task(topo_m2):
"""Test csn generator test
:id: b976849f-dbed-447e-91a7-c877d5d71fd0
:setup: MMR with 2 masters
:steps:
1. Create a csngen_test task
2. Check that debug messages "_csngen_gen_tester_main" are in errors logs
:expectedresults:
1. Should succeeds
2. Should succeeds
"""
m1 = topo_m2.ms["master1"]
csngen_task = csngenTestTask(m1)
csngen_task.create(properties={
'ttl': '300'
})
time.sleep(10)
log.info('Check the error log contains strings showing csn generator is tested')
assert m1.searchErrorsLog("_csngen_gen_tester_main")
if __name__ == '__main__':
# Run isolated
# -s for DEBUG mode
CURRENT_FILE = os.path.realpath(__file__)
pytest.main("-s %s" % CURRENT_FILE)