Skip to content

Commit

Permalink
HDDS-4932. [FSO] Provide list subpaths function to perform recursive …
Browse files Browse the repository at this point in the history
…ACL check during delete and rename op (#2008)
  • Loading branch information
rakeshadr committed Apr 8, 2021
1 parent 81361d6 commit 5ed34df
Show file tree
Hide file tree
Showing 14 changed files with 587 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public StoreType getStoreType() {

public abstract String getKeyName();

public abstract OzonePrefixPath getOzonePrefixPathViewer();

/**
* Get PrefixName.
* A prefix name is like a key name under the bucket but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public final class OzoneObjInfo extends OzoneObj {
private final String bucketName;
private final String name;

private OzonePrefixPath ozonePrefixPath;

/**
*
* @param resType
Expand All @@ -46,11 +48,13 @@ public final class OzoneObjInfo extends OzoneObj {
* @param name - keyName/PrefixName
*/
private OzoneObjInfo(ResourceType resType, StoreType storeType,
String volumeName, String bucketName, String name) {
String volumeName, String bucketName, String name,
OzonePrefixPath ozonePrefixPath) {
super(resType, storeType);
this.volumeName = volumeName;
this.bucketName = bucketName;
this.name = name;
this.ozonePrefixPath = ozonePrefixPath;
}

@Override
Expand Down Expand Up @@ -95,6 +99,10 @@ public String getPrefixName() {
return name;
}

@Override
public OzonePrefixPath getOzonePrefixPathViewer() {
return ozonePrefixPath;
}

public static OzoneObjInfo fromProtobuf(OzoneManagerProtocolProtos.OzoneObj
proto) {
Expand Down Expand Up @@ -154,6 +162,7 @@ public static class Builder {
private String volumeName;
private String bucketName;
private String name;
private OzonePrefixPath ozonePrefixPath;

public static Builder newBuilder() {
return new Builder();
Expand Down Expand Up @@ -207,8 +216,15 @@ public Builder setPrefixName(String prefix) {
return this;
}

public Builder setOzonePrefixPath(OzonePrefixPath ozonePrefixPathViewer) {
this.ozonePrefixPath = ozonePrefixPathViewer;
return this;
}


public OzoneObjInfo build() {
return new OzoneObjInfo(resType, storeType, volumeName, bucketName, name);
return new OzoneObjInfo(resType, storeType, volumeName, bucketName,
name, ozonePrefixPath);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.security.acl;

import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;

import java.io.IOException;
import java.util.Iterator;

/**
* Interface used to lists immediate children(sub-paths) for a given keyPrefix.
*/
public interface OzonePrefixPath {

/**
* Returns file status for the given key path.
*
* @return OzoneFileStatus for the given path.
*/
OzoneFileStatus getOzoneFileStatus();

/**
* Lists immediate children(files or a directories) of the given keyPrefix.
* It won't do recursive traversal. The given keyPrefix parameter should be a
* directory type.
*
* Assume following is the Ozone FS tree structure.
*
* buck-1
* |
* a
* |
* -----------------------------------
* | | |
* b1 b2 b3
* ----- -------- ----------
* | | | | | | | |
* c1 c2 d1 d2 d3 e1 e2 e3
* | |
* -------- |
* | | |
* d21.txt d22.txt e11.txt
*
* Say, KeyPrefix = "a" will return immediate children [a/b1, a/b2, a/b3].
* Say, KeyPrefix = "a/b2" will return children [a/b2/d1, a/b2/d2, a/b2/d3].
*
* @param keyPrefix keyPrefix name
* @return list of immediate files or directories under the given keyPrefix.
* @throws IOException
*/
Iterator<? extends OzoneFileStatus> getChildren(String keyPrefix)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;

Expand All @@ -48,9 +49,12 @@
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzonePrefixPathImpl;
import org.apache.hadoop.ozone.om.TrashPolicyOzone;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
Expand All @@ -63,6 +67,7 @@
import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -1265,4 +1270,61 @@ public void testTrash() throws Exception {
}
}, 1000, 120000);
}

@Test
public void testListStatusOnLargeDirectoryForACLCheck() throws Exception {
String keyName = "dir1/dir2/testListStatusOnLargeDirectoryForACLCheck";
Path root = new Path(OZONE_URI_DELIMITER, keyName);
Set<String> paths = new TreeSet<>();
int numDirs = LISTING_PAGE_SIZE + LISTING_PAGE_SIZE / 2;
for (int i = 0; i < numDirs; i++) {
Path p = new Path(root, String.valueOf(i));
getFs().mkdirs(p);
paths.add(keyName + OM_KEY_PREFIX + p.getName());
}

// unknown keyname
try {
new OzonePrefixPathImpl(getVolumeName(), getBucketName(), "invalidKey",
cluster.getOzoneManager().getKeyManager());
Assert.fail("Non-existent key name!");
} catch (OMException ome) {
Assert.assertEquals(OMException.ResultCodes.KEY_NOT_FOUND,
ome.getResult());
}

OzonePrefixPathImpl ozonePrefixPath =
new OzonePrefixPathImpl(getVolumeName(), getBucketName(), keyName,
cluster.getOzoneManager().getKeyManager());

OzoneFileStatus status = ozonePrefixPath.getOzoneFileStatus();
Assert.assertNotNull(status);
Assert.assertEquals(keyName, status.getTrimmedName());
Assert.assertTrue(status.isDirectory());

Iterator<? extends OzoneFileStatus> pathItr =
ozonePrefixPath.getChildren(keyName);
Assert.assertTrue("Failed to list keyPath:" + keyName, pathItr.hasNext());

Set<String> actualPaths = new TreeSet<>();
while (pathItr.hasNext()) {
String pathname = pathItr.next().getTrimmedName();
actualPaths.add(pathname);

// no subpaths, expected an empty list
Iterator<? extends OzoneFileStatus> subPathItr =
ozonePrefixPath.getChildren(pathname);
Assert.assertNotNull(subPathItr);
Assert.assertFalse("Failed to list keyPath: " + pathname,
subPathItr.hasNext());
}

Assert.assertEquals("ListStatus failed", paths.size(),
actualPaths.size());

for (String pathname : actualPaths) {
paths.remove(pathname);
}
Assert.assertTrue("ListStatus failed:" + paths, paths.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,28 +270,6 @@ private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
return metadataManager.getBucketTable().get(bucketKey);
}

private void validateBucket(String volumeName, String bucketName)
throws IOException {
String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
// Check if bucket exists
if (metadataManager.getBucketTable().get(bucketKey) == null) {
String volumeKey = metadataManager.getVolumeKey(volumeName);
// If the volume also does not exist, we should throw volume not found
// exception
if (metadataManager.getVolumeTable().get(volumeKey) == null) {
LOG.error("volume not found: {}", volumeName);
throw new OMException("Volume not found",
VOLUME_NOT_FOUND);
}

// if the volume exists but bucket does not exist, throw bucket not found
// exception
LOG.error("bucket not found: {}/{} ", volumeName, bucketName);
throw new OMException("Bucket not found",
BUCKET_NOT_FOUND);
}
}

/**
* Check S3 bucket exists or not.
* @param volumeName
Expand Down Expand Up @@ -322,7 +300,7 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
validateBucket(volumeName, bucketName);
OMFileRequest.validateBucket(metadataManager, volumeName, bucketName);
String openKey = metadataManager.getOpenKey(
volumeName, bucketName, keyName, clientID);

Expand Down Expand Up @@ -431,7 +409,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
validateBucket(volumeName, bucketName);
OMFileRequest.validateBucket(metadataManager, volumeName, bucketName);

long currentTime = UniqueId.next();
OmKeyInfo keyInfo;
Expand Down Expand Up @@ -615,7 +593,7 @@ public void commitKey(OmKeyArgs args, long clientID) throws IOException {
try {
metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
bucketName);
validateBucket(volumeName, bucketName);
OMFileRequest.validateBucket(metadataManager, volumeName, bucketName);
OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey);
if (keyInfo == null) {
throw new OMException("Failed to commit key, as " + openKey + "entry " +
Expand Down Expand Up @@ -1577,7 +1555,7 @@ public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {

metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
try {
validateBucket(volume, bucket);
OMFileRequest.validateBucket(metadataManager, volume, bucket);
String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey);
if (keyInfo == null) {
Expand Down Expand Up @@ -1621,7 +1599,7 @@ public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {

metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
try {
validateBucket(volume, bucket);
OMFileRequest.validateBucket(metadataManager, volume, bucket);
String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey);
if (keyInfo == null) {
Expand Down Expand Up @@ -1662,7 +1640,7 @@ public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {

metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
try {
validateBucket(volume, bucket);
OMFileRequest.validateBucket(metadataManager, volume, bucket);
String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey);
if (keyInfo == null) {
Expand Down Expand Up @@ -1701,7 +1679,7 @@ public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
OmKeyInfo keyInfo;
metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volume, bucket);
try {
validateBucket(volume, bucket);
OMFileRequest.validateBucket(metadataManager, volume, bucket);
String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
if (OzoneManagerRatisUtils.isBucketFSOptimized()) {
keyInfo = getOmKeyInfoV1(volume, bucket, keyName);
Expand Down Expand Up @@ -1750,7 +1728,7 @@ public boolean checkAccess(OzoneObj ozObject, RequestContext context)

metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volume, bucket);
try {
validateBucket(volume, bucket);
OMFileRequest.validateBucket(metadataManager, volume, bucket);
OmKeyInfo keyInfo;

// For Acl Type "WRITE", the key can only be found in
Expand Down Expand Up @@ -1885,7 +1863,7 @@ private OzoneFileStatus getOzoneFileStatus(String volumeName,
try {
// Check if this is the root of the filesystem.
if (keyName.length() == 0) {
validateBucket(volumeName, bucketName);
OMFileRequest.validateBucket(metadataManager, volumeName, bucketName);
return new OzoneFileStatus();
}

Expand Down Expand Up @@ -1942,7 +1920,7 @@ private OzoneFileStatus getOzoneFileStatusV1(String volumeName,
try {
// Check if this is the root of the filesystem.
if (keyName.length() == 0) {
validateBucket(volumeName, bucketName);
OMFileRequest.validateBucket(metadataManager, volumeName, bucketName);
return new OzoneFileStatus();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,21 @@ public boolean checkAcls(ResourceType resType, StoreType storeType,
.setAclRights(aclType)
.setOwnerName(volumeOwner)
.build();

return checkAcls(obj, context, throwIfPermissionDenied);
}

/**
* CheckAcls for the ozone object.
*
* @return true if permission granted, false if permission denied.
* @throws OMException ResultCodes.PERMISSION_DENIED if permission denied
* and throwOnPermissionDenied set to true.
*/
public boolean checkAcls(OzoneObj obj, RequestContext context,
boolean throwIfPermissionDenied)
throws OMException {

if (!accessAuthorizer.checkAccess(obj, context)) {
if (throwIfPermissionDenied) {
LOG.warn("User {} doesn't have {} permission to access {} /{}/{}/{}",
Expand All @@ -1790,6 +1805,8 @@ public boolean checkAcls(ResourceType resType, StoreType storeType,
}
}



/**
* Return true if Ozone acl's are enabled, else false.
*
Expand Down
Loading

0 comments on commit 5ed34df

Please sign in to comment.