Skip to content

Commit

Permalink
Implement complete multipart upload
Browse files Browse the repository at this point in the history
  • Loading branch information
cc committed Aug 31, 2017
1 parent c8facbf commit 4ecd6bf
Show file tree
Hide file tree
Showing 3 changed files with 324 additions and 61 deletions.
@@ -0,0 +1,123 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.proxy.s3;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;

/**
* Result returned after requests for completing a multipart upload.
* It is defined in http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html.
* It will be encoded into an XML string to be returned as a response for the REST call.
*/
@JacksonXmlRootElement(localName = "CompleteMultipartUploadResult")
@JsonPropertyOrder({ "Location", "Bucket", "Key", "ETag" })
public class CompleteMultipartUploadResult {
// The URI that identifies the newly created object.
private String mLocation;
// Name of the bucket.
private String mBucket;
// Object key.
private String mKey;
// Entity tag of the object.
private String mETag;

/**
* Constructs an {@link CompleteMultipartUploadResult} with fields initialized to empty strings.
*/
public CompleteMultipartUploadResult() {
mLocation = "";
mBucket = "";
mKey = "";
mETag = "";
}

/**
* Constructs an {@link CompleteMultipartUploadResult} with the specified values.
*
* @param location the URI that identifies the newly created object
* @param bucket name of the bucket
* @param key object key
* @param etag entity tag of the newly created object, the etag should not be surrounded by quotes
*/
public CompleteMultipartUploadResult(String location, String bucket, String key, String etag) {
mLocation = location;
mBucket = bucket;
mKey = key;
mETag = "\"" + etag + "\"";
}

/**
* @return the location
*/
@JacksonXmlProperty(localName = "Location")
public String getLocation() {
return mLocation;
}

/**
* @param location the location to set
*/
@JacksonXmlProperty(localName = "Location")
public void setLocation(String location) {
mLocation = location;
}

/**
* @return the bucket name
*/
@JacksonXmlProperty(localName = "Bucket")
public String getBucket() {
return mBucket;
}

/**
* @param bucket the bucket name to set
*/
@JacksonXmlProperty(localName = "Bucket")
public void setBucket(String bucket) {
mBucket = bucket;
}

/**
* @return the object key
*/
@JacksonXmlProperty(localName = "Key")
public String getKey() {
return mKey;
}

/**
* @param key the object key to set
*/
@JacksonXmlProperty(localName = "Key")
public void setKey(String key) {
mKey = key;
}

/**
* @return the entity tag surrounded by quotes
*/
@JacksonXmlProperty(localName = "ETag")
public String getETag() {
return mETag;
}

/**
* @param etag the entity tag to be set, should not be surrounded by quotes
*/
@JacksonXmlProperty(localName = "ETag")
public void setUploadId(String etag) {
mETag = "\"" + etag + "\"";
}
}
Expand Up @@ -42,6 +42,8 @@
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
Expand Down Expand Up @@ -235,15 +237,7 @@ public Response call() throws S3Exception {
// This object is part of a multipart upload, should be uploaded into the temporary // This object is part of a multipart upload, should be uploaded into the temporary
// directory first. // directory first.
String tmpDir = S3RestUtils.getMultipartTemporaryDirForObject(bucket, object); String tmpDir = S3RestUtils.getMultipartTemporaryDirForObject(bucket, object);
long tmpDirFileId = -1; checkUploadId(new AlluxioURI(tmpDir), uploadId);
try {
tmpDirFileId = mFileSystem.getStatus(new AlluxioURI(tmpDir)).getFileId();
} catch (Exception e) {
throw toObjectS3Exception(e, tmpDir);
}
if (uploadId != tmpDirFileId) {
throw new S3Exception(objectPath, S3ErrorCode.NO_SUCH_UPLOAD);
}
objectPath = tmpDir + AlluxioURI.SEPARATOR + Integer.toString(partNumber); objectPath = tmpDir + AlluxioURI.SEPARATOR + Integer.toString(partNumber);
} }
AlluxioURI objectURI = new AlluxioURI(objectPath); AlluxioURI objectURI = new AlluxioURI(objectPath);
Expand Down Expand Up @@ -283,25 +277,34 @@ public Response call() throws S3Exception {
} }


/** /**
* @summary initiates a multipart upload * @summary initiates or completes a multipart upload based on query parameters
* @param bucket the bucket name * @param bucket the bucket name
* @param object the object name * @param object the object name
* @param uploads the query parameter specifying that this request is to initiate a multipart * @param uploads the query parameter specifying that this request is to initiate a multipart
* upload instead of uploading an object through HTTP multipart forms * upload instead of uploading an object through HTTP multipart forms
* @param uploadId the ID of the multipart upload to be completed
* @return the response object * @return the response object
*/ */
@POST @POST
@Path(OBJECT_PARAM) @Path(OBJECT_PARAM)
@ReturnType("alluxio.proxy.s3.InitiateMultipartUploadResult") @ReturnType("alluxio.proxy.s3.InitiateMultipartUploadResult,"
public Response initiateMultipartUpload(@PathParam("bucket") final String bucket, + "alluxio.proxy.s3.CompleteMultipartUploadResult")
@PathParam("object") final String object, @QueryParam("uploads") final String uploads) { // TODO(cc): investigate on how to specify multiple return types, and how to decouple the REST
// endpoints where the only difference is the query parameter.
public Response initiateOrCompleteMultipartUpload(@PathParam("bucket") final String bucket,
@PathParam("object") final String object, @QueryParam("uploads") final String uploads,
@QueryParam("uploadId") final Long uploadId) {
if (uploads != null) {
return initiateMultipartUpload(bucket, object);
} else {
return completeMultipartUpload(bucket, object, uploadId);
}
}

private Response initiateMultipartUpload(final String bucket, final String object) {
return S3RestUtils.call(bucket, new S3RestUtils.RestCallable<InitiateMultipartUploadResult>() { return S3RestUtils.call(bucket, new S3RestUtils.RestCallable<InitiateMultipartUploadResult>() {
@Override @Override
public InitiateMultipartUploadResult call() throws S3Exception { public InitiateMultipartUploadResult call() throws S3Exception {
Preconditions.checkNotNull(bucket, "required 'bucket' parameter is missing");
Preconditions.checkNotNull(object, "required 'object' parameter is missing");
Preconditions.checkNotNull(uploads, "required 'uploads' parameter is missing");

String bucketPath = parseBucketPath(AlluxioURI.SEPARATOR + bucket); String bucketPath = parseBucketPath(AlluxioURI.SEPARATOR + bucket);
checkBucketIsAlluxioDirectory(bucketPath); checkBucketIsAlluxioDirectory(bucketPath);
String objectPath = bucketPath + AlluxioURI.SEPARATOR + object; String objectPath = bucketPath + AlluxioURI.SEPARATOR + object;
Expand All @@ -319,6 +322,65 @@ public InitiateMultipartUploadResult call() throws S3Exception {
}); });
} }


// TODO(cc): support the options in the XML request body defined in
// http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html, currently, the parts
// under the temporary multipart upload directory are combined into the final object.
private Response completeMultipartUpload(final String bucket, final String object,
final long uploadId) {
return S3RestUtils.call(bucket, new S3RestUtils.RestCallable<CompleteMultipartUploadResult>() {
@Override
public CompleteMultipartUploadResult call() throws S3Exception {
String bucketPath = parseBucketPath(AlluxioURI.SEPARATOR + bucket);
checkBucketIsAlluxioDirectory(bucketPath);
String objectPath = bucketPath + AlluxioURI.SEPARATOR + object;
AlluxioURI multipartTemporaryDir =
new AlluxioURI(S3RestUtils.getMultipartTemporaryDirForObject(bucket, object));
checkUploadId(multipartTemporaryDir, uploadId);

try {
List<URIStatus> parts = mFileSystem.listStatus(multipartTemporaryDir);
Collections.sort(parts, new Comparator<URIStatus>() {
@Override
public int compare(URIStatus o1, URIStatus o2) {
long part1 = Long.parseLong(o1.getName());
long part2 = Long.parseLong(o2.getName());
if (part1 == part2) {
return 0;
}
if (part1 < part2) {
return -1;
}
return 1;
}
});

CreateFileOptions options = CreateFileOptions.defaults().setRecursive(true)
.setWriteType(getS3WriteType());
FileOutStream os = mFileSystem.createFile(new AlluxioURI(objectPath), options);
MessageDigest md5 = MessageDigest.getInstance("MD5");
DigestOutputStream digestOutputStream = new DigestOutputStream(os, md5);

try {
for (URIStatus part : parts) {
try (FileInStream is = mFileSystem.openFile(new AlluxioURI(part.getPath()))) {
ByteStreams.copy(is, digestOutputStream);
}
}
} finally {
digestOutputStream.close();
}

mFileSystem.delete(multipartTemporaryDir, DeleteOptions.defaults().setRecursive(true));

String entityTag = Hex.encodeHexString(md5.digest());
return new CompleteMultipartUploadResult(objectPath, bucket, object, entityTag);
} catch (Exception e) {
throw toObjectS3Exception(e, objectPath);
}
}
});
}

/** /**
* @summary retrieves an object's metadata * @summary retrieves an object's metadata
* @param bucket the bucket name * @param bucket the bucket name
Expand Down Expand Up @@ -403,7 +465,7 @@ public Response call() throws S3Exception {
@DELETE @DELETE
@Path(OBJECT_PARAM) @Path(OBJECT_PARAM)
@ReturnType("java.lang.Void") @ReturnType("java.lang.Void")
public Response deleteObject(@PathParam("bucket") final String bucket, public Response deleteObjectOrAbortMultipartUpload(@PathParam("bucket") final String bucket,
@PathParam("object") final String object, @QueryParam("uploadId") final Long uploadId) { @PathParam("object") final String object, @QueryParam("uploadId") final Long uploadId) {
return S3RestUtils.call(bucket, new S3RestUtils.RestCallable<Response.Status>() { return S3RestUtils.call(bucket, new S3RestUtils.RestCallable<Response.Status>() {
@Override @Override
Expand All @@ -412,29 +474,47 @@ public Response.Status call() throws S3Exception {
Preconditions.checkNotNull(object, "required 'object' parameter is missing"); Preconditions.checkNotNull(object, "required 'object' parameter is missing");


if (uploadId != null) { if (uploadId != null) {
// Abort an incomplete multipart upload.
abortMultipartUpload(bucket, object, uploadId); abortMultipartUpload(bucket, object, uploadId);
// Note: the normal response for S3 abort multipart upload is 204 NO_CONTENT, not 200 OK } else {
return Response.Status.NO_CONTENT; deleteObject(bucket, object);
} }


String bucketPath = parseBucketPath(AlluxioURI.SEPARATOR + bucket);
// Delete the object.
String objectPath = bucketPath + AlluxioURI.SEPARATOR + object;
DeleteOptions options = DeleteOptions.defaults();
options.setAlluxioOnly(Configuration.get(PropertyKey.PROXY_S3_DELETE_TYPE)
.equals(Constants.S3_DELETE_IN_ALLUXIO_ONLY));
try {
mFileSystem.delete(new AlluxioURI(objectPath), options);
} catch (Exception e) {
throw toObjectS3Exception(e, objectPath);
}
// Note: the normal response for S3 delete key is 204 NO_CONTENT, not 200 OK // Note: the normal response for S3 delete key is 204 NO_CONTENT, not 200 OK
return Response.Status.NO_CONTENT; return Response.Status.NO_CONTENT;
} }
}); });
} }


private void abortMultipartUpload(String bucket, String object, long uploadId)
throws S3Exception {
String bucketPath = parseBucketPath(AlluxioURI.SEPARATOR + bucket);
checkBucketIsAlluxioDirectory(bucketPath);
String objectPath = bucketPath + AlluxioURI.SEPARATOR + object;
AlluxioURI multipartTemporaryDir =
new AlluxioURI(S3RestUtils.getMultipartTemporaryDirForObject(bucket, object));
checkUploadId(multipartTemporaryDir, uploadId);

try {
mFileSystem.delete(multipartTemporaryDir, DeleteOptions.defaults().setRecursive(true));
} catch (Exception e) {
throw toObjectS3Exception(e, objectPath);
}
}

private void deleteObject(String bucket, String object) throws S3Exception {
String bucketPath = parseBucketPath(AlluxioURI.SEPARATOR + bucket);
// Delete the object.
String objectPath = bucketPath + AlluxioURI.SEPARATOR + object;
DeleteOptions options = DeleteOptions.defaults();
options.setAlluxioOnly(Configuration.get(PropertyKey.PROXY_S3_DELETE_TYPE)
.equals(Constants.S3_DELETE_IN_ALLUXIO_ONLY));
try {
mFileSystem.delete(new AlluxioURI(objectPath), options);
} catch (Exception e) {
throw toObjectS3Exception(e, objectPath);
}
}

private S3Exception toBucketS3Exception(Exception exception, String resource) { private S3Exception toBucketS3Exception(Exception exception, String resource) {
try { try {
throw exception; throw exception;
Expand Down Expand Up @@ -499,6 +579,20 @@ private void checkBucketIsAlluxioDirectory(String bucketPath) throws S3Exception
} }
} }


private void checkUploadId(AlluxioURI multipartTemporaryDir, long uploadId) throws S3Exception {
try {
if (!mFileSystem.exists(multipartTemporaryDir)) {
throw new S3Exception(multipartTemporaryDir.getPath(), S3ErrorCode.NO_SUCH_UPLOAD);
}
long tmpDirId = mFileSystem.getStatus(multipartTemporaryDir).getFileId();
if (uploadId != tmpDirId) {
throw new S3Exception(multipartTemporaryDir.getPath(), S3ErrorCode.NO_SUCH_UPLOAD);
}
} catch (Exception e) {
throw toObjectS3Exception(e, multipartTemporaryDir.getPath());
}
}

private List<URIStatus> listObjects(AlluxioURI uri, ListBucketOptions listBucketOptions) private List<URIStatus> listObjects(AlluxioURI uri, ListBucketOptions listBucketOptions)
throws FileDoesNotExistException, IOException, AlluxioException { throws FileDoesNotExistException, IOException, AlluxioException {
List<URIStatus> objects = new ArrayList<>(); List<URIStatus> objects = new ArrayList<>();
Expand Down Expand Up @@ -535,26 +629,4 @@ private List<URIStatus> listObjects(AlluxioURI uri, ListBucketOptions listBucket
private WriteType getS3WriteType() { private WriteType getS3WriteType() {
return Configuration.getEnum(PropertyKey.PROXY_S3_WRITE_TYPE, WriteType.class); return Configuration.getEnum(PropertyKey.PROXY_S3_WRITE_TYPE, WriteType.class);
} }

private void abortMultipartUpload(String bucket, String object, long uploadId)
throws S3Exception {
String bucketPath = parseBucketPath(AlluxioURI.SEPARATOR + bucket);
checkBucketIsAlluxioDirectory(bucketPath);
String objectPath = bucketPath + AlluxioURI.SEPARATOR + object;
AlluxioURI multipartTemporaryDir =
new AlluxioURI(S3RestUtils.getMultipartTemporaryDirForObject(bucket, object));

try {
if (!mFileSystem.exists(multipartTemporaryDir)) {
throw new S3Exception(multipartTemporaryDir.getPath(), S3ErrorCode.NO_SUCH_UPLOAD);
}
long tmpDirId = mFileSystem.getStatus(multipartTemporaryDir).getFileId();
if (uploadId != tmpDirId) {
throw new S3Exception(multipartTemporaryDir.getPath(), S3ErrorCode.NO_SUCH_UPLOAD);
}
mFileSystem.delete(multipartTemporaryDir, DeleteOptions.defaults().setRecursive(true));
} catch (Exception e) {
throw toObjectS3Exception(e, objectPath);
}
}
} }

0 comments on commit 4ecd6bf

Please sign in to comment.