ozone.om.user.max.volume
1024
diff --git a/hadoop-hdds/docs/content/shell/VolumeCommands.md b/hadoop-hdds/docs/content/shell/VolumeCommands.md
index 47fb9852b86..fe459f31335 100644
--- a/hadoop-hdds/docs/content/shell/VolumeCommands.md
+++ b/hadoop-hdds/docs/content/shell/VolumeCommands.md
@@ -85,13 +85,15 @@ The above command will print out the information about hive volume.
### List
-The `volume list` command will list the volumes owned by a user.
+The `volume list` command will list the volumes accessible by a user.
{{< highlight bash >}}
ozone sh volume list --user hadoop
{{< /highlight >}}
-The above command will print out all the volumes owned by the user hadoop.
+When ACL is enabled, the above command will print out volumes that the user
+hadoop has LIST permission to. When ACL is disabled, the above command will
+print out all the volumes owned by the user hadoop.
### Update
diff --git a/hadoop-hdds/docs/content/shell/VolumeCommands.zh.md b/hadoop-hdds/docs/content/shell/VolumeCommands.zh.md
index b4a4c28add9..190e0994e74 100644
--- a/hadoop-hdds/docs/content/shell/VolumeCommands.zh.md
+++ b/hadoop-hdds/docs/content/shell/VolumeCommands.zh.md
@@ -80,13 +80,14 @@ ozone sh volume info /hive
### 列举
-`volume list` 命令用来列举一个用户拥有的所有卷。
+`volume list` 命令用来列举一个用户可以访问的所有卷。
{{< highlight bash >}}
ozone sh volume list --user hadoop
{{< /highlight >}}
-上述命令会打印出 hadoop 用户拥有的所有卷。
+若 ACL 已启用,上述命令会打印出 hadoop 用户有 LIST 权限的所有卷。
+若 ACL 被禁用,上述命令会打印出 hadoop 用户拥有的所有卷。
### 更新
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index f46b30854e2..7af6627d05d 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -76,6 +76,9 @@ private OMConfigKeys() {
"ozone.om.db.cache.size.mb";
public static final int OZONE_OM_DB_CACHE_SIZE_DEFAULT = 128;
+ public static final String OZONE_OM_VOLUME_LISTALL_ALLOWED =
+ "ozone.om.volume.listall.allowed";
+ public static final boolean OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT = true;
public static final String OZONE_OM_USER_MAX_VOLUME =
"ozone.om.user.max.volume";
public static final int OZONE_OM_USER_MAX_VOLUME_DEFAULT = 1024;
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index ff317731830..34943cb3f3c 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -118,7 +118,7 @@ boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
void deleteVolume(String volume) throws IOException;
/**
- * Lists volume owned by a specific user.
+ * Lists volumes accessible by a specific user.
* @param userName - user name
* @param prefix - Filter prefix -- Return only entries that match this.
* @param prevKey - Previous key -- List starts from the next from the prevkey
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index a95d45ef74a..674f67258f0 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -573,7 +573,7 @@ public void deleteVolume(String volume) throws IOException {
}
/**
- * Lists volume owned by a specific user.
+ * Lists volumes accessible by a specific user.
*
* @param userName - user name
* @param prefix - Filter prefix -- Return only entries that match this.
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index df34dd7505b..6184b422429 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -15,6 +15,7 @@
# limitations under the License.
CORE-SITE.XML_fs.o3fs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem
+OZONE-SITE.XML_ozone.om.volume.listall.allowed=false
OZONE-SITE.XML_ozone.om.address=om
OZONE-SITE.XML_ozone.om.http-address=om:9874
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
new file mode 100644
index 00000000000..d5b0efc7146
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+/**
+ * Test OzoneManager list volume operation under combinations of configs.
+ */
+public class TestOzoneManagerListVolumes {
+
+ @Rule
+ public Timeout timeout = new Timeout(120_000);
+
+ private UserGroupInformation loginUser;
+ private UserGroupInformation user1 =
+ UserGroupInformation.createRemoteUser("user1"); // Admin user
+ private UserGroupInformation user2 =
+ UserGroupInformation.createRemoteUser("user2"); // Non-admin user
+
+ @Before
+ public void init() throws Exception {
+ loginUser = UserGroupInformation.getLoginUser();
+ }
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ */
+ private MiniOzoneCluster startCluster(boolean aclEnabled,
+ boolean volListAllAllowed) throws Exception {
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ String clusterId = UUID.randomUUID().toString();
+ String scmId = UUID.randomUUID().toString();
+ String omId = UUID.randomUUID().toString();
+ conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
+ conf.set(OZONE_ADMINISTRATORS, "user1");
+ conf.setInt(OZONE_SCM_RATIS_PIPELINE_LIMIT, 10);
+
+ // Use native impl here, default impl doesn't do actual checks
+ conf.set(OZONE_ACL_AUTHORIZER_CLASS, OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
+ // Note: OM doesn't support live config reloading
+ conf.setBoolean(OZONE_ACL_ENABLED, aclEnabled);
+ conf.setBoolean(OZONE_OM_VOLUME_LISTALL_ALLOWED, volListAllAllowed);
+
+ MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
+ .setClusterId(clusterId).setScmId(scmId).setOmId(omId).build();
+ cluster.waitForClusterToBeReady();
+
+ // loginUser is the user running this test.
+ // Implication: loginUser is automatically added to the OM admin list.
+ UserGroupInformation.setLoginUser(loginUser);
+ // Create volumes with non-default owners and ACLs
+ OzoneClient client = cluster.getClient();
+ ObjectStore objectStore = client.getObjectStore();
+
+ /* r = READ, w = WRITE, c = CREATE, d = DELETE
+ l = LIST, a = ALL, n = NONE, x = READ_ACL, y = WRITE_ACL */
+ String aclUser1All = "user:user1:a";
+ String aclUser2All = "user:user2:a";
+ String aclWorldAll = "world::a";
+ createVolumeWithOwnerAndAcl(objectStore, "volume1", "user1", aclUser1All);
+ createVolumeWithOwnerAndAcl(objectStore, "volume2", "user2", aclUser2All);
+ createVolumeWithOwnerAndAcl(objectStore, "volume3", "user1", aclUser2All);
+ createVolumeWithOwnerAndAcl(objectStore, "volume4", "user2", aclUser1All);
+ createVolumeWithOwnerAndAcl(objectStore, "volume5", "user1", aclWorldAll);
+
+ return cluster;
+ }
+
+ private void stopCluster(MiniOzoneCluster cluster) {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private void createVolumeWithOwnerAndAcl(ObjectStore objectStore,
+ String volumeName, String ownerName, String aclString)
+ throws IOException {
+ ClientProtocol proxy = objectStore.getClientProxy();
+ objectStore.createVolume(volumeName);
+ proxy.setVolumeOwner(volumeName, ownerName);
+ setVolumeAcl(objectStore, volumeName, aclString);
+ }
+
+ /**
+ * Helper function to set volume ACL.
+ */
+ private void setVolumeAcl(ObjectStore objectStore, String volumeName,
+ String aclString) throws IOException {
+ OzoneObj obj = OzoneObjInfo.Builder.newBuilder().setVolumeName(volumeName)
+ .setResType(OzoneObj.ResourceType.VOLUME).setStoreType(OZONE).build();
+ Assert.assertTrue(objectStore.setAcl(obj, OzoneAcl.parseAcls(aclString)));
+ }
+
+ /**
+ * Helper function to reduce code redundancy for test checks with each user
+ * under different config combination.
+ */
+ private void checkUser(MiniOzoneCluster cluster, UserGroupInformation user,
+ List expectVol, boolean expectListAllSuccess) throws IOException {
+
+ UserGroupInformation.setLoginUser(user);
+ OzoneClient client = cluster.getClient();
+ ObjectStore objectStore = client.getObjectStore();
+
+ // `ozone sh volume list` shall return volumes with LIST permission of user.
+ Iterator extends OzoneVolume> it = objectStore.listVolumesByUser(
+ null, "", "");
+ Set accessibleVolumes = new HashSet<>();
+ while (it.hasNext()) {
+ OzoneVolume vol = it.next();
+ String volumeName = vol.getName();
+ accessibleVolumes.add(volumeName);
+ }
+ Assert.assertEquals(new HashSet<>(expectVol), accessibleVolumes);
+
+ // `ozone sh volume list --all` returns all volumes,
+ // or throws exception (for non-admin if acl enabled & listall disallowed).
+ if (expectListAllSuccess) {
+ it = objectStore.listVolumes("volume");
+ int count = 0;
+ while (it.hasNext()) {
+ it.next();
+ count++;
+ }
+ Assert.assertEquals(5, count);
+ } else {
+ try {
+ objectStore.listVolumes("volume");
+ Assert.fail("listAllVolumes should fail for " + user.getUserName());
+ } catch (RuntimeException ex) {
+ // Current listAllVolumes throws RuntimeException
+ if (ex.getCause() instanceof OMException) {
+ // Expect PERMISSION_DENIED
+ if (((OMException) ex.getCause()).getResult() !=
+ OMException.ResultCodes.PERMISSION_DENIED) {
+ throw ex;
+ }
+ } else {
+ throw ex;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAclEnabledListAllAllowed() throws Exception {
+ // ozone.acl.enabled = true, ozone.om.volume.listall.allowed = true
+ MiniOzoneCluster cluster = startCluster(true, true);
+ checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
+ true);
+ checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
+ true);
+ stopCluster(cluster);
+ }
+
+ @Test
+ public void testAclEnabledListAllDisallowed() throws Exception {
+ // ozone.acl.enabled = true, ozone.om.volume.listall.allowed = false
+ MiniOzoneCluster cluster = startCluster(true, false);
+ checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
+ true);
+ checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
+ false);
+ stopCluster(cluster);
+ }
+
+ @Test
+ public void testAclDisabledListAllAllowed() throws Exception {
+ // ozone.acl.enabled = false, ozone.om.volume.listall.allowed = true
+ MiniOzoneCluster cluster = startCluster(false, true);
+ checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume5"),
+ true);
+ checkUser(cluster, user2, Arrays.asList("volume2", "volume4"),
+ true);
+ stopCluster(cluster);
+ }
+
+ @Test
+ public void testAclDisabledListAllDisallowed() throws Exception {
+ // ozone.acl.enabled = false, ozone.om.volume.listall.allowed = false
+ MiniOzoneCluster cluster = startCluster(false, false);
+ checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume5"),
+ true);
+ checkUser(cluster, user2, Arrays.asList("volume2", "volume4"),
+ true); // listall will succeed since acl is disabled
+ stopCluster(cluster);
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 67606bf6333..217d22c0fe0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -202,6 +202,8 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
@@ -295,6 +297,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private static String keyProviderUriKeyName =
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
+ private boolean allowListAllVolumes;
// Adding parameters needed for VolumeRequests here, so that during request
// execution, we can get from ozoneManager.
private long maxUserVolumeCount;
@@ -341,6 +344,8 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
loginOMUserIfSecurityEnabled(conf);
+ this.allowListAllVolumes = conf.getBoolean(OZONE_OM_VOLUME_LISTALL_ALLOWED,
+ OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT);
this.maxUserVolumeCount = conf.getInt(OZONE_OM_USER_MAX_VOLUME,
OZONE_OM_USER_MAX_VOLUME_DEFAULT);
Preconditions.checkArgument(this.maxUserVolumeCount > 0,
@@ -421,11 +426,9 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
}
instantiateServices();
-
this.omRatisSnapshotInfo = new OMRatisSnapshotInfo(
omStorage.getCurrentDir());
initializeRatisServer();
-
if (isRatisEnabled) {
// Create Ratis storage dir
String omRatisDirectory =
@@ -446,7 +449,6 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
}
metrics = OMMetrics.create();
-
omClientProtocolMetrics = ProtocolMessageMetrics
.create("OmClientProtocol", "Ozone Manager RPC endpoint",
OzoneManagerProtocolProtos.Type.values());
@@ -1595,7 +1597,7 @@ public void createVolume(OmVolumeArgs args) throws IOException {
* @param vol - name of volume
* @param bucket - bucket name
* @param key - key
- * @throws OMException
+ * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied.
*/
private void checkAcls(ResourceType resType, StoreType store,
ACLType acl, String vol, String bucket, String key)
@@ -1603,27 +1605,54 @@ private void checkAcls(ResourceType resType, StoreType store,
checkAcls(resType, store, acl, vol, bucket, key,
ProtobufRpcEngine.Server.getRemoteUser(),
ProtobufRpcEngine.Server.getRemoteIp(),
- ProtobufRpcEngine.Server.getRemoteIp().getHostName());
+ ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
+ true);
+ }
+
+ /**
+ * A variant of checkAcls that doesn't throw exception if permission denied.
+ * @return true if permission granted, false if permission denied.
+ */
+ private boolean hasAcls(ResourceType resType, StoreType store,
+ ACLType acl, String vol, String bucket, String key) {
+ try {
+ return checkAcls(resType, store, acl, vol, bucket, key,
+ ProtobufRpcEngine.Server.getRemoteUser(),
+ ProtobufRpcEngine.Server.getRemoteIp(),
+ ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
+ false);
+ } catch (OMException ex) {
+ // Should not trigger exception here at all
+ return false;
+ }
}
+
/**
* CheckAcls for the ozone object.
- * @param resType
- * @param storeType
- * @param aclType
- * @param vol
- * @param bucket
- * @param key
- * @param ugi
- * @param remoteAddress
- * @param hostName
- * @throws OMException
+ * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied.
*/
@SuppressWarnings("parameternumber")
public void checkAcls(ResourceType resType, StoreType storeType,
ACLType aclType, String vol, String bucket, String key,
UserGroupInformation ugi, InetAddress remoteAddress, String hostName)
throws OMException {
+ checkAcls(resType, storeType, aclType, vol, bucket, key,
+ ugi, remoteAddress, hostName, true);
+ }
+
+ /**
+ * CheckAcls for the ozone object.
+ * @return true if permission granted, false if permission denied.
+ * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied
+ * and throwOnPermissionDenied set to true.
+ */
+ @SuppressWarnings("parameternumber")
+ private boolean checkAcls(ResourceType resType, StoreType storeType,
+ ACLType aclType, String vol, String bucket, String key,
+ UserGroupInformation ugi, InetAddress remoteAddress, String hostName,
+ boolean throwIfPermissionDenied)
+ throws OMException {
OzoneObj obj = OzoneObjInfo.Builder.newBuilder()
.setResType(resType)
.setStoreType(storeType)
@@ -1638,11 +1667,16 @@ public void checkAcls(ResourceType resType, StoreType storeType,
.setAclRights(aclType)
.build();
if (!accessAuthorizer.checkAccess(obj, context)) {
- LOG.warn("User {} doesn't have {} permission to access {} /{}/{}/{}",
- ugi.getUserName(), aclType, resType, vol, bucket, key);
- throw new OMException("User " + ugi.getUserName() + " doesn't " +
- "have " + aclType + " permission to access " + resType,
- ResultCodes.PERMISSION_DENIED);
+ if (throwIfPermissionDenied) {
+ LOG.warn("User {} doesn't have {} permission to access {} /{}/{}/{}",
+ ugi.getUserName(), aclType, resType, vol, bucket, key);
+ throw new OMException("User " + ugi.getUserName() + " doesn't have " +
+ aclType + " permission to access " + resType,
+ ResultCodes.PERMISSION_DENIED);
+ }
+ return false;
+ } else {
+ return true;
}
}
@@ -1725,7 +1759,7 @@ public void setQuota(String volume, long quota) throws IOException {
@Override
public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
throws IOException {
- if(isAclEnabled) {
+ if (isAclEnabled) {
checkAcls(ResourceType.VOLUME, StoreType.OZONE,
ACLType.READ, volume, null, null);
}
@@ -1810,7 +1844,7 @@ public void deleteVolume(String volume) throws IOException {
}
/**
- * Lists volume owned by a specific user.
+ * Lists volumes accessible by a specific user.
*
* @param userName - user name
* @param prefix - Filter prefix -- Return only entries that match this.
@@ -1823,13 +1857,13 @@ public void deleteVolume(String volume) throws IOException {
@Override
public List listVolumeByUser(String userName, String prefix,
String prevKey, int maxKeys) throws IOException {
- if(isAclEnabled) {
- UserGroupInformation remoteUserUgi = ProtobufRpcEngine.Server.
- getRemoteUser();
+ UserGroupInformation remoteUserUgi =
+ ProtobufRpcEngine.Server.getRemoteUser();
+ if (isAclEnabled) {
if (remoteUserUgi == null) {
LOG.error("Rpc user UGI is null. Authorization failed.");
- throw new OMException("Rpc user UGI is null. Authorization " +
- "failed.", ResultCodes.PERMISSION_DENIED);
+ throw new OMException("Rpc user UGI is null. Authorization failed.",
+ ResultCodes.PERMISSION_DENIED);
}
}
boolean auditSuccess = true;
@@ -1840,7 +1874,23 @@ public List listVolumeByUser(String userName, String prefix,
auditMap.put(OzoneConsts.USERNAME, userName);
try {
metrics.incNumVolumeLists();
- return volumeManager.listVolumes(userName, prefix, prevKey, maxKeys);
+ if (isAclEnabled) {
+ // List all volumes first
+ List listAllVolumes = volumeManager.listVolumes(
+ null, prefix, prevKey, maxKeys);
+ List result = new ArrayList<>();
+ // Filter all volumes by LIST ACL
+ for (OmVolumeArgs volumeArgs : listAllVolumes) {
+ if (hasAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.LIST,
+ volumeArgs.getVolume(), null, null)) {
+ result.add(volumeArgs);
+ }
+ }
+ return result;
+ } else {
+ // When ACL is not enabled, fallback to filter by owner
+ return volumeManager.listVolumes(userName, prefix, prevKey, maxKeys);
+ }
} catch (Exception ex) {
metrics.incNumVolumeListFails();
auditSuccess = false;
@@ -1876,7 +1926,10 @@ public List listAllVolumes(String prefix, String prevKey, int
auditMap.put(OzoneConsts.USERNAME, null);
try {
metrics.incNumVolumeLists();
- checkAdmin();
+ if (!allowListAllVolumes) {
+ // Only admin can list all volumes when disallowed in config
+ checkAdmin();
+ }
return volumeManager.listVolumes(null, prefix, prevKey, maxKeys);
} catch (Exception ex) {
metrics.incNumVolumeListFails();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
index 9d9cdab9e81..f7d730aac2e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
@@ -20,10 +20,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -32,10 +30,8 @@
.OzoneManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.UserVolumeInfo;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
import org.apache.hadoop.ozone.security.acl.RequestContext;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import com.google.common.base.Preconditions;
@@ -453,23 +449,11 @@ public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
@Override
public List listVolumes(String userName,
String prefix, String startKey, int maxKeys) throws IOException {
- metadataManager.getLock().acquireLock(USER_LOCK, userName);
+ metadataManager.getLock().acquireReadLock(USER_LOCK, userName);
try {
- List volumes = metadataManager.listVolumes(
- userName, prefix, startKey, maxKeys);
- UserGroupInformation userUgi = ProtobufRpcEngine.Server.
- getRemoteUser();
- if (userUgi == null || !aclEnabled) {
- return volumes;
- }
-
- List filteredVolumes = volumes.stream().
- filter(v -> v.getAclMap().
- hasAccess(IAccessAuthorizer.ACLType.LIST, userUgi))
- .collect(Collectors.toList());
- return filteredVolumes;
+ return metadataManager.listVolumes(userName, prefix, startKey, maxKeys);
} finally {
- metadataManager.getLock().releaseLock(USER_LOCK, userName);
+ metadataManager.getLock().releaseReadLock(USER_LOCK, userName);
}
}
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/volume/ListVolumeHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/volume/ListVolumeHandler.java
index 78f0e4f3056..fb0760ebf43 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/volume/ListVolumeHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/volume/ListVolumeHandler.java
@@ -52,9 +52,14 @@ public class ListVolumeHandler extends Handler {
private ListOptions listOptions;
@Option(names = {"--user", "-u"},
- description = "Owner of the volumes to list.")
+ description = "List accessible volumes of the user. This will be ignored"
+ + " if list all volumes option is specified.")
private String userName;
+ @Option(names = {"--all", "-a"},
+ description = "List all volumes.")
+ private boolean listAllVolumes;
+
@Override
protected OzoneAddress getAddress() throws OzoneClientException {
OzoneAddress address = new OzoneAddress(uri);
@@ -71,12 +76,12 @@ protected void execute(OzoneClient client, OzoneAddress address)
}
Iterator extends OzoneVolume> volumeIterator;
- if (userName != null) {
+ if (userName != null && !listAllVolumes) {
volumeIterator = client.getObjectStore().listVolumesByUser(userName,
listOptions.getPrefix(), listOptions.getStartItem());
} else {
volumeIterator = client.getObjectStore().listVolumes(
- listOptions.getPrefix());
+ listOptions.getPrefix(), listOptions.getStartItem());
}
int counter = 0;