-
Notifications
You must be signed in to change notification settings - Fork 2k
/
test_admin.py
345 lines (268 loc) · 13.6 KB
/
test_admin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
import ckan.model as model
from ckan.tests import url_for, CreateTestData, WsgiAppCase
class TestAdminController(WsgiAppCase):
@classmethod
def setup_class(cls):
# setup test data including testsysadmin user
CreateTestData.create()
@classmethod
def teardown_class(self):
model.repo.rebuild_db()
#test that only sysadmins can access the /ckan-admin page
def test_index(self):
url = url_for('ckanadmin', action='index')
# redirect as not authorized
response = self.app.get(url, status=[302])
# random username
response = self.app.get(url, status=[401],
extra_environ={'REMOTE_USER': 'my-random-user-name'})
# now test real access
username = u'testsysadmin'.encode('utf8')
response = self.app.get(url,
extra_environ={'REMOTE_USER': username})
assert 'Administration' in response, response
class TestAdminAuthzController(WsgiAppCase):
@classmethod
def setup_class(cls):
# setup test data including testsysadmin user
CreateTestData.create()
# Creating a couple of authorization groups, which are enough to break
# some things just by their existence
for ag_name in [u'anauthzgroup', u'anotherauthzgroup']:
ag=model.AuthorizationGroup.by_name(ag_name)
if not ag: #may already exist, if not create
ag=model.AuthorizationGroup(name=ag_name)
model.Session.add(ag)
model.Session.commit()
#they are especially dangerous if they have a role on the System
ag = model.AuthorizationGroup.by_name(u'anauthzgroup')
model.add_authorization_group_to_role(ag, u'editor', model.System())
model.Session.commit()
@classmethod
def teardown_class(self):
model.repo.rebuild_db()
def test_role_table(self):
#logged in as testsysadmin for all actions
as_testsysadmin = {'REMOTE_USER': 'testsysadmin'}
def get_system_user_roles():
sys_query=model.Session.query(model.SystemRole)
return sorted([(x.user.name,x.role) for x in sys_query.all() if x.user])
def get_system_authzgroup_roles():
sys_query=model.Session.query(model.SystemRole)
return sorted([(x.authorized_group.name,x.role) for x in sys_query.all() if x.authorized_group])
def get_response():
response = self.app.get(
url_for('ckanadmin', action='authz'),
extra_environ=as_testsysadmin)
assert 'Administration - Authorization' in response, response
return response
def get_user_form():
response = get_response()
return response.forms['theform']
def get_authzgroup_form():
response = get_response()
return response.forms['authzgroup_form']
def check_and_set_checkbox(theform, user, role, should_be, set_to):
user_role_string = '%s$%s' % (user, role)
checkboxes = [x for x in theform.fields[user_role_string] \
if x.__class__.__name__ == 'Checkbox']
assert(len(checkboxes)==1), \
"there should only be one checkbox for %s/%s" % (user, role)
checkbox = checkboxes[0]
#checkbox should be unticked
assert checkbox.checked==should_be, \
"%s/%s checkbox in unexpected state" % (user, role)
#tick or untick the box and submit the form
checkbox.checked=set_to
return theform
def submit(form):
return form.submit('save', extra_environ=as_testsysadmin)
def authz_submit(form):
return form.submit('authz_save', extra_environ=as_testsysadmin)
# get and store the starting state of the system roles
original_user_roles = get_system_user_roles()
original_authzgroup_roles = get_system_authzgroup_roles()
# also keep a copy that we can update as the tests go on
expected_user_roles = get_system_user_roles()
expected_authzgroup_roles = get_system_authzgroup_roles()
# before we start changing things, check that the roles on the system are as expected
assert original_user_roles == \
[(u'logged_in', u'editor'), (u'testsysadmin', u'admin'), (u'visitor', u'anon_editor')] , \
"original user roles not as expected " + str(original_user_roles)
assert original_authzgroup_roles == [(u'anauthzgroup', u'editor')], \
"original authzgroup roles not as expected" + str(original_authzgroup_roles)
# visitor is not an admin. check that his admin box is unticked, tick it, and submit
submit(check_and_set_checkbox(get_user_form(), u'visitor', u'admin', False, True))
# update expected state to reflect the change we should just have made
expected_user_roles.append((u'visitor', u'admin'))
expected_user_roles.sort()
# and check that's the state in the database now
assert get_system_user_roles() == expected_user_roles
assert get_system_authzgroup_roles() == expected_authzgroup_roles
# try again, this time we expect the box to be ticked already
submit(check_and_set_checkbox(get_user_form(), u'visitor', u'admin', True, True))
# performing the action twice shouldn't have changed anything
assert get_system_user_roles() == expected_user_roles
assert get_system_authzgroup_roles() == expected_authzgroup_roles
# now let's make the authzgroup which already has a system role an admin
authz_submit(check_and_set_checkbox(get_authzgroup_form(), u'anauthzgroup', u'admin', False, True))
# update expected state to reflect the change we should just have made
expected_authzgroup_roles.append((u'anauthzgroup', u'admin'))
expected_authzgroup_roles.sort()
# check that's happened
assert get_system_user_roles() == expected_user_roles
assert get_system_authzgroup_roles() == expected_authzgroup_roles
# put it back how it was
submit(check_and_set_checkbox(get_user_form(), u'visitor', u'admin', True, False))
authz_submit(check_and_set_checkbox(get_authzgroup_form(), u'anauthzgroup', u'admin', True, False))
# should be back to our starting state
assert original_user_roles == get_system_user_roles()
assert original_authzgroup_roles == get_system_authzgroup_roles()
# now test making multiple changes
# change lots of things
form = get_user_form()
check_and_set_checkbox(form, u'visitor', u'editor', False, True)
check_and_set_checkbox(form, u'visitor', u'reader', False, False)
check_and_set_checkbox(form, u'logged_in', u'editor', True, False)
check_and_set_checkbox(form, u'logged_in', u'reader', False, True)
submit(form)
roles=get_system_user_roles()
# and assert that they've actually changed
assert (u'visitor', u'editor') in roles and \
(u'logged_in', u'editor') not in roles and \
(u'logged_in', u'reader') in roles and \
(u'visitor', u'reader') not in roles, \
"visitor and logged_in roles seem not to have reversed"
def get_roles_by_name(user=None, group=None):
if user:
return [y for (x,y) in get_system_user_roles() if x==user]
elif group:
return [y for (x,y) in get_system_authzgroup_roles() if x==group]
else:
assert False, 'miscalled'
# now we test the box for giving roles to an arbitrary user
# check that tester doesn't have a system role
assert len(get_roles_by_name(user=u'tester'))==0, \
"tester should not have roles"
# get the put tester in the username box
form = get_response().forms['addform']
form.fields['new_user_name'][0].value='tester'
# get the admin checkbox
checkbox = [x for x in form.fields['admin'] \
if x.__class__.__name__ == 'Checkbox'][0]
# check it's currently unticked
assert checkbox.checked == False
# tick it and submit
checkbox.checked=True
response = form.submit('add', extra_environ=as_testsysadmin)
assert "User Added" in response, "don't see flash message"
assert get_roles_by_name(user=u'tester') == ['admin'], \
"tester should be an admin now"
# and similarly for an arbitrary authz group
assert get_roles_by_name(group=u'anotherauthzgroup') == [], \
"should not have roles"
form = get_response().forms['authzgroup_addform']
form.fields['new_user_name'][0].value='anotherauthzgroup'
checkbox = [x for x in form.fields['reader'] \
if x.__class__.__name__ == 'Checkbox'][0]
assert checkbox.checked == False
checkbox.checked=True
response = form.submit('authz_add', extra_environ=as_testsysadmin)
assert "Authorization Group Added" in response, "don't see flash message"
assert get_roles_by_name(group=u'anotherauthzgroup') == [u'reader'], \
"should be a reader now"
class TestAdminTrashController(WsgiAppCase):
def setup(cls):
CreateTestData.create()
def teardown(self):
model.repo.rebuild_db()
def test_purge_revision(self):
as_testsysadmin = {'REMOTE_USER': 'testsysadmin'}
# Put a revision in deleted state
rev = model.repo.youngest_revision()
revid = rev.id
rev.state = model.State.DELETED
model.Session.commit()
# check it shows up on trash page and
url = url_for('ckanadmin', action='trash')
response = self.app.get(url, extra_environ=as_testsysadmin)
assert revid in response, response
# check it can be successfully purged
form = response.forms['form-purge-revisions']
res = form.submit('purge-revisions', status=[302], extra_environ=as_testsysadmin)
res = res.follow(extra_environ=as_testsysadmin)
assert not revid in res, res
rev = model.Session.query(model.Revision).filter_by(id=revid).first()
assert rev is None, rev
def test_purge_package(self):
as_testsysadmin = {'REMOTE_USER': 'testsysadmin'}
# Put packages in deleted state
rev = model.repo.new_revision()
pkg = model.Package.by_name(u'warandpeace')
pkg.state = model.State.DELETED
model.repo.commit_and_remove()
# Check shows up on trash page
url = url_for('ckanadmin', action='trash')
response = self.app.get(url, extra_environ=as_testsysadmin)
assert 'dataset/warandpeace' in response, response
# Check we get correct error message on attempted purge
form = response.forms['form-purge-packages']
response = form.submit('purge-packages', status=[302],
extra_environ=as_testsysadmin)
response = response.follow(extra_environ=as_testsysadmin)
assert 'Cannot purge package' in response, response
assert 'dataset/warandpeace' in response
# now check we really can purge when things are ok
model.repo.new_revision()
pkg = model.Package.by_name(u'annakarenina')
pkg.state = model.State.DELETED
model.repo.commit_and_remove()
response = self.app.get(url, extra_environ=as_testsysadmin)
assert 'dataset/warandpeace' in response, response
assert 'dataset/annakarenina' in response, response
form = response.forms['form-purge-packages']
res = form.submit('purge-packages', status=[302], extra_environ=as_testsysadmin)
res = res.follow(extra_environ=as_testsysadmin)
pkgs = model.Session.query(model.Package).all()
assert len(pkgs) == 0
def test_purge_youngest_revision(self):
as_testsysadmin = {'REMOTE_USER': 'testsysadmin'}
id = u'warandpeace'
log_message = 'test_1234'
edit_url = url_for(controller='package', action='edit', id=id)
# Manually create a revision
res = self.app.get(edit_url)
fv = res.forms['dataset-edit']
fv['title'] = 'RevisedTitle'
fv['log_message'] = log_message
res = fv.submit('save')
# Delete that revision
rev = model.repo.youngest_revision()
assert rev.message == log_message
rev.state = model.State.DELETED
model.Session.commit()
# Run a purge
url = url_for('ckanadmin', action='trash')
res = self.app.get(url, extra_environ=as_testsysadmin)
form = res.forms['form-purge-revisions']
res = form.submit('purge-revisions', status=[302], extra_environ=as_testsysadmin)
res = res.follow(extra_environ=as_testsysadmin)
# Verify the edit page can be loaded (ie. does not 404)
res = self.app.get(edit_url)
def test_undelete(self):
as_testsysadmin = {'REMOTE_USER': 'testsysadmin'}
rev = model.repo.youngest_revision()
rev_id = rev.id
rev.state = model.State.DELETED
model.Session.commit()
# Click undelete
url = url_for('ckanadmin', action='trash')
res = self.app.get(url, extra_environ=as_testsysadmin)
form = res.forms['undelete-'+rev.id]
res = form.submit('submit', status=[302], extra_environ=as_testsysadmin)
res = res.follow(extra_environ=as_testsysadmin)
assert 'Revision updated' in res
assert not 'DELETED' in res
rev = model.repo.youngest_revision()
assert rev.id == rev_id
assert rev.state == model.State.ACTIVE