-
Notifications
You must be signed in to change notification settings - Fork 2k
/
authorization_group.py
201 lines (173 loc) · 8.24 KB
/
authorization_group.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import genshi
from sqlalchemy.orm import eagerload_all
from ckan.lib.base import *
from pylons.i18n import get_lang, _
import ckan.authz as authz
import ckan.forms
from ckan.lib.helpers import Page
from ckan.logic import NotAuthorized, check_access
class AuthorizationGroupController(BaseController):
def __init__(self):
BaseController.__init__(self)
def index(self):
from ckan.lib.helpers import Page
try:
context = {'model':model,'user': c.user or c.author}
check_access('site_read',context)
except NotAuthorized:
abort(401, _('Not authorized to see this page'))
query = ckan.authz.Authorizer().authorized_query(c.user, model.AuthorizationGroup)
query = query.options(eagerload_all('users'))
c.page = Page(
collection=query,
page=request.params.get('page', 1),
items_per_page=20
)
return render('authorization_group/index.html')
def _get_authgroup_by_name_or_id(self, id):
return model.AuthorizationGroup.by_name(id) or\
model.Session.query(model.AuthorizationGroup).get(id)
def read(self, id):
c.authorization_group = self._get_authgroup_by_name_or_id(id)
if c.authorization_group is None:
abort(404)
auth_for_read = self.authorizer.am_authorized(c, model.Action.READ,
c.authorization_group)
if not auth_for_read:
abort(401, _('Not authorized to read %s') % id.encode('utf8'))
import ckan.misc
c.authorization_group_admins = self.authorizer.get_admins(c.authorization_group)
c.page = Page(
collection=c.authorization_group.users,
page=request.params.get('page', 1),
items_per_page=50
)
return render('authorization_group/read.html')
def new(self):
record = model.AuthorizationGroup
c.error = ''
auth_for_create = self.authorizer.am_authorized(c, model.Action.AUTHZ_GROUP_CREATE, model.System())
if not auth_for_create:
abort(401, _('Unauthorized to create a group'))
is_admin = self.authorizer.is_sysadmin(c.user)
fs = ckan.forms.get_authorization_group_fieldset(is_admin=is_admin)
if request.params.has_key('save'):
# needed because request is nested
# multidict which is read only
params = dict(request.params)
c.fs = fs.bind(record, data=params or None, session=model.Session)
try:
self._update(c.fs, id, record.id)
except ValidationException, error:
fs = error.args[0]
c.form = self._render_edit_form(fs)
return render('authorization_group/edit.html')
# do not use groupname from id as may have changed
c.authzgroupname = c.fs.name.value
authorization_group = model.AuthorizationGroup.by_name(c.authzgroupname)
assert authorization_group
user = model.User.by_name(c.user)
model.setup_default_user_roles(authorization_group, [user])
users = [model.User.by_name(name) for name in \
request.params.getall('AuthorizationGroup-users-current')]
authorization_group.users = list(set(users))
usernames = request.params.getall('AuthorizationGroupUser--user_name')
for username in usernames:
if username:
usr = model.User.by_name(username)
if usr and usr not in authorization_group.users:
model.add_user_to_authorization_group(usr, authorization_group, model.Role.READER)
model.repo.commit_and_remove()
h.redirect_to(controller='authorization_group', action='read', id=c.authzgroupname)
c.form = self._render_edit_form(fs)
return render('authorization_group/new.html')
def edit(self, id=None): # allow id=None to allow posting
c.error = ''
authorization_group = self._get_authgroup_by_name_or_id(id)
if authorization_group is None:
abort(404, '404 Not Found')
am_authz = self.authorizer.am_authorized(c, model.Action.EDIT, authorization_group)
if not am_authz:
abort(401, _('User %r not authorized to edit %r') % (c.user, id))
is_admin = self.authorizer.is_sysadmin(c.user)
if not 'save' in request.params:
c.authorization_group = authorization_group
c.authorization_group_name = authorization_group.name
fs = ckan.forms.get_authorization_group_fieldset(is_admin=is_admin).bind(authorization_group)
c.form = self._render_edit_form(fs)
return render('authorization_group/edit.html')
else:
# id is the name (pre-edited state)
c.authorization_group_name = id
# needed because request is nested
# multidict which is read only
params = dict(request.params)
c.fs = ckan.forms.get_authorization_group_fieldset()\
.bind(authorization_group, data=params or None)
try:
self._update(c.fs, id, authorization_group.id)
# do not use groupname from id as may have changed
c.authorization_group = authorization_group
c.authorization_group_name = authorization_group.name
except ValidationException, error:
fs = error.args[0]
c.form = self._render_edit_form(fs)
return render('authorization_group/edit.html')
user = model.User.by_name(c.user)
users = [model.User.by_name(name) for name in \
request.params.getall('AuthorizationGroup-users-current')]
authorization_group.users = list(set(users))
usernames = request.params.getall('AuthorizationGroupUser--user_name')
for username in usernames:
if username:
usr = model.User.by_name(username)
if usr and usr not in authorization_group.users:
model.add_user_to_authorization_group(usr, authorization_group, model.Role.READER)
model.repo.commit_and_remove()
h.redirect_to(controller='authorization_group', action='read', id=c.authorization_group_name)
def authz(self, id):
authorization_group = self._get_authgroup_by_name_or_id(id)
if authorization_group is None:
abort(404, _('Group not found'))
c.authorization_group_name = authorization_group.name
c.authorization_group = authorization_group
c.authz_editable = self.authorizer.am_authorized(c, model.Action.EDIT_PERMISSIONS,
authorization_group)
if not c.authz_editable:
abort(401, gettext('User %r not authorized to edit %s authorizations') % (c.user, id))
roles = self._handle_update_of_authz(authorization_group)
self._prepare_authz_info_for_render(roles)
return render('authorization_group/authz.html')
def _render_edit_form(self, fs):
# errors arrive in c.error and fs.errors
c.fieldset = fs
c.fieldset2 = ckan.forms.get_authorization_group_user_fieldset()
return render('authorization_group/edit_form.html')
def _update(self, fs, group_name, group_id):
'''
Writes the POST data (associated with a group edit) to the database
@input c.error
'''
validation = fs.validate()
if not validation:
c.form = self._render_edit_form(fs)
raise ValidationException(fs)
try:
fs.sync()
except Exception, inst:
model.Session.rollback()
raise
else:
model.Session.commit()
def _update_authz(self, fs):
validation = fs.validate()
if not validation:
c.form = self._render_edit_form(fs)
raise ValidationException(fs)
try:
fs.sync()
except Exception, inst:
model.Session.rollback()
raise
else:
model.Session.commit()