Skip to content

Commit

Permalink
HDDS-6337. [FSO] Disable recursive access check flag for directories …
Browse files Browse the repository at this point in the history
…with no children. (#3134)
  • Loading branch information
JyotinderSingh committed Feb 27, 2022
1 parent fbce851 commit ebbe9b5
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 15 deletions.
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.security.acl.OzonePrefixPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,6 +44,7 @@ public class OzonePrefixPathImpl implements OzonePrefixPath {
// TODO: based on need can make batchSize configurable.
private int batchSize = 1000;
private OzoneFileStatus pathStatus;
private boolean checkRecursiveAccess = false;

public OzonePrefixPathImpl(String volumeName, String bucketName,
String keyPrefix, KeyManager keyManagerImpl) throws IOException {
Expand All @@ -66,6 +68,15 @@ public OzonePrefixPathImpl(String volumeName, String bucketName,
}
throw ome;
}

// Check if this key is a directory.
// NOTE: checkRecursiveAccess is always false for a file keyPrefix.
if (pathStatus != null && pathStatus.isDirectory()) {
// set recursive access check to true if this directory contains
// sub-directories or sub-files.
checkRecursiveAccess = OMFileRequest.hasChildren(
pathStatus.getKeyInfo(), keyManager.getMetadataManager());
}
}

@Override
Expand Down Expand Up @@ -161,4 +172,11 @@ List<OzoneFileStatus> getNextListOfKeys(String prevKey) throws
return statuses;
}
}

/**
* @return true if no sub-directories or sub-files exist, false otherwise
*/
public boolean isCheckRecursiveAccess() {
return checkRecursiveAccess;
}
}
Expand Up @@ -208,8 +208,9 @@ public void checkAcls(OzoneManager ozoneManager,
* @param keyName
* @throws IOException
*/
protected void checkACLs(OzoneManager ozoneManager, String volumeName,
String bucketName, String keyName, IAccessAuthorizer.ACLType aclType)
protected void checkACLsWithFSO(OzoneManager ozoneManager, String volumeName,
String bucketName, String keyName,
IAccessAuthorizer.ACLType aclType)
throws IOException {

// TODO: Presently not populating sub-paths under a single bucket
Expand All @@ -226,11 +227,10 @@ protected void checkACLs(OzoneManager ozoneManager, String volumeName,
.setKeyName(keyName)
.setOzonePrefixPath(pathViewer).build();

boolean isDirectory = pathViewer.getOzoneFileStatus().isDirectory();

RequestContext.Builder contextBuilder = RequestContext.newBuilder()
.setAclRights(aclType)
.setRecursiveAccessCheck(isDirectory); // recursive checks for a dir
// recursive checks for a dir with sub-directories or sub-files
.setRecursiveAccessCheck(pathViewer.isCheckRecursiveAccess());

// check Acl
if (ozoneManager.getAclsEnabled()) {
Expand Down
Expand Up @@ -100,7 +100,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
volumeName = keyArgs.getVolumeName();
bucketName = keyArgs.getBucketName();

checkACLs(ozoneManager, volumeName, bucketName, keyName,
checkACLsWithFSO(ozoneManager, volumeName, bucketName, keyName,
IAccessAuthorizer.ACLType.DELETE);

acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
Expand Down
Expand Up @@ -113,7 +113,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
// old key and create operation on new key

// check Acl fromKeyName
checkACLs(ozoneManager, volumeName, bucketName, fromKeyName,
checkACLsWithFSO(ozoneManager, volumeName, bucketName, fromKeyName,
IAccessAuthorizer.ACLType.DELETE);

// check Acl toKeyName
Expand Down
Expand Up @@ -39,6 +39,10 @@
* Tests OmKeyDelete request with prefix layout.
*/
public class TestOMKeyDeleteRequestWithFSO extends TestOMKeyDeleteRequest {
private static final String INTERMEDIATE_DIR = "c/d/";
private static final String PARENT_DIR = "c/d/e";
private static final String FILE_NAME = "file1";
private static final String FILE_KEY = PARENT_DIR + "/" + FILE_NAME;

@Override
protected OMKeyDeleteRequest getOmKeyDeleteRequest(
Expand All @@ -54,24 +58,37 @@ public BucketLayout getBucketLayout() {

@Override
protected String addKeyToTable() throws Exception {
String parentDir = "c/d/e";
String fileName = "file1";
String key = parentDir + "/" + fileName;
keyName = key; // updated key name
keyName = FILE_KEY; // updated key name

// Create parent dirs for the path
long parentId = OMRequestTestUtils.addParentsToDirTable(volumeName,
bucketName, parentDir, omMetadataManager);
bucketName, PARENT_DIR, omMetadataManager);

OmKeyInfo omKeyInfo =
OMRequestTestUtils.createOmKeyInfo(volumeName, bucketName, key,
OMRequestTestUtils.createOmKeyInfo(volumeName, bucketName, FILE_KEY,
HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE,
parentId + 1,
parentId, 100, Time.now());
omKeyInfo.setKeyName(fileName);
omKeyInfo.setKeyName(FILE_NAME);
OMRequestTestUtils.addFileToKeyTable(false, false,
fileName, omKeyInfo, -1, 50, omMetadataManager);
FILE_NAME, omKeyInfo, -1, 50, omMetadataManager);
return omKeyInfo.getPath();
}

protected String addKeyToDirTable(String volumeName, String bucketName,
String key) throws Exception {
// Create parent dirs for the path
long parentId = OMRequestTestUtils.addParentsToDirTable(volumeName,
bucketName, key, omMetadataManager);

OmKeyInfo omKeyInfo =
OMRequestTestUtils.createOmKeyInfo(volumeName, bucketName, key,
HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE,
parentId + 1,
parentId, 100, Time.now());
omKeyInfo.setKeyName(key);
return omKeyInfo.getPath();
}

Expand Down Expand Up @@ -132,4 +149,68 @@ private void verifyPath(OzonePrefixPath ozonePrefixPath, String pathName,
// expected
}
}

@Test
public void testRecursiveAccessCheck() throws Exception {
// Add volume, bucket and key entries to OM DB.
OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager, getBucketLayout());

// Case 1:
// We create an empty directory structure.
String parentKey = "x/y/";
String key = "x/y/z/";
addKeyToDirTable(volumeName, bucketName, key);

// Instantiate PrefixPath for complete key.
OzonePrefixPathImpl pathViewer = new OzonePrefixPathImpl(volumeName,
bucketName, key, ozoneManager.getKeyManager());

// 'x/y/z' has no sub-directories or sub files - recursive access check
// should not be enabled for this case.
Assert.assertFalse(pathViewer.isCheckRecursiveAccess());

// Instantiate PrefixPath for parent key.
pathViewer = new OzonePrefixPathImpl(volumeName,
bucketName, parentKey, ozoneManager.getKeyManager());

// 'x/y/' has a sub-directory 'z', hence, we should be performing recursive
// access check.
Assert.assertTrue(pathViewer.isCheckRecursiveAccess());

// Case 2:
// We create a directory structure with a file as the leaf node.
// 'c/d/e/file1'.
String ozoneKey = addKeyToTable();

OmKeyInfo omKeyInfo =
omMetadataManager.getKeyTable(getBucketLayout()).get(ozoneKey);

// As we added manually to key table.
Assert.assertNotNull(omKeyInfo);

// Instantiate PrefixPath for parent key 'c/d/'.
pathViewer = new OzonePrefixPathImpl(volumeName,
bucketName, INTERMEDIATE_DIR, ozoneManager.getKeyManager());

// 'c/d' has a sub-directory 'e', hence, we should be performing recursive
// access check.
Assert.assertTrue(pathViewer.isCheckRecursiveAccess());

// Instantiate PrefixPath for complete directory structure (without file).
pathViewer = new OzonePrefixPathImpl(volumeName,
bucketName, PARENT_DIR, ozoneManager.getKeyManager());

// 'c/d/e/' has a 'file1' under it, hence, we should be performing recursive
// access check.
Assert.assertTrue(pathViewer.isCheckRecursiveAccess());

// Instantiate PrefixPath for complete file1.
pathViewer = new OzonePrefixPathImpl(volumeName,
bucketName, FILE_KEY, ozoneManager.getKeyManager());

// Recursive access check is only enabled for directories, hence should be
// false for file1.
Assert.assertFalse(pathViewer.isCheckRecursiveAccess());
}
}

0 comments on commit ebbe9b5

Please sign in to comment.