Skip to content

Commit

Permalink
HDDS-7697. Restrict change of bucket properties to owner and admins i…
Browse files Browse the repository at this point in the history
…n NativeACL (#4439)
  • Loading branch information
sumitagrawl committed Mar 27, 2023
1 parent 7357f75 commit 0d7a042
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.net.InetAddress;

import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.BUCKET;
import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.VOLUME;

/**
* Ozone Acl Wrapper class.
Expand Down Expand Up @@ -83,24 +85,6 @@ public static void checkAllAcls(OmMetadataReader omMetadataReader,

boolean isVolOwner = isOwner(user, volOwner);

IAccessAuthorizer.ACLType parentAclRight = aclType;

//OzoneNativeAuthorizer differs from Ranger Authorizer as Ranger requires
// only READ access on parent level access. OzoneNativeAuthorizer has
// different parent level access based on the child level access type
if (omMetadataReader.isNativeAuthorizerEnabled()) {
if (aclType == IAccessAuthorizer.ACLType.CREATE ||
aclType == IAccessAuthorizer.ACLType.DELETE ||
aclType == IAccessAuthorizer.ACLType.WRITE_ACL) {
parentAclRight = IAccessAuthorizer.ACLType.WRITE;
} else if (aclType == IAccessAuthorizer.ACLType.READ_ACL ||
aclType == IAccessAuthorizer.ACLType.LIST) {
parentAclRight = IAccessAuthorizer.ACLType.READ;
}
} else {
parentAclRight = IAccessAuthorizer.ACLType.READ;
}

switch (resType) {
//For Volume level access we only need to check {OWNER} equal
// to Volume Owner.
Expand All @@ -116,6 +100,14 @@ public static void checkAllAcls(OmMetadataReader omMetadataReader,
// volume owner if current ugi user is volume owner else we need check
//{OWNER} equals bucket owner for bucket/key/prefix.
case PREFIX:
//OzoneNativeAuthorizer differs from Ranger Authorizer as Ranger requires
// only READ access on parent level access. OzoneNativeAuthorizer has
// different parent level access based on the child level access type
IAccessAuthorizer.ACLType parentAclRight = IAccessAuthorizer.ACLType.READ;
if (omMetadataReader.isNativeAuthorizerEnabled() && resType == BUCKET) {
parentAclRight = getParentNativeAcl(aclType, resType);
}

omMetadataReader.checkAcls(OzoneObj.ResourceType.VOLUME, storeType,
parentAclRight, vol, bucket, key, user,
remoteAddress, hostName, true,
Expand All @@ -138,6 +130,56 @@ public static void checkAllAcls(OmMetadataReader omMetadataReader,
}
}

/**
* get the Parent ACL based on child ACL and resource type.
*
* @param aclRight child acl as required
* @param resType resource type
* @return parent acl
*/
public static IAccessAuthorizer.ACLType getParentNativeAcl(
IAccessAuthorizer.ACLType aclRight, OzoneObj.ResourceType resType) {
// For volume, parent access has no meaning and not used
if (resType == VOLUME) {
return IAccessAuthorizer.ACLType.NONE;
}

// Refined the parent for bucket, keys & prefix
// OP |CHILD |PARENT

// CREATE NONE WRITE
// DELETE DELETE READ
// WRITE WRITE WRITE (For key/prefix, volume is READ)
// WRITE_ACL WRITE_ACL READ (V1 WRITE_ACL=>WRITE)

// READ READ READ
// LIST LIST READ (V1 LIST=>READ)
// READ_ACL READ_ACL READ (V1 READ_ACL=>READ)

// for bucket, except CREATE, all cases need READ for volume
if (resType == BUCKET) {
if (aclRight == IAccessAuthorizer.ACLType.CREATE) {
return IAccessAuthorizer.ACLType.WRITE;
}
return IAccessAuthorizer.ACLType.READ;
}

// else for key and prefix, bucket permission will be read
// except where key/prefix have CREATE and WRITE,
// bucket will have WRITE
IAccessAuthorizer.ACLType parentAclRight = aclRight;
if (aclRight == IAccessAuthorizer.ACLType.CREATE) {
parentAclRight = IAccessAuthorizer.ACLType.WRITE;
} else if (aclRight == IAccessAuthorizer.ACLType.READ_ACL
|| aclRight == IAccessAuthorizer.ACLType.LIST
|| aclRight == IAccessAuthorizer.ACLType.WRITE_ACL
|| aclRight == IAccessAuthorizer.ACLType.DELETE) {
parentAclRight = IAccessAuthorizer.ACLType.READ;
}

return parentAclRight;
}

private static boolean isOwner(UserGroupInformation callerUgi,
String ownerName) {
if (ownerName == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.ozone.om.request.validation.ValidationContext;
import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -124,9 +125,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
try {
// check Acl
if (ozoneManager.getAclsEnabled()) {
checkAcls(ozoneManager, OzoneObj.ResourceType.BUCKET,
OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
volumeName, bucketName, null);
checkAclPermission(ozoneManager, volumeName, bucketName);
}

// acquire lock.
Expand Down Expand Up @@ -269,6 +268,26 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
}
}

private void checkAclPermission(
OzoneManager ozoneManager, String volumeName, String bucketName)
throws IOException {
if (ozoneManager.getOmMetadataReader().isNativeAuthorizerEnabled()) {
UserGroupInformation ugi = createUGI();
String bucketOwner = ozoneManager.getBucketOwner(volumeName, bucketName,
IAccessAuthorizer.ACLType.READ, OzoneObj.ResourceType.BUCKET);
if (!ozoneManager.isAdmin(ugi) &&
!ozoneManager.isOwner(ugi, bucketOwner)) {
throw new OMException(
"Bucket properties are allowed to changed by Admin and Owner",
OMException.ResultCodes.PERMISSION_DENIED);
}
} else { // ranger acl
checkAcls(ozoneManager, OzoneObj.ResourceType.BUCKET,
OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
volumeName, bucketName, null);
}
}

public boolean checkQuotaBytesValid(OMMetadataManager metadataManager,
OmVolumeArgs omVolumeArgs, OmBucketArgs omBucketArgs,
OmBucketInfo dbBucketInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.BucketManager;
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OzoneAclUtils;
import org.apache.hadoop.ozone.om.PrefixManager;
import org.apache.hadoop.ozone.om.VolumeManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
Expand All @@ -34,6 +35,7 @@

import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.BUCKET;
import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.VOLUME;

/**
* Public API for Ozone ACLs. Security providers providing support for Ozone
Expand Down Expand Up @@ -79,6 +81,7 @@ public boolean checkAccess(IOzoneObj ozObject, RequestContext context)
Objects.requireNonNull(context);
OzoneObjInfo objInfo;
RequestContext parentContext;
RequestContext parentVolContext;
boolean isACLTypeCreate = (context.getAclRights() == ACLType.CREATE);

if (ozObject instanceof OzoneObjInfo) {
Expand All @@ -101,37 +104,21 @@ public boolean checkAccess(IOzoneObj ozObject, RequestContext context)
return getAllowListAllVolumes();
}

// Refined the parent context
// OP |CHILD |PARENT

// CREATE NONE WRITE (parent:'CREATE' when 'create bucket')
// DELETE DELETE WRITE
// WRITE WRITE WRITE
// WRITE_ACL WRITE_ACL WRITE (V1 WRITE_ACL=>WRITE)

// READ READ READ
// LIST LIST READ (V1 LIST=>READ)
// READ_ACL READ_ACL READ (V1 READ_ACL=>READ)

ACLType aclRight = context.getAclRights();
ACLType parentAclRight = aclRight;

if (aclRight == ACLType.CREATE || aclRight == ACLType.DELETE ||
aclRight == ACLType.WRITE_ACL) {
parentAclRight = ACLType.WRITE;
} else if (aclRight == ACLType.READ_ACL || aclRight == ACLType.LIST) {
parentAclRight = ACLType.READ;
}
// To prevent ACL enlargement, parent should be 'CREATE'
// when op is 'create bucket'. see HDDS-7461.
if (objInfo.getResourceType() == BUCKET && aclRight == ACLType.CREATE) {
parentAclRight = ACLType.CREATE;
}
ACLType parentAclRight = OzoneAclUtils.getParentNativeAcl(
context.getAclRights(), objInfo.getResourceType());

parentContext = RequestContext.newBuilder()
.setClientUgi(context.getClientUgi())
.setIp(context.getIp())
.setAclType(context.getAclType())
.setAclRights(parentAclRight).build();

// Volume will be always read in case of key and prefix
parentVolContext = RequestContext.newBuilder()
.setClientUgi(context.getClientUgi())
.setIp(context.getIp())
.setAclType(context.getAclType())
.setAclRights(ACLType.READ).build();

switch (objInfo.getResourceType()) {
case VOLUME:
Expand Down Expand Up @@ -168,7 +155,7 @@ public boolean checkAccess(IOzoneObj ozObject, RequestContext context)
return (keyAccess
&& prefixManager.checkAccess(objInfo, parentContext)
&& bucketManager.checkAccess(objInfo, parentContext)
&& volumeManager.checkAccess(objInfo, parentContext));
&& volumeManager.checkAccess(objInfo, parentVolContext));
case PREFIX:
LOG.trace("Checking access for Prefix: {}", objInfo);
// Skip check for volume owner
Expand All @@ -181,7 +168,7 @@ public boolean checkAccess(IOzoneObj ozObject, RequestContext context)
|| prefixManager.checkAccess(objInfo, context);
return (prefixAccess
&& bucketManager.checkAccess(objInfo, parentContext)
&& volumeManager.checkAccess(objInfo, parentContext));
&& volumeManager.checkAccess(objInfo, parentVolContext));
default:
throw new OMException("Unexpected object type:" +
objInfo.getResourceType(), INVALID_REQUEST);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,6 @@ public static void setup() throws IOException, AuthenticationException {
new String[]{"test1"});
}

// Refined the parent context
// OP |CHILD |PARENT

// CREATE NONE WRITE (parent:'CREATE' when 'create bucket')
// DELETE DELETE WRITE
// WRITE WRITE WRITE
// WRITE_ACL WRITE_ACL WRITE (V1 WRITE_ACL=>WRITE)

// READ READ READ
// LIST LIST READ (V1 LIST=>READ)
// READ_ACL READ_ACL READ (V1 READ_ACL=>READ)
@Test
@Flaky("HDDS-6335")
public void testKeyAcl()
Expand All @@ -148,11 +137,11 @@ public void testKeyAcl()
List<OzoneAcl> originalBuckAcls = getBucketAcls(vol, buck);
List<OzoneAcl> originalKeyAcls = getBucketAcls(vol, buck);

testParentChild(keyObj, WRITE, WRITE_ACL);
testParentChild(keyObj, READ, WRITE_ACL);
resetAcl(vol, originalVolAcls, buck, originalBuckAcls,
key, originalKeyAcls);

testParentChild(keyObj, WRITE, DELETE);
testParentChild(keyObj, READ, DELETE);
resetAcl(vol, originalVolAcls, buck, originalBuckAcls,
key, originalKeyAcls);

Expand Down Expand Up @@ -190,10 +179,10 @@ public void testBucketAcl()

List<OzoneAcl> originalVolAcls = getVolumeAcls(vol);
List<OzoneAcl> originalBuckAcls = getBucketAcls(vol, buck);
testParentChild(bucketObj, WRITE, WRITE_ACL);
testParentChild(bucketObj, READ, WRITE_ACL);
resetAcl(vol, originalVolAcls, buck, originalBuckAcls, null, null);

testParentChild(bucketObj, WRITE, DELETE);
testParentChild(bucketObj, READ, DELETE);
resetAcl(vol, originalVolAcls, buck, originalBuckAcls, null, null);

testParentChild(bucketObj, READ, READ_ACL);
Expand All @@ -202,13 +191,13 @@ public void testBucketAcl()
testParentChild(bucketObj, READ, LIST);
resetAcl(vol, originalVolAcls, buck, originalBuckAcls, null, null);

testParentChild(bucketObj, CREATE, CREATE);
testParentChild(bucketObj, WRITE, CREATE);
resetAcl(vol, originalVolAcls, buck, originalBuckAcls, null, null);

testParentChild(bucketObj, READ, READ);
resetAcl(vol, originalVolAcls, buck, originalBuckAcls, null, null);

testParentChild(bucketObj, WRITE, WRITE);
testParentChild(bucketObj, READ, WRITE);
resetAcl(vol, originalVolAcls, buck, originalBuckAcls, null, null);
}

Expand Down Expand Up @@ -263,7 +252,9 @@ private void testParentChild(OzoneObj child,
Assert.assertFalse(nativeAuthorizer.checkAccess(child, requestContext));

// add the volume acl (grand-parent), now key access is allowed.
addVolumeAcl(child.getVolumeName(), parentAcl);
OzoneAcl parentVolumeAcl = new OzoneAcl(USER,
testUgi1.getUserName(), READ, ACCESS);
addVolumeAcl(child.getVolumeName(), parentVolumeAcl);
Assert.assertTrue(nativeAuthorizer.checkAccess(child, requestContext));
}
}
Expand Down

0 comments on commit 0d7a042

Please sign in to comment.