/
controller.py
1641 lines (1400 loc) · 71 KB
/
controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Contains functionality needed in every web interface
"""
import logging
import re
from paste.httpexceptions import (
HTTPBadRequest,
HTTPInternalServerError,
HTTPNotImplemented,
HTTPRequestRangeNotSatisfiable
)
from six import string_types
from sqlalchemy import true
from galaxy import (
exceptions,
model,
security,
util,
web
)
from galaxy.datatypes.interval import ChromatinInteractions
from galaxy.managers import (
api_keys,
base as managers_base,
configuration,
tags,
users,
workflows
)
from galaxy.model import (
ExtendedMetadata,
ExtendedMetadataIndex,
HistoryDatasetAssociation,
LibraryDatasetDatasetAssociation
)
from galaxy.model.item_attrs import UsesAnnotations
from galaxy.security.validate_user_input import validate_publicname
from galaxy.util.dictifiable import Dictifiable
from galaxy.util.sanitize_html import sanitize_html
from galaxy.web import (
error,
url_for
)
from galaxy.web.form_builder import (
AddressField,
CheckboxField,
PasswordField
)
from galaxy.workflow.modules import WorkflowModuleInjector
log = logging.getLogger(__name__)
# States for passing messages
SUCCESS, INFO, WARNING, ERROR = "done", "info", "warning", "error"
def _is_valid_slug(slug):
""" Returns true if slug is valid. """
VALID_SLUG_RE = re.compile("^[a-z0-9\-]+$")
return VALID_SLUG_RE.match(slug)
class BaseController(object):
"""
Base class for Galaxy web application controllers.
"""
def __init__(self, app):
"""Initialize an interface for application 'app'"""
self.app = app
self.sa_session = app.model.context
self.user_manager = users.UserManager(app)
def get_toolbox(self):
"""Returns the application toolbox"""
return self.app.toolbox
def get_class(self, class_name):
""" Returns the class object that a string denotes. Without this method, we'd have to do eval(<class_name>). """
return managers_base.get_class(class_name)
def get_object(self, trans, id, class_name, check_ownership=False, check_accessible=False, deleted=None):
"""
Convenience method to get a model object with the specified checks.
"""
return managers_base.get_object(trans, id, class_name, check_ownership=check_ownership, check_accessible=check_accessible, deleted=deleted)
# this should be here - but catching errors from sharable item controllers that *should* have SharableItemMixin
# but *don't* then becomes difficult
# def security_check( self, trans, item, check_ownership=False, check_accessible=False ):
# log.warning( 'BaseController.security_check: %s, %b, %b', str( item ), check_ownership, check_accessible )
# # meant to be overridden in SharableSecurityMixin
# return item
def get_user(self, trans, id, check_ownership=False, check_accessible=False, deleted=None):
return self.get_object(trans, id, 'User', check_ownership=False, check_accessible=False, deleted=deleted)
def get_group(self, trans, id, check_ownership=False, check_accessible=False, deleted=None):
return self.get_object(trans, id, 'Group', check_ownership=False, check_accessible=False, deleted=deleted)
def get_role(self, trans, id, check_ownership=False, check_accessible=False, deleted=None):
return self.get_object(trans, id, 'Role', check_ownership=False, check_accessible=False, deleted=deleted)
# ---- parsing query params
def decode_id(self, id):
return managers_base.decode_id(self.app, id)
def encode_all_ids(self, trans, rval, recursive=False):
"""
Encodes all integer values in the dict rval whose keys are 'id' or end with '_id'
It might be useful to turn this in to a decorator
"""
return trans.security.encode_all_ids(rval, recursive=recursive)
def parse_filter_params(self, qdict, filter_attr_key='q', filter_value_key='qv', attr_op_split_char='-'):
"""
"""
# TODO: import DEFAULT_OP from FilterParser
DEFAULT_OP = 'eq'
if filter_attr_key not in qdict:
return []
# precondition: attrs/value pairs are in-order in the qstring
attrs = qdict.get(filter_attr_key)
if not isinstance(attrs, list):
attrs = [attrs]
# ops are strings placed after the attr strings and separated by a split char (e.g. 'create_time-lt')
# ops are optional and default to 'eq'
reparsed_attrs = []
ops = []
for attr in attrs:
op = DEFAULT_OP
if attr_op_split_char in attr:
# note: only split the last (e.g. q=community-tags-in&qv=rna yields ( 'community-tags', 'in', 'rna' )
attr, op = attr.rsplit(attr_op_split_char, 1)
ops.append(op)
reparsed_attrs.append(attr)
attrs = reparsed_attrs
values = qdict.get(filter_value_key, [])
if not isinstance(values, list):
values = [values]
# TODO: it may be more helpful to the consumer if we error on incomplete 3-tuples
# (instead of relying on zip to shorten)
return list(zip(attrs, ops, values))
def parse_limit_offset(self, qdict):
"""
"""
def _parse_pos_int(i):
try:
new_val = int(i)
if new_val >= 0:
return new_val
except (TypeError, ValueError):
pass
return None
limit = _parse_pos_int(qdict.get('limit', None))
offset = _parse_pos_int(qdict.get('offset', None))
return (limit, offset)
Root = BaseController
class BaseUIController(BaseController):
def get_object(self, trans, id, class_name, check_ownership=False, check_accessible=False, deleted=None):
try:
return BaseController.get_object(self, trans, id, class_name,
check_ownership=check_ownership, check_accessible=check_accessible, deleted=deleted)
except exceptions.MessageException:
raise # handled in the caller
except Exception:
log.exception("Exception in get_object check for %s %s:", class_name, str(id))
raise Exception('Server error retrieving %s id ( %s ).' % (class_name, str(id)))
def message_exception(self, trans, message):
trans.response.status = 400
return {'err_msg': util.sanitize_text(message)}
class BaseAPIController(BaseController):
def get_object(self, trans, id, class_name, check_ownership=False, check_accessible=False, deleted=None):
try:
return BaseController.get_object(self, trans, id, class_name,
check_ownership=check_ownership, check_accessible=check_accessible, deleted=deleted)
except exceptions.ItemDeletionException as e:
raise HTTPBadRequest(detail="Invalid %s id ( %s ) specified: %s" % (class_name, str(id), str(e)))
except exceptions.MessageException as e:
raise HTTPBadRequest(detail=e.err_msg)
except Exception as e:
log.exception("Exception in get_object check for %s %s.", class_name, str(id))
raise HTTPInternalServerError(comment=str(e))
def validate_in_users_and_groups(self, trans, payload):
"""
For convenience, in_users and in_groups can be encoded IDs or emails/group names in the API.
"""
def get_id(item, model_class, column):
try:
return trans.security.decode_id(item)
except Exception:
pass # maybe an email/group name
# this will raise if the item is invalid
return trans.sa_session.query(model_class).filter(column == item).first().id
new_in_users = []
new_in_groups = []
invalid = []
for item in util.listify(payload.get('in_users', [])):
try:
new_in_users.append(get_id(item, trans.app.model.User, trans.app.model.User.table.c.email))
except Exception:
invalid.append(item)
for item in util.listify(payload.get('in_groups', [])):
try:
new_in_groups.append(get_id(item, trans.app.model.Group, trans.app.model.Group.table.c.name))
except Exception:
invalid.append(item)
if invalid:
msg = "The following value(s) for associated users and/or groups could not be parsed: %s." % ', '.join(invalid)
msg += " Valid values are email addresses of users, names of groups, or IDs of both."
raise Exception(msg)
payload['in_users'] = list(map(str, new_in_users))
payload['in_groups'] = list(map(str, new_in_groups))
def not_implemented(self, trans, **kwd):
raise HTTPNotImplemented()
def _parse_serialization_params(self, kwd, default_view):
view = kwd.get('view', None)
keys = kwd.get('keys')
if isinstance(keys, string_types):
keys = keys.split(',')
return dict(view=view, keys=keys, default_view=default_view)
class JSAppLauncher(BaseUIController):
"""
A controller that launches JavaScript web applications.
"""
#: path to js app template
JS_APP_MAKO_FILEPATH = "/js-app.mako"
#: window-scoped js function to call to start the app (will be passed options, bootstrapped)
DEFAULT_ENTRY_FN = "app"
#: keys used when serializing current user for bootstrapped data
USER_BOOTSTRAP_KEYS = ('id', 'email', 'username', 'is_admin', 'tags_used',
'total_disk_usage', 'nice_total_disk_usage', 'quota_percent', 'preferences')
def __init__(self, app):
super(JSAppLauncher, self).__init__(app)
self.user_manager = users.UserManager(app)
self.user_serializer = users.CurrentUserSerializer(app)
self.config_serializer = configuration.ConfigSerializer(app)
self.admin_config_serializer = configuration.AdminConfigSerializer(app)
@web.expose
def client(self, trans, **kwd):
"""
Endpoint for clientside routes. Currently a passthrough to index
(minus kwargs) though we can differentiate it more in the future.
Should not be used with url_for -- see
(https://github.com/galaxyproject/galaxy/issues/1878) for why.
"""
return self.index(trans)
def _get_js_options(self, trans, root=None):
"""
Return a dictionary of session/site configuration/options to jsonify
and pass onto the js app.
Defaults to `config`, `user`, and the root url. Pass kwargs to update further.
"""
root = root or web.url_for('/')
js_options = {
'root' : root,
'user' : self.user_serializer.serialize(trans.user, self.USER_BOOTSTRAP_KEYS, trans=trans),
'config' : self._get_site_configuration(trans),
'params' : dict(trans.request.params),
'session_csrf_token' : trans.session_csrf_token,
}
return js_options
def _get_site_configuration(self, trans):
"""
Return a dictionary representing Galaxy's current configuration.
"""
try:
serializer = self.config_serializer
if self.user_manager.is_admin(trans.user):
serializer = self.admin_config_serializer
return serializer.serialize_to_view(self.app.config, view='all')
except Exception as exc:
log.exception(exc)
return {}
def template(self, trans, app_name, entry_fn='app', options=None, bootstrapped_data=None, masthead=True, **additional_options):
"""
Render and return the single page mako template that starts the app.
`app_name` (string): the first portion of the webpack bundle to as the app.
`entry_fn` (string): the name of the window-scope function that starts the
app. Defaults to 'app'.
`bootstrapped_data` (dict): (optional) update containing any more data
the app may need.
`masthead` (boolean): (optional, default=True) include masthead elements in
the initial page dom.
`additional_options` (kwargs): update to the options sent to the app.
"""
options = options or self._get_js_options(trans)
options.update(additional_options)
return trans.fill_template(
self.JS_APP_MAKO_FILEPATH,
js_app_name=app_name,
js_app_entry_fn=(entry_fn or self.DEFAULT_ENTRY_FN),
options=options,
bootstrapped=(bootstrapped_data or {}),
masthead=masthead
)
class Datatype(object):
"""Used for storing in-memory list of datatypes currently in the datatypes registry."""
def __init__(self, extension, dtype, type_extension, mimetype, display_in_upload):
self.extension = extension
self.dtype = dtype
self.type_extension = type_extension
self.mimetype = mimetype
self.display_in_upload = display_in_upload
#
# -- Mixins for working with Galaxy objects. --
#
class CreatesUsersMixin:
"""
Mixin centralizing logic for user creation between web and API controller.
Web controller handles additional features such e-mail subscription, activation,
user forms, etc.... API created users are much more vanilla for the time being.
"""
def create_user(self, trans, email, username, password):
user = trans.app.model.User(email=email)
user.set_password_cleartext(password)
user.username = username
if trans.app.config.user_activation_on:
user.active = False
else:
user.active = True # Activation is off, every new user is active by default.
trans.sa_session.add(user)
trans.sa_session.flush()
trans.app.security_agent.create_private_user_role(user)
if trans.webapp.name == 'galaxy':
# We set default user permissions, before we log in and set the default history permissions
trans.app.security_agent.user_set_default_permissions(user,
default_access_private=trans.app.config.new_user_dataset_access_role_default_private)
return user
class CreatesApiKeysMixin:
"""
Mixing centralizing logic for creating API keys for user objects.
Deprecated - please use api_keys.ApiKeyManager for new development.
"""
def create_api_key(self, trans, user):
return api_keys.ApiKeyManager(trans.app).create_api_key(user)
class SharableItemSecurityMixin:
""" Mixin for handling security for sharable items. """
def security_check(self, trans, item, check_ownership=False, check_accessible=False):
""" Security checks for an item: checks if (a) user owns item or (b) item is accessible to user. """
return managers_base.security_check(trans, item, check_ownership=check_ownership, check_accessible=check_accessible)
class ExportsHistoryMixin:
def serve_ready_history_export(self, trans, jeha):
assert jeha.ready
if jeha.compressed:
trans.response.set_content_type('application/x-gzip')
else:
trans.response.set_content_type('application/x-tar')
disposition = 'attachment; filename="%s"' % jeha.export_name
trans.response.headers["Content-Disposition"] = disposition
return open(trans.app.object_store.get_filename(jeha.dataset))
def queue_history_export(self, trans, history, gzip=True, include_hidden=False, include_deleted=False):
# Convert options to booleans.
if isinstance(gzip, string_types):
gzip = (gzip in ['True', 'true', 'T', 't'])
if isinstance(include_hidden, string_types):
include_hidden = (include_hidden in ['True', 'true', 'T', 't'])
if isinstance(include_deleted, string_types):
include_deleted = (include_deleted in ['True', 'true', 'T', 't'])
# Run job to do export.
history_exp_tool = trans.app.toolbox.get_tool('__EXPORT_HISTORY__')
params = {
'history_to_export': history,
'compress': gzip,
'include_hidden': include_hidden,
'include_deleted': include_deleted
}
history_exp_tool.execute(trans, incoming=params, history=history, set_output_hid=True)
class ImportsHistoryMixin:
def queue_history_import(self, trans, archive_type, archive_source):
# Run job to do import.
history_imp_tool = trans.app.toolbox.get_tool('__IMPORT_HISTORY__')
incoming = {'__ARCHIVE_SOURCE__' : archive_source, '__ARCHIVE_TYPE__' : archive_type}
history_imp_tool.execute(trans, incoming=incoming)
class UsesLibraryMixin:
def get_library(self, trans, id, check_ownership=False, check_accessible=True):
l = self.get_object(trans, id, 'Library')
if check_accessible and not (trans.user_is_admin() or trans.app.security_agent.can_access_library(trans.get_current_user_roles(), l)):
error("LibraryFolder is not accessible to the current user")
return l
class UsesLibraryMixinItems(SharableItemSecurityMixin):
def get_library_folder(self, trans, id, check_ownership=False, check_accessible=True):
return self.get_object(trans, id, 'LibraryFolder',
check_ownership=False, check_accessible=check_accessible)
def get_library_dataset_dataset_association(self, trans, id, check_ownership=False, check_accessible=True):
# Deprecated in lieu to galaxy.managers.lddas.LDDAManager.get() but not
# reusing that exactly because of subtle differences in exception handling
# logic (API controller override get_object to be slightly different).
return self.get_object(trans, id, 'LibraryDatasetDatasetAssociation',
check_ownership=False, check_accessible=check_accessible)
def get_library_dataset(self, trans, id, check_ownership=False, check_accessible=True):
return self.get_object(trans, id, 'LibraryDataset',
check_ownership=False, check_accessible=check_accessible)
# TODO: it makes no sense that I can get roles from a user but not user.is_admin()
# def can_user_add_to_library_item( self, trans, user, item ):
# if not user: return False
# return ( ( user.is_admin() )
# or ( trans.app.security_agent.can_add_library_item( user.all_roles(), item ) ) )
def can_current_user_add_to_library_item(self, trans, item):
if not trans.user:
return False
return ((trans.user_is_admin()) or
(trans.app.security_agent.can_add_library_item(trans.get_current_user_roles(), item)))
def check_user_can_add_to_library_item(self, trans, item, check_accessible=True):
"""
Raise exception if user cannot add to the specified library item (i.e.
Folder). Can set check_accessible to False if folder was loaded with
this check.
"""
if not trans.user:
return False
current_user_roles = trans.get_current_user_roles()
if trans.user_is_admin():
return True
if check_accessible:
if not trans.app.security_agent.can_access_library_item(current_user_roles, item, trans.user):
raise exceptions.ItemAccessibilityException('You do not have access to the requested item')
if not trans.app.security_agent.can_add_library_item(trans.get_current_user_roles(), item):
# Slight misuse of ItemOwnershipException?
raise exceptions.ItemOwnershipException("User cannot add to library item.")
def _copy_hdca_to_library_folder(self, trans, hda_manager, from_hdca_id, folder_id, ldda_message=''):
"""
Fetches the collection identified by `from_hcda_id` and dispatches individual collection elements to
_copy_hda_to_library_folder
"""
hdca = trans.sa_session.query(trans.app.model.HistoryDatasetCollectionAssociation).get(from_hdca_id)
if hdca.collection.collection_type != 'list':
raise exceptions.NotImplemented('Cannot add nested collections to library. Please flatten your collection first.')
hdas = []
for element in hdca.collection.elements:
hdas.append((element.element_identifier, element.dataset_instance.id))
return [self._copy_hda_to_library_folder(trans,
hda_manager=hda_manager,
from_hda_id=hda_id,
folder_id=folder_id,
ldda_message=ldda_message,
element_identifier=element_identifier) for (element_identifier, hda_id) in hdas]
def _copy_hda_to_library_folder(self, trans, hda_manager, from_hda_id, folder_id, ldda_message='', element_identifier=None):
"""
Copies hda ``from_hda_id`` to library folder ``folder_id``, optionally
adding ``ldda_message`` to the new ldda's ``message``.
``library_contents.create`` will branch to this if called with 'from_hda_id'
in its payload.
"""
log.debug('_copy_hda_to_library_folder: %s' % (str((from_hda_id, folder_id, ldda_message))))
# PRECONDITION: folder_id has already been altered to remove the folder prefix ('F')
# TODO: allow name and other, editable ldda attrs?
if ldda_message:
ldda_message = util.sanitize_html.sanitize_html(ldda_message, 'utf-8')
# check permissions on (all three?) resources: hda, library, folder
# TODO: do we really need the library??
hda = hda_manager.get_owned(from_hda_id, trans.user, current_history=trans.history)
hda = hda_manager.error_if_uploading(hda)
folder = self.get_library_folder(trans, folder_id, check_accessible=True)
# TOOD: refactor to use check_user_can_add_to_library_item, eliminate boolean
# can_current_user_add_to_library_item.
if folder.parent_library.deleted:
raise exceptions.ObjectAttributeInvalidException('You cannot add datasets into deleted library. Undelete it first.')
if not self.can_current_user_add_to_library_item(trans, folder):
raise exceptions.InsufficientPermissionsException('You do not have proper permissions to add a dataset to this folder,')
ldda = self.copy_hda_to_library_folder(trans, hda, folder, ldda_message=ldda_message, element_identifier=element_identifier)
# I don't see a reason why hdas copied into libraries should not be visible.
# If there is, refactor `ldda.visible = True` to do this only when adding HDCAs.
ldda.visible = True
trans.sa_session.flush()
ldda_dict = ldda.to_dict()
rval = trans.security.encode_dict_ids(ldda_dict)
update_time = ldda.update_time.strftime("%Y-%m-%d %I:%M %p")
rval['update_time'] = update_time
return rval
def copy_hda_to_library_folder(self, trans, hda, library_folder, roles=None, ldda_message='', element_identifier=None):
# PRECONDITION: permissions for this action on hda and library_folder have been checked
roles = roles or []
# this code was extracted from library_common.add_history_datasets_to_library
# TODO: refactor library_common.add_history_datasets_to_library to use this for each hda to copy
# create the new ldda and apply the folder perms to it
ldda = hda.to_library_dataset_dataset_association(trans, target_folder=library_folder,
roles=roles, ldda_message=ldda_message, element_identifier=element_identifier)
self._apply_library_folder_permissions_to_ldda(trans, library_folder, ldda)
self._apply_hda_permissions_to_ldda(trans, hda, ldda)
# TODO:?? not really clear on how permissions are being traded here
# seems like hda -> ldda permissions should be set in to_library_dataset_dataset_association
# then they get reset in _apply_library_folder_permissions_to_ldda
# then finally, re-applies hda -> ldda for missing actions in _apply_hda_permissions_to_ldda??
return ldda
def _apply_library_folder_permissions_to_ldda(self, trans, library_folder, ldda):
"""
Copy actions/roles from library folder to an ldda (and its library_dataset).
"""
# PRECONDITION: permissions for this action on library_folder and ldda have been checked
security_agent = trans.app.security_agent
security_agent.copy_library_permissions(trans, library_folder, ldda)
security_agent.copy_library_permissions(trans, library_folder, ldda.library_dataset)
return security_agent.get_permissions(ldda)
def _apply_hda_permissions_to_ldda(self, trans, hda, ldda):
"""
Copy actions/roles from hda to ldda.library_dataset (and then ldda) if ldda
doesn't already have roles for the given action.
"""
# PRECONDITION: permissions for this action on hda and ldda have been checked
# Make sure to apply any defined dataset permissions, allowing the permissions inherited from the
# library_dataset to over-ride the same permissions on the dataset, if they exist.
security_agent = trans.app.security_agent
dataset_permissions_dict = security_agent.get_permissions(hda.dataset)
library_dataset = ldda.library_dataset
library_dataset_actions = [permission.action for permission in library_dataset.actions]
# except that: if DATASET_MANAGE_PERMISSIONS exists in the hda.dataset permissions,
# we need to instead apply those roles to the LIBRARY_MANAGE permission to the library dataset
dataset_manage_permissions_action = security_agent.get_action('DATASET_MANAGE_PERMISSIONS').action
library_manage_permissions_action = security_agent.get_action('LIBRARY_MANAGE').action
# TODO: test this and remove if in loop below
# TODO: doesn't handle action.action
# if dataset_manage_permissions_action in dataset_permissions_dict:
# managing_roles = dataset_permissions_dict.pop( dataset_manage_permissions_action )
# dataset_permissions_dict[ library_manage_permissions_action ] = managing_roles
flush_needed = False
for action, dataset_permissions_roles in dataset_permissions_dict.items():
if isinstance(action, security.Action):
action = action.action
# alter : DATASET_MANAGE_PERMISSIONS -> LIBRARY_MANAGE (see above)
if action == dataset_manage_permissions_action:
action = library_manage_permissions_action
# TODO: generalize to util.update_dict_without_overwrite
# add the hda actions & roles to the library_dataset
# NOTE: only apply an hda perm if it's NOT set in the library_dataset perms (don't overwrite)
if action not in library_dataset_actions:
for role in dataset_permissions_roles:
ldps = trans.model.LibraryDatasetPermissions(action, library_dataset, role)
ldps = [ldps] if not isinstance(ldps, list) else ldps
for ldp in ldps:
trans.sa_session.add(ldp)
flush_needed = True
if flush_needed:
trans.sa_session.flush()
# finally, apply the new library_dataset to its associated ldda (must be the same)
security_agent.copy_library_permissions(trans, library_dataset, ldda)
return security_agent.get_permissions(ldda)
class UsesVisualizationMixin(UsesLibraryMixinItems):
"""
Mixin for controllers that use Visualization objects.
"""
def get_visualization(self, trans, id, check_ownership=True, check_accessible=False):
"""
Get a Visualization from the database by id, verifying ownership.
"""
# Load workflow from database
try:
visualization = trans.sa_session.query(trans.model.Visualization).get(trans.security.decode_id(id))
except TypeError:
visualization = None
if not visualization:
error("Visualization not found")
else:
return self.security_check(trans, visualization, check_ownership, check_accessible)
def get_visualizations_by_user(self, trans, user, order_by=None, query_only=False):
"""
Return query or query results of visualizations filtered by a user.
Set `order_by` to a column or list of columns to change the order
returned. Defaults to `DEFAULT_ORDER_BY`.
Set `query_only` to return just the query for further filtering or
processing.
"""
# TODO: move into model (as class attr)
DEFAULT_ORDER_BY = [model.Visualization.title]
if not order_by:
order_by = DEFAULT_ORDER_BY
if not isinstance(order_by, list):
order_by = [order_by]
query = trans.sa_session.query(model.Visualization)
query = query.filter(model.Visualization.user == user)
if order_by:
query = query.order_by(*order_by)
if query_only:
return query
return query.all()
def get_visualizations_shared_with_user(self, trans, user, order_by=None, query_only=False):
"""
Return query or query results for visualizations shared with the given user.
Set `order_by` to a column or list of columns to change the order
returned. Defaults to `DEFAULT_ORDER_BY`.
Set `query_only` to return just the query for further filtering or
processing.
"""
DEFAULT_ORDER_BY = [model.Visualization.title]
if not order_by:
order_by = DEFAULT_ORDER_BY
if not isinstance(order_by, list):
order_by = [order_by]
query = trans.sa_session.query(model.Visualization).join(model.VisualizationUserShareAssociation)
query = query.filter(model.VisualizationUserShareAssociation.user_id == user.id)
# remove duplicates when a user shares with themselves?
query = query.filter(model.Visualization.user_id != user.id)
if order_by:
query = query.order_by(*order_by)
if query_only:
return query
return query.all()
def get_published_visualizations(self, trans, exclude_user=None, order_by=None, query_only=False):
"""
Return query or query results for published visualizations optionally excluding
the user in `exclude_user`.
Set `order_by` to a column or list of columns to change the order
returned. Defaults to `DEFAULT_ORDER_BY`.
Set `query_only` to return just the query for further filtering or
processing.
"""
DEFAULT_ORDER_BY = [model.Visualization.title]
if not order_by:
order_by = DEFAULT_ORDER_BY
if not isinstance(order_by, list):
order_by = [order_by]
query = trans.sa_session.query(model.Visualization)
query = query.filter(model.Visualization.published == true())
if exclude_user:
query = query.filter(model.Visualization.user != exclude_user)
if order_by:
query = query.order_by(*order_by)
if query_only:
return query
return query.all()
# TODO: move into model (to_dict)
def get_visualization_summary_dict(self, visualization):
"""
Return a set of summary attributes for a visualization in dictionary form.
NOTE: that encoding ids isn't done here should happen at the caller level.
"""
# TODO: deleted
# TODO: importable
return {
'id' : visualization.id,
'title' : visualization.title,
'type' : visualization.type,
'dbkey' : visualization.dbkey,
}
def get_visualization_dict(self, visualization):
"""
Return a set of detailed attributes for a visualization in dictionary form.
The visualization's latest_revision is returned in its own sub-dictionary.
NOTE: that encoding ids isn't done here should happen at the caller level.
"""
return {
'model_class': 'Visualization',
'id' : visualization.id,
'title' : visualization.title,
'type' : visualization.type,
'user_id' : visualization.user.id,
'dbkey' : visualization.dbkey,
'slug' : visualization.slug,
# to_dict only the latest revision (allow older to be fetched elsewhere)
'latest_revision' : self.get_visualization_revision_dict(visualization.latest_revision),
'revisions' : [r.id for r in visualization.revisions],
}
def get_visualization_revision_dict(self, revision):
"""
Return a set of detailed attributes for a visualization in dictionary form.
NOTE: that encoding ids isn't done here should happen at the caller level.
"""
return {
'model_class' : 'VisualizationRevision',
'id' : revision.id,
'visualization_id' : revision.visualization.id,
'title' : revision.title,
'dbkey' : revision.dbkey,
'config' : revision.config,
}
def import_visualization(self, trans, id, user=None):
"""
Copy the visualization with the given id and associate the copy
with the given user (defaults to trans.user).
Raises `ItemAccessibilityException` if `user` is not passed and
the current user is anonymous, and if the visualization is not `importable`.
Raises `ItemDeletionException` if the visualization has been deleted.
"""
# default to trans.user, error if anon
if not user:
if not trans.user:
raise exceptions.ItemAccessibilityException("You must be logged in to import Galaxy visualizations")
user = trans.user
# check accessibility
visualization = self.get_visualization(trans, id, check_ownership=False)
if not visualization.importable:
raise exceptions.ItemAccessibilityException("The owner of this visualization has disabled imports via this link.")
if visualization.deleted:
raise exceptions.ItemDeletionException("You can't import this visualization because it has been deleted.")
# copy vis and alter title
# TODO: need to handle custom db keys.
imported_visualization = visualization.copy(user=user, title="imported: " + visualization.title)
trans.sa_session.add(imported_visualization)
trans.sa_session.flush()
return imported_visualization
def create_visualization(self, trans, type, title="Untitled Visualization", slug=None,
dbkey=None, annotation=None, config={}, save=True):
"""
Create visualiation and first revision.
"""
visualization = self._create_visualization(trans, title, type, dbkey, slug, annotation, save)
# TODO: handle this error structure better either in _create or here
if isinstance(visualization, dict):
err_dict = visualization
raise ValueError(err_dict['title_err'] or err_dict['slug_err'])
# Create and save first visualization revision
revision = trans.model.VisualizationRevision(visualization=visualization, title=title,
config=config, dbkey=dbkey)
visualization.latest_revision = revision
if save:
session = trans.sa_session
session.add(revision)
session.flush()
return visualization
def add_visualization_revision(self, trans, visualization, config, title, dbkey):
"""
Adds a new `VisualizationRevision` to the given `visualization` with
the given parameters and set its parent visualization's `latest_revision`
to the new revision.
"""
# precondition: only add new revision on owned vis's
# TODO:?? should we default title, dbkey, config? to which: visualization or latest_revision?
revision = trans.model.VisualizationRevision(visualization, title, dbkey, config)
visualization.latest_revision = revision
# TODO:?? does this automatically add revision to visualzation.revisions?
trans.sa_session.add(revision)
trans.sa_session.flush()
return revision
def save_visualization(self, trans, config, type, id=None, title=None, dbkey=None, slug=None, annotation=None):
session = trans.sa_session
# Create/get visualization.
if not id:
# Create new visualization.
vis = self._create_visualization(trans, title, type, dbkey, slug, annotation)
else:
decoded_id = trans.security.decode_id(id)
vis = session.query(trans.model.Visualization).get(decoded_id)
# TODO: security check?
# Create new VisualizationRevision that will be attached to the viz
vis_rev = trans.model.VisualizationRevision()
vis_rev.visualization = vis
# do NOT alter the dbkey
vis_rev.dbkey = vis.dbkey
# do alter the title and config
vis_rev.title = title
# -- Validate config. --
if vis.type == 'trackster':
def unpack_track(track_dict):
""" Unpack a track from its json. """
dataset_dict = track_dict['dataset']
return {
"dataset_id": trans.security.decode_id(dataset_dict['id']),
"hda_ldda": dataset_dict.get('hda_ldda', 'hda'),
"track_type": track_dict['track_type'],
"prefs": track_dict['prefs'],
"mode": track_dict['mode'],
"filters": track_dict['filters'],
"tool_state": track_dict['tool_state']
}
def unpack_collection(collection_json):
""" Unpack a collection from its json. """
unpacked_drawables = []
drawables = collection_json['drawables']
for drawable_json in drawables:
if 'track_type' in drawable_json:
drawable = unpack_track(drawable_json)
else:
drawable = unpack_collection(drawable_json)
unpacked_drawables.append(drawable)
return {
"obj_type": collection_json['obj_type'],
"drawables": unpacked_drawables,
"prefs": collection_json.get('prefs', []),
"filters": collection_json.get('filters', None)
}
# TODO: unpack and validate bookmarks:
def unpack_bookmarks(bookmarks_json):
return bookmarks_json
# Unpack and validate view content.
view_content = unpack_collection(config['view'])
bookmarks = unpack_bookmarks(config['bookmarks'])
vis_rev.config = {"view": view_content, "bookmarks": bookmarks}
# Viewport from payload
if 'viewport' in config:
chrom = config['viewport']['chrom']
start = config['viewport']['start']
end = config['viewport']['end']
overview = config['viewport']['overview']
vis_rev.config["viewport"] = {'chrom': chrom, 'start': start, 'end': end, 'overview': overview}
else:
# Default action is to save the config as is with no validation.
vis_rev.config = config
vis.latest_revision = vis_rev
session.add(vis_rev)
session.flush()
encoded_id = trans.security.encode_id(vis.id)
return {"vis_id": encoded_id, "url": url_for(controller='visualization', action=vis.type, id=encoded_id)}
def get_tool_def(self, trans, hda):
""" Returns definition of an interactive tool for an HDA. """
# Get dataset's job.
job = None
for job_output_assoc in hda.creating_job_associations:
job = job_output_assoc.job
break
if not job:
return None
tool = trans.app.toolbox.get_tool(job.tool_id, tool_version=job.tool_version)
if not tool:
return None
# Tool must have a Trackster configuration.
if not tool.trackster_conf:
return None
# -- Get tool definition and add input values from job. --
tool_dict = tool.to_dict(trans, io_details=True)
tool_param_values = dict([(p.name, p.value) for p in job.parameters])
tool_param_values = tool.params_from_strings(tool_param_values, trans.app, ignore_errors=True)
# Only get values for simple inputs for now.
inputs_dict = [i for i in tool_dict['inputs'] if i['type'] not in ['data', 'hidden_data', 'conditional']]
for t_input in inputs_dict:
# Add value to tool.
if 'name' in t_input:
name = t_input['name']
if name in tool_param_values:
value = tool_param_values[name]
if isinstance(value, Dictifiable):
value = value.to_dict()
t_input['value'] = value
return tool_dict
def get_visualization_config(self, trans, visualization):
""" Returns a visualization's configuration. Only works for trackster visualizations right now. """
config = None
if visualization.type in ['trackster', 'genome']:
# Unpack Trackster config.
latest_revision = visualization.latest_revision
bookmarks = latest_revision.config.get('bookmarks', [])
def pack_track(track_dict):
dataset_id = track_dict['dataset_id']
hda_ldda = track_dict.get('hda_ldda', 'hda')
if hda_ldda == 'ldda':
# HACK: need to encode library dataset ID because get_hda_or_ldda
# only works for encoded datasets.
dataset_id = trans.security.encode_id(dataset_id)
dataset = self.get_hda_or_ldda(trans, hda_ldda, dataset_id)
try:
prefs = track_dict['prefs']
except KeyError:
prefs = {}
track_data_provider = trans.app.data_provider_registry.get_data_provider(trans,
original_dataset=dataset,
source='data')
return {
"track_type": dataset.datatype.track_type,
"dataset": trans.security.encode_dict_ids(dataset.to_dict()),
"prefs": prefs,
"mode": track_dict.get('mode', 'Auto'),
"filters": track_dict.get('filters', {'filters' : track_data_provider.get_filters()}),
"tool": self.get_tool_def(trans, dataset),
"tool_state": track_dict.get('tool_state', {})
}
def pack_collection(collection_dict):
drawables = []
for drawable_dict in collection_dict['drawables']:
if 'track_type' in drawable_dict:
drawables.append(pack_track(drawable_dict))
else:
drawables.append(pack_collection(drawable_dict))
return {
'obj_type': collection_dict['obj_type'],
'drawables': drawables,
'prefs': collection_dict.get('prefs', []),
'filters': collection_dict.get('filters', {})
}
def encode_dbkey(dbkey):
"""
Encodes dbkey as needed. For now, prepends user's public name
to custom dbkey keys.
"""