Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 64 additions & 26 deletions samples/gettingData.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,59 +15,97 @@

from ds3 import ds3

client = ds3.createClientFromEnv()
# This example retrieves all objects in the specified bucket and lands them in the specified destination.
# By default it looks for objects in bucket 'books' and lands them in the temporary directory.
# At the end of running, those files are removed from the local system for testing reasons.
#
# This example assumes that a bucket named "books" containing some objects exist on the server.

bucketName = "books"
# this example assumes that a bucket named "books" containing some objects exist on the server
bucketName = "books" # modify this value to match the BP bucket you wish to retrieve objects from

destination = tempfile.gettempdir() # modify this value to match where the object should be landed on your system

client = ds3.createClientFromEnv()

# retrieves a list of all objects in the bucket
bucketContents = client.get_bucket(ds3.GetBucketRequest(bucketName))

# Converting that list of objects into a list of objects to retrieve.
# If you want to retrieve a subset of objects, or already know their names, then just make a list of ds3.DesGetObject
# where each item describes one object you wish to retrieve from the BP.
objectList = list([ds3.Ds3GetObject(obj['Key']) for obj in bucketContents.result['ContentsList']])

# Create a dictionary to map the BP object name to the destination where your landing the object.
# In this example, we are landing all objects to the path described in the destination variable.
# Also, if the object name contains paths, this will normalize it for your OS and land that object
# in a sub-folder of the destination.
objectNameToDestinationPathMap = {}
for obj in objectList:
objectNameToDestinationPathMap[obj.name] = os.path.join(destination, os.path.normpath(obj.name))

# Create a bulk get job on the BP. This tells the BP what objects your going to retrieve.
# This triggers the BP to start staging the objects in cache.
# Large objects may have been broken up into several pieces, i.e. blobs.
# The BP breaks up your retrieval job into "chunks".
# These chunks represent bundles of data that are ready to be retrieved.
# Each chunk which will contain one or more pieces of your files (blobs).
# How the job will be broken up (chunked) is determined when you create the bulk get job.
bulkGetResult = client.get_bulk_job_spectra_s3(ds3.GetBulkJobSpectraS3Request(bucketName, objectList))

# create a set of the chunk ids which will be used to track
# what chunks have not been retrieved
# Create a set of the chunk ids that describe all units of work that make up the get job.
# This will be used to track which chunks we still need to process.
chunkIds = set([x['ChunkId'] for x in bulkGetResult.result['ObjectsList']])

# create a dictionary to map our retrieved objects to temporary files
# if you want to keep the retreived files on disk, this is not necessary
tempFiles = {}

# while we still have chunks to retrieve
# Attempt to retrieve data from the BP while there are still chunks that need to be processed.
while len(chunkIds) > 0:
# get a list of the available chunks that we can get
# Get a list of chunks for this job that are ready to be retrieved.
availableChunks = client.get_job_chunks_ready_for_client_processing_spectra_s3(
ds3.GetJobChunksReadyForClientProcessingSpectraS3Request(bulkGetResult.result['JobId']))

chunks = availableChunks.result['ObjectsList']

# check to make sure we got some chunks, if we did not
# sleep and retry. This could mean that the cache is full
# Check to make sure we got some chunks, if we did not sleep and retry.
# Having no chunks ready may indicate that the BP cache is currently full.
if len(chunks) == 0:
time.sleep(availableChunks.retryAfter)
continue

# for each chunk that is available, check to make sure
# we have not gotten it, and if not, get that object
# For each chunk that is available, check to make sure we haven't processed it already.
# If we have not processed this chunk yet, then retrieve all its objects.
for chunk in chunks:
if not chunk['ChunkId'] in chunkIds:
# This chunk has already been processed
continue
chunkIds.remove(chunk['ChunkId'])

# For each blob within this chunk, retrieve the data and land it on the destination.
for obj in chunk['ObjectList']:
# if we haven't create a temporary file for this object yet, create one
if obj['Name'] not in list(tempFiles.keys()):
tempFiles[obj['Name']] = tempfile.mkstemp()
# Open the destination file and seek to the offset corresponding with this blob.
objectStream = open(objectNameToDestinationPathMap[obj['Name']], "wb")
objectStream.seek(int(obj['Offset']))

# get the object
objectStream = open(tempFiles[obj['Name']][1], "wb")
# Get the blob for the current object and write it to the destination.
client.get_object(ds3.GetObjectRequest(bucketName,
obj['Name'],
objectStream,
offset=int(obj['Offset']),
job=bulkGetResult.result['JobId']))

# iterate over the temporary files, printing out their names, then closing and and removing them
for objName in list(tempFiles.keys()):
print(objName)
os.close(tempFiles[objName][0])
os.remove(tempFiles[objName][1])
# Close the file handle.
objectStream.close()

# We've finished processing this chunk. Remove it from our list of chunks that still need processing.
chunkIds.remove(chunk['ChunkId'])

# Go through all items that were landed and check that they were created.
# This is not needed in production code.
for objName in objectNameToDestinationPathMap.keys():
destinationPath = objectNameToDestinationPathMap[objName]
if os.path.isfile(destinationPath):
fileSize = os.path.getsize(destinationPath)
print(f'Retrieved object={objName}, landed at destination={destinationPath}, has size={fileSize}')

# This removes the retrieved file from the destination.
# This is done to clean up the script for when people are using it to test connection only.
os.remove(destinationPath) # Remove in production code.
else:
print(f'Failed to retrieve object={objName}')