-
Notifications
You must be signed in to change notification settings - Fork 2k
/
admin.py
364 lines (314 loc) · 15.9 KB
/
admin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
from pylons import g
import ckan.lib.base as base
import ckan.lib.helpers as h
import ckan.lib.app_globals as app_globals
import ckan.authz
import ckan.lib.authztool
import ckan.model as model
from ckan.model.authz import Role
roles = Role.get_all()
role_tuples = [(x, x) for x in roles]
c = base.c
request = base.request
_ = base._
def get_sysadmins():
q = model.Session.query(model.SystemRole).filter_by(role=model.Role.ADMIN)
return [uor.user for uor in q.all() if uor.user]
class AdminController(base.BaseController):
def __before__(self, action, **params):
super(AdminController, self).__before__(action, **params)
if not ckan.authz.Authorizer().is_sysadmin(unicode(c.user)):
base.abort(401, _('Need to be system administrator to administer'))
c.revision_change_state_allowed = (
c.user and self.authorizer.is_authorized(c.user,
model.Action.CHANGE_STATE,
model.Revision))
def config(self):
data = request.POST
if 'save' in data:
# update config from form
style = data.get('style')
app_globals.set_global('ckan.main_css', style)
app_globals.set_main_css(style)
site_title = data.get('title')
app_globals.set_global('ckan.site_title', site_title)
tag_line = data.get('tagline')
app_globals.set_global('ckan.site_description', tag_line)
about = data.get('about')
app_globals.set_global('ckan.site_about', about)
if 'reset' in data:
# reset to values in config
app_globals.reset()
# Styles for use in the form.select() macro.
styles = [{'text': 'Default', 'value': '/base/css/main.css'},
{'text': 'Red', 'value': '/base/css/red.css'},
{'text': 'Green', 'value': '/base/css/green.css'},
{'text': 'Fuchsia', 'value': '/base/css/fuchsia.css'}]
data = {}
data['title'] = g.site_title
data['style'] = g.main_css
data['tagline'] = g.site_description
data['about'] = g.site_about
vars = {'data': data, 'errors': {}, 'styles': styles}
return base.render('admin/config.html',
extra_vars = vars)
def index(self):
#now pass the list of sysadmins
c.sysadmins = [a.name for a in get_sysadmins()]
return base.render('admin/index.html')
def authz(self):
def action_save_form(users_or_authz_groups):
# The permissions grid has been saved
# which is a grid of checkboxes named user$role
rpi = request.params.items()
# The grid passes us a list of the users/roles that were displayed
submitted = [a for (a, b) in rpi if (b == u'submitted')]
# and also those which were checked
checked = [a for (a, b) in rpi if (b == u'on')]
# from which we can deduce true/false for each user/role
# combination that was displayed in the form
table_dict = {}
for a in submitted:
table_dict[a] = False
for a in checked:
table_dict[a] = True
# now we'll split up the user$role strings to make a dictionary
# from (user,role) to True/False, which tells us what we need to
# do.
new_user_role_dict = {}
for (ur, val) in table_dict.items():
u, r = ur.split('$')
new_user_role_dict[(u, r)] = val
# we get the current user/role assignments
# and make a dictionary of them
current_uors = model.Session.query(model.SystemRole).all()
if users_or_authz_groups == 'users':
current_users_roles = [(uor.user.name, uor.role)
for uor in current_uors
if uor.user]
elif users_or_authz_groups == 'authz_groups':
current_users_roles = [(uor.authorized_group.name, uor.role)
for uor in current_uors
if uor.authorized_group]
else:
assert False, "shouldn't be here"
current_user_role_dict = {}
for (u, r) in current_users_roles:
current_user_role_dict[(u, r)] = True
# and now we can loop through our dictionary of desired states
# checking whether a change needs to be made, and if so making it
# WORRY: Here it seems that we have to check whether someone is
# already assigned a role, in order to avoid assigning it twice,
# or attempting to delete it when it doesn't exist. Otherwise
# problems occur. However this doesn't affect the index page,
# which would seem to be prone to suffer the same effect. Why
# the difference?
if users_or_authz_groups == 'users':
for ((u, r), val) in new_user_role_dict.items():
if val:
if not ((u, r) in current_user_role_dict):
model.add_user_to_role(
model.User.by_name(u), r,
model.System())
else:
if ((u, r) in current_user_role_dict):
model.remove_user_from_role(
model.User.by_name(u), r,
model.System())
elif users_or_authz_groups == 'authz_groups':
for ((u, r), val) in new_user_role_dict.items():
if val:
if not ((u, r) in current_user_role_dict):
model.add_authorization_group_to_role(
model.AuthorizationGroup.by_name(u), r,
model.System())
else:
if ((u, r) in current_user_role_dict):
model.remove_authorization_group_from_role(
model.AuthorizationGroup.by_name(u), r,
model.System())
else:
assert False, "shouldn't be here"
# finally commit the change to the database
model.Session.commit()
h.flash_success(_("Changes Saved"))
if ('save' in request.POST):
action_save_form('users')
if ('authz_save' in request.POST):
action_save_form('authz_groups')
def action_add_form(users_or_authz_groups):
# The user is attempting to set new roles for a named user
new_user = request.params.get('new_user_name')
# this is the list of roles whose boxes were ticked
checked_roles = [a for (a, b) in request.params.items()
if (b == u'on')]
# this is the list of all the roles that were in the submitted
# form
submitted_roles = [a for (a, b) in request.params.items()
if (b == u'submitted')]
# from this we can make a dictionary of the desired states
# i.e. true for the ticked boxes, false for the unticked
desired_roles = {}
for r in submitted_roles:
desired_roles[r] = False
for r in checked_roles:
desired_roles[r] = True
# again, in order to avoid either creating a role twice or
# deleting one which is non-existent, we need to get the users'
# current roles (if any)
current_uors = model.Session.query(model.SystemRole).all()
if users_or_authz_groups == 'users':
current_roles = [uor.role for uor in current_uors
if (uor.user and uor.user.name == new_user)]
user_object = model.User.by_name(new_user)
if user_object is None:
# The submitted user does not exist. Bail with flash
# message
h.flash_error(_('unknown user:') + str(new_user))
else:
# Whenever our desired state is different from our
# current state, change it.
for (r, val) in desired_roles.items():
if val:
if (r not in current_roles):
model.add_user_to_role(user_object, r,
model.System())
else:
if (r in current_roles):
model.remove_user_from_role(user_object, r,
model.System())
h.flash_success(_("User Added"))
elif users_or_authz_groups == 'authz_groups':
current_roles = [uor.role for uor in current_uors
if (uor.authorized_group and
uor.authorized_group.name == new_user)]
user_object = model.AuthorizationGroup.by_name(new_user)
if user_object is None:
# The submitted user does not exist. Bail with flash
# message
h.flash_error(_('unknown authorization group:') +
str(new_user))
else:
# Whenever our desired state is different from our
# current state, change it.
for (r, val) in desired_roles.items():
if val:
if (r not in current_roles):
model.add_authorization_group_to_role(
user_object, r, model.System())
else:
if (r in current_roles):
model.remove_authorization_group_from_role(
user_object, r, model.System())
h.flash_success(_("Authorization Group Added"))
else:
assert False, "shouldn't be here"
# and finally commit all these changes to the database
model.Session.commit()
if 'add' in request.POST:
action_add_form('users')
if 'authz_add' in request.POST:
action_add_form('authz_groups')
# =================
# Display the page
# Find out all the possible roles. For the system object that's just
# all of them.
possible_roles = Role.get_all()
# get the list of users who have roles on the System, with their roles
uors = model.Session.query(model.SystemRole).all()
# uniquify and sort
users = sorted(list(set([uor.user.name for uor in uors if uor.user])))
authz_groups = sorted(list(set([uor.authorized_group.name
for uor in uors if uor.authorized_group])))
# make a dictionary from (user, role) to True, False
users_roles = [(uor.user.name, uor.role) for uor in uors if uor.user]
user_role_dict = {}
for u in users:
for r in possible_roles:
if (u, r) in users_roles:
user_role_dict[(u, r)] = True
else:
user_role_dict[(u, r)] = False
# and similarly make a dictionary from (authz_group, role) to
# True, False
authz_groups_roles = [(uor.authorized_group.name, uor.role)
for uor in uors if uor.authorized_group]
authz_groups_role_dict = {}
for u in authz_groups:
for r in possible_roles:
if (u, r) in authz_groups_roles:
authz_groups_role_dict[(u, r)] = True
else:
authz_groups_role_dict[(u, r)] = False
# pass these variables to the template for rendering
c.roles = possible_roles
c.users = users
c.user_role_dict = user_role_dict
c.authz_groups = authz_groups
c.authz_groups_role_dict = authz_groups_role_dict
count = model.Session.query(model.AuthorizationGroup).count()
c.are_any_authz_groups = bool(count)
return base.render('admin/authz.html')
def trash(self):
c.deleted_revisions = model.Session.query(
model.Revision).filter_by(state=model.State.DELETED)
c.deleted_packages = model.Session.query(
model.Package).filter_by(state=model.State.DELETED)
if not request.params or (len(request.params) == 1 and '__no_cache__'
in request.params):
return base.render('admin/trash.html')
else:
# NB: we repeat retrieval of of revisions
# this is obviously inefficient (but probably not *that* bad)
# but has to be done to avoid (odd) sqlalchemy errors (when doing
# purge packages) of form: "this object already exists in the
# session"
msgs = []
if ('purge-packages' in request.params) or ('purge-revisions' in
request.params):
if 'purge-packages' in request.params:
revs_to_purge = []
for pkg in c.deleted_packages:
revisions = [x[0] for x in pkg.all_related_revisions]
# ensure no accidental purging of other(non-deleted)
# packages initially just avoided purging revisions
# where non-deleted packages were affected
# however this lead to confusing outcomes e.g.
# we succesfully deleted revision in which package
# was deleted (so package now active again) but no
# other revisions
problem = False
for r in revisions:
affected_pkgs = set(r.packages).\
difference(set(c.deleted_packages))
if affected_pkgs:
msg = _('Cannot purge package %s as '
'associated revision %s includes '
'non-deleted packages %s')
msg = msg % (pkg.id, r.id, [pkg.id for r
in affected_pkgs])
msgs.append(msg)
problem = True
break
if not problem:
revs_to_purge += [r.id for r in revisions]
model.Session.remove()
else:
revs_to_purge = [rev.id for rev in c.deleted_revisions]
revs_to_purge = list(set(revs_to_purge))
for id in revs_to_purge:
revision = model.Session.query(model.Revision).get(id)
try:
# TODO deleting the head revision corrupts the edit
# page Ensure that whatever 'head' pointer is used
# gets moved down to the next revision
model.repo.purge_revision(revision, leave_record=False)
except Exception, inst:
msg = _('Problem purging revision %s: %s') % (id, inst)
msgs.append(msg)
h.flash_success(_('Purge complete'))
else:
msgs.append(_('Action not implemented.'))
for msg in msgs:
h.flash_error(msg)
h.redirect_to(h.url_for('ckanadmin', action='trash'))