From b4ec11d1eb0aa212d6fb20d361842bdd070933df Mon Sep 17 00:00:00 2001 From: Rachel Tucker Date: Fri, 26 Mar 2021 15:59:20 -0600 Subject: [PATCH] PYTHONSDK-97: Add functionality to specify a directory in put data call. Implemented basic streaming strategy helpers. Fixed miscelaneous errors, including incorrectly performing a file path normalization on object names in bulk put and get request payloads. --- ds3/ds3.py | 7 +- ds3/ds3Helpers.py | 285 ++++++++++++++++++++++++++++++++++++++++++ ds3/ds3network.py | 6 +- tests/helpersTests.py | 209 +++++++++++++++++++++++++++++++ 4 files changed, 501 insertions(+), 6 deletions(-) create mode 100644 ds3/ds3Helpers.py create mode 100644 tests/helpersTests.py diff --git a/ds3/ds3.py b/ds3/ds3.py index f630399..c264e53 100755 --- a/ds3/ds3.py +++ b/ds3/ds3.py @@ -15,7 +15,6 @@ import os from abc import ABCMeta -import posixpath from .ds3network import * @@ -58,7 +57,7 @@ def __init__(self, name, size): def to_xml(self): xml_object = xmldom.Element('Object') - xml_object.set('Name', posixpath.normpath(self.name)) + xml_object.set('Name', self.name) xml_object.set('Size', str(self.size)) return xml_object @@ -72,7 +71,7 @@ def __init__(self, name, length=None, offset=None, version_id=None): def to_xml(self): xml_object = xmldom.Element('Object') - xml_object.set('Name', posixpath.normpath(self.name)) + xml_object.set('Name', self.name) if self.length is not None: xml_object.set('Length', str(self.length)) if self.offset is not None: @@ -2489,7 +2488,7 @@ def __init__(self): def parseModel(root, model): - if root.tag is 'Data': + if root.tag == 'Data': children = list(root.iter()) if not children: return None diff --git a/ds3/ds3Helpers.py b/ds3/ds3Helpers.py new file mode 100644 index 0000000..eed9f2b --- /dev/null +++ b/ds3/ds3Helpers.py @@ -0,0 +1,285 @@ +# Copyright 2021 Spectra Logic Corporation. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use +# this file except in compliance with the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. +# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +import time +import concurrent.futures +from .ds3 import * +from os import walk, path +from typing import List, Set, Dict + +from platform import system + + +class EmptyReader(object): + @staticmethod + def read(_): + return None + + @staticmethod + def close(): + return + + +class Blob(object): + def __init__(self, name: str, length: int, offset: int): + self.name = name + self.length = length + self.offset = offset + + def __eq__(self, other): + if self.name == other.name and self.length == other.length and self.offset == other.offset: + return True + else: + return False + + def __hash__(self): + return hash((self.name, self.length, self.offset)) + + +class HelperPutObject(object): + def __init__(self, object_name: str, file_path: str, size: int): + self.object_name = object_name + self.file_path = file_path + self.size = size + + def get_data_stream(self, offset: int): + if self.size == 0: + return EmptyReader() + data_stream = open(self.file_path, "rb") + data_stream.seek(offset, 0) + return data_stream + + +class HelperGetObject(object): + def __init__(self, object_name: str, destination_path: str, version_id: str = None): + self.object_name = object_name + self.destination_path = destination_path + self.version_id = version_id + + def get_data_stream(self, offset: int): + landing_dir = os.path.dirname(self.destination_path) + if not os.path.exists(landing_dir): + os.makedirs(name=landing_dir, exist_ok=True) + + fd = os.open(self.destination_path, os.O_CREAT | os.O_WRONLY) + data_stream = os.fdopen(fd, 'wb') + data_stream.seek(offset, 0) + return data_stream + + +def file_path_to_object_store_name(file_path: str) -> str: + if system().lower() == "windows": + return file_path.replace('\\', '/') + return file_path + + +def object_name_to_file_path(object_name: str) -> str: + if system().lower() == "windows": + return object_name.replace('/', '\\') + return object_name + + +class Helper(object): + def __init__(self, client: Client): + self.client = client + + def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threads: int = 5) -> str: + ds3_put_objects: List[Ds3PutObject] = [] + put_objects_map: Dict[str, HelperPutObject] = dict() + for entry in put_objects: + ds3_put_objects.append(Ds3PutObject(name=entry.object_name, size=entry.size)) + put_objects_map[entry.object_name] = entry + + bulk_put = self.client.put_bulk_job_spectra_s3( + PutBulkJobSpectraS3Request(bucket_name=bucket, object_list=ds3_put_objects)) + + job_id = bulk_put.result['JobId'] + + blob_set: Set[Blob] = set() + for chunk in bulk_put.result['ObjectsList']: + for blob in chunk['ObjectList']: + name: str = blob['Name'] + length: int = int(blob['Length']) + offset: int = int(blob['Offset']) + cur_blob = Blob(name=name, length=length, offset=offset) + blob_set.add(cur_blob) + + # send until all blobs have been transferred + while len(blob_set) > 0: + available_chunks = self.client.get_job_chunks_ready_for_client_processing_spectra_s3( + GetJobChunksReadyForClientProcessingSpectraS3Request(job_id)) + + chunks = available_chunks.result['ObjectsList'] + + if len(chunks) <= 0: + time.sleep(available_chunks.retryAfter) + continue + + # retrieve all available blobs concurrently + with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: + for chunk in chunks: + for blob in chunk['ObjectList']: + name: str = blob['Name'] + length: int = int(blob['Length']) + offset: int = int(blob['Offset']) + cur_blob = Blob(name=name, length=length, offset=offset) + + if cur_blob in blob_set: + blob_set.remove(cur_blob) + put_object = put_objects_map[cur_blob.name] + + executor.submit(self.put_blob, bucket, put_object, cur_blob.length, cur_blob.offset, job_id) + + return job_id + + def put_blob(self, bucket: str, put_object: HelperPutObject, length: int, offset: int, job_id: str): + stream = put_object.get_data_stream(offset) + self.client.put_object(PutObjectRequest(bucket_name=bucket, + object_name=put_object.object_name, + length=length, + stream=stream, + offset=offset, + job=job_id)) + stream.close() + + def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per_bp_job: int = 1000, + max_threads: int = 5) -> List[str]: + obj_list: List[HelperPutObject] = list() + job_list: List[str] = list() + for root, dirs, files in walk(top=source_dir): + for name in files: + obj_path = path.join(root, name) + obj_name = file_path_to_object_store_name(path.normpath(path.relpath(path=obj_path, start=source_dir))) + size = os.path.getsize(obj_path) + obj_list.append(HelperPutObject(object_name=obj_name, file_path=obj_path, size=size)) + if len(obj_list) >= objects_per_bp_job: + job_list.append(self.put_objects(obj_list, bucket, max_threads=max_threads)) + obj_list = [] + + for name in dirs: + dir_path = path.join(root, name) + dir_name = file_path_to_object_store_name( + path.join(path.normpath(path.relpath(path=dir_path, start=source_dir)), "")) + obj_list.append(HelperPutObject(object_name=dir_name, file_path=dir_path, size=0)) + if len(obj_list) >= objects_per_bp_job: + job_list.append(self.put_objects(obj_list, bucket, max_threads=max_threads)) + obj_list = [] + + if len(obj_list) > 0: + job_list.append(self.put_objects(obj_list, bucket, max_threads=max_threads)) + + return job_list + + def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threads: int = 5) -> str: + ds3_get_objects: List[Ds3GetObject] = [] + get_objects_map: Dict[str, HelperGetObject] = dict() + for entry in get_objects: + ds3_get_objects.append(Ds3GetObject(name=entry.object_name, version_id=entry.version_id)) + get_objects_map[entry.object_name] = entry + + bulk_get = self.client.get_bulk_job_spectra_s3(GetBulkJobSpectraS3Request(bucket_name=bucket, + object_list=ds3_get_objects)) + + job_id = bulk_get.result['JobId'] + + blob_set: Set[Blob] = set() + for chunk in bulk_get.result['ObjectsList']: + for blob in chunk['ObjectList']: + name: str = blob['Name'] + length: int = int(blob['Length']) + offset: int = int(blob['Offset']) + cur_blob = Blob(name=name, length=length, offset=offset) + blob_set.add(cur_blob) + + # retrieve until all blobs have been transferred + while len(blob_set) > 0: + available_chunks = self.client.get_job_chunks_ready_for_client_processing_spectra_s3( + GetJobChunksReadyForClientProcessingSpectraS3Request(job_id)) + + chunks = available_chunks.result['ObjectsList'] + + if len(chunks) <= 0: + time.sleep(available_chunks.retryAfter) + continue + + # retrieve all available blobs concurrently + with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: + for chunk in chunks: + for blob in chunk['ObjectList']: + name: str = blob['Name'] + length: int = int(blob['Length']) + offset: int = int(blob['Offset']) + cur_blob = Blob(name=name, length=length, offset=offset) + + if cur_blob in blob_set: + blob_set.remove(cur_blob) + get_object = get_objects_map[cur_blob.name] + + executor.submit(self.get_blob, bucket, get_object, offset, job_id) + + return job_id + + def get_blob(self, bucket: str, get_object: HelperGetObject, offset: int, job_id: str): + stream = get_object.get_data_stream(offset) + self.client.get_object(GetObjectRequest(bucket_name=bucket, + object_name=get_object.object_name, + stream=stream, + offset=offset, + job=job_id, + version_id=get_object.version_id)) + stream.close() + + def get_all_files_in_bucket(self, destination_dir: str, bucket: str, objects_per_bp_job: int = 1000, + max_threads: int = 5) -> List[str]: + truncated: str = 'true' + marker = "" + job_ids: List[str] = [] + while truncated.lower() == 'true': + list_bucket = self.client.get_bucket(GetBucketRequest(bucket_name=bucket, + max_keys=objects_per_bp_job, + versions=False, + marker=marker)) + + get_objects: List[HelperGetObject] = [] + for bp_object in list_bucket.result['ContentsList']: + is_latest: str = bp_object['IsLatest'] + if is_latest.lower() != 'true': + # only retrieve the latest version of objects + continue + + object_name: str = bp_object["Key"] + object_destination = os.path.join(destination_dir, object_name_to_file_path(object_name)) + if object_name.endswith('/'): + os.makedirs(object_destination, exist_ok=True) + else: + get_objects.append(HelperGetObject(object_name=object_name, destination_path=object_destination)) + + for bp_object in list_bucket.result['VersionList']: + is_latest: str = bp_object['IsLatest'] + if is_latest.lower() != 'true': + # only retrieve the latest version of objects + continue + + object_name: str = bp_object["Key"] + object_destination = os.path.join(destination_dir, object_name_to_file_path(object_name)) + if object_name.endswith('/'): + os.makedirs(object_destination, exist_ok=True) + else: + get_objects.append(HelperGetObject(object_name=object_name, destination_path=object_destination)) + + if len(get_objects) > 0: + job_id = self.get_objects(get_objects=get_objects, bucket=bucket, max_threads=max_threads) + job_ids.append(job_id) + + truncated = list_bucket.result['IsTruncated'] + marker = list_bucket.result['NextMarker'] + + return job_ids diff --git a/ds3/ds3network.py b/ds3/ds3network.py index addb292..1c8d2f5 100644 --- a/ds3/ds3network.py +++ b/ds3/ds3network.py @@ -217,7 +217,7 @@ def send_request(self, request): headers.update(amz_headers) - if request.body is not None and request.body is not "": + if request.body is not None and request.body != "": canonicalized_amz_header = self.canonicalized_amz_headers(amz_headers) headers['Content-Type'] = 'application/octet-stream' headers['Authorization'] = self.build_authorization(verb=request.http_verb, @@ -261,6 +261,8 @@ def canonicalize_path(self, request_path, query_params): path += '?delete' if 'versioning' in query_params: path += '?versioning=' + str(query_params['versioning']) + if 'versions' in query_params: + path += '?versions=' + str(query_params['versions']) if 'uploads' in query_params: path += '?uploads' if query_params['uploads'] is not None: @@ -289,7 +291,7 @@ def sign(self, key, contents): signer = hmac.new(key.encode('utf-8'), digestmod=sha1) signer.update(contents.encode('utf-8')) digest = signer.digest() - return base64.encodestring(digest).strip().decode('utf-8') + return base64.encodebytes(digest).strip().decode('utf-8') def normalize_string(self, url): return urllib.parse.quote(url) diff --git a/tests/helpersTests.py b/tests/helpersTests.py new file mode 100644 index 0000000..6dff2fd --- /dev/null +++ b/tests/helpersTests.py @@ -0,0 +1,209 @@ +# Copyright 2021 Spectra Logic Corporation. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use +# this file except in compliance with the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. +# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +import unittest +import os +import tempfile +import uuid +import concurrent.futures + +from ds3 import ds3 +from ds3 import ds3Helpers +from typing import List, Dict + +import xml.etree.ElementTree as xmlDom + + +def create_files_in_directory(directory: str, num_files: int, root_dir: str, + include_dirs: bool = True) -> List[ds3Helpers.HelperPutObject]: + put_objects = [] + # create the directory if it doesn't exist + if not os.path.exists(directory): + os.mkdir(path=directory) + if include_dirs: + obj_name = ds3Helpers.file_path_to_object_store_name(os.path.join(os.path.relpath(directory, root_dir), "")) + put_objects.append(ds3Helpers.HelperPutObject(object_name=obj_name, file_path=directory, size=0)) + + # create an empty sub directory + if include_dirs: + dir_path = os.path.join(directory, 'empty-dir') + os.mkdir(path=dir_path) + obj_name = ds3Helpers.file_path_to_object_store_name(os.path.join(os.path.relpath(dir_path, root_dir), "")) + put_objects.append(ds3Helpers.HelperPutObject(object_name=obj_name, file_path=directory, size=0)) + + # create some files + for i in range(num_files): + file_path = os.path.join(directory, f'file-{i}.txt') + f = open(file_path, "a") + f.write(f'I am file number {i}') + f.close() + + obj_name = ds3Helpers.file_path_to_object_store_name(os.path.relpath(file_path, root_dir)) + size = os.path.getsize(file_path) + put_objects.append(ds3Helpers.HelperPutObject(object_name=obj_name, file_path=file_path, size=size)) + + return put_objects + + +class Ds3HelpersTestCase(unittest.TestCase): + def test_file_path_to_object_store_name(self): + self.assertEqual(ds3Helpers.file_path_to_object_store_name(os.path.join("some", "dir", "")), 'some/dir/') + self.assertEqual(ds3Helpers.file_path_to_object_store_name(os.path.join("some", "file")), 'some/file') + + def test_marshaling_put_object_list(self): + dir_obj = ds3.Ds3PutObject(name="dir-0/", size=0) + object_list: List[ds3.Ds3PutObject] = [dir_obj] + xml_object_list = ds3.Ds3PutObjectList(object_list) + to_xml = xml_object_list.to_xml() + result = xmlDom.tostring(to_xml) + self.assertEqual(result, b'') + + @staticmethod + def write_to_stream(i: int, char: str, get_object: ds3Helpers.HelperGetObject): + offset = i * 10 + content = '' + for j in range(10): + content += char + stream = get_object.get_data_stream(offset) + stream.write(bytes(content, 'utf-8')) + stream.close() + + def test_get_object_data_stream(self): + directory = tempfile.TemporaryDirectory(prefix="ds3-python3-sdk-") + file_path = os.path.join(directory.name, "sub-dir", "file.txt") + + get_object = ds3Helpers.HelperGetObject(object_name="file.txt", destination_path=file_path) + + inputs = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'] + expected: str = '' + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + for i in range(len(inputs)): + for j in range(10): + expected += inputs[i] + executor.submit(self.write_to_stream, i, inputs[i], get_object) + + file = open(file_path) + content = file.read() + self.assertEqual(expected, content) + file.close() + + directory.cleanup() + + def test_put_and_get_objects(self): + bucket = f'ds3-python3-sdk-test-{uuid.uuid1()}' + + # create temporary directory with some files + source = tempfile.TemporaryDirectory(prefix="ds3-python3-sdk-src-") + put_objects = create_files_in_directory(directory=source.name, + num_files=10, + root_dir=source.name, + include_dirs=False) + + # create the BP helper and perform the put all objects call + client = ds3.createClientFromEnv() + client.put_bucket_spectra_s3(ds3.PutBucketSpectraS3Request(name=bucket)) + + helpers = ds3Helpers.Helper(client=client) + job_id = helpers.put_objects(bucket=bucket, put_objects=put_objects) + self.assertNotEqual(job_id, "", "job id was returned") + + # verify all the files and directories are on the BP + head_obj = client.head_object(ds3.HeadObjectRequest(bucket_name=bucket, object_name="does-not-exist")) + self.assertEqual(head_obj.result, "DOESNTEXIST") + + for put_object in put_objects: + head_obj = client.head_object(ds3.HeadObjectRequest(bucket_name=bucket, object_name=put_object.object_name)) + self.assertNotEqual(head_obj.result, "DOESNTEXIST") + + # retrieve the files from the BP + destination = tempfile.TemporaryDirectory(prefix="ds3-python3-sdk-dst-") + get_objects: List[ds3Helpers.HelperGetObject] = [] + object_name_to_source: Dict[str, str] = dict() + for put_object in put_objects: + destination_path = os.path.join(destination.name, put_object.object_name) + get_objects.append( + ds3Helpers.HelperGetObject(object_name=put_object.object_name, destination_path=destination_path)) + object_name_to_source[put_object.object_name] = put_object.file_path + + # perform the get objects call + job_id = helpers.get_objects(bucket=bucket, get_objects=get_objects) + self.assertNotEqual(job_id, "", "job id was returned") + + for get_object in get_objects: + original_file = open(object_name_to_source[get_object.object_name], 'rb') + retrieved_file = open(get_object.destination_path, 'rb') + + original_content = original_file.read() + retrieved_content = retrieved_file.read() + self.assertEqual(original_content, retrieved_content) + original_file.close() + retrieved_file.close() + + # cleanup + source.cleanup() + destination.cleanup() + client.delete_bucket_spectra_s3(ds3.DeleteBucketSpectraS3Request(bucket_name=bucket, force=True)) + + def test_put_and_get_all_objects_in_directory(self): + bucket = f'ds3-python3-sdk-test-{uuid.uuid1()}' + + # create temporary directory with some files and sub directories + source = tempfile.TemporaryDirectory(prefix="ds3-python3-sdk-src-") + + put_objects = create_files_in_directory(directory=source.name, num_files=5, root_dir=source.name) + for i in range(2): + sub_dir_path = os.path.join(source.name, f'dir-{i}') + put_objects += create_files_in_directory(directory=sub_dir_path, num_files=2, root_dir=source.name) + for j in range(2): + sub_sub_dir_path = os.path.join(sub_dir_path, f'sub-dir-{j}') + put_objects += create_files_in_directory(directory=sub_sub_dir_path, + num_files=2, + root_dir=source.name) + + # create the BP helper and perform the put all objects call + client = ds3.createClientFromEnv() + client.put_bucket(ds3.PutBucketRequest(bucket_name=bucket)) + + helpers = ds3Helpers.Helper(client=client) + job_ids = helpers.put_all_objects_in_directory(source_dir=source.name, bucket=bucket, objects_per_bp_job=10) + self.assertGreaterEqual(len(job_ids), 1, "received at least one job id") + + # verify all the files and directories are on the BP + for put_object in put_objects: + head_obj = client.head_object(ds3.HeadObjectRequest(bucket_name=bucket, object_name=put_object.object_name)) + self.assertNotEqual(head_obj.result, "DOESNTEXIST") + + # retrieve the objects from the BP + destination = tempfile.TemporaryDirectory(prefix="ds3-python3-sdk-dst-") + job_ids = helpers.get_all_files_in_bucket(destination_dir=destination.name, + bucket=bucket, + objects_per_bp_job=10) + + self.assertGreaterEqual(len(job_ids), 2, "multiple job ids returned") + + # verify all the files and directories were retrieved + for put_object in put_objects: + obj_destination = os.path.join(destination.name, + ds3Helpers.object_name_to_file_path(put_object.object_name)) + if put_object.object_name.endswith('/'): + self.assertTrue(os.path.isdir(obj_destination), f'expected path to be directory: {obj_destination}') + else: + self.assertTrue(os.path.isfile(obj_destination), f'expected path to be file: {obj_destination}') + self.assertEqual(put_object.size, os.path.getsize(obj_destination), 'file size') + + # cleanup + source.cleanup() + destination.cleanup() + client.delete_bucket_spectra_s3(ds3.DeleteBucketSpectraS3Request(bucket_name=bucket, force=True)) + + +if __name__ == '__main__': + unittest.main()