diff --git a/loomengine/utils/filemanager.py b/loomengine/utils/filemanager.py index e358f81d..7d1b7ed6 100644 --- a/loomengine/utils/filemanager.py +++ b/loomengine/utils/filemanager.py @@ -1,4 +1,3 @@ -import abc import copy import errno import glob @@ -24,6 +23,10 @@ import apiclient.discovery +class Md5ValidationError(Exception): + pass + + logger = logging.getLogger(__name__) def _urlparse(pattern): @@ -31,92 +34,90 @@ def _urlparse(pattern): """ url = urlparse.urlparse(pattern) if not url.scheme: - url = urlparse.urlparse('file://' + os.path.abspath(os.path.expanduser(pattern))) + url = urlparse.urlparse( + 'file://' + os.path.abspath(os.path.expanduser(pattern))) return url -def SourceSet(pattern, settings, retry=False): - """Factory Method that returns a set of Sources matching the given pattern. - Each Source represents one source file to be copied. +def FileSet(pattern, settings, retry=False): + """Factory Method that returns a set of Files matching the given pattern. """ url = _urlparse(pattern) if url.scheme == 'gs': - return GoogleStorageSourceSet(pattern, settings, retry=retry) + return GoogleStorageFileSet(pattern, settings, retry=retry) elif url.scheme == 'file': if url.hostname == 'localhost' or url.hostname is None: - return LocalSourceSet(pattern, settings, retry=retry) + return LocalFileSet(pattern, settings, retry=retry) else: - raise Exception("Cannot process file pattern %s. Remote file hosts not supported." % pattern) + raise Exception('Cannot process file pattern %s. '\ + 'Remote file hosts not supported.' % pattern) else: raise Exception('Cannot recognize file scheme in "%s". '\ - 'Make sure the pattern starts with a supported protocol like gs:// or file://' + 'Make sure the pattern starts with a '\ + 'supported protocol like gs:// or file://' % pattern) -class AbstractSourceSet: - """Creates an iterable set of Sources that match the given pattern. +class AbstractFileSet: + """Creates an iterable set of Files that match the given pattern. Pattern may include wildcards. """ - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod def __init__(self, pattern, settings, retry=False): - pass + raise Exception('Child class must override this method') - @abc.abstractmethod def __iter__(self): - pass + raise Exception('Child class must override this method') -class LocalSourceSet(AbstractSourceSet): - """A set of source files on local storage +class LocalFileSet(AbstractFileSet): + """A set of Files for files on local storage """ def __init__(self, pattern, settings, retry=False): # retry has no effect url = _urlparse(pattern) matches = self._get_matching_files(url.path) - self.sources = [LocalSource('file://' + path, settings, retry=retry) + self.files = [LocalFile('file://' + path, settings, retry=retry) for path in matches] def __iter__(self): - return self.sources.__iter__() + return self.files.__iter__() def _get_matching_files(self, path): all_matches = glob.glob(path) return self._remove_directories(all_matches) def _remove_directories(self, all_matches): - return filter(lambda x: os.path.isfile(x), all_matches) + return filter(lambda path: os.path.isfile(path), all_matches) -class GoogleStorageSourceSet(AbstractSourceSet): - """A set of source files on Google Storage +class GoogleStorageFileSet(AbstractFileSet): + """A set of Files for files on Google Storage """ def __init__(self, pattern, settings, retry=False): self.settings = settings - self.sources = [GoogleStorageSource(pattern, settings, retry=retry)] + self.files = [GoogleStorageFile(pattern, settings, retry=retry)] def __iter__(self): - return self.sources.__iter__() + return self.files.__iter__() # TODO support wildcards with multiple matches -def Source(url, settings, retry=False): +def File(url, settings, retry=False): """Factory method """ parsed_url = _urlparse(url) if parsed_url.scheme == 'gs': - return GoogleStorageSource(url, settings, retry=retry) + return GoogleStorageFile(url, settings, retry=retry) elif parsed_url.scheme == 'file': if parsed_url.hostname == 'localhost' or parsed_url.hostname is None: - return LocalSource(url, settings, retry=retry) + return LocalFile(url, settings, retry=retry) else: raise Exception( "Cannot process file url %s. Remote file hosts not supported." @@ -126,36 +127,63 @@ def Source(url, settings, retry=False): % (parsed_url.scheme, url)) -class AbstractSource: - """A Source represents a single file to be copied. +class AbstractFile: + """A File represents a single file that to be copied to or from, + or otherwise manipulated. """ - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod def __init__(self, url, settings, retry=False): - pass + raise Exception('Child class must override this method') + def copy_to(self, destination, expected_md5=None): + if self.retry or destination.retry: + tries_remaining = 2 + else: + tries_remaining = 1 - def copy_to(self, destination): - copier = Copier(self, destination) - copier.copy() + while True: + try: + copier = Copier(self, destination) + copier.copy() + destination.verify_md5(expected_md5) + break + except Md5ValidationError as e: + logger.info('Copied file did not have the expected md5. '\ + '%s retries remaining' % tries_remaining) + if tries_remaining == 0: + raise + tries_remaining -= 1 + destination.delete() + + def verify_md5(self, expected_md5): + if expected_md5: + md5 = self.calculate_md5() + if md5 != expected_md5: + raise Md5ValidationError( + 'Expected md5 "%s" for file "%s", but found md5 "%s"' + % (expected_md5, self.get_url(), md5)) - @abc.abstractmethod def calculate_md5(self): - pass - - @abc.abstractmethod + raise Exception('Child class must override this method') def get_url(self): - pass - - @abc.abstractmethod + raise Exception('Child class must override this method') + def get_path(self): + raise Exception('Child class must override this method') def get_filename(self): - pass - + raise Exception('Child class must override this method') + def exists(self): + raise Exception('Child class must override this method') + def is_dir(self): + raise Exception('Child class must override this method') + def read(self, content): + raise Exception('Child class must override this method') + def write(self, content): + raise Exception('Child class must override this method') + def delete(self): + raise Exception('Child class must override this method') -class LocalSource(AbstractSource): - """A source file on local storage. +class LocalFile(AbstractFile): + """For files saved on local storage. """ type = 'local' @@ -176,16 +204,26 @@ def get_path(self): def get_filename(self): return os.path.basename(self.get_path()) + def exists(self): + return os.path.exists(self.get_path()) + + def is_dir(self): + return os.path.isdir(self.get_path()) + def read(self): with open(self.get_path()) as f: return f.read() + def write(self, content): + with open(self.get_path(), 'w') as f: + f.write(content) + def delete(self): os.remove(self.get_path()) -class GoogleStorageSource(AbstractSource): - """A source file on Google Storage. +class GoogleStorageFile(AbstractFile): + """For file saved on Google Storage """ type = 'google_storage' @@ -194,6 +232,7 @@ class GoogleStorageSource(AbstractSource): CHUNK_SIZE = 1024*1024*100 def __init__(self, url, settings, retry=False): + self.settings = settings self.url = _urlparse(url) self.retry = retry assert self.url.scheme == 'gs' @@ -201,9 +240,6 @@ def __init__(self, url, settings, retry=False): self.blob_id = self.url.path.lstrip('/') if not self.bucket_id or not self.blob_id: raise Exception('Could not parse url "%s". Be sure to use the format "gs://bucket/blob_id".' % url) - - self.settings = settings - try: self.client = execute_with_retries( lambda: google.cloud.storage.client.Client( @@ -233,10 +269,12 @@ def __init__(self, url, settings, retry=False): self.bucket = self.client.get_bucket(self.bucket_id) self.blob = self.bucket.get_blob(self.blob_id) except HttpAccessTokenRefreshError: - raise Exception('Failed to access bucket "%s". Are you logged in? Try "gcloud auth login"' % self.bucket_id) + raise Exception( + 'Failed to access bucket "%s". Are you logged in? '\ + 'Try "gcloud auth login"' % self.bucket_id) if self.blob is None: - raise Exception('Could not find file %s' - % (self.url.geturl())) + self.blob = google.cloud.storage.blob.Blob( + self.blob_id, self.bucket, chunk_size=self.CHUNK_SIZE) if self.blob.size > self.CHUNK_SIZE: self.blob.chunk_size = self.CHUNK_SIZE @@ -251,145 +289,32 @@ def get_url(self): def get_filename(self): return os.path.basename(self.blob_id) + def exists(self): + return self.blob.exists() + + def is_dir(self): + # No dirs in Google Storage, just blobs. + # Call it a dir if it ends in / + self.url.geturl().endswith('/') + def read(self): tempdir = tempfile.mkdtemp() dest_file = os.path.join(tempdir, self.get_filename()) - self.copy_to(Destination(dest_file, self.settings, retry=retry)) + self.copy_to(File(dest_file, self.settings, retry=retry)) with open(dest_file) as f: text = f.read() os.remove(dest_file) os.rmdir(tempdir) return text - def delete(self): - self.blob.delete() - - -def Destination(url, settings, retry=False): - """Factory method - """ - parsed_url = _urlparse(url) - - if parsed_url.scheme == 'gs': - return GoogleStorageDestination(url, settings, retry=retry) - elif parsed_url.scheme == 'file': - if parsed_url.hostname == 'localhost' or parsed_url.hostname is None: - return LocalDestination(url, settings, retry=retry) - else: - raise Exception("Cannot process file url %s. Remote file hosts not supported." % url) - else: - raise Exception('Unsupported scheme "%s" in file "%s"' % (parsed_url.scheme, url)) - - -class AbstractDestination: - """A Destination represents a path or directory to which a file will be copied - """ - - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod - def __init__(self, url, settings, retry=False): - pass - - @abc.abstractmethod - def get_url(self): - pass - - @abc.abstractmethod - def exists(self): - pass - - @abc.abstractmethod - def is_dir(self): - pass - - @abc.abstractmethod - def write(self, content): - pass - -class LocalDestination(AbstractDestination): - - type = 'local' - - def __init__(self, url, settings, retry=False): - self.url = _urlparse(url) - self.settings = settings - self.retry = retry - - def get_path(self): - return os.path.abspath(os.path.expanduser(self.url.path)) - - def get_url(self): - return 'file://'+ self.get_path() - - def exists(self): - return os.path.exists(self.get_path()) - - def is_dir(self): - return os.path.isdir(self.get_path()) - - def write(self, content): - with open(self.get_path(), 'w') as f: - f.write(content) - - -class GoogleStorageDestination(AbstractDestination): - - type = 'google_storage' - - CHUNK_SIZE = 1024*1024*100 - - def __init__(self, url, settings, retry=False): - self.settings = settings - self.url = _urlparse(url) - self.retry = retry - assert self.url.scheme == 'gs' - self.bucket_id = self.url.hostname - self.blob_id = self.url.path.lstrip('/') - self.client = execute_with_retries( - lambda: google.cloud.storage.client.Client( - self.settings['GCE_PROJECT']), - (Exception,), - logger, - 'Get client') - try: - if self.retry: - self.bucket = execute_with_retries( - lambda: self.client.get_bucket(self.bucket_id), - (Exception,), - logger, - 'Get bucket') - self.blob = execute_with_retries( - lambda: self.bucket.get_blob(self.blob_id), - (Exception,), - logger, - 'Get blob') - else: - self.bucket = self.client.get_bucket(self.bucket_id) - self.blob = self.bucket.get_blob(self.blob_id) - if self.blob: - self.blob.chunk_size = self.CHUNK_SIZE - except HttpAccessTokenRefreshError: - raise Exception('Failed to access bucket "%s". Are you logged in? Try "gcloud auth login"' % self.bucket_id) - if self.blob is None: - self.blob = google.cloud.storage.blob.Blob(self.blob_id, self.bucket, chunk_size=self.CHUNK_SIZE) - - def get_url(self): - return self.url.geturl() - - def exists(self): - return self.blob.exists() - - def is_dir(self): - # No dirs in Google Storage, just blobs. - # Call it a dir if it ends in / - self.url.geturl().endswith('/') - def write(self, content): with tempfile.NamedTemporaryFile('w') as f: f.write(content) f.flush() - Source(f.name, self.settings, retry=self.retry).copy_to(self) + File(f.name, self.settings, retry=self.retry).copy_to(self) + + def delete(self): + self.blob.delete() def Copier(source, destination): @@ -405,21 +330,20 @@ def Copier(source, destination): elif source.type == 'google_storage' and destination.type == 'google_storage': return GoogleStorageCopier(source, destination) else: - raise Exception('Could not find method to copy from source "%s" to destination "%s".' - % (source, destination)) + raise Exception('Could not find method to copy from source '\ + '"%s" to destination "%s".' % (source, destination)) class AbstractCopier: - __metaclass__ = abc.ABCMeta - def __init__(self, source, destination, retry=False): + def __init__(self, source, destination, retry=False, expected_md5=None): self.source = source self.destination = destination + self.expected_md5=expected_md5 + self.retry = self.source.retry or self.destination.retry - @abc.abstractmethod def copy(self, path): - pass - + raise Exception('Child class must override method') class LocalCopier(AbstractCopier): @@ -438,10 +362,9 @@ def copy(self): class GoogleStorageCopier(AbstractCopier): def copy(self): - # retry has no effect rewrite_token = None while True: - if self.source.retry or self.destination.retry: + if self.retry: rewrite_token, rewritten, size = execute_with_retries( lambda: self.destination.blob.rewrite( self.source.blob, token=rewrite_token), @@ -460,7 +383,7 @@ def copy(self): class Local2GoogleStorageCopier(AbstractCopier): def copy(self): - if self.source.retry or self.destination.retry: + if self.retry: execute_with_retries( lambda: self.destination.blob.upload_from_filename( self.source.get_path()), @@ -484,7 +407,7 @@ def copy(self): 'Failed to create local directory "%s": "%s"' % (os.path.dirname(self.destination.get_path()), e)) - if self.source.retry or self.destination.retry: + if self.retry: execute_with_retries( lambda: self.source.blob.download_to_filename( self.destination.get_path()), @@ -505,7 +428,7 @@ def __init__(self, master_url): def import_from_patterns(self, patterns, comments, original_copy=False, force_duplicates=False, retry=False): - + files = [] for pattern in patterns: files.extend(self.import_from_pattern( @@ -516,7 +439,7 @@ def import_from_patterns(self, patterns, comments, original_copy=False, def import_from_pattern(self, pattern, comments, original_copy=False, force_duplicates=False, retry=False): files = [] - for source in SourceSet(pattern, self.settings, retry=retry): + for source in FileSet(pattern, self.settings, retry=retry): files.append(self.import_file( source.get_url(), comments, @@ -528,7 +451,7 @@ def import_from_pattern(self, pattern, comments, original_copy=False, def import_file(self, source_url, comments, original_copy=False, force_duplicates=False, retry=False): - source = Source(source_url, self.settings, retry=retry) + source = File(source_url, self.settings, retry=retry) try: if original_copy: data_object = self._create_file_data_object_from_original_copy( @@ -615,7 +538,7 @@ def _verify_no_file_duplicates(self, filename, md5): def import_result_file(self, task_attempt_output, source_url, retry=False): logger.info('Calculating md5 on file "%s"...' % source_url) - source = Source(source_url, self.settings, retry=retry) + source = File(source_url, self.settings, retry=retry) md5 = source.calculate_md5() task_attempt_output = self._create_task_attempt_output_file( task_attempt_output, md5, source.get_filename()) @@ -643,7 +566,7 @@ def import_result_file_list(self, task_attempt_output, source_url_list, filename_list = [] for source_url in source_url_list: logger.info('Calculating md5 on file "%s"...' % source_url) - source = Source(source_url, self.settings, retry=retry) + source = File(source_url, self.settings, retry=retry) md5_list.append(source.calculate_md5()) filename_list.append(source.get_filename()) task_attempt_output = self._create_task_attempt_output_file_array( @@ -651,7 +574,7 @@ def import_result_file_list(self, task_attempt_output, source_url_list, data_object_array = task_attempt_output['data']['contents'] imported_data_objects = [] for (source_url, data_object) in zip(source_url_list, data_object_array): - source = Source(source_url, self.settings, retry=retry) + source = File(source_url, self.settings, retry=retry) imported_data_objects.append( self._execute_file_import(data_object, source, retry=retry)) return imported_data_objects @@ -681,7 +604,7 @@ def import_log_file(self, task_attempt, source_url, retry=False): task_attempt['uuid'], {'log_name': log_name}) logger.info('Calculating md5 on file "%s"...' % source_url) - source = Source(source_url, self.settings, retry=retry) + source = File(source_url, self.settings, retry=retry) md5 = source.calculate_md5() data_object = self.\ @@ -703,7 +626,7 @@ def _execute_file_import(self, file_data_object, source, retry=False): logger.info(' server already has the file. Skipping upload.') return file_data_object try: - destination = Destination( + destination = File( file_data_object['value']['file_url'], self.settings, retry=retry) @@ -739,7 +662,7 @@ def _set_upload_status(self, file_data_object, upload_status): def export_files(self, file_ids, destination_url=None, retry=False): if destination_url is not None and len(file_ids) > 1: # destination must be a directory - if not Destination(destination_url, self.settings, retry=retry).is_dir(): + if not File(destination_url, self.settings, retry=retry).is_dir(): raise Exception( 'Destination must be a directory if multiple files '\ 'are exported. "%s" is not a directory.' @@ -762,7 +685,7 @@ def export_file(self, file_id, destination_url=None, if not destination_url: destination_url = os.getcwd() - if Destination(destination_url, self.settings, retry=retry).is_dir(): + if File(destination_url, self.settings, retry=retry).is_dir(): # Filename not given with destination_url. We get it from inputs # or from the object specified by file_id if not destination_filename: @@ -777,7 +700,7 @@ def export_file(self, file_id, destination_url=None, # Filename is included with destination_url destination_file_url = destination_url - destination = Destination(destination_file_url, self.settings, retry=retry) + destination = File(destination_file_url, self.settings, retry=retry) if destination.exists(): raise FileAlreadyExistsError('File already exists at %s' % destination_url) @@ -787,17 +710,20 @@ def export_file(self, file_id, destination_url=None, destination.get_url())) # Copy from the first file location + file_resource = data_object.get('value') + md5 = file_resource.get('md5') source_url = data_object['value']['file_url'] - Source(source_url, self.settings, retry=retry).copy_to(destination) + File(source_url, self.settings, retry=retry).copy_to( + destination, expected_md5=md5) logger.info('...finished exporting file') def read_file(self, url, retry=False): - source = Source(url, self.settings, retry=retry) + source = File(url, self.settings, retry=retry) return source.read(), source.get_url() def write_to_file(self, url, content, retry=False): - destination = Destination(url, self.settings, retry=retry) + destination = File(url, self.settings, retry=retry) if destination.exists(): raise FileAlreadyExistsError('File already exists at %s' % url) destination.write(content) @@ -806,7 +732,7 @@ def normalize_url(self, url): return _urlparse(url).geturl() def calculate_md5(self, url, retry=False): - return Source(url, self.settings, retry=retry).calculate_md5() + return File(url, self.settings, retry=retry).calculate_md5() def verify_no_template_duplicates(self, template): md5 = template.get('md5') @@ -834,7 +760,7 @@ def get_destination_file_url(self, requested_destination, default_name, """ if requested_destination is None: destination = os.path.join(os.getcwd(), default_name) - elif Destination(requested_destination, self.settings, retry=retry).is_dir(): + elif File(requested_destination, self.settings, retry=retry).is_dir(): destination = os.path.join(requested_destination, default_name) else: # Don't modify a file destination specified by the user