Skip to content

Commit

Permalink
HDDS-4565. [FSO]Delete : quotaReleased on directory deletion should c…
Browse files Browse the repository at this point in the history
…onsider all sub paths (#3973)
  • Loading branch information
sumitagrawl authored and kaijchen committed Dec 7, 2022
1 parent 5c24d3f commit cd92f2c
Show file tree
Hide file tree
Showing 3 changed files with 315 additions and 3 deletions.
Expand Up @@ -18,8 +18,17 @@

package org.apache.hadoop.ozone.om.request.key;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
Expand All @@ -30,6 +39,8 @@

import java.util.List;

import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;

/**
* Handles purging of keys from OM DB.
*/
Expand All @@ -48,12 +59,72 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
List<OzoneManagerProtocolProtos.PurgePathRequest> purgeRequests =
purgeDirsRequest.getDeletedPathList();

Set<Pair<String, String>> lockSet = new HashSet<>();
Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap = new HashMap<>();
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
try {
for (OzoneManagerProtocolProtos.PurgePathRequest path : purgeRequests) {
for (OzoneManagerProtocolProtos.KeyInfo key :
path.getMarkDeletedSubDirsList()) {
OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(key);
String volumeName = keyInfo.getVolumeName();
String bucketName = keyInfo.getBucketName();
Pair<String, String> volBucketPair = Pair.of(volumeName, bucketName);
if (!lockSet.contains(volBucketPair)) {
omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
volumeName, bucketName);
lockSet.add(volBucketPair);
}
OmBucketInfo omBucketInfo = getBucketInfo(omMetadataManager,
volumeName, bucketName);
// bucketInfo can be null in case of delete volume or bucket
if (null != omBucketInfo) {
omBucketInfo.incrUsedNamespace(-1L);
volBucketInfoMap.putIfAbsent(volBucketPair, omBucketInfo);
}
}

for (OzoneManagerProtocolProtos.KeyInfo key :
path.getDeletedSubFilesList()) {
OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(key);
String volumeName = keyInfo.getVolumeName();
String bucketName = keyInfo.getBucketName();
Pair<String, String> volBucketPair = Pair.of(volumeName, bucketName);
if (!lockSet.contains(volBucketPair)) {
omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
volumeName, bucketName);
lockSet.add(volBucketPair);
}
OmBucketInfo omBucketInfo = getBucketInfo(omMetadataManager,
volumeName, bucketName);
// bucketInfo can be null in case of delete volume or bucket
if (null != omBucketInfo) {
omBucketInfo.incrUsedBytes(-sumBlockLengths(keyInfo));
omBucketInfo.incrUsedNamespace(-1L);
volBucketInfoMap.putIfAbsent(volBucketPair, omBucketInfo);
}
}
}
} catch (IOException ex) {
// Case of IOException for fromProtobuf will not hanppen
// as this is created and send within OM
// only case of upgrade where compatibility is broken can have
throw new IllegalStateException(ex);
} finally {
lockSet.stream().forEach(e -> omMetadataManager.getLock()
.releaseWriteLock(BUCKET_LOCK, e.getKey(),
e.getValue()));
for (Map.Entry<Pair<String, String>, OmBucketInfo> entry :
volBucketInfoMap.entrySet()) {
entry.setValue(entry.getValue().copyObject());
}
}

OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
getOmRequest());

OMClientResponse omClientResponse = new OMDirectoriesPurgeResponseWithFSO(
omResponse.build(), purgeRequests, ozoneManager.isRatisEnabled(),
getBucketLayout());
getBucketLayout(), volBucketInfoMap);
addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
omDoubleBufferHelper);

Expand Down
Expand Up @@ -18,10 +18,13 @@

package org.apache.hadoop.ozone.om.response.key;

import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.request.key.OMDirectoriesPurgeRequestWithFSO;
Expand Down Expand Up @@ -51,14 +54,17 @@ public class OMDirectoriesPurgeResponseWithFSO extends OmKeyResponse {

private List<OzoneManagerProtocolProtos.PurgePathRequest> paths;
private boolean isRatisEnabled;
private Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap;


public OMDirectoriesPurgeResponseWithFSO(@Nonnull OMResponse omResponse,
@Nonnull List<OzoneManagerProtocolProtos.PurgePathRequest> paths,
boolean isRatisEnabled, @Nonnull BucketLayout bucketLayout) {
boolean isRatisEnabled, @Nonnull BucketLayout bucketLayout,
Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap) {
super(omResponse, bucketLayout);
this.paths = paths;
this.isRatisEnabled = isRatisEnabled;
this.volBucketInfoMap = volBucketInfoMap;
}

@Override
Expand Down Expand Up @@ -124,6 +130,13 @@ public void addToDBBatch(OMMetadataManager omMetadataManager,
LOG.info("Purge Deleted Directory DBKey: {}", path.getDeletedDir());
}
}

// update bucket usedBytes.
for (OmBucketInfo omBucketInfo : volBucketInfoMap.values()) {
omMetadataManager.getBucketTable().putWithBatch(batchOperation,
omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
omBucketInfo.getBucketName()), omBucketInfo);
}
}
}
}
@@ -0,0 +1,228 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.request.key;

import com.google.common.base.Optional;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
import org.apache.hadoop.ozone.om.response.key.OMDirectoriesPurgeResponseWithFSO;
import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.junit.Assert;
import org.junit.Test;

/**
* Tests {@link OMKeyPurgeRequest} and {@link OMKeyPurgeResponse}.
*/
public class TestOMDirectoriesPurgeRequestAndResponse extends TestOMKeyRequest {

private int numKeys = 10;

/**
* Creates volume, bucket and key entries and adds to OM DB and then
* deletes these keys to move them to deletedKeys table.
*/
private List<OmKeyInfo> createAndDeleteKeys(Integer trxnIndex, String bucket)
throws Exception {
if (bucket == null) {
bucket = bucketName;
}
// Add volume, bucket and key entries to OM DB.
OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucket,
omMetadataManager);

List<OmKeyInfo> deletedKeyNames = new ArrayList<>(numKeys);
List<String> ozoneKeyNames = new ArrayList<>(numKeys);
for (int i = 1; i <= numKeys; i++) {
String key = keyName + "-" + i;
OMRequestTestUtils.addKeyToTable(false, false, volumeName, bucket,
key, clientID, replicationType, replicationFactor, trxnIndex++,
omMetadataManager);
String ozoneKey = omMetadataManager.getOzoneKey(
volumeName, bucket, key);
ozoneKeyNames.add(ozoneKey);
OmKeyInfo omKeyInfo =
omMetadataManager.getKeyTable(BucketLayout.DEFAULT).get(ozoneKey);
deletedKeyNames.add(omKeyInfo);
updateBlockInfo(omKeyInfo);
}

for (String ozoneKey : ozoneKeyNames) {
OMRequestTestUtils.deleteKey(
ozoneKey, omMetadataManager, trxnIndex++);
}

return deletedKeyNames;
}

private void updateBlockInfo(OmKeyInfo omKeyInfo) throws IOException {
String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
OmBucketInfo omBucketInfo = omMetadataManager.getBucketTable().get(
bucketKey);
List<OmKeyLocationInfoGroup> locationList = new ArrayList<>();
List<OmKeyLocationInfo> locList = new ArrayList<>();
OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder();
builder.setLength(omKeyInfo.getDataSize()).setBlockID(new BlockID(1, 1));
locList.add(builder.build());
locationList.add(new OmKeyLocationInfoGroup(1, locList, false));
omKeyInfo.setKeyLocationVersions(locationList);
omBucketInfo.incrUsedBytes(omKeyInfo.getDataSize());
omBucketInfo.incrUsedNamespace(1L);
omMetadataManager.getBucketTable().addCacheEntry(new CacheKey<>(bucketKey),
new CacheValue<>(Optional.of(omBucketInfo), 1L));
omMetadataManager.getBucketTable().put(bucketKey, omBucketInfo);
}

/**
* Create OMRequest which encapsulates DeleteKeyRequest.
* @return OMRequest
*/
private OMRequest createPurgeKeysRequest(String purgeDeletedDir,
List<OmKeyInfo> keyList) throws IOException {
List<OzoneManagerProtocolProtos.PurgePathRequest> purgePathRequestList
= new ArrayList<>();
List<OmKeyInfo> subFiles = new ArrayList<>();
for (OmKeyInfo key : keyList) {
subFiles.add(key);
}
List<OmKeyInfo> subDirs = new ArrayList<>();
Long volumeId = 1L;
Long bucketId = 1L;
OzoneManagerProtocolProtos.PurgePathRequest request = wrapPurgeRequest(
volumeId, bucketId, purgeDeletedDir, subFiles, subDirs);
purgePathRequestList.add(request);

OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest =
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
purgeDirRequest.addAllDeletedPath(purgePathRequestList);

OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
.setPurgeDirectoriesRequest(purgeDirRequest)
.setClientId(UUID.randomUUID().toString())
.build();
return omRequest;
}
private OzoneManagerProtocolProtos.PurgePathRequest wrapPurgeRequest(
final long volumeId, final long bucketId, final String purgeDeletedDir,
final List<OmKeyInfo> purgeDeletedFiles,
final List<OmKeyInfo> markDirsAsDeleted) {
// Put all keys to be purged in a list
OzoneManagerProtocolProtos.PurgePathRequest.Builder purgePathsRequest
= OzoneManagerProtocolProtos.PurgePathRequest.newBuilder();
purgePathsRequest.setVolumeId(volumeId);
purgePathsRequest.setBucketId(bucketId);

if (purgeDeletedDir != null) {
purgePathsRequest.setDeletedDir(purgeDeletedDir);
}

for (OmKeyInfo purgeFile : purgeDeletedFiles) {
purgePathsRequest.addDeletedSubFiles(
purgeFile.getProtobuf(true, ClientVersion.CURRENT_VERSION));
}

// Add these directories to deletedDirTable, so that its sub-paths will be
// traversed in next iteration to ensure cleanup all sub-children.
for (OmKeyInfo dir : markDirsAsDeleted) {
purgePathsRequest.addMarkDeletedSubDirs(
dir.getProtobuf(ClientVersion.CURRENT_VERSION));
}

return purgePathsRequest.build();
}

private OMRequest preExecute(OMRequest originalOmRequest) throws IOException {
OMKeyPurgeRequest omKeyPurgeRequest =
new OMKeyPurgeRequest(originalOmRequest);

OMRequest modifiedOmRequest = omKeyPurgeRequest.preExecute(ozoneManager);

// Will not be equal, as UserInfo will be set.
Assert.assertNotEquals(originalOmRequest, modifiedOmRequest);

return modifiedOmRequest;
}

@Test
public void testValidateAndUpdateCacheCheckQuota() throws Exception {
// Create and Delete keys. The keys should be moved to DeletedKeys table
List<OmKeyInfo> deletedKeyInfos = createAndDeleteKeys(1, null);
// The keys should be present in the DeletedKeys table before purging
List<String> deletedKeyNames = new ArrayList<>();
for (OmKeyInfo deletedKey : deletedKeyInfos) {
String keyName = omMetadataManager.getOzoneKey(deletedKey.getVolumeName(),
deletedKey.getBucketName(), deletedKey.getKeyName());
Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(
keyName));
deletedKeyNames.add(keyName);
}

// Create PurgeKeysRequest to purge the deleted keys
OMRequest omRequest = createPurgeKeysRequest(null, deletedKeyInfos);

OMRequest preExecutedRequest = preExecute(omRequest);
OMDirectoriesPurgeRequestWithFSO omKeyPurgeRequest =
new OMDirectoriesPurgeRequestWithFSO(preExecutedRequest);

String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
OmBucketInfo omBucketInfo = omMetadataManager.getBucketTable().get(
bucketKey);
Assert.assertEquals(1000L * deletedKeyNames.size(),
omBucketInfo.getUsedBytes());
OMDirectoriesPurgeResponseWithFSO omClientResponse
= (OMDirectoriesPurgeResponseWithFSO) omKeyPurgeRequest
.validateAndUpdateCache(ozoneManager, 100L,
ozoneManagerDoubleBufferHelper);
omBucketInfo = omMetadataManager.getBucketTable().get(
bucketKey);
Assert.assertEquals(0L * deletedKeyNames.size(),
omBucketInfo.getUsedBytes());

try (BatchOperation batchOperation =
omMetadataManager.getStore().initBatchOperation()) {

omClientResponse.addToDBBatch(omMetadataManager, batchOperation);

// Do manual commit and see whether addToBatch is successful or not.
omMetadataManager.getStore().commitBatchOperation(batchOperation);
}

// The keys should exist in the DeletedKeys table after dir delete
for (String deletedKey : deletedKeyNames) {
Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(
deletedKey));
}
}
}

0 comments on commit cd92f2c

Please sign in to comment.