Skip to content

Commit

Permalink
lots of refactoring for download and upload
Browse files Browse the repository at this point in the history
  • Loading branch information
zimingd committed Jul 22, 2017
1 parent 9d340fc commit 5452632
Show file tree
Hide file tree
Showing 13 changed files with 200 additions and 205 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Expand Up @@ -10,7 +10,8 @@ script:
- 'if [ "${TRAVIS_PULL_REQUEST}" = "false" ]; then nosetests -vs tests/integration; else echo "Skipping integration tests"; fi'
before_install:
- 'if [ "${TRAVIS_PULL_REQUEST}" = "false" ]; then openssl aes-256-cbc -K $encrypted_d17283647768_key -iv $encrypted_d17283647768_iv
-in test.synapseConfig.enc -out test.synapseConfig -d; mv test.synapseConfig ~/.synapseConfig; fi'
-in test.synapseConfig.enc -out test.synapseConfig -d; mv test.synapseConfig ~/.synapseConfig; openssl aes-256-cbc -K $encrypted_d17283647768_key -iv $encrypted_d17283647768_iv
-in test.awscredentials.enc -out test.awscredentials -d; mkdir -p ~/.aws && mv test.awscredentials ~/.aws/credentials;fi'
- travis_wait pip install pysftp cython pandas
addons:
ssh_known_hosts: ec2-54-212-85-156.us-west-2.compute.amazonaws.com
184 changes: 58 additions & 126 deletions synapseclient/client.py

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion synapseclient/concrete_types.py
Expand Up @@ -6,4 +6,7 @@
SYNAPSE_S3_UPLOAD_DESTINATION = 'org.sagebionetworks.repo.model.file.S3UploadDestination'
EXTERNAL_UPLOAD_DESTINATION = 'org.sagebionetworks.repo.model.file.ExternalUploadDestination'
EXTERNAL_S3_UPLOAD_DESTINATION = 'org.sagebionetworks.repo.model.file.ExternalS3UploadDestination'
EXTERNAL_OBJECT_STORE_UPLOAD_DESTINATION = 'org.sagebionetworks.repo.model.file.ExternalObjectStoreUploadDestination'
EXTERNAL_OBJECT_STORE_UPLOAD_DESTINATION = 'org.sagebionetworks.repo.model.file.ExternalObjectStoreUploadDestination'

#Concrete types for FileHandles
EXTERNAL_OBJECT_STORE_FILE_HANDLE = "org.sagebionetworks.repo.model.file.ExternalObjectStoreFileHandle"
25 changes: 25 additions & 0 deletions synapseclient/download_functions.py
@@ -0,0 +1,25 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import os
from .utils import is_url, md5_for_file
from . import concrete_types
import sys
from .remote_file_connection import ClientS3Connection
from .multipart_upload import multipart_upload
try:
from urllib.parse import urlparse
from urllib.parse import urlunparse
from urllib.parse import quote
from urllib.parse import unquote
from urllib.request import urlretrieve
except ImportError:
from urlparse import urlparse
from urlparse import urlunparse
from urllib import quote
from urllib import unquote
from urllib import urlretrieve


72 changes: 37 additions & 35 deletions synapseclient/remote_file_connection.py
Expand Up @@ -9,44 +9,38 @@
from abc import ABCMeta, abstractmethod
import os
import boto3
import time
from .utils import printTransferProgress

#TODO: RENAME
@add_metaclass(ABCMeta)
class RemoteFileConnection():

@abstractmethod
def downloadFile(self, remotePath, downloadFilePath):
pass

@abstractmethod
def uploadFile(self, remotePath, uploadFilePath):
pass

# @classmethod
# def createConnection(self, storageLocationSetting):
# class_constructor = _storageLocationSetting_type_to_Connection.get(storageLocationSetting['concreteType'], None)
# if class_constructor is None:
# raise NotImplementedError("The client delegated connection for concreteType [%s] is not currently supported." % storageLocationSetting['concreteType'])
# return class_constructor(storageLocationSetting)


#TODO: RENAME
class ClientS3Connection(RemoteFileConnection):

#TODO: perhaps use a @meomoize __new__() instead of init so we dont create the same connection multiple times

#TODO: Implement the basic lower level download/upload first but then look at hwo to integrate into python client and change object model accordingly
#TODO:
#TODO: acept aws profile names
class ClientS3Connection:

@staticmethod
def downloadFile(bucket, endpoint_url, remote_file_key, download_file_path, access_key_id = None, secret_access_key = None):
def download_file(bucket, endpoint_url, remote_file_key, download_file_path, profile_name = None, show_progress=True):
# boto does not check for file existence and will overwrite the file if it already exists
if os.path.exists(download_file_path):
raise ValueError("The file: [%s] already exists", download_file_path)

s3 = boto3.resource('s3', endpoint_url=endpoint_url, aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key)
boto_session = boto3.session.Session(profile_name=profile_name)
s3 = boto_session.resource('s3', endpoint_url=endpoint_url)

try:
s3.Bucket(bucket).download_file(remote_file_key, download_file_path)
s3_obj = s3.Object(bucket, remote_file_key)

progress_callback = None
if show_progress:
s3_obj.load()
file_size = s3_obj.content_length
# TODO: does the lambda only resolve the time.time() once or multiple times????
t0 = time.time()
filename = os.path.basename(download_file_path)
print(filename)
progress_callback = lambda bytes_downloaded: printTransferProgress(bytes_downloaded, file_size,
prefix='Downloading', postfix=filename,
dt=time.time() - t0,
previouslyTransferred=0)
s3_obj.download_file(download_file_path, Callback=progress_callback)
return download_file_path
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
Expand All @@ -55,12 +49,20 @@ def downloadFile(bucket, endpoint_url, remote_file_key, download_file_path, acce
raise

@staticmethod
def uploadFile(bucket, endpoint_url, remote_file_key, upload_file_path, access_key_id = None, secret_access_key = None):
def upload_file(bucket, endpoint_url, remote_file_key, upload_file_path, profile_name = None, show_progress=True):
if not os.path.isfile(upload_file_path):
raise ValueError("The path: [%s] does not exist or is not a file", upload_file_path)

s3 = boto3.resource('s3', endpoint_url=endpoint_url, aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key)
s3.Bucket(bucket).upload_file(upload_file_path, remote_file_key) #automatically determines whether to perform multi-part upload
return upload_file_path
boto_session = boto3.session.Session(profile_name=profile_name)
s3 = boto_session.resource('s3', endpoint_url=endpoint_url)

progress_callback = None
if show_progress:
file_size = os.stat(upload_file_path).st_size
filename = os.path.basename(upload_file_path)
t0 = time.time()
#TODO: does the lambda only resolve the time.time() once or multiple times????
progress_callback = lambda bytes_uploaded: printTransferProgress(bytes_uploaded, file_size, prefix='Uploading', postfix=filename, dt=time.time() - t0, previouslyTransferred=0)

s3.Bucket(bucket).upload_file(upload_file_path, remote_file_key, Callback=progress_callback) #automatically determines whether to perform multi-part upload
return upload_file_path
63 changes: 34 additions & 29 deletions synapseclient/upload_functions.py
Expand Up @@ -3,11 +3,7 @@
from __future__ import print_function
from __future__ import unicode_literals

from six import add_metaclass

from abc import ABCMeta, abstractmethod
import os
import mimetypes
from .utils import is_url, md5_for_file
from . import concrete_types
import sys
Expand All @@ -26,28 +22,36 @@
from urllib import unquote
from urllib import urlretrieve


# TODO: replace uploadExternallyStoringProjects make this a factory elsewhere
#TODO: documentation
def upload_file(syn, entity_parent_id, local_state):
"""Uploads the file set in the local_state['path'] (if necessary) to a storage location based on project settings.
Returns the FileHandle to represent the stored file.
:param entity_parent_id: parent id of the entity to which we upload.
:param local_state: local state of the entity
:returns: a dict of a file handle that represents the uploaded file
"""
if '_file_handle' not in local_state:
local_state['_file_handle'] = {}

local_state_file_handle = local_state['_file_handle']

#TODO: what to do with localstate?? just use entity instead idk

# if doing a external file handle with no actual upload
if not local_state['synapseStore']:
if local_state_file_handle.get('externalURL', None):
return create_external_file_handle(syn, local_state_file_handle['externalUrl'], local_state['contentType'], )
return create_external_file_handle(syn, local_state_file_handle['externalUrl'], local_state['contentType'])
elif is_url(local_state['path']):
local_state_file_handle['externalURL'] = local_state['path']
# If the url is a local path compute the md5
url = urlparse(local_state['path'])
if os.path.isfile(url.path) and url.scheme == 'file':
local_state_file_handle['contentMd5'] = md5_for_file(url.path).hexdigest()
return create_external_file_handle()
return create_external_file_handle(syn, local_state['path'], local_state['contentType'])

expanded_upload_path = os.path.expandvars(os.path.expanduser(local_state['path']))

#determine the upload function based on the UploadDestination
location = syn._getDefaultUploadDestination(entity_parent_id)
upload_destination_type = location['concreteType']
# synapse managed S3
Expand All @@ -56,50 +60,51 @@ def upload_file(syn, entity_parent_id, local_state):
storageString = 'Synapse' if upload_destination_type == concrete_types.SYNAPSE_S3_UPLOAD_DESTINATION else 'your external S3'
sys.stdout.write('\n' + '#' * 50 + '\n Uploading file to ' + storageString + ' storage \n' + '#' * 50 + '\n')

return upload_synapse_s3(syn, local_state['path'], location['storageLocationId'])
return upload_synapse_s3(syn, expanded_upload_path, location['storageLocationId'])
#external file handle (sftp)
elif upload_destination_type == concrete_types.EXTERNAL_UPLOAD_DESTINATION:
if location['uploadType'] == 'SFTP':
sys.stdout.write('\n%s\n%s\nUploading to: %s\n%s\n' % ('#' * 50,
location.get('banner', ''),
urlparse(location['url']).netloc,
'#' * 50))
return upload_external_file_handle_sftp(syn, local_state['path'], location['url'])
return upload_external_file_handle_sftp(syn, expanded_upload_path, location['url'])
else:
raise NotImplementedError('Can only handle SFTP upload locations.')
#client authenticated S3
elif upload_destination_type == concrete_types.EXTERNAL_OBJECT_STORE_UPLOAD_DESTINATION:
return upload_client_auth_s3(syn, local_state['path'], location['bucket'], location['endpointUrl'], location['keyPrefixUUID'], location['storageLocationId'])
sys.stdout.write('\n%s\n%s\nUploading to endpoint: [%s] bucket: [%s]\n%s\n' % ('#' * 50,
location.get('banner', ''),
location.get('endpointUrl'),
location.get('bucket'),
'#' * 50))
return upload_client_auth_s3(syn, expanded_upload_path, location['bucket'], location['endpointUrl'], location['keyPrefixUUID'], location['storageLocationId'])

def create_external_file_handle(syn, file_path_or_url, mimetype, md5, file_size):
#does nothing on purpose because there is nothing to upload

def create_external_file_handle(syn, file_path_or_url, mimetype=None, md5=None, file_size=None):
#just creates the file handle because there is nothing to upload
return syn._create_ExternalFileHandle(file_path_or_url, mimetype=mimetype, md5=md5, fileSize=file_size)


def upload_external_file_handle_sftp(syn, file_path, sftp_url, md5, file_size):
def upload_external_file_handle_sftp(syn, file_path, sftp_url):
uploaded_url = syn._sftpUploadFile(file_path, unquote(sftp_url))

return syn._create_ExternalFileHandle(uploaded_url, md5=md5_for_file(file_path).hexdigest(), fileSize=os.stat(file_path).st_size)

def upload_synapse_s3(syn, file_path, storageLocationId, mimetype=None):

def upload_synapse_s3(syn, file_path, storageLocationId=None, mimetype=None):
file_handle_id = multipart_upload(syn, file_path, contentType=mimetype, storageLocationId=storageLocationId)

syn.cache.add(file_handle_id, file_path)
return syn._getFileHandle(file_handle_id)


def upload_client_auth_s3(syn, file_path, bucket, endpoint_url, key_prefix, storage_location_id):
file_key = key_prefix + '/' + os.path.basename(file_path)
ClientS3Connection.uploadFile(bucket, endpoint_url, file_key, file_path)

#TODO: move into helper function?
mimetype, enc = mimetypes.guess_type(file_path, strict=False)
file_handle = {'concreteType': 'org.sagebionetworks.repo.model.file.ExternalObjectStoreFileHandle',
'fileName': os.path.basename(file_path),
'contentMd5': md5_for_file(file_path).hexdigest(),
'contentSize': os.stat(file_path).st_size,
'storageLocationId': storage_location_id,
'contentType': mimetype}

file_handle = syn._POST_ExternalFileHandleInterface(file_handle)
profile = syn._get_client_authenticated_s3_profile(endpoint_url, bucket)
ClientS3Connection.upload_file(bucket, endpoint_url, file_key, file_path, profile_name=profile)

file_handle = syn._create_ExternalObjectStoreFileHandle(file_key, file_path,storage_location_id)

syn.cache.add(file_handle['id'], file_path)
return file_handle
2 changes: 1 addition & 1 deletion synapseclient/utils.py
Expand Up @@ -641,7 +641,7 @@ def printTransferProgress(transferred, toBeTransferred, prefix = '', postfix='',
"""
if not sys.stdout.isatty():
return
pass#return
barLength = 20 # Modify this to change the length of the progress bar
status = ''
rate = ''
Expand Down
3 changes: 3 additions & 0 deletions test.awscredentials.enc
@@ -0,0 +1,3 @@
�R�^�
�m��XRv ��dA��ީ��'\:�U9t���%$7@ox���esH�&��)583�7,T ��M���1�� C1��
�d��,َ`�.s[���I=˗{�����`��)�l\�J]lNj�ɢ ���L<~�"�7��
27 changes: 24 additions & 3 deletions tests/integration/integration_test.py
Expand Up @@ -222,7 +222,7 @@ def test_uploadFileEntity():
entity = {'name' : 'fooUploadFileEntity', \
'description' : 'A test file entity', \
'parentId' : project['id']}
entity = syn.uploadFile(entity, fname)
entity = syn.upload_file(entity, fname)

# Download and verify
entity = syn.downloadEntity(entity)
Expand All @@ -240,7 +240,7 @@ def test_uploadFileEntity():
schedule_for_cleanup(fname)

# Update existing FileEntity
entity = syn.uploadFile(entity, fname)
entity = syn.upload_file(entity, fname)

# Download and verify that it is the same file
entity = syn.downloadEntity(entity)
Expand Down Expand Up @@ -469,4 +469,25 @@ def test_getChildren():

expected_id_set = {project_file.id, folder.id}
children_id_set = { x['id'] for x in syn.getChildren(test_project.id)}
assert_equals(expected_id_set, children_id_set)
assert_equals(expected_id_set, children_id_set)

def test_ExternalObjectStore_roundtrip():
#TODO: remove after changes in prod
syn.setEndpoints(**synapseclient.client.STAGING_ENDPOINTS)

import uuid
proj = syn.store(Project(name=str(uuid.uuid4()) + "ExternalObjStoreProject"))

storage_location = syn._create_ExternalObjectStorageLocationSetting("https://s3.amazonaws.com","test-client-auth-s3")
syn._set_container_storage_location(proj, storage_location['storageLocationId'])

file_path = utils.make_bogus_data_file()

file_entity = File(file_path, name="TestName", parent=proj)
file_entity = syn.store(file_entity)

syn.cache.purge(time.time())
file_entity_downloaded = syn.get(file_entity['id'])

assert_not_equal(utils.normalize_path(file_path), utils.normalize_path(file_entity_downloaded['path']))
assert filecmp.cmp(file_path, file_entity_downloaded['path'])
9 changes: 5 additions & 4 deletions tests/integration/integration_test_Entity.py
Expand Up @@ -19,6 +19,7 @@
import synapseclient
from synapseclient import Activity, Project, Folder, File, Link, DockerRepository
from synapseclient.exceptions import *
from synapseclient.upload_functions import create_external_file_handle, upload_synapse_s3

import integration
from nose.tools import assert_false, assert_equals
Expand Down Expand Up @@ -150,7 +151,7 @@ def test_Entity():
# Upload a new File and verify
new_path = utils.make_bogus_data_file()
schedule_for_cleanup(new_path)
a_file = syn.uploadFile(a_file, new_path)
a_file = syn.upload_file(a_file, new_path)
a_file = syn.downloadEntity(a_file)
assert filecmp.cmp(new_path, a_file.path)
assert a_file.versionNumber == 2
Expand Down Expand Up @@ -522,8 +523,8 @@ def test_download_local_file_URL_path():
path = utils.make_bogus_data_file()
schedule_for_cleanup(path)

filehandle = syn._uploadToFileHandleService(path, synapseStore=False,
mimetype=None, fileSize=None)
filehandle = create_external_file_handle(syn, path,
mimetype=None, file_size=None)

localFileEntity = syn.store(File(dataFileHandleId=filehandle['id'], parent=project))
e = syn.get(localFileEntity.id)
Expand All @@ -542,7 +543,7 @@ def test_store_file_handle_update_metadata():
#create file handle to replace the old one
replacement_file_path = utils.make_bogus_data_file()
schedule_for_cleanup(replacement_file_path)
new_file_handle = syn._uploadToFileHandleService(replacement_file_path, synapseStore=True)
new_file_handle = upload_synapse_s3(syn, replacement_file_path)

entity.dataFileHandleId = new_file_handle['id']
new_entity = syn.store(entity)
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/test_tables.py
Expand Up @@ -26,6 +26,7 @@
from synapseclient.exceptions import *
from synapseclient import File, Folder, Schema, EntityViewSchema
from synapseclient.table import Column, RowSet, Row, as_table_columns, Table
from synapseclient.upload_functions import upload_synapse_s3

import integration
from integration import schedule_for_cleanup
Expand Down Expand Up @@ -462,7 +463,7 @@ def test_download_table_files():
path = utils.make_bogus_data_file()
original_files.append(path)
schedule_for_cleanup(path)
file_handle = syn._uploadToFileHandleService(path)
file_handle = upload_synapse_s3(syn, path)
row[4] = file_handle['id']

row_reference_set = syn.store(RowSet(columns=cols, schema=schema, rows=[Row(r) for r in data]))
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/test_wikis.py
Expand Up @@ -12,6 +12,7 @@
import synapseclient.utils as utils
from synapseclient.exceptions import *
from synapseclient import Project, File, Wiki, Activity, Evaluation
from synapseclient.upload_functions import upload_synapse_s3

import integration
from integration import schedule_for_cleanup
Expand All @@ -32,7 +33,7 @@ def test_wikiAttachment():
attachname = utils.make_bogus_data_file()
schedule_for_cleanup(filename)
schedule_for_cleanup(attachname)
fileHandle = syn._uploadToFileHandleService(filename)
fileHandle = upload_synapse_s3(syn, filename)

# Create and store a Wiki
# The constructor should accept both file handles and file paths
Expand Down

0 comments on commit 5452632

Please sign in to comment.