-
Notifications
You must be signed in to change notification settings - Fork 67
/
client.py
3444 lines (2697 loc) · 148 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
**************
Synapse Client
**************
The `Synapse` object encapsulates a connection to the Synapse service and is
used for building projects, uploading and retrieving data, and recording
provenance of data analysis.
~~~~~
Login
~~~~~
.. automethod:: synapseclient.client.login
~~~~~~~
Synapse
~~~~~~~
.. autoclass:: synapseclient.Synapse
:members:
~~~~~~~~~~~~~~~~
More information
~~~~~~~~~~~~~~~~
See also the `Synapse API documentation <http://rest.synapse.org>`_.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from builtins import str
from builtins import input
try:
import configparser
except ImportError:
import ConfigParser as configparser
import collections
import math, os, sys, stat, re, json, time
import base64, hashlib, hmac
import six
import uuid
try:
from urllib.parse import urlparse
from urllib.parse import urlunparse
from urllib.parse import quote
from urllib.parse import unquote
except ImportError:
from urlparse import urlparse
from urlparse import urlunparse
from urllib import quote
from urllib import unquote
import requests, webbrowser
import shutil
import zipfile
import mimetypes
import tempfile
import warnings
import getpass
from collections import OrderedDict
import synapseclient
from . import utils
from . import cache
from . import exceptions
from .exceptions import *
from .version_check import version_check
from .utils import id_of, get_properties, KB, MB, memoize, _is_json, _extract_synapse_id_from_query, find_data_file_handle, log_error
from .annotations import from_synapse_annotations, to_synapse_annotations
from .annotations import to_submission_status_annotations, from_submission_status_annotations
from .activity import Activity
from .entity import Entity, File, Project, Folder, Link, Versionable, split_entity_namespaces, is_versionable, is_container, is_synapse_entity
from .dict_object import DictObject
from .evaluation import Evaluation, Submission, SubmissionStatus
from .table import Schema, Table, Column, RowSet, Row, TableQueryResult, CsvFileTable
from .team import UserProfile, Team, TeamMember, UserGroupHeader
from .wiki import Wiki, WikiAttachment
from .retry import _with_retry
from .multipart_upload import multipart_upload, multipart_upload_string
PRODUCTION_ENDPOINTS = {'repoEndpoint':'https://repo-prod.prod.sagebase.org/repo/v1',
'authEndpoint':'https://auth-prod.prod.sagebase.org/auth/v1',
'fileHandleEndpoint':'https://file-prod.prod.sagebase.org/file/v1',
'portalEndpoint':'https://www.synapse.org/'}
STAGING_ENDPOINTS = {'repoEndpoint':'https://repo-staging.prod.sagebase.org/repo/v1',
'authEndpoint':'https://auth-staging.prod.sagebase.org/auth/v1',
'fileHandleEndpoint':'https://file-staging.prod.sagebase.org/file/v1',
'portalEndpoint':'https://staging.synapse.org/'}
CONFIG_FILE = os.path.join(os.path.expanduser('~'), '.synapseConfig')
SESSION_FILENAME = '.session'
FILE_BUFFER_SIZE = 2*MB
CHUNK_SIZE = 5*MB
QUERY_LIMIT = 1000
CHUNK_UPLOAD_POLL_INTERVAL = 1 # second
ROOT_ENTITY = 'syn4489'
PUBLIC = 273949 #PrincipalId of public "user"
AUTHENTICATED_USERS = 273948
DEBUG_DEFAULT = False
REDIRECT_LIMIT = 5
# Defines the standard retry policy applied to the rest methods
## The retry period needs to span a minute because sending
## messages is limited to 10 per 60 seconds.
STANDARD_RETRY_PARAMS = {"retry_status_codes": [429, 500, 502, 503, 504],
"retry_errors" : ["proxy error", "slow down", "timeout", "timed out",
"connection reset by peer", "unknown ssl protocol error",
"couldn't connect to host", "slowdown", "try again",
"connection reset by peer"],
"retry_exceptions" : ["ConnectionError", "Timeout", "timeout", "ChunkedEncodingError"],
"retries" : 60, #Retries for up to about 30 minutes
"wait" : 1,
"max_wait" : 30,
"back_off" : 2}
# Add additional mimetypes
mimetypes.add_type('text/x-r', '.R', strict=False)
mimetypes.add_type('text/x-r', '.r', strict=False)
mimetypes.add_type('text/tab-separated-values', '.maf', strict=False)
mimetypes.add_type('text/tab-separated-values', '.bed5', strict=False)
mimetypes.add_type('text/tab-separated-values', '.bed', strict=False)
mimetypes.add_type('text/tab-separated-values', '.vcf', strict=False)
mimetypes.add_type('text/tab-separated-values', '.sam', strict=False)
mimetypes.add_type('text/yaml', '.yaml', strict=False)
mimetypes.add_type('text/x-markdown', '.md', strict=False)
mimetypes.add_type('text/x-markdown', '.markdown', strict=False)
def login(*args, **kwargs):
"""
Convenience method to create a Synapse object and login.
See :py:func:`synapseclient.Synapse.login` for arguments and usage.
Example::
import synapseclient
syn = synapseclient.login()
"""
syn = Synapse()
syn.login(*args, **kwargs)
return syn
def _test_import_sftp():
"""
Check if pysftp is installed and give instructions if not.
"""
try:
import pysftp
except ImportError as e1:
sys.stderr.write(
("\n\nLibraries required for SFTP are not installed!\n"
"The Synapse client uses pysftp in order to access SFTP storage "
"locations. This library in turn depends on pycrypto.\n"
"To install these libraries on Unix variants including OS X, make "
"sure the python devel libraries are installed, then:\n"
" (sudo) pip install pysftp\n\n"
"For Windows systems without a C/C++ compiler, install the appropriate "
"binary distribution of pycrypto from:\n"
" http://www.voidspace.org.uk/python/modules.shtml#pycrypto\n\n"
"For more information, see: http://python-docs.synapse.org/sftp.html"
"\n\n\n"))
raise
class Synapse:
"""
Constructs a Python client object for the Synapse repository service
:param repoEndpoint: Location of Synapse repository
:param authEndpoint: Location of authentication service
:param fileHandleEndpoint: Location of file service
:param portalEndpoint: Location of the website
:param serviceTimeoutSeconds: Wait time before timeout (currently unused)
:param debug: Print debugging messages if True
:param skip_checks: Skip version and endpoint checks
:param configPath: Path to config File with setting for Synapse
defaults to ~/.synapseConfig
Typically, no parameters are needed::
import synapseclient
syn = synapseclient.Synapse()
See:
- :py:func:`synapseclient.Synapse.login`
- :py:func:`synapseclient.Synapse.setEndpoints`
"""
def __init__(self, repoEndpoint=None, authEndpoint=None, fileHandleEndpoint=None, portalEndpoint=None,
debug=DEBUG_DEFAULT, skip_checks=False, configPath=CONFIG_FILE):
cache_root_dir = synapseclient.cache.CACHE_ROOT_DIR
# Check for a config file
self.configPath=configPath
if os.path.isfile(configPath):
config = self.getConfigFile(configPath)
if config.has_option('cache', 'location'):
cache_root_dir=config.get('cache', 'location')
if config.has_section('debug'):
debug = True
elif debug:
# Alert the user if no config is found
sys.stderr.write("Could not find a config file (%s). Using defaults." % os.path.abspath(configPath))
self.cache = synapseclient.cache.Cache(cache_root_dir)
self.setEndpoints(repoEndpoint, authEndpoint, fileHandleEndpoint, portalEndpoint, skip_checks)
self.default_headers = {'content-type': 'application/json; charset=UTF-8', 'Accept': 'application/json; charset=UTF-8'}
self.username = None
self.apiKey = None
self.debug = debug
self.skip_checks = skip_checks
self.table_query_sleep = 2
self.table_query_backoff = 1.1
self.table_query_max_sleep = 20
self.table_query_timeout = 300
def getConfigFile(self, configPath):
"""Returns a ConfigParser populated with properties from the user's configuration file."""
try:
config = configparser.ConfigParser()
config.read(configPath) # Does not fail if the file does not exist
return config
except configparser.Error:
sys.stderr.write('Error parsing Synapse config file: %s' % configPath)
raise
def setEndpoints(self, repoEndpoint=None, authEndpoint=None, fileHandleEndpoint=None, portalEndpoint=None, skip_checks=False):
"""
Sets the locations for each of the Synapse services (mostly useful for testing).
:param repoEndpoint: Location of synapse repository
:param authEndpoint: Location of authentication service
:param fileHandleEndpoint: Location of file service
:param portalEndpoint: Location of the website
:param skip_checks: Skip version and endpoint checks
To switch between staging and production endpoints::
syn.setEndpoints(**synapseclient.client.STAGING_ENDPOINTS)
syn.setEndpoints(**synapseclient.client.PRODUCTION_ENDPOINTS)
"""
endpoints = {'repoEndpoint' : repoEndpoint,
'authEndpoint' : authEndpoint,
'fileHandleEndpoint' : fileHandleEndpoint,
'portalEndpoint' : portalEndpoint}
# For unspecified endpoints, first look in the config file
config = self.getConfigFile(self.configPath)
for point in endpoints.keys():
if endpoints[point] is None and config.has_option('endpoints', point):
endpoints[point] = config.get('endpoints', point)
# Endpoints default to production
for point in endpoints.keys():
if endpoints[point] is None:
endpoints[point] = PRODUCTION_ENDPOINTS[point]
# Update endpoints if we get redirected
if not skip_checks:
response = requests.get(endpoints[point], allow_redirects=False, headers=synapseclient.USER_AGENT)
if response.status_code == 301:
endpoints[point] = response.headers['location']
self.repoEndpoint = endpoints['repoEndpoint']
self.authEndpoint = endpoints['authEndpoint']
self.fileHandleEndpoint = endpoints['fileHandleEndpoint']
self.portalEndpoint = endpoints['portalEndpoint']
def login(self, email=None, password=None, apiKey=None, sessionToken=None, rememberMe=False, silent=False, forced=False):
"""
Authenticates the user using the given credentials (in order of preference):
- supplied synapse user name (or email) and password
- supplied email and API key (base 64 encoded)
- supplied session token
- supplied email and cached API key
- most recent cached email and API key
- email in the configuration file and cached API key
- email and API key in the configuration file
- email and password in the configuraton file
- session token in the configuration file
:param email: Synapse user name (or an email address associated with a Synapse account)
:param password: password
:param apiKey: Base64 encoded Synapse API key
:param rememberMe: Whether the authentication information should be cached locally
for usage across sessions and clients.
:param silent: Defaults to False. Suppresses the "Welcome ...!" message.
:param forced: Defaults to False. Bypass the credential cache if set.
Example::
syn.login('me@somewhere.com', 'secret-password', rememberMe=True)
#> Welcome, Me!
After logging in with the *rememberMe* flag set, an API key will be cached and
used to authenticate for future logins::
syn.login()
#> Welcome, Me!
"""
# Note: the order of the logic below reflects the ordering in the docstring above.
# Check version before logging in
if not self.skip_checks: version_check(synapseclient.__version__)
# Make sure to invalidate the existing session
self.logout()
if email is not None and password is not None:
self.username = email
sessionToken = self._getSessionToken(email=self.username, password=password)
self.apiKey = self._getAPIKey(sessionToken)
elif email is not None and apiKey is not None:
self.username = email
self.apiKey = base64.b64decode(apiKey)
elif sessionToken is not None:
try:
self._getSessionToken(sessionToken=sessionToken)
self.username = self.getUserProfile(sessionToken=sessionToken)['userName']
self.apiKey = self._getAPIKey(sessionToken)
except SynapseAuthenticationError:
# Session token is invalid
pass
# If supplied arguments are not enough
# Try fetching the information from the API key cache
if self.apiKey is None and not forced:
cachedSessions = self._readSessionCache()
if email is None and "<mostRecent>" in cachedSessions:
email = cachedSessions["<mostRecent>"]
if email is not None and email in cachedSessions:
self.username = email
self.apiKey = base64.b64decode(cachedSessions[email])
# Resort to reading the configuration file
if self.apiKey is None:
# Resort to checking the config file
config = configparser.ConfigParser()
try:
config.read(self.configPath)
except configparser.Error:
sys.stderr.write('Error parsing Synapse config file: %s' % self.configPath)
raise
if config.has_option('authentication', 'username'):
self.username = config.has_option('authentication', 'username')
if self.username in cachedSessions:
self.apiKey = base64.b64decode(cachedSessions[self.username])
# Just use the configuration file
if self.apiKey is None:
if config.has_option('authentication', 'username') and config.has_option('authentication', 'apikey'):
self.username = config.get('authentication', 'username')
self.apiKey = base64.b64decode(config.get('authentication', 'apikey'))
elif config.has_option('authentication', 'username') and config.has_option('authentication', 'password'):
self.username = config.get('authentication', 'username')
password = config.get('authentication', 'password')
token = self._getSessionToken(email=self.username, password=password)
self.apiKey = self._getAPIKey(token)
elif config.has_option('authentication', 'sessiontoken'):
sessionToken = config.get('authentication', 'sessiontoken')
try:
self._getSessionToken(sessionToken=sessionToken)
self.username = self.getUserProfile(sessionToken=sessionToken)['userName']
self.apiKey = self._getAPIKey(sessionToken)
except SynapseAuthenticationError:
raise SynapseAuthenticationError("No credentials provided. Note: the session token within your configuration file has expired.")
# Final check on login success
if self.apiKey is None:
raise SynapseNoCredentialsError("No credentials provided.")
# Save the API key in the cache
if rememberMe:
cachedSessions = self._readSessionCache()
cachedSessions[self.username] = base64.b64encode(self.apiKey).decode()
# Note: make sure this key cannot conflict with usernames by using invalid username characters
cachedSessions["<mostRecent>"] = self.username
self._writeSessionCache(cachedSessions)
if not silent:
profile = self.getUserProfile(refresh=True)
## TODO-PY3: in Python2, do we need to ensure that this is encoded in utf-8
print("Welcome, %s!\n" % (profile['displayName'] if 'displayName' in profile else self.username))
def _getSessionToken(self, email=None, password=None, sessionToken=None):
"""Returns a validated session token."""
if email is not None and password is not None:
# Login normally
try:
req = {'email' : email, 'password' : password}
session = self.restPOST('/session', body=json.dumps(req), endpoint=self.authEndpoint, headers=self.default_headers)
return session['sessionToken']
except SynapseHTTPError as err:
if err.response.status_code == 403 or err.response.status_code == 404:
raise SynapseAuthenticationError("Invalid username or password.")
raise
elif sessionToken is not None:
# Validate the session token
try:
token = {'sessionToken' : sessionToken}
response = self.restPUT('/session', body=json.dumps(token), endpoint=self.authEndpoint, headers=self.default_headers)
# Success!
return sessionToken
except SynapseHTTPError as err:
if err.response.status_code == 401:
raise SynapseAuthenticationError("Supplied session token (%s) is invalid." % sessionToken)
raise
else:
raise SynapseAuthenticationError("No credentials provided.")
def _getAPIKey(self, sessionToken):
"""Uses a session token to fetch an API key."""
headers = {'sessionToken' : sessionToken, 'Accept': 'application/json'}
secret = self.restGET('/secretKey', endpoint=self.authEndpoint, headers=headers)
return base64.b64decode(secret['secretKey'])
def _readSessionCache(self):
"""Returns the JSON contents of CACHE_DIR/SESSION_FILENAME."""
sessionFile = os.path.join(self.cache.cache_root_dir, SESSION_FILENAME)
if os.path.isfile(sessionFile):
try:
file = open(sessionFile, 'r')
return json.load(file)
except: pass
return {}
def _writeSessionCache(self, data):
"""Dumps the JSON data into CACHE_DIR/SESSION_FILENAME."""
sessionFile = os.path.join(self.cache.cache_root_dir, SESSION_FILENAME)
with open(sessionFile, 'w') as file:
json.dump(data, file)
file.write('\n') # For compatibility with R's JSON parser
def _loggedIn(self):
"""Test whether the user is logged in to Synapse."""
if self.apiKey is None or self.username is None:
return False
try:
user = self.restGET('/userProfile')
if 'displayName' in user:
if user['displayName'] == 'Anonymous':
# No session token, not logged in
return False
return user['displayName']
except SynapseHTTPError as err:
if err.response.status_code == 401:
return False
raise
def logout(self, forgetMe=False):
"""
Removes authentication information from the Synapse client.
:param forgetMe: Set as True to clear any local storage of authentication information.
See the flag "rememberMe" in :py:func:`synapseclient.Synapse.login`.
"""
# Since this client does not store the session token,
# it cannot REST DELETE /session
# Delete the user's API key from the cache
if forgetMe:
cachedSessions = self._readSessionCache()
if self.username in cachedSessions:
del cachedSessions[self.username]
self._writeSessionCache(cachedSessions)
# Remove the authentication information from memory
self.username = None
self.apiKey = None
def invalidateAPIKey(self):
"""Invalidates authentication across all clients."""
# Logout globally
if self._loggedIn():
self.restDELETE('/secretKey', endpoint=self.authEndpoint)
@memoize
def getUserProfile(self, id=None, sessionToken=None, refresh=False):
"""
Get the details about a Synapse user.
Retrieves information on the current user if 'id' is omitted.
:param id: The 'userId' (aka 'ownerId') of a user or the userName
:param sessionToken: The session token to use to find the user profile
:param refresh: If set to True will always fetch the data from Synape otherwise
will used cached information
:returns: JSON-object
Example::
my_profile = syn.getUserProfile()
freds_profile = syn.getUserProfile('fredcommo')
"""
try:
## if id is unset or a userID, this will succeed
id = '' if id is None else int(id)
except (TypeError, ValueError):
if isinstance(id, collections.Mapping) and 'ownerId' in id:
id = id.ownerId
elif isinstance(id, TeamMember):
id = id.member.ownerId
else:
principals = self._findPrincipals(id)
if len(principals) == 1:
id = principals[0]['ownerId']
else:
for principal in principals:
if principal.get('userName', None).lower()==id.lower():
id = principal['ownerId']
break
else: # no break
raise ValueError('Can\'t find user "%s": ' % id)
uri = '/userProfile/%s' % id
return UserProfile(**self.restGET(uri, headers={'sessionToken' : sessionToken} if sessionToken else None))
def _findPrincipals(self, query_string):
"""
Find users or groups by name or email.
:returns: A list of userGroupHeader objects with fields displayName, email, firstName, lastName, isIndividual, ownerId
Example::
syn._findPrincipals('test')
[{u'displayName': u'Synapse Test',
u'email': u'syn...t@sagebase.org',
u'firstName': u'Synapse',
u'isIndividual': True,
u'lastName': u'Test',
u'ownerId': u'1560002'},
{u'displayName': ... }]
"""
## In Python2, urllib.quote expects encoded byte-strings
if six.PY2 and isinstance(query_string, unicode) or isinstance(query_string, str):
query_string = query_string.encode('utf-8')
uri = '/userGroupHeaders?prefix=%s' % quote(query_string)
return [UserGroupHeader(**result) for result in self._GET_paginated(uri)]
def onweb(self, entity, subpageId=None):
"""
Opens up a browser window to the entity page or wiki-subpage.
:param entity: Either an Entity or a Synapse ID
:param subpageId: (Optional) ID of one of the wiki's sub-pages
"""
if isinstance(entity, six.string_types) and os.path.isfile(entity):
entity = self.get(entity, downloadFile=False)
if subpageId is None:
webbrowser.open("%s#!Synapse:%s" % (self.portalEndpoint, id_of(entity)))
else:
webbrowser.open("%s#!Wiki:%s/ENTITY/%s" % (self.portalEndpoint, id_of(entity), subpageId))
def printEntity(self, entity, ensure_ascii=True):
"""Pretty prints an Entity."""
if utils.is_synapse_id(entity):
entity = self._getEntity(entity)
try:
print(json.dumps(entity, sort_keys=True, indent=2, ensure_ascii=ensure_ascii))
except TypeError:
print(str(entity))
############################################################
## Get / Store methods ##
############################################################
def get(self, entity, **kwargs):
"""
Gets a Synapse entity from the repository service.
:param entity: A Synapse ID, a Synapse Entity object,
a plain dictionary in which 'id' maps to a Synapse ID or
a local file that is stored in Synapse (found by hash of file)
:param version: The specific version to get.
Defaults to the most recent version.
:param downloadFile: Whether associated files(s) should be downloaded.
Defaults to True
:param downloadLocation: Directory where to download the Synapse File Entity.
Defaults to the local cache.
:param followLink: Whether the link returns the target Entity.
Defaults to False
:param ifcollision: Determines how to handle file collisions.
May be "overwrite.local", "keep.local", or "keep.both".
Defaults to "keep.both".
:param limitSearch: a Synanpse ID used to limit the search in Synapse if entity is
specified as a local file. That is, if the file is stored in multiple
locations in Synapse only the ones in the specified folder/project will be
returned.
:returns: A new Synapse Entity object of the appropriate type
Example::
## download file into cache
entity = syn.get('syn1906479')
print(entity.name)
print(entity.path)
## download file into current working directory
entity = syn.get('syn1906479', downloadLocation='.')
print(entity.name)
print(entity.path)
## Determine the provenance of a localy stored file as indicated in Synapse
entity = syn.get('/path/to/file.txt', limitSearch='syn12312')
print(syn.getProvenance(entity))
"""
#If entity is a local file determine the corresponding synapse entity
if isinstance(entity, six.string_types) and os.path.isfile(entity):
bundle = self.__getFromFile(entity, kwargs.get('limitSearch', None))
# bundle['path'] = entity
# kwargs['downloadFile']=False
kwargs['downloadFile'] = False
kwargs['path'] = entity
elif isinstance(entity, six.string_types) and not utils.is_synapse_id(entity):
raise SynapseFileNotFoundError(('The parameter %s is neither a local file path '
' or a valid entity id' %entity))
else:
version = kwargs.get('version', None)
bundle = self._getEntityBundle(entity, version)
# Check and warn for unmet access requirements
if len(bundle['unmetAccessRequirements']) > 0:
warning_message = ("\nWARNING: This entity has access restrictions. Please visit the "
"web page for this entity (syn.onweb(\"%s\")). Click the downward "
"pointing arrow next to the file's name to review and fulfill its "
"download requirement(s).\n" % id_of(entity))
if kwargs.get('downloadFile', True):
raise SynapseUnmetAccessRestrictions(warning_message)
warnings.warn(warning_message)
return self._getWithEntityBundle(entityBundle=bundle, entity=entity, **kwargs)
def __getFromFile(self, filepath, limitSearch=None):
"""
Gets a Synapse entityBundle based on the md5 of a local file
See :py:func:`synapseclient.Synapse.get`.
:param filepath: path to local file
:param limitSearch: Limits the places in Synapse where the file is searched for.
"""
results = self.restGET('/entity/md5/%s' %utils.md5_for_file(filepath).hexdigest())['results']
if limitSearch is not None:
#Go through and find the path of every entity found
paths = [self.restGET('/entity/%s/path' %ent['id']) for ent in results]
#Filter out all entities whose path does not contain limitSearch
results = [ent for ent, path in zip(results, paths) if
utils.is_in_path(limitSearch, path)]
if len(results)==0: #None found
raise SynapseFileNotFoundError('File %s not found in Synapse' % (filepath,))
elif len(results)>1:
id_txts = '\n'.join(['%s.%i' %(r['id'], r['versionNumber']) for r in results])
sys.stderr.write('\nWARNING: The file %s is associated with many files in Synapse:\n'
'%s\n'
'You can limit to files in specific project or folder by setting the '
'limitSearch to the synapse Id of the project or folder. \n'
'Will use the first one returned: \n'
'%s version %i\n' %(filepath, id_txts, results[0]['id'], results[0]['versionNumber']))
entity = results[0]
bundle = self._getEntityBundle(entity, version=entity['versionNumber'])
self.cache.add(bundle['entity']['dataFileHandleId'], filepath)
return bundle
def _getWithEntityBundle(self, entityBundle, entity=None, **kwargs):
"""
Creates a :py:mod:`synapseclient.Entity` from an entity bundle returned by Synapse.
An existing Entity can be supplied in case we want to refresh a stale Entity.
:param entityBundle: Uses the given dictionary as the meta information of the Entity to get
:param entity: Optional, entity whose local state will be copied into the returned entity
:param submission: Optional, access associated files through a submission rather than
through an entity.
See :py:func:`synapseclient.Synapse.get`.
See :py:func:`synapseclient.Synapse._getEntityBundle`.
See :py:mod:`synapseclient.Entity`.
"""
# Note: This version overrides the version of 'entity' (if the object is Mappable)
version = kwargs.get('version', None)
downloadFile = kwargs.get('downloadFile', True)
downloadLocation = kwargs.get('downloadLocation', None)
ifcollision = kwargs.get('ifcollision', 'keep.both')
submission = kwargs.get('submission', None)
followLink = kwargs.get('followLink',False)
#If Link, get target ID entity bundle
if entityBundle['entity']['concreteType'] == 'org.sagebionetworks.repo.model.Link' and followLink:
targetId = entityBundle['entity']['linksTo']['targetId']
targetVersion = entityBundle['entity']['linksTo'].get('targetVersionNumber')
entityBundle = self._getEntityBundle(targetId, targetVersion)
## TODO is it an error to specify both downloadFile=False and downloadLocation?
## TODO this matters if we want to return already cached files when downloadFile=False
# Make a fresh copy of the Entity
local_state = entity.local_state() if entity and isinstance(entity, Entity) else {}
if 'path' in kwargs:
local_state['path'] = kwargs['path']
properties = entityBundle['entity']
annotations = from_synapse_annotations(entityBundle['annotations'])
entity = Entity.create(properties, annotations, local_state)
if isinstance(entity, File):
fileName = entity['name']
# Fill in information about the file, even if we don't download it
# Note: fileHandles will be an empty list if there are unmet access requirements
for handle in entityBundle['fileHandles']:
if handle['id'] == entityBundle['entity']['dataFileHandleId']:
entity.md5 = handle.get('contentMd5', '')
entity.fileSize = handle.get('contentSize', None)
entity.contentType = handle.get('contentType', None)
fileName = properties['fileNameOverride'] if 'fileNameOverride' in properties else handle['fileName']
if handle['concreteType'] == 'org.sagebionetworks.repo.model.file.ExternalFileHandle':
entity['externalURL'] = handle['externalURL']
#Determine if storage location for this entity matches the url of the
#project to determine if I should synapseStore it in the future.
#This can fail with a 404 for submissions who's original entity is deleted
try:
storageLocation = self.__getStorageLocation(entity)
entity['synapseStore'] = utils.is_same_base_url(storageLocation.get('url', 'S3'), entity['externalURL'])
except SynapseHTTPError:
warnings.warn("Can't get storage location for entity %s" % entity['id'])
if not downloadFile:
return entity
# Make sure the download location is a fully resolved directory
if downloadLocation is not None:
downloadLocation = os.path.expanduser(downloadLocation)
if os.path.isfile(downloadLocation):
raise ValueError("Parameter 'downloadLocation' should be a directory, not a file.")
# Determine if the file should be downloaded
# downloadPath = None if downloadLocation is None else os.path.join(downloadLocation, fileName)
# if downloadFile:
# downloadFile = cache.local_file_has_changed(entityBundle, True, downloadPath)
# # Determine where the file should be downloaded to
# if downloadFile:
# _, localPath, _ = cache.determine_local_file_location(entityBundle)
cached_file_path = self.cache.get(entityBundle['entity']['dataFileHandleId'], downloadLocation)
# if we found a cached copy, return it
# if downloadFile
# download it
# add it to the cache
if cached_file_path is not None:
fileName = os.path.basename(cached_file_path)
if not downloadLocation:
downloadLocation = os.path.dirname(cached_file_path)
entity.path = utils.normalize_path(os.path.join(downloadLocation, fileName))
entity.files = [fileName]
entity.cacheDir = downloadLocation
else:
downloadPath = utils.normalize_path(os.path.join(downloadLocation, fileName))
if downloadPath != cached_file_path:
if not downloadFile:
## This is a strange case where downloadLocation is
## set but downloadFile=False. Copying files from a
## cached location seems like the wrong thing to do
## in this case.
entity.path = None
entity.files = []
entity.cacheDir = None
else:
## TODO apply ifcollision here
shutil.copy(cached_file_path, downloadPath)
entity.path = downloadPath
entity.files = [os.path.basename(downloadPath)]
entity.cacheDir = downloadLocation
else:
entity.path = downloadPath
entity.files = [os.path.basename(downloadPath)]
entity.cacheDir = downloadLocation
elif downloadFile:
# By default, download to the local cache
if downloadLocation is None:
downloadLocation = self.cache.get_cache_dir(entityBundle['entity']['dataFileHandleId'])
downloadPath = os.path.join(downloadLocation, fileName)
# If the file already exists but has been modified since caching
if os.path.exists(downloadPath):
if ifcollision == "overwrite.local":
pass
elif ifcollision == "keep.local":
downloadFile = False
elif ifcollision == "keep.both":
downloadPath = utils.unique_filename(downloadPath)
else:
raise ValueError('Invalid parameter: "%s" is not a valid value '
'for "ifcollision"' % ifcollision)
objectType = 'FileEntity' if submission is None else 'SubmissionAttachment'
objectId = entity['id'] if submission is None else submission
fileResult = self._getFileHandleDownload(entityBundle['entity']['dataFileHandleId'],
objectId, objectType)
entity['path'] = self._downloadFileHandle(fileResult['preSignedURL'],
downloadPath, fileResult['fileHandle'])
self.cache.add(entityBundle['entity']['dataFileHandleId'], downloadPath)
if 'path' in entity and (entity['path'] is None or not os.path.exists(entity['path'])):
entity['synapseStore'] = False
return entity
def store(self, obj, **kwargs):
"""
Creates a new Entity or updates an existing Entity,
uploading any files in the process.
:param obj: A Synapse Entity, Evaluation, or Wiki
:param used: The Entity, Synapse ID, or URL
used to create the object
:param executed: The Entity, Synapse ID, or URL
representing code executed to create the object
:param activity: Activity object specifying the user's provenance
:param activityName: Activity name to be used in conjunction with *used* and *executed*.
:param activityDescription: Activity description to be used in conjunction with *used* and *executed*.
:param createOrUpdate: Indicates whether the method should automatically perform an update if the 'obj'
conflicts with an existing Synapse object. Defaults to True.
:param forceVersion: Indicates whether the method should increment the version of the object even if
nothing has changed. Defaults to True.
:param versionLabel: Arbitrary string used to label the version.
:param isRestricted: If set to true, an email will be sent to the Synapse access control team
to start the process of adding terms-of-use
or review board approval for this entity.
You will be contacted with regards to the specific data being restricted
and the requirements of access.
:returns: A Synapse Entity, Evaluation, or Wiki
Example::
from synapseclient import Project
project = Project('My uniquely named project')
project = syn.store(project)
Adding files with `provenance <Activity.html>`_::
from synapseclient import File, Activity
## A synapse entity *syn1906480* contains data
## entity *syn1917825* contains code
activity = Activity(
'Fancy Processing',
description='No seriously, really fancy processing',
used=['syn1906480', 'http://data_r_us.com/fancy/data.txt'],
executed='syn1917825')
test_entity = File('/path/to/data/file.xyz', description='Fancy new data', parent=project)
test_entity = syn.store(test_entity, activity=activity)
"""
createOrUpdate = kwargs.get('createOrUpdate', True)
forceVersion = kwargs.get('forceVersion', True)
versionLabel = kwargs.get('versionLabel', None)
isRestricted = kwargs.get('isRestricted', False)
## _before_store hook
## give objects a chance to do something before being stored
if hasattr(obj, '_before_synapse_store'):
obj._before_synapse_store(self)
## _synapse_store hook
## for objects that know how to store themselves
if hasattr(obj, '_synapse_store'):
return obj._synapse_store(self)
# Handle all non-Entity objects
if not (isinstance(obj, Entity) or type(obj) == dict):
if isinstance(obj, Wiki):
return self._storeWiki(obj)
if 'id' in obj: # If ID is present, update
obj.update(self.restPUT(obj.putURI(), obj.json()))
return obj
try: # If no ID is present, attempt to POST the object
obj.update(self.restPOST(obj.postURI(), obj.json()))
return obj
except SynapseHTTPError as err:
# If already present and we want to update attempt to get the object content
if createOrUpdate and err.response.status_code == 409:
newObj = self.restGET(obj.getByNameURI(obj.name))
newObj.update(obj)
obj = obj.__class__(**newObj)
obj.update(self.restPUT(obj.putURI(), obj.json()))
return obj
raise
# If the input object is an Entity or a dictionary
entity = obj
properties, annotations, local_state = split_entity_namespaces(entity)
bundle = None
# Anything with a path is treated as a cache-able item
if entity.get('path', False):
if 'concreteType' not in properties:
properties['concreteType'] = File._synapse_entity_type
# Make sure the path is fully resolved
entity['path'] = os.path.expanduser(entity['path'])
# Check if the File already exists in Synapse by fetching metadata on it
bundle = self._getEntityBundle(entity)
if bundle:
# Check if the file should be uploaded
fileHandle = find_data_file_handle(bundle)
if fileHandle and fileHandle['concreteType'] == "org.sagebionetworks.repo.model.file.ExternalFileHandle":
needs_upload = (fileHandle['externalURL'] != entity['externalURL'])
else:
## Check if we need to upload a new version of an existing
## file. If the file referred to by entity['path'] has been
## modified, we want to upload the new version.
needs_upload = not self.cache.contains(bundle['entity']['dataFileHandleId'], entity['path'])
elif entity.get('dataFileHandleId',None) is not None:
needs_upload = False
else:
needs_upload = True
if needs_upload:
fileLocation, local_state = self.__uploadExternallyStoringProjects(entity, local_state)
fileHandle = self._uploadToFileHandleService(fileLocation,