diff --git a/samples/gettingData.py b/samples/gettingData.py index 33ea638..6cebede 100644 --- a/samples/gettingData.py +++ b/samples/gettingData.py @@ -15,59 +15,97 @@ from ds3 import ds3 -client = ds3.createClientFromEnv() +# This example retrieves all objects in the specified bucket and lands them in the specified destination. +# By default it looks for objects in bucket 'books' and lands them in the temporary directory. +# At the end of running, those files are removed from the local system for testing reasons. +# +# This example assumes that a bucket named "books" containing some objects exist on the server. -bucketName = "books" -# this example assumes that a bucket named "books" containing some objects exist on the server +bucketName = "books" # modify this value to match the BP bucket you wish to retrieve objects from +destination = tempfile.gettempdir() # modify this value to match where the object should be landed on your system + +client = ds3.createClientFromEnv() + +# retrieves a list of all objects in the bucket bucketContents = client.get_bucket(ds3.GetBucketRequest(bucketName)) +# Converting that list of objects into a list of objects to retrieve. +# If you want to retrieve a subset of objects, or already know their names, then just make a list of ds3.DesGetObject +# where each item describes one object you wish to retrieve from the BP. objectList = list([ds3.Ds3GetObject(obj['Key']) for obj in bucketContents.result['ContentsList']]) + +# Create a dictionary to map the BP object name to the destination where your landing the object. +# In this example, we are landing all objects to the path described in the destination variable. +# Also, if the object name contains paths, this will normalize it for your OS and land that object +# in a sub-folder of the destination. +objectNameToDestinationPathMap = {} +for obj in objectList: + objectNameToDestinationPathMap[obj.name] = os.path.join(destination, os.path.normpath(obj.name)) + +# Create a bulk get job on the BP. This tells the BP what objects your going to retrieve. +# This triggers the BP to start staging the objects in cache. +# Large objects may have been broken up into several pieces, i.e. blobs. +# The BP breaks up your retrieval job into "chunks". +# These chunks represent bundles of data that are ready to be retrieved. +# Each chunk which will contain one or more pieces of your files (blobs). +# How the job will be broken up (chunked) is determined when you create the bulk get job. bulkGetResult = client.get_bulk_job_spectra_s3(ds3.GetBulkJobSpectraS3Request(bucketName, objectList)) -# create a set of the chunk ids which will be used to track -# what chunks have not been retrieved +# Create a set of the chunk ids that describe all units of work that make up the get job. +# This will be used to track which chunks we still need to process. chunkIds = set([x['ChunkId'] for x in bulkGetResult.result['ObjectsList']]) -# create a dictionary to map our retrieved objects to temporary files -# if you want to keep the retreived files on disk, this is not necessary -tempFiles = {} - -# while we still have chunks to retrieve +# Attempt to retrieve data from the BP while there are still chunks that need to be processed. while len(chunkIds) > 0: - # get a list of the available chunks that we can get + # Get a list of chunks for this job that are ready to be retrieved. availableChunks = client.get_job_chunks_ready_for_client_processing_spectra_s3( ds3.GetJobChunksReadyForClientProcessingSpectraS3Request(bulkGetResult.result['JobId'])) chunks = availableChunks.result['ObjectsList'] - # check to make sure we got some chunks, if we did not - # sleep and retry. This could mean that the cache is full + # Check to make sure we got some chunks, if we did not sleep and retry. + # Having no chunks ready may indicate that the BP cache is currently full. if len(chunks) == 0: time.sleep(availableChunks.retryAfter) continue - # for each chunk that is available, check to make sure - # we have not gotten it, and if not, get that object + # For each chunk that is available, check to make sure we haven't processed it already. + # If we have not processed this chunk yet, then retrieve all its objects. for chunk in chunks: if not chunk['ChunkId'] in chunkIds: + # This chunk has already been processed continue - chunkIds.remove(chunk['ChunkId']) + + # For each blob within this chunk, retrieve the data and land it on the destination. for obj in chunk['ObjectList']: - # if we haven't create a temporary file for this object yet, create one - if obj['Name'] not in list(tempFiles.keys()): - tempFiles[obj['Name']] = tempfile.mkstemp() + # Open the destination file and seek to the offset corresponding with this blob. + objectStream = open(objectNameToDestinationPathMap[obj['Name']], "wb") + objectStream.seek(int(obj['Offset'])) - # get the object - objectStream = open(tempFiles[obj['Name']][1], "wb") + # Get the blob for the current object and write it to the destination. client.get_object(ds3.GetObjectRequest(bucketName, obj['Name'], objectStream, offset=int(obj['Offset']), job=bulkGetResult.result['JobId'])) -# iterate over the temporary files, printing out their names, then closing and and removing them -for objName in list(tempFiles.keys()): - print(objName) - os.close(tempFiles[objName][0]) - os.remove(tempFiles[objName][1]) + # Close the file handle. + objectStream.close() + + # We've finished processing this chunk. Remove it from our list of chunks that still need processing. + chunkIds.remove(chunk['ChunkId']) + +# Go through all items that were landed and check that they were created. +# This is not needed in production code. +for objName in objectNameToDestinationPathMap.keys(): + destinationPath = objectNameToDestinationPathMap[objName] + if os.path.isfile(destinationPath): + fileSize = os.path.getsize(destinationPath) + print(f'Retrieved object={objName}, landed at destination={destinationPath}, has size={fileSize}') + + # This removes the retrieved file from the destination. + # This is done to clean up the script for when people are using it to test connection only. + os.remove(destinationPath) # Remove in production code. + else: + print(f'Failed to retrieve object={objName}')