diff --git a/ds3/ds3Helpers.py b/ds3/ds3Helpers.py index e2165c9..fd1023a 100644 --- a/ds3/ds3Helpers.py +++ b/ds3/ds3Helpers.py @@ -150,7 +150,7 @@ def get_checksum_type(self, bucket_name: str) -> str: return policy_response.result['ChecksumType'] def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threads: int = 5, - calculate_checksum: bool = False) -> str: + calculate_checksum: bool = False, job_name: str = None) -> str: """ Puts a list of objects to a Black Pearl bucket. @@ -168,6 +168,8 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa if the client and BP checksums do not match. Note that calculating the checksum is processor intensive, and it also requires two reads of the object (first to calculate checksum, and secondly to send the data). The type of checksum calculated is determined by the data policy associated with the bucket. + job_name : str + The name to give the BP put job. """ # If calculating checksum, then determine the checksum type from the data policy checksum_type = None @@ -181,7 +183,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa put_objects_map[entry.object_name] = entry bulk_put = self.client.put_bulk_job_spectra_s3( - PutBulkJobSpectraS3Request(bucket_name=bucket, object_list=ds3_put_objects)) + PutBulkJobSpectraS3Request(bucket_name=bucket, object_list=ds3_put_objects, name=job_name)) job_id = bulk_put.result['JobId'] @@ -244,7 +246,8 @@ def put_blob(self, bucket: str, put_object: HelperPutObject, length: int, offset stream.close() def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per_bp_job: int = 1000, - max_threads: int = 5, calculate_checksum: bool = False) -> List[str]: + max_threads: int = 5, calculate_checksum: bool = False, + job_name: str = None) -> List[str]: """ Puts all files and subdirectories to a Black Pearl bucket. @@ -267,6 +270,8 @@ def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per and BP checksums do not match. Note that calculating the checksum is processor intensive, and it also requires two reads of the object (first to calculate checksum, and secondly to send the data). The type of checksum calculated is determined by the data policy associated with the bucket. + job_name : str + The name to give the BP put jobs. All BP jobs that are created will have the same name. """ obj_list: List[HelperPutObject] = list() job_list: List[str] = list() @@ -277,8 +282,8 @@ def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per size = os.path.getsize(obj_path) obj_list.append(HelperPutObject(object_name=obj_name, file_path=obj_path, size=size)) if len(obj_list) >= objects_per_bp_job: - job_list.append(self.put_objects( - obj_list, bucket, max_threads=max_threads, calculate_checksum=calculate_checksum)) + job_list.append(self.put_objects(obj_list, bucket, max_threads=max_threads, + calculate_checksum=calculate_checksum, job_name=job_name)) obj_list = [] for name in dirs: @@ -287,17 +292,18 @@ def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per path.join(path.normpath(path.relpath(path=dir_path, start=source_dir)), "")) obj_list.append(HelperPutObject(object_name=dir_name, file_path=dir_path, size=0)) if len(obj_list) >= objects_per_bp_job: - job_list.append(self.put_objects( - obj_list, bucket, max_threads=max_threads, calculate_checksum=calculate_checksum)) + job_list.append(self.put_objects(obj_list, bucket, max_threads=max_threads, + calculate_checksum=calculate_checksum, job_name=job_name)) obj_list = [] if len(obj_list) > 0: job_list.append(self.put_objects( - obj_list, bucket, max_threads=max_threads, calculate_checksum=calculate_checksum)) + obj_list, bucket, max_threads=max_threads, calculate_checksum=calculate_checksum, job_name=job_name)) return job_list - def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threads: int = 5) -> str: + def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threads: int = 5, + job_name: str = None) -> str: """ Retrieves a list of objects from a Black Pearl bucket. @@ -309,6 +315,8 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa The name of the bucket where the objects are being retrieved from. max_threads : int The number of concurrent objects being transferred at once (default 5). + job_name : str + The name to give the BP get job. """ ds3_get_objects: List[Ds3GetObject] = [] get_objects_map: Dict[str, HelperGetObject] = dict() @@ -317,7 +325,8 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa get_objects_map[entry.object_name] = entry bulk_get = self.client.get_bulk_job_spectra_s3(GetBulkJobSpectraS3Request(bucket_name=bucket, - object_list=ds3_get_objects)) + object_list=ds3_get_objects, + name=job_name)) job_id = bulk_get.result['JobId'] @@ -369,7 +378,7 @@ def get_blob(self, bucket: str, get_object: HelperGetObject, offset: int, job_id stream.close() def get_all_files_in_bucket(self, destination_dir: str, bucket: str, objects_per_bp_job: int = 1000, - max_threads: int = 5) -> List[str]: + max_threads: int = 5, job_name: str = None) -> List[str]: """ Retrieves all objects from a Black Pearl bucket. @@ -385,6 +394,8 @@ def get_all_files_in_bucket(self, destination_dir: str, bucket: str, objects_per This determines how many objects to bundle per BP job. max_threads : int The number of concurrent objects being transferred at once (default 5). + job_name : str + The name to give the BP get jobs. All BP jobs that are created will have the same name. """ truncated: str = 'true' marker = "" @@ -423,7 +434,8 @@ def get_all_files_in_bucket(self, destination_dir: str, bucket: str, objects_per get_objects.append(HelperGetObject(object_name=object_name, destination_path=object_destination)) if len(get_objects) > 0: - job_id = self.get_objects(get_objects=get_objects, bucket=bucket, max_threads=max_threads) + job_id = self.get_objects(get_objects=get_objects, bucket=bucket, max_threads=max_threads, + job_name=job_name) job_ids.append(job_id) truncated = list_bucket.result['IsTruncated'] diff --git a/tests/helpersTests.py b/tests/helpersTests.py index f0b788f..400f87b 100644 --- a/tests/helpersTests.py +++ b/tests/helpersTests.py @@ -108,11 +108,12 @@ def test_put_and_get_objects(self): include_dirs=False) # create the BP helper and perform the put all objects call + job_name = "python test job" client = ds3.createClientFromEnv() client.put_bucket_spectra_s3(ds3.PutBucketSpectraS3Request(name=bucket)) helpers = ds3Helpers.Helper(client=client) - job_id = helpers.put_objects(bucket=bucket, put_objects=put_objects) + job_id = helpers.put_objects(bucket=bucket, put_objects=put_objects, job_name=job_name) self.assertNotEqual(job_id, "", "job id was returned") # verify all the files and directories are on the BP @@ -123,6 +124,10 @@ def test_put_and_get_objects(self): head_obj = client.head_object(ds3.HeadObjectRequest(bucket_name=bucket, object_name=put_object.object_name)) self.assertNotEqual(head_obj.result, "DOESNTEXIST") + # verify that the job was created with the desired name + get_job = client.get_job_spectra_s3(ds3.GetJobSpectraS3Request(job_id=job_id)) + self.assertEqual(get_job.result['Name'], job_name) + # retrieve the files from the BP destination = tempfile.TemporaryDirectory(prefix="ds3-python3-sdk-dst-") get_objects: List[ds3Helpers.HelperGetObject] = [] @@ -134,7 +139,7 @@ def test_put_and_get_objects(self): object_name_to_source[put_object.object_name] = put_object.file_path # perform the get objects call - job_id = helpers.get_objects(bucket=bucket, get_objects=get_objects) + job_id = helpers.get_objects(bucket=bucket, get_objects=get_objects, job_name=job_name) self.assertNotEqual(job_id, "", "job id was returned") for get_object in get_objects: @@ -147,6 +152,10 @@ def test_put_and_get_objects(self): original_file.close() retrieved_file.close() + # verify that the job was created with the desired name + get_job = client.get_job_spectra_s3(ds3.GetJobSpectraS3Request(job_id=job_id)) + self.assertEqual(get_job.result['Name'], job_name) + # cleanup source.cleanup() destination.cleanup() @@ -154,6 +163,7 @@ def test_put_and_get_objects(self): def test_put_and_get_all_objects_in_directory(self): bucket = f'ds3-python3-sdk-test-{uuid.uuid1()}' + job_name = "python test job" # create temporary directory with some files and subdirectories source = tempfile.TemporaryDirectory(prefix="ds3-python3-sdk-src-") @@ -173,7 +183,8 @@ def test_put_and_get_all_objects_in_directory(self): client.put_bucket(ds3.PutBucketRequest(bucket_name=bucket)) helpers = ds3Helpers.Helper(client=client) - job_ids = helpers.put_all_objects_in_directory(source_dir=source.name, bucket=bucket, objects_per_bp_job=10) + job_ids = helpers.put_all_objects_in_directory(source_dir=source.name, bucket=bucket, objects_per_bp_job=10, + job_name=job_name) self.assertGreaterEqual(len(job_ids), 1, "received at least one job id") # verify all the files and directories are on the BP @@ -181,11 +192,17 @@ def test_put_and_get_all_objects_in_directory(self): head_obj = client.head_object(ds3.HeadObjectRequest(bucket_name=bucket, object_name=put_object.object_name)) self.assertNotEqual(head_obj.result, "DOESNTEXIST") + # verify that all the job were created with the desired name + for job_id in job_ids: + get_job = client.get_job_spectra_s3(ds3.GetJobSpectraS3Request(job_id=job_id)) + self.assertEqual(get_job.result['Name'], job_name) + # retrieve the objects from the BP destination = tempfile.TemporaryDirectory(prefix="ds3-python3-sdk-dst-") job_ids = helpers.get_all_files_in_bucket(destination_dir=destination.name, bucket=bucket, - objects_per_bp_job=10) + objects_per_bp_job=10, + job_name=job_name) self.assertGreaterEqual(len(job_ids), 2, "multiple job ids returned") @@ -199,6 +216,11 @@ def test_put_and_get_all_objects_in_directory(self): self.assertTrue(os.path.isfile(obj_destination), f'expected path to be file: {obj_destination}') self.assertEqual(put_object.size, os.path.getsize(obj_destination), 'file size') + # verify that all the job were created with the desired name + for job_id in job_ids: + get_job = client.get_job_spectra_s3(ds3.GetJobSpectraS3Request(job_id=job_id)) + self.assertEqual(get_job.result['Name'], job_name) + # cleanup source.cleanup() destination.cleanup()