Skip to content

Commit

Permalink
HDDS-4781. [FSO]S3MultiPart: Implement create and commit upload part …
Browse files Browse the repository at this point in the history
…file (#1897)
  • Loading branch information
rakeshadr committed Feb 15, 2021
1 parent e63520a commit e01a1ef
Show file tree
Hide file tree
Showing 14 changed files with 917 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.TestDataUtil;
Expand All @@ -37,7 +35,6 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Assert;
Expand Down Expand Up @@ -125,34 +122,6 @@ public void testCreateFile() throws Exception {
omMgr);
openFileKey = d2ObjectID + OzoneConsts.OM_KEY_PREFIX + file.getName();

// verify entries in directory table
TableIterator<String, ? extends
Table.KeyValue<String, OmDirectoryInfo>> iterator =
omMgr.getDirectoryTable().iterator();
iterator.seekToFirst();
int count = dirKeys.size();
Assert.assertEquals("Unexpected directory table entries!", 2, count);
while (iterator.hasNext()) {
count--;
Table.KeyValue<String, OmDirectoryInfo> value = iterator.next();
verifyKeyFormat(value.getKey(), dirKeys);
}
Assert.assertEquals("Unexpected directory table entries!", 0, count);

// verify entries in open key table
TableIterator<String, ? extends
Table.KeyValue<String, OmKeyInfo>> keysItr =
omMgr.getOpenKeyTable().iterator();
keysItr.seekToFirst();

while (keysItr.hasNext()) {
count++;
Table.KeyValue<String, OmKeyInfo> value = keysItr.next();
verifyOpenKeyFormat(value.getKey(), openFileKey);
verifyOMFileInfoFormat(value.getValue(), file.getName(), d2ObjectID);
}
Assert.assertEquals("Unexpected file table entries!", 1, count);

// trigger CommitKeyRequest
outputStream.close();

Expand Down Expand Up @@ -183,42 +152,6 @@ private void verifyOMFileInfoFormat(OmKeyInfo omKeyInfo, String fileName,
omKeyInfo.getPath());
}

/**
* Verify key name format and the DB key existence in the expected dirKeys
* list.
*
* @param key table keyName
* @param dirKeys expected keyName
*/
private void verifyKeyFormat(String key, ArrayList<String> dirKeys) {
String[] keyParts = StringUtils.split(key,
OzoneConsts.OM_KEY_PREFIX.charAt(0));
Assert.assertEquals("Invalid KeyName", 2, keyParts.length);
boolean removed = dirKeys.remove(key);
Assert.assertTrue("Key:" + key + " doesn't exists in directory table!",
removed);
}

/**
* Verify key name format and the DB key existence in the expected
* openFileKeys list.
*
* @param key table keyName
* @param openFileKey expected keyName
*/
private void verifyOpenKeyFormat(String key, String openFileKey) {
String[] keyParts = StringUtils.split(key,
OzoneConsts.OM_KEY_PREFIX.charAt(0));
Assert.assertEquals("Invalid KeyName:" + key, 3, keyParts.length);
String[] expectedOpenFileParts = StringUtils.split(openFileKey,
OzoneConsts.OM_KEY_PREFIX.charAt(0));
Assert.assertEquals("ParentId/Key:" + expectedOpenFileParts[0]
+ " doesn't exists in openFileTable!",
expectedOpenFileParts[0] + OzoneConsts.OM_KEY_PREFIX
+ expectedOpenFileParts[1],
keyParts[0] + OzoneConsts.OM_KEY_PREFIX + keyParts[1]);
}

long verifyDirKey(long parentId, String dirKey, String absolutePath,
ArrayList<String> dirKeys, OMMetadataManager omMgr)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@

package org.apache.hadoop.ozone.client.rpc;

import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;

import static org.apache.hadoop.hdds.StringUtils.string2Bytes;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;

Expand Down Expand Up @@ -179,4 +185,91 @@ public void testInitiateMultipartUploadWithDefaultReplication() throws
assertNotNull(multipartInfo.getUploadID());
}

@Test
public void testUploadPartWithNoOverride() throws IOException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
String sampleData = "sample Value";

store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
STAND_ALONE, ONE);

assertNotNull(multipartInfo);
String uploadID = multipartInfo.getUploadID();
Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
Assert.assertEquals(bucketName, multipartInfo.getBucketName());
Assert.assertEquals(keyName, multipartInfo.getKeyName());
assertNotNull(multipartInfo.getUploadID());

OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
sampleData.length(), 1, uploadID);
ozoneOutputStream.write(string2Bytes(sampleData), 0, sampleData.length());
ozoneOutputStream.close();

OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream
.getCommitUploadPartInfo();

assertNotNull(commitUploadPartInfo);
assertNotNull(commitUploadPartInfo.getPartName());
}

@Test
public void testUploadPartOverrideWithRatis() throws IOException {

String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
String sampleData = "sample Value";

store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
ReplicationType.RATIS, THREE);

assertNotNull(multipartInfo);
String uploadID = multipartInfo.getUploadID();
Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
Assert.assertEquals(bucketName, multipartInfo.getBucketName());
Assert.assertEquals(keyName, multipartInfo.getKeyName());
assertNotNull(multipartInfo.getUploadID());

int partNumber = 1;

OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
sampleData.length(), partNumber, uploadID);
ozoneOutputStream.write(string2Bytes(sampleData), 0, sampleData.length());
ozoneOutputStream.close();

OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream
.getCommitUploadPartInfo();

assertNotNull(commitUploadPartInfo);
String partName = commitUploadPartInfo.getPartName();
assertNotNull(commitUploadPartInfo.getPartName());

//Overwrite the part by creating part key with same part number.
sampleData = "sample Data Changed";
ozoneOutputStream = bucket.createMultipartKey(keyName,
sampleData.length(), partNumber, uploadID);
ozoneOutputStream.write(string2Bytes(sampleData), 0, "name".length());
ozoneOutputStream.close();

commitUploadPartInfo = ozoneOutputStream
.getCommitUploadPartInfo();

assertNotNull(commitUploadPartInfo);
assertNotNull(commitUploadPartInfo.getPartName());

// PartName should be different from old part Name.
assertNotEquals("Part names should be different", partName,
commitUploadPartInfo.getPartName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequestV1;
import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadAbortRequest;
import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest;
import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequestV1;
import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequest;
import org.apache.hadoop.ozone.om.request.s3.security.S3GetSecretRequest;
import org.apache.hadoop.ozone.om.request.security.OMCancelDelegationTokenRequest;
Expand Down Expand Up @@ -188,6 +189,9 @@ public static OMClientRequest createClientRequest(OMRequest omRequest) {
}
return new S3InitiateMultipartUploadRequest(omRequest);
case CommitMultiPartUpload:
if (isBucketFSOptimized()) {
return new S3MultipartUploadCommitPartRequestV1(omRequest);
}
return new S3MultipartUploadCommitPartRequest(omRequest);
case AbortMultiPartUpload:
return new S3MultipartUploadAbortRequest(omRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,17 @@ private OmKeyInfo prepareMultipartFileInfo(
// error no such multipart upload.
String uploadID = args.getMultipartUploadID();
Preconditions.checkNotNull(uploadID);
String multipartKey = omMetadataManager
.getMultipartKey(args.getVolumeName(), args.getBucketName(),
args.getKeyName(), uploadID);
String multipartKey = "";
if (omPathInfo != null) {
// FileTable metadata format
multipartKey = omMetadataManager.getMultipartKey(
omPathInfo.getLastKnownParentId(),
omPathInfo.getLeafNodeName(), uploadID);
} else {
multipartKey = omMetadataManager
.getMultipartKey(args.getVolumeName(), args.getBucketName(),
args.getKeyName(), uploadID);
}
OmKeyInfo partKeyInfo = omMetadataManager.getOpenKeyTable().get(
multipartKey);
if (partKeyInfo == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,17 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
volumeName, bucketName);
}
}
logResult(ozoneManager, multipartInfoInitiateRequest, auditMap, volumeName,
bucketName, keyName, exception, result);

return omClientResponse;
}

@SuppressWarnings("parameternumber")
protected void logResult(OzoneManager ozoneManager,
MultipartInfoInitiateRequest multipartInfoInitiateRequest,
Map<String, String> auditMap, String volumeName, String bucketName,
String keyName, IOException exception, Result result) {
// audit log
auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
OMAction.INITIATE_MULTIPART_UPLOAD, auditMap,
Expand All @@ -246,7 +256,5 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
LOG.error("Unrecognized Result for S3InitiateMultipartUploadRequest: {}",
multipartInfoInitiateRequest);
}

return omClientResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
Expand Down Expand Up @@ -228,28 +227,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
volumeName, bucketName);
}
}

// audit log
auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
OMAction.INITIATE_MULTIPART_UPLOAD, auditMap,
exception, getOmRequest().getUserInfo()));

switch (result) {
case SUCCESS:
LOG.debug("S3 InitiateMultipart Upload request for Key {} in " +
"Volume/Bucket {}/{} is successfully completed", keyName,
volumeName, bucketName);
break;
case FAILURE:
ozoneManager.getMetrics().incNumInitiateMultipartUploadFails();
LOG.error("S3 InitiateMultipart Upload request for Key {} in " +
"Volume/Bucket {}/{} is failed", keyName, volumeName, bucketName,
exception);
break;
default:
LOG.error("Unrecognized Result for S3InitiateMultipartUploadRequest: {}",
multipartInfoInitiateRequest);
}
logResult(ozoneManager, multipartInfoInitiateRequest, auditMap, volumeName,
bucketName, keyName, exception, result);

return omClientResponse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,16 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
validateBucketAndVolume(omMetadataManager, volumeName, bucketName);

String uploadID = keyArgs.getMultipartUploadID();
multipartKey = omMetadataManager.getMultipartKey(volumeName, bucketName,
keyName, uploadID);
multipartKey = getMultipartKey(volumeName, bucketName, keyName,
omMetadataManager, uploadID);

multipartKeyInfo = omMetadataManager.getMultipartInfoTable()
.get(multipartKey);

long clientID = multipartCommitUploadPartRequest.getClientID();

openKey = omMetadataManager.getOpenKey(
volumeName, bucketName, keyName, clientID);
openKey = getOpenKey(volumeName, bucketName, keyName, omMetadataManager,
clientID);

String ozoneKey = omMetadataManager.getOzoneKey(
volumeName, bucketName, keyName);
Expand Down Expand Up @@ -248,6 +248,31 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
}
}

logResult(ozoneManager, multipartCommitUploadPartRequest, keyArgs,
auditMap, volumeName, bucketName, keyName, exception, partName,
result);

return omClientResponse;
}

private String getOpenKey(String volumeName, String bucketName,
String keyName, OMMetadataManager omMetadataManager, long clientID) {
return omMetadataManager.getOpenKey(volumeName, bucketName,
keyName, clientID);
}

private String getMultipartKey(String volumeName, String bucketName,
String keyName, OMMetadataManager omMetadataManager, String uploadID) {
return omMetadataManager.getMultipartKey(volumeName, bucketName,
keyName, uploadID);
}

@SuppressWarnings("parameternumber")
protected void logResult(OzoneManager ozoneManager,
MultipartCommitUploadPartRequest multipartCommitUploadPartRequest,
KeyArgs keyArgs, Map<String, String> auditMap, String volumeName,
String bucketName, String keyName, IOException exception,
String partName, Result result) {
// audit log
// Add MPU related information.
auditMap.put(OzoneConsts.MULTIPART_UPLOAD_PART_NUMBER,
Expand All @@ -273,8 +298,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
LOG.error("Unrecognized Result for S3MultipartUploadCommitPartRequest: " +
"{}", multipartCommitUploadPartRequest);
}

return omClientResponse;
}

}
Expand Down

0 comments on commit e01a1ef

Please sign in to comment.