-
Notifications
You must be signed in to change notification settings - Fork 966
/
libraries.py
264 lines (232 loc) · 11.4 KB
/
libraries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
"""
Manager and Serializer for libraries.
"""
import logging
from sqlalchemy import and_, false, not_, or_, true
from sqlalchemy.orm.exc import MultipleResultsFound
from sqlalchemy.orm.exc import NoResultFound
from galaxy import exceptions
from galaxy.managers import folders
from galaxy.util import pretty_print_time_interval
log = logging.getLogger(__name__)
# =============================================================================
class LibraryManager(object):
"""
Interface/service object for interacting with libraries.
"""
def __init__(self, *args, **kwargs):
super(LibraryManager, self).__init__(*args, **kwargs)
def get(self, trans, decoded_library_id, check_accessible=True):
"""
Get the library from the DB.
:param decoded_library_id: decoded library id
:type decoded_library_id: int
:param check_accessible: flag whether to check that user can access item
:type check_accessible: bool
:returns: the requested library
:rtype: galaxy.model.Library
"""
try:
library = trans.sa_session.query(trans.app.model.Library).filter(trans.app.model.Library.table.c.id == decoded_library_id).one()
except MultipleResultsFound:
raise exceptions.InconsistentDatabase('Multiple libraries found with the same id.')
except NoResultFound:
raise exceptions.RequestParameterInvalidException('No library found with the id provided.')
except Exception as e:
raise exceptions.InternalServerError('Error loading from the database.' + str(e))
library = self.secure(trans, library, check_accessible)
return library
def create(self, trans, name, description='', synopsis=''):
"""
Create a new library.
"""
if not trans.user_is_admin():
raise exceptions.ItemAccessibilityException('Only administrators can create libraries.')
else:
library = trans.app.model.Library(name=name, description=description, synopsis=synopsis)
root_folder = trans.app.model.LibraryFolder(name=name, description='')
library.root_folder = root_folder
trans.sa_session.add_all((library, root_folder))
trans.sa_session.flush()
return library
def update(self, trans, library, name=None, description=None, synopsis=None):
"""
Update the given library
"""
changed = False
if not trans.user_is_admin():
raise exceptions.ItemAccessibilityException('Only administrators can update libraries.')
if library.deleted:
raise exceptions.RequestParameterInvalidException('You cannot modify a deleted library. Undelete it first.')
if name is not None:
library.name = name
changed = True
# When library is renamed the root folder has to be renamed too.
folder_manager = folders.FolderManager()
folder_manager.update(trans, library.root_folder, name=name)
if description is not None:
library.description = description
changed = True
if synopsis is not None:
library.synopsis = synopsis
changed = True
if changed:
trans.sa_session.add(library)
trans.sa_session.flush()
return library
def delete(self, trans, library, undelete=False):
"""
Mark given library deleted/undeleted based on the flag.
"""
if not trans.user_is_admin():
raise exceptions.ItemAccessibilityException('Only administrators can delete and undelete libraries.')
if undelete:
library.deleted = False
else:
library.deleted = True
trans.sa_session.add(library)
trans.sa_session.flush()
return library
def list(self, trans, deleted=False):
"""
Return a list of libraries from the DB.
:param deleted: if True, show only ``deleted`` libraries, if False show only ``non-deleted``
:type deleted: boolean (optional)
:returns: query that will emit all accessible libraries
:rtype: sqlalchemy query
:returns: set of library ids that have restricted access (not public)
:rtype: set
"""
is_admin = trans.user_is_admin()
query = trans.sa_session.query(trans.app.model.Library)
library_access_action = trans.app.security_agent.permitted_actions.LIBRARY_ACCESS.action
restricted_library_ids = {lp.library_id for lp in (
trans.sa_session.query(trans.model.LibraryPermissions).filter(
trans.model.LibraryPermissions.table.c.action == library_access_action
).distinct())}
if is_admin:
if deleted is None:
# Flag is not specified, do not filter on it.
pass
elif deleted:
query = query.filter(trans.app.model.Library.table.c.deleted == true())
else:
query = query.filter(trans.app.model.Library.table.c.deleted == false())
else:
# Nonadmins can't see deleted libraries
current_user_role_ids = [role.id for role in trans.get_current_user_roles()]
accessible_restricted_library_ids = [lp.library_id for lp in (
trans.sa_session.query(trans.model.LibraryPermissions).filter(
and_(
trans.model.LibraryPermissions.table.c.action == library_access_action,
trans.model.LibraryPermissions.table.c.role_id.in_(current_user_role_ids)
)))]
query = query.filter(or_(
not_(trans.model.Library.table.c.id.in_(restricted_library_ids)),
trans.model.Library.table.c.id.in_(accessible_restricted_library_ids)
))
return query, restricted_library_ids
def secure(self, trans, library, check_accessible=True):
"""
Check if library is accessible to user.
:param library: library
:type library: galaxy.model.Library
:param check_accessible: flag whether to check that user can access library
:type check_accessible: bool
:returns: the original library
:rtype: Library
"""
# all libraries are accessible to an admin
if trans.user_is_admin():
return library
if check_accessible:
library = self.check_accessible(trans, library)
return library
def check_accessible(self, trans, library):
"""
Check whether the library is accessible to current user.
"""
if not trans.app.security_agent.can_access_library(trans.get_current_user_roles(), library):
raise exceptions.ObjectNotFound('Library with the id provided was not found.')
elif library.deleted:
raise exceptions.ObjectNotFound('Library with the id provided is deleted.')
else:
return library
def get_library_dict(self, trans, library, restricted_library_ids=None):
"""
Return library data in the form of a dictionary.
:param library: library
:type library: galaxy.model.Library
:param restricted_library_ids: ids of restricted libraries to speed up the
detection of public libraries
:type restricted_library_ids: list of ints
:returns: dict with data about the library
:rtype: dictionary
"""
library_dict = library.to_dict(view='element', value_mapper={'id': trans.security.encode_id, 'root_folder_id': trans.security.encode_id})
if restricted_library_ids and library.id in restricted_library_ids:
library_dict['public'] = False
else:
library_dict['public'] = True
library_dict['create_time_pretty'] = pretty_print_time_interval(library.create_time, precise=True)
if not trans.user_is_admin():
current_user_roles = trans.get_current_user_roles()
library_dict['can_user_add'] = trans.app.security_agent.can_add_library_item(current_user_roles, library)
library_dict['can_user_modify'] = trans.app.security_agent.can_modify_library_item(current_user_roles, library)
library_dict['can_user_manage'] = trans.app.security_agent.can_manage_library_item(current_user_roles, library)
else:
library_dict['can_user_add'] = True
library_dict['can_user_modify'] = True
library_dict['can_user_manage'] = True
return library_dict
def get_current_roles(self, trans, library):
"""
Load all permissions currently related to the given library.
:param library: the model object
:type library: galaxy.model.Library
:rtype: dictionary
:returns: dict of current roles for all available permission types
"""
access_library_role_list = [(access_role.name, trans.security.encode_id(access_role.id)) for access_role in self.get_access_roles(trans, library)]
modify_library_role_list = [(modify_role.name, trans.security.encode_id(modify_role.id)) for modify_role in self.get_modify_roles(trans, library)]
manage_library_role_list = [(manage_role.name, trans.security.encode_id(manage_role.id)) for manage_role in self.get_manage_roles(trans, library)]
add_library_item_role_list = [(add_role.name, trans.security.encode_id(add_role.id)) for add_role in self.get_add_roles(trans, library)]
return dict(access_library_role_list=access_library_role_list,
modify_library_role_list=modify_library_role_list,
manage_library_role_list=manage_library_role_list,
add_library_item_role_list=add_library_item_role_list)
def get_access_roles(self, trans, library):
"""
Load access roles for all library permissions
"""
return set(library.get_access_roles(trans))
def get_modify_roles(self, trans, library):
"""
Load modify roles for all library permissions
"""
return set(trans.app.security_agent.get_roles_for_action(library, trans.app.security_agent.permitted_actions.LIBRARY_MODIFY))
def get_manage_roles(self, trans, library):
"""
Load manage roles for all library permissions
"""
return set(trans.app.security_agent.get_roles_for_action(library, trans.app.security_agent.permitted_actions.LIBRARY_MANAGE))
def get_add_roles(self, trans, library):
"""
Load add roles for all library permissions
"""
return set(trans.app.security_agent.get_roles_for_action(library, trans.app.security_agent.permitted_actions.LIBRARY_ADD))
def set_permission_roles(self, trans, library, access_roles, modify_roles, manage_roles, add_roles):
"""
Set permissions on the given library.
"""
def make_public(self, trans, library):
"""
Makes the given library public (removes all access roles)
"""
trans.app.security_agent.make_library_public(library)
return self.is_public(trans, library)
def is_public(self, trans, library):
"""
Return true if lib is public.
"""
return trans.app.security_agent.library_is_public(library)