Skip to content

Commit

Permalink
Merge pull request #44 from earlephilhower/master
Browse files Browse the repository at this point in the history
Add multipart copy API and support inside s3 executable
  • Loading branch information
bji committed Jun 2, 2016
2 parents 0759f1d + 2ca2df6 commit 6a0336f
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 12 deletions.
51 changes: 51 additions & 0 deletions inc/libs3.h
Original file line number Diff line number Diff line change
Expand Up @@ -1962,6 +1962,57 @@ void S3_copy_object(const S3BucketContext *bucketContext,
const S3ResponseHandler *handler, void *callbackData);


/**
* Copies portion of an object from one location to another. The object may
* be copied back to itself, which is useful for replacing metadata without
* changing the object. Required when doing >5GB object copies.
*
* @param bucketContext gives the source bucket and associated parameters for
* this request
* @param key is the source key
* @param destinationBucket gives the destination bucket into which to copy
* the object. If NULL, the source bucket will be used.
* @param destinationKey gives the destination key into which to copy the
* object. If NULL, the source key will be used.
* @param partNo is the sequence numebr of any multipart upload, 0 = non-multipart
* @param uploadId is the ID returned for a multipart initialize request, ignored
* if partNo = 0
* @param startOffset is the starting point in original object to copy.
* @param count is the number of bytes starting at startOffset in original
* object to copy. 0 indicates no-range (i.e. all)
* @param putProperties optionally provides properties to apply to the object
* that is being put to. If not supplied (i.e. NULL is passed in),
* then the copied object will retain the metadata of the copied
* object.
* @param lastModifiedReturn returns the last modified date of the copied
* object
* @param eTagReturnSize specifies the number of bytes provided in the
* eTagReturn buffer
* @param eTagReturn is a buffer into which the resulting eTag of the copied
* object will be written
* @param handler gives the callbacks to call as the request is processed and
* completed
* @param callbackData will be passed in as the callbackData parameter to
* all callbacks for this request
* @param requestContext if non-NULL, gives the S3RequestContext to add this
* request to, and does not perform the request immediately. If NULL,
* performs the request immediately and synchronously.
* @param handler gives the callbacks to call as the request is processed and
* completed
* @param callbackData will be passed in as the callbackData parameter to
* all callbacks for this request
**/
void S3_copy_object_range(const S3BucketContext *bucketContext,
const char *key, const char *destinationBucket,
const char *destinationKey,
const int partNo, const char *uploadId,
const unsigned long startOffset, const unsigned long count,
const S3PutProperties *putProperties,
int64_t *lastModifiedReturn, int eTagReturnSize,
char *eTagReturn, S3RequestContext *requestContext,
const S3ResponseHandler *handler, void *callbackData);


/**
* Gets an object from S3. The contents of the object are returned in the
* handler's getObjectDataCallback.
Expand Down
35 changes: 32 additions & 3 deletions src/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,27 @@ void S3_copy_object(const S3BucketContext *bucketContext, const char *key,
int64_t *lastModifiedReturn, int eTagReturnSize,
char *eTagReturn, S3RequestContext *requestContext,
const S3ResponseHandler *handler, void *callbackData)
{
/* Use the range copier with 0 length */
S3_copy_object_range(bucketContext, key,
destinationBucket, destinationKey,
0, NULL, // No multipart
0, 0, // No length => std. copy of < 5GB
putProperties,
lastModifiedReturn, eTagReturnSize,
eTagReturn, requestContext,
handler, callbackData);
}


void S3_copy_object_range(const S3BucketContext *bucketContext, const char *key,
const char *destinationBucket, const char *destinationKey,
const int partNo, const char *uploadId,
const unsigned long startOffset, const unsigned long count,
const S3PutProperties *putProperties,
int64_t *lastModifiedReturn, int eTagReturnSize,
char *eTagReturn, S3RequestContext *requestContext,
const S3ResponseHandler *handler, void *callbackData)
{
// Create the callback data
CopyObjectData *data =
Expand All @@ -197,6 +218,14 @@ void S3_copy_object(const S3BucketContext *bucketContext, const char *key,
data->eTagReturnLen = 0;
string_buffer_initialize(data->lastModified);

// If there's a sequence ID > 0 then add a subResource, OTW pass in NULL
char subResource[512];
char *subRsrc = NULL;
if (partNo > 0) {
snprintf(subResource, 512, "partNumber=%d&uploadId=%s", partNo, uploadId);
subRsrc = subResource;
}

// Set up the RequestParams
RequestParams params =
{
Expand All @@ -211,12 +240,12 @@ void S3_copy_object(const S3BucketContext *bucketContext, const char *key,
bucketContext->securityToken }, // securityToken
destinationKey ? destinationKey : key, // key
0, // queryParams
0, // subResource
subRsrc, // subResource
bucketContext->bucketName, // copySourceBucketName
key, // copySourceKey
0, // getConditions
0, // startByte
0, // byteCount
startOffset, // startByte
count, // byteCount
putProperties, // putProperties
&copyObjectPropertiesCallback, // propertiesCallback
0, // toS3Callback
Expand Down
6 changes: 6 additions & 0 deletions src/request.c
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,12 @@ static S3Status compose_amz_headers(const RequestParams *params,
params->copySourceBucketName,
params->copySourceKey);
}
// If byteCount != 0 then we're just copying a range, add header
if (params->byteCount > 0) {
headers_append(1, "x-amz-copy-source-range: bytes=%ld-%ld",
params->startByte,
params->startByte + params->byteCount);
}
// And the x-amz-metadata-directive header
if (properties) {
headers_append(1, "%s", "x-amz-metadata-directive: REPLACE");
Expand Down
105 changes: 96 additions & 9 deletions src/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -2063,7 +2063,6 @@ S3Status MultipartResponseProperiesCallback
return S3StatusOK;
}


static int multipartPutXmlCallback(int bufferSize, char *buffer,
void *callbackData)
{
Expand Down Expand Up @@ -2132,7 +2131,8 @@ static int try_get_parts_info(const char *bucketName, const char *key,
return 0;
}

static void put_object(int argc, char **argv, int optindex)
static void put_object(int argc, char **argv, int optindex,
const char *srcBucketName, const char *srcKey, unsigned long long srcSize)
{
if (optindex == argc) {
fprintf(stderr, "\nERROR: Missing parameter: bucket/key\n");
Expand Down Expand Up @@ -2283,7 +2283,12 @@ static void put_object(int argc, char **argv, int optindex)
data.gb = 0;
data.noStatus = noStatus;

if (filename) {
if (srcSize) {
// This is really a COPY multipart, not a put, so take from source object
contentLength = srcSize;
data.infile = NULL;
}
else if (filename) {
if (!contentLength) {
struct stat statbuf;
// Stat the file to get its length
Expand Down Expand Up @@ -2459,16 +2464,43 @@ static void put_object(int argc, char **argv, int optindex)
partData.put_object_data = data;
partContentLength = ((contentLength > MULTIPART_CHUNK_SIZE) ?
MULTIPART_CHUNK_SIZE : contentLength);
printf("Sending Part Seq %d, length=%d\n", seq, partContentLength);
printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength);
partData.put_object_data.contentLength = partContentLength;
partData.put_object_data.originalContentLength = partContentLength;
partData.put_object_data.totalContentLength = todoContentLength;
partData.put_object_data.totalOriginalContentLength = totalContentLength;
putProperties.md5 = 0;
do {
S3_upload_part(&bucketContext, key, &putProperties,
&putObjectHandler, seq, manager.upload_id,
partContentLength,0, &partData);
if (srcSize) {
S3BucketContext srcBucketContext =
{
0,
srcBucketName,
protocolG,
uriStyleG,
accessKeyIdG,
secretAccessKeyG,
0
};

S3ResponseHandler copyResponseHandler = { &responsePropertiesCallback, &responseCompleteCallback };
int64_t lastModified;

unsigned long long startOffset = (unsigned long long)MULTIPART_CHUNK_SIZE * (unsigned long long)(seq-1);
unsigned long long count = partContentLength - 1; // Inclusive for copies
// The default copy callback tries to set this for us, need to allocate here
manager.etags[seq-1] = malloc(512); // TBD - magic #! Isa there a max etag defined?
S3_copy_object_range(&srcBucketContext, srcKey, bucketName, key,
seq, manager.upload_id,
startOffset, count,
&putProperties,
&lastModified, 512 /*TBD - magic # */, manager.etags[seq-1], 0,
&copyResponseHandler, 0);
} else {
S3_upload_part(&bucketContext, key, &putProperties,
&putObjectHandler, seq, manager.upload_id,
partContentLength,0, &partData);
}
} while (S3_status_is_retryable(statusG) && should_retry());
if (statusG != S3StatusOK) {
printError();
Expand Down Expand Up @@ -2519,6 +2551,30 @@ static void put_object(int argc, char **argv, int optindex)


// copy object ---------------------------------------------------------------
static S3Status copyListKeyCallback(int isTruncated, const char *nextMarker,
int contentsCount,
const S3ListBucketContent *contents,
int commonPrefixesCount,
const char **commonPrefixes,
void *callbackData)
{
unsigned long long *size = (unsigned long long *)callbackData;

// These are unused, avoid warnings in a hopefully portable way
(void)(nextMarker);
(void)(commonPrefixesCount);
(void)(commonPrefixes);
(void)(isTruncated);

if (contentsCount != 1) {
// We either have no matched or multiples...can't perform the operation
return S3StatusErrorUnexpectedContent;
}

*size = (unsigned long long) contents->size;
return S3StatusOK;
}


static void copy_object(int argc, char **argv, int optindex)
{
Expand All @@ -2541,13 +2597,45 @@ static void copy_object(int argc, char **argv, int optindex)

const char *sourceBucketName = argv[optindex++];
const char *sourceKey = slash;
unsigned long long sourceSize = 0;

if (optindex == argc) {
fprintf(stderr, "\nERROR: Missing parameter: "
"destination bucket/key\n");
usageExit(stderr);
}

S3_init();
S3BucketContext listBucketContext =
{
0,
sourceBucketName,
protocolG,
uriStyleG,
accessKeyIdG,
secretAccessKeyG,
0
};
S3ListBucketHandler listBucketHandler =
{
{ &responsePropertiesCallback, &responseCompleteCallback },
&copyListKeyCallback
};
// Find size of existing key to determine if MP required
do {
S3_list_bucket(&listBucketContext, sourceKey, NULL,
".", 1, 0, &listBucketHandler, &sourceSize);
} while (S3_status_is_retryable(statusG) && should_retry());
if (statusG != S3StatusOK) {
fprintf(stderr, "\nERROR: Unable to get source object size\n");
exit(1);
}
if (sourceSize > MULTIPART_CHUNK_SIZE) {
printf("\nUsing multipart copy because object size %llu is above %d.\n", sourceSize, MULTIPART_CHUNK_SIZE);
put_object(argc, argv, optindex, sourceBucketName, sourceKey, sourceSize);
return;
}

// Split bucket/key
slash = argv[optindex];
while (*slash && (*slash != '/')) {
Expand Down Expand Up @@ -2663,7 +2751,6 @@ static void copy_object(int argc, char **argv, int optindex)
}
}

S3_init();

S3BucketContext bucketContext =
{
Expand Down Expand Up @@ -3664,7 +3751,7 @@ int main(int argc, char **argv)
}
}
else if (!strcmp(command, "put")) {
put_object(argc, argv, optind);
put_object(argc, argv, optind, NULL, NULL, 0);
}
else if (!strcmp(command, "copy")) {
copy_object(argc, argv, optind);
Expand Down

0 comments on commit 6a0336f

Please sign in to comment.