Skip to content

Commit

Permalink
Fix CopyObject writetype and unclosed outstream in InitiateMPUpload
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

1. when s3 write type is CACHE_THRU, initiateMultipartUpload creates MultipartMetaFile without closing the outstream, causing leak in BlockWorkerClient resource.
2. createFilePOption in CopyObject didn't set any write type (which should respect alluxio.proxy.s3.writetype), causing all objects copied are in the MUST_CACHE write type.

### Why are the changes needed?

To fix the above 2 problems.

### Does this PR introduce any user facing changes?

No.
			pr-link: #17164
			change-id: cid-968c2381d8cbcb6152fe8d4af2272cb201776b98
  • Loading branch information
lucyge2022 committed Apr 10, 2023
1 parent 9a4e154 commit 01bfecc
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 24 deletions.
36 changes: 14 additions & 22 deletions core/server/proxy/src/main/java/alluxio/proxy/s3/S3ObjectTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,6 @@ public Response continueTask() {
if (objectPath.endsWith(AlluxioURI.SEPARATOR)) {
createDirectory(objectPath, userFs, auditContext);
}
AlluxioURI objectUri = new AlluxioURI(objectPath);

// Populate the xattr Map with the metadata tags if provided
Map<String, ByteString> xattrMap = new HashMap<>();
Expand All @@ -459,19 +458,6 @@ public Response continueTask() {
final String contentTypeHeader = mHandler.getHeader(S3Constants.S3_CONTENT_TYPE_HEADER);
S3RestUtils.populateContentTypeInXAttr(xattrMap, contentTypeHeader);

CreateFilePOptions filePOptions =
CreateFilePOptions.newBuilder()
.setRecursive(true)
.setMode(PMode.newBuilder()
.setOwnerBits(Bits.ALL)
.setGroupBits(Bits.ALL)
.setOtherBits(Bits.NONE).build())
.setWriteType(S3RestUtils.getS3WriteType())
.putAllXattr(xattrMap)
.setXattrPropStrat(XAttrPropagationStrategy.LEAF_NODE)
.setOverwrite(true)
.build();

try {
copySource = URLDecoder.decode(copySource, "UTF-8");
} catch (UnsupportedEncodingException ex) {
Expand All @@ -483,15 +469,19 @@ public Response continueTask() {
.setMode(PMode.newBuilder()
.setOwnerBits(Bits.ALL)
.setGroupBits(Bits.ALL)
.setOtherBits(Bits.NONE).build());
.setOtherBits(Bits.NONE)
.build())
.setWriteType(S3RestUtils.getS3WriteType())
.setXattrPropStrat(XAttrPropagationStrategy.LEAF_NODE)
.setOverwrite(true);

// Handle metadata directive
final String metadataDirective = mHandler.getHeader(
S3Constants.S3_METADATA_DIRECTIVE_HEADER);
if (StringUtils.equals(metadataDirective, S3Constants.Directive.REPLACE.name())
&& filePOptions.getXattrMap().containsKey(S3Constants.CONTENT_TYPE_XATTR_KEY)) {
&& xattrMap.containsKey(S3Constants.CONTENT_TYPE_XATTR_KEY)) {
copyFilePOptionsBuilder.putXattr(S3Constants.CONTENT_TYPE_XATTR_KEY,
filePOptions.getXattrMap().get(S3Constants.CONTENT_TYPE_XATTR_KEY));
xattrMap.get(S3Constants.CONTENT_TYPE_XATTR_KEY));
} else { // defaults to COPY
try {
status = userFs.getStatus(new AlluxioURI(copySource));
Expand All @@ -510,9 +500,9 @@ public Response continueTask() {
final String taggingDirective = mHandler.getHeader(
S3Constants.S3_TAGGING_DIRECTIVE_HEADER);
if (StringUtils.equals(taggingDirective, S3Constants.Directive.REPLACE.name())
&& filePOptions.getXattrMap().containsKey(S3Constants.TAGGING_XATTR_KEY)) {
&& xattrMap.containsKey(S3Constants.TAGGING_XATTR_KEY)) {
copyFilePOptionsBuilder.putXattr(S3Constants.TAGGING_XATTR_KEY,
filePOptions.getXattrMap().get(S3Constants.TAGGING_XATTR_KEY));
xattrMap.get(S3Constants.TAGGING_XATTR_KEY));
} else { // defaults to COPY
try {
if (status == null) {
Expand Down Expand Up @@ -712,7 +702,6 @@ public Response continueTask() {
if (objectPath.endsWith(AlluxioURI.SEPARATOR)) {
return createDirectory(objectPath, userFs, auditContext);
}
AlluxioURI objectUri = new AlluxioURI(objectPath);

// Populate the xattr Map with the metadata tags if provided
Map<String, ByteString> xattrMap = new HashMap<>();
Expand Down Expand Up @@ -802,6 +791,7 @@ public Response continueTask() {
.setOwnerBits(Bits.ALL)
.setGroupBits(Bits.ALL)
.setOtherBits(Bits.NONE).build())
.setWriteType(S3RestUtils.getS3WriteType())
.setOverwrite(true);
String entityTag = copyObject(userFs, auditContext, objectPath,
copySource, copyFilePOptionsBuilder.build());
Expand Down Expand Up @@ -900,7 +890,7 @@ public Response continueTask() {
ByteString.copyFrom(mHandler.getObject(), S3Constants.XATTR_STR_CHARSET));
xattrMap.put(S3Constants.UPLOADS_FILE_ID_XATTR_KEY, ByteString.copyFrom(
Longs.toByteArray(userFs.getStatus(multipartTemporaryDir).getFileId())));
mHandler.getMetaFS().createFile(
try (FileOutStream fos = mHandler.getMetaFS().createFile(
new AlluxioURI(S3RestUtils.getMultipartMetaFilepathForUploadId(uploadId)),
CreateFilePOptions.newBuilder()
.setRecursive(true)
Expand All @@ -912,7 +902,9 @@ public Response continueTask() {
.putAllXattr(xattrMap)
.setXattrPropStrat(XAttrPropagationStrategy.LEAF_NODE)
.build()
);
)) {
// Empty file creation, nothing to do.
}
SetAttributePOptions attrPOptions = SetAttributePOptions.newBuilder()
.setOwner(user)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ public Response createObjectOrUploadPart(@HeaderParam("Content-MD5") final Strin
.setOwnerBits(Bits.ALL)
.setGroupBits(Bits.ALL)
.setOtherBits(Bits.NONE).build())
.setWriteType(S3RestUtils.getS3WriteType())
.setCheckS3BucketPath(true)
.setOverwrite(true);
// Handle metadata directive
Expand Down Expand Up @@ -1089,7 +1090,7 @@ public Response initiateMultipartUpload(
.putAllXattr(xattrMap)
.setXattrPropStrat(XAttrPropagationStrategy.LEAF_NODE)
.build()
);
).close();
SetAttributePOptions attrPOptions = SetAttributePOptions.newBuilder()
.setOwner(user)
.build();
Expand Down
53 changes: 52 additions & 1 deletion tests/src/test/java/alluxio/client/rest/S3ClientRestApiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,17 @@ public void initiateMultipartUpload() throws Exception {
String expectedResult = XML_MAPPER.writeValueAsString(expected);

Assert.assertEquals(expectedResult, result);

URIStatus mpMetaFileStatus = mFileSystem.getStatus(
new AlluxioURI(S3RestUtils.getMultipartMetaFilepathForUploadId(uploadId)));
Assert.assertTrue(mpMetaFileStatus.isCompleted());

AlluxioURI mpTempDirURI = new AlluxioURI(S3RestUtils.getMultipartTemporaryDirForObject(
S3RestUtils.parsePath(AlluxioURI.SEPARATOR + bucketName),
objectName, uploadId));
Assert.assertTrue(mFileSystem.exists(mpTempDirURI));
URIStatus mpTempDirStatus = mFileSystem.getStatus(mpTempDirURI);
Assert.assertTrue(mpTempDirStatus.getFileInfo().isFolder());
}

@Test
Expand Down Expand Up @@ -1374,7 +1385,47 @@ public void uploadPartWithoutInitiation() throws Exception {
Assert.fail("Upload part of an object without multipart upload initialization should fail");
}

// TODO(czhu): Add test for UploadPartCopy
@Test
public void testUploadPartCopy() throws Exception {
final String bucketName = "bucket";
createBucketRestCall(bucketName);

final String objectName = "src-object";
String srcObjectKey = bucketName + AlluxioURI.SEPARATOR + objectName;
final byte[] srcObjectContent = CommonUtils.randomAlphaNumString(DATA_SIZE).getBytes();
putObjectTest(bucketName, objectName, srcObjectContent, null, null);

// UploadPartCopy object
String targetObjectName = "target-MP-object";
String targetMPObjectKey = bucketName + AlluxioURI.SEPARATOR + targetObjectName;
String result = initiateMultipartUploadRestCall(targetMPObjectKey);
final String uploadId = XML_MAPPER.readValue(result, InitiateMultipartUploadResult.class)
.getUploadId();
Map<String, String> params = new HashMap<>();
params.put("uploadId", uploadId);
params.put("partNumber", "1");

new TestCase(mHostname, mPort, mBaseUri,
targetMPObjectKey,
params, HttpMethod.PUT,
getDefaultOptionsWithAuth()
.addHeader(S3Constants.S3_COPY_SOURCE_HEADER, srcObjectKey)).runAndGetResponse();

List<CompleteMultipartUploadRequest.Part> partList = new ArrayList<>();
partList.add(new CompleteMultipartUploadRequest.Part("", 1));
result = completeMultipartUploadRestCall(targetMPObjectKey, uploadId,
new CompleteMultipartUploadRequest(partList));

// Verify the object's content.
byte[] downloadTargetMpObj = new byte[DATA_SIZE];
MessageDigest md5 = MessageDigest.getInstance("MD5");
try (FileInStream is = mFileSystem
.openFile(new AlluxioURI("/" + targetMPObjectKey))) {
is.read(downloadTargetMpObj, 0, DATA_SIZE);
Assert.assertTrue(is.available() <= 0);
}
Assert.assertArrayEquals(srcObjectContent, downloadTargetMpObj);
}

@Test
public void listParts() throws Exception {
Expand Down

0 comments on commit 01bfecc

Please sign in to comment.