Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions ds3/ds3Helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# specific language governing permissions and limitations under the License.
import concurrent.futures
import hashlib
import logging
import time
import zlib
from os import walk, path
Expand Down Expand Up @@ -124,6 +125,14 @@ def calculate_checksum_header(object_data_stream, checksum_type: str, length: in
return {header_key: encoded_checksum}


def done_callback(future: concurrent.futures.Future):
try:
result = future.result()
logging.info(f'Finished transferring blob name={result[0]}, offset={result[1]}')
except Exception as ex:
logging.error(f'{ex}')


class Helper(object):
"""A class that moves data to and from a Black Pearl"""

Expand Down Expand Up @@ -186,6 +195,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
PutBulkJobSpectraS3Request(bucket_name=bucket, object_list=ds3_put_objects, name=job_name))

job_id = bulk_put.result['JobId']
logging.info(f'Created put job {job_id} with {len(put_objects)} objects.')

blob_set: Set[Blob] = set()
for chunk in bulk_put.result['ObjectsList']:
Expand All @@ -197,6 +207,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
blob_set.add(cur_blob)

# send until all blobs have been transferred
error_count: int = 0
while len(blob_set) > 0:
available_chunks = self.client.get_job_chunks_ready_for_client_processing_spectra_s3(
GetJobChunksReadyForClientProcessingSpectraS3Request(job_id))
Expand All @@ -208,6 +219,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
continue

# retrieve all available blobs concurrently
futures: List[concurrent.futures.Future] = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
for chunk in chunks:
for blob in chunk['ObjectList']:
Expand All @@ -220,8 +232,21 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
blob_set.remove(cur_blob)
put_object = put_objects_map[cur_blob.name]

executor.submit(self.put_blob, bucket, put_object, cur_blob.length, cur_blob.offset, job_id,
checksum_type)
future = executor.submit(self.put_blob, bucket, put_object, cur_blob.length,
cur_blob.offset, job_id, checksum_type)
future.add_done_callback(done_callback)
futures.append(future)

# Wait for all futures to finish
concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
for future in futures:
if future.exception() is not None:
error_count += 1

if error_count > 0:
logging.warning(f'Completed job {job_id} with {error_count} errors.')
else:
logging.info(f'Completed job {job_id} with no errors.')

return job_id

Expand All @@ -244,6 +269,7 @@ def put_blob(self, bucket: str, put_object: HelperPutObject, length: int, offset
job=job_id,
headers=headers))
stream.close()
return put_object.object_name, offset

def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per_bp_job: int = 1000,
max_threads: int = 5, calculate_checksum: bool = False,
Expand Down Expand Up @@ -329,6 +355,7 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
name=job_name))

job_id = bulk_get.result['JobId']
logging.info(f'Created get job {job_id} with {len(get_objects)} objects.')

blob_set: Set[Blob] = set()
for chunk in bulk_get.result['ObjectsList']:
Expand All @@ -340,6 +367,7 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
blob_set.add(cur_blob)

# retrieve until all blobs have been transferred
error_count: int = 0
while len(blob_set) > 0:
available_chunks = self.client.get_job_chunks_ready_for_client_processing_spectra_s3(
GetJobChunksReadyForClientProcessingSpectraS3Request(job_id))
Expand All @@ -351,6 +379,7 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
continue

# retrieve all available blobs concurrently
futures: List[concurrent.futures.Future] = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
for chunk in chunks:
for blob in chunk['ObjectList']:
Expand All @@ -363,7 +392,20 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
blob_set.remove(cur_blob)
get_object = get_objects_map[cur_blob.name]

executor.submit(self.get_blob, bucket, get_object, offset, job_id)
future = executor.submit(self.get_blob, bucket, get_object, offset, job_id)
future.add_done_callback(done_callback)
futures.append(future)

# Wait for all futures to finish
concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
for future in futures:
if future.exception() is not None:
error_count += 1

if error_count > 0:
logging.warning(f'Completed job {job_id} with {error_count} errors.')
else:
logging.info(f'Completed job {job_id} with no errors.')

return job_id

Expand All @@ -376,6 +418,7 @@ def get_blob(self, bucket: str, get_object: HelperGetObject, offset: int, job_id
job=job_id,
version_id=get_object.version_id))
stream.close()
return get_object.object_name, offset

def get_all_files_in_bucket(self, destination_dir: str, bucket: str, objects_per_bp_job: int = 1000,
max_threads: int = 5, job_name: str = None) -> List[str]:
Expand Down
5 changes: 3 additions & 2 deletions tests/helpersTests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

import logging
import unittest
import os
import tempfile
Expand All @@ -21,6 +21,8 @@

import xml.etree.ElementTree as xmlDom

logging.basicConfig(level=logging.INFO)


def create_files_in_directory(directory: str, num_files: int, root_dir: str,
include_dirs: bool = True) -> List[ds3Helpers.HelperPutObject]:
Expand Down Expand Up @@ -253,7 +255,6 @@ def put_all_objects_in_directory_with_checksum(self, checksum_type: str):
# fetch existing storage domain
storage_domain = client.get_storage_domains_spectra_s3(ds3.GetStorageDomainsSpectraS3Request())
storage_domain_id = storage_domain.result['StorageDomainList'][0]['Id']
print("test")

data_persistence_rule = client.put_data_persistence_rule_spectra_s3(
ds3.PutDataPersistenceRuleSpectraS3Request(data_policy_id=data_policy_id, isolation_level='standard',
Expand Down