Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #2986] Support for multiple ACL files in a fixed directory #3761

Merged
merged 13 commits into from
Feb 19, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.rocketmq.acl;

import java.util.List;
import java.util.Map;

import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

Expand Down Expand Up @@ -60,16 +63,18 @@ public interface AccessValidator {
*
* @return
*/
String getAclConfigVersion();
Map<String, DataVersion> getAclConfigVersion();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes the interface incompatible to older versions, you'd better add a new method and deprecate the original ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/**
* Update globalWhiteRemoteAddresses in acl yaml config file
*
* @return
*/
boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList);

/**
* get broker cluster acl config information
*
* @return
*/
AclConfig getAllAclConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
Expand Down Expand Up @@ -127,7 +128,7 @@ public AccessResource parse(RemotingCommand request, String remoteAddr) {
SortedMap<String, String> map = new TreeMap<String, String>();
for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) {
if (!SessionCredentials.SIGNATURE.equals(entry.getKey())
&& !MixAll.UNIQUE_MSG_QUERY_FLAG.equals(entry.getKey())) {
&& !MixAll.UNIQUE_MSG_QUERY_FLAG.equals(entry.getKey())) {
map.put(entry.getKey(), entry.getValue());
}
}
Expand All @@ -150,7 +151,7 @@ public boolean deleteAccessConfig(String accesskey) {
return aclPlugEngine.deleteAccessConfig(accesskey);
}

@Override public String getAclConfigVersion() {
@Override public Map<String, DataVersion> getAclConfigVersion() {
return aclPlugEngine.getAclConfigDataVersion();
}

Expand All @@ -161,4 +162,13 @@ public boolean deleteAccessConfig(String accesskey) {
@Override public AclConfig getAllAclConfig() {
return aclPlugEngine.getAllAclConfig();
}

public Map<String, Object> createAclAccessConfigMap(Map<String, Object> existedAccountMap,
PlainAccessConfig plainAccessConfig) {
return aclPlugEngine.createAclAccessConfigMap(existedAccountMap, plainAccessConfig);
}

public Map<String, Object> updateAclConfigFileVersion(Map<String, Object> updateAclConfigMap) {
return aclPlugEngine.updateAclConfigFileVersion(updateAclConfigMap);
}
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public void init() throws NoSuchFieldException, SecurityException, IOException {
ANYPlainAccessResource = clonePlainAccessResource(Permission.ANY);
DENYPlainAccessResource = clonePlainAccessResource(Permission.DENY);

System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
File file = new File("src/test/resources");
System.setProperty("rocketmq.home.dir", file.getAbsolutePath());

plainPermissionManager = new PlainPermissionManager();

}
Expand Down Expand Up @@ -117,7 +117,7 @@ public void buildPlainAccessResourceTest() {
Assert.assertEquals(resourcePermMap.size(), 3);

Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupA")).byteValue(), Permission.DENY);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB")).byteValue(), Permission.PUB|Permission.SUB);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB")).byteValue(), Permission.PUB | Permission.SUB);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupC")).byteValue(), Permission.PUB);

List<String> topics = new ArrayList<String>();
Expand All @@ -130,7 +130,7 @@ public void buildPlainAccessResourceTest() {
Assert.assertEquals(resourcePermMap.size(), 6);

Assert.assertEquals(resourcePermMap.get("topicA").byteValue(), Permission.DENY);
Assert.assertEquals(resourcePermMap.get("topicB").byteValue(), Permission.PUB|Permission.SUB);
Assert.assertEquals(resourcePermMap.get("topicB").byteValue(), Permission.PUB | Permission.SUB);
Assert.assertEquals(resourcePermMap.get("topicC").byteValue(), Permission.PUB);
}

Expand All @@ -157,13 +157,15 @@ public void checkPerm() {
plainPermissionManager.checkPerm(plainAccessResource, ANYPlainAccessResource);

}

@Test(expected = AclException.class)
public void checkErrorPermDefaultValueNotMatch() {

plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicF", Permission.PUB);
plainPermissionManager.checkPerm(plainAccessResource, SUBPlainAccessResource);
}

@Test(expected = AclException.class)
public void accountNullTest() {
plainAccessConfig.setAccessKey(null);
Expand All @@ -184,7 +186,7 @@ public void passWordtNullTest() {

@Test(expected = AclException.class)
public void passWordThanTest() {
plainAccessConfig.setAccessKey("123");
plainAccessConfig.setSecretKey("123");
plainPermissionManager.buildPlainAccessResource(plainAccessConfig);
}

Expand All @@ -198,11 +200,11 @@ public void testPlainAclPlugEngineInit() {
@Test
public void cleanAuthenticationInfoTest() throws IllegalAccessException {
// PlainPermissionManager.addPlainAccessResource(plainAccessResource);
Map<String, List<PlainAccessResource>> plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
Map<String, Map<String, PlainAccessResource>> plainAccessResourceMap = (Map<String, Map<String, PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true);
Assert.assertFalse(plainAccessResourceMap.isEmpty());

plainPermissionManager.clearPermissionInfo();
plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
plainAccessResourceMap = (Map<String, Map<String, PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true);
Assert.assertTrue(plainAccessResourceMap.isEmpty());
// RemoveDataVersionFromYamlFile("src/test/resources/conf/plain_acl.yml");
}
Expand All @@ -213,15 +215,14 @@ public void isWatchStartTest() {
PlainPermissionManager plainPermissionManager = new PlainPermissionManager();
Assert.assertTrue(plainPermissionManager.isWatchStart());
// RemoveDataVersionFromYamlFile("src/test/resources/conf/plain_acl.yml");

}


@Test
public void testWatch() throws IOException, IllegalAccessException ,InterruptedException{
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl-test.yml");
String fileName =System.getProperty("rocketmq.home.dir", "src/test/resources")+System.getProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
public void testWatch() throws IOException, IllegalAccessException, InterruptedException {
File file = new File("src/test/resources");
System.setProperty("rocketmq.home.dir", file.getAbsolutePath());

String fileName = System.getProperty("rocketmq.home.dir") + File.separator + "/conf/acl/plain_acl_test.yml";
File transport = new File(fileName);
transport.delete();
transport.createNewFile();
Expand All @@ -234,13 +235,14 @@ public void testWatch() throws IOException, IllegalAccessException ,InterruptedE
writer.flush();
writer.close();


PlainPermissionManager plainPermissionManager = new PlainPermissionManager();
Assert.assertTrue(plainPermissionManager.isWatchStart());

Map<String, String> accessKeyTable = (Map<String, String>) FieldUtils.readDeclaredField(plainPermissionManager, "accessKeyTable", true);
String aclFileName = accessKeyTable.get("watchrocketmq");
{
Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq");
Map<String, Map<String, PlainAccessResource>> plainAccessResourceMap = (Map<String, Map<String, PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true);
PlainAccessResource accessResource = plainAccessResourceMap.get(aclFileName).get("watchrocketmq");
Assert.assertNotNull(accessResource);
Assert.assertEquals(accessResource.getSecretKey(), "12345678");
Assert.assertTrue(accessResource.isAdmin());
Expand All @@ -259,22 +261,21 @@ public void testWatch() throws IOException, IllegalAccessException ,InterruptedE

Thread.sleep(1000);
{
Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq1");
Map<String, Map<String, PlainAccessResource>> plainAccessResourceMap = (Map<String, Map<String, PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "aclPlainAccessResourceMap", true);
PlainAccessResource accessResource = plainAccessResourceMap.get(aclFileName).get("watchrocketmq1");
Assert.assertNotNull(accessResource);
Assert.assertEquals(accessResource.getSecretKey(), "88888888");
Assert.assertFalse(accessResource.isAdmin());

}
transport.delete();
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
}

@Test(expected = AclException.class)
public void initializeTest() {
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_null.yml");
File file = new File("src/test/resources/conf/test");
System.setProperty("rocketmq.home.dir", file.getAbsolutePath());
new PlainPermissionManager();

}
}
44 changes: 44 additions & 0 deletions acl/src/test/resources/conf/acl/plain_acl.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## suggested format

globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*

accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress: 192.168.0.*
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=SUB
- groupC=SUB

- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true

18 changes: 18 additions & 0 deletions acl/src/test/resources/conf/test/conf/acl/plain_acl_null.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## suggested format


Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
Expand Down Expand Up @@ -386,7 +385,7 @@ public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,
clusterAclVersionInfo.setClusterName(responseHeader.getClusterName());
clusterAclVersionInfo.setBrokerName(responseHeader.getBrokerName());
clusterAclVersionInfo.setBrokerAddr(responseHeader.getBrokerAddr());
clusterAclVersionInfo.setAclConfigDataVersion(DataVersion.fromJson(responseHeader.getVersion(), DataVersion.class));
clusterAclVersionInfo.setAclConfigDataVersion(responseHeader.getVersion());
return clusterAclVersionInfo;
}
default:
Expand All @@ -397,7 +396,8 @@ public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,

}

public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
public AclConfig getBrokerClusterConfig(final String addr,
final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG, null);

Expand All @@ -407,7 +407,7 @@ public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMil
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
GetBrokerClusterAclConfigResponseBody body =
GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
AclConfig aclConfig = new AclConfig();
aclConfig.setGlobalWhiteAddrs(body.getGlobalWhiteAddrs());
aclConfig.setPlainAccessConfigs(body.getPlainAccessConfigs());
Expand Down Expand Up @@ -505,7 +505,7 @@ private SendResult sendMessageSync(
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response,addr);
return this.processSendResponse(brokerName, msg, response, addr);
}

private void sendMessageAsync(
Expand Down Expand Up @@ -671,7 +671,7 @@ private SendResult processSendResponse(
}

SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);

//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
Expand All @@ -690,8 +690,8 @@ private SendResult processSendResponse(
uniqMsgId = sb.toString();
}
SendResult sendResult = new SendResult(sendStatus,
uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
Expand Down Expand Up @@ -1432,8 +1432,8 @@ public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName,
}

public int addWritePermOfBroker(final String nameSrvAddr, String brokerName, final long timeoutMillis)
throws RemotingCommandException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
throws RemotingCommandException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
AddWritePermOfBrokerRequestHeader requestHeader = new AddWritePermOfBrokerRequestHeader();
requestHeader.setBrokerName(brokerName);

Expand All @@ -1444,7 +1444,7 @@ public int addWritePermOfBroker(final String nameSrvAddr, String brokerName, fin
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
AddWritePermOfBrokerResponseHeader responseHeader =
(AddWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(AddWritePermOfBrokerResponseHeader.class);
(AddWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(AddWritePermOfBrokerResponseHeader.class);
return responseHeader.getAddTopicCount();
}
default:
Expand Down Expand Up @@ -1492,8 +1492,9 @@ public void deleteTopicInNameServer(final String addr, final String topic, final
throw new MQClientException(response.getCode(), response.getRemark());
}

public void deleteSubscriptionGroup(final String addr, final String groupName, final boolean removeOffset, final long timeoutMillis)
throws RemotingException, InterruptedException, MQClientException {
public void deleteSubscriptionGroup(final String addr, final String groupName, final boolean removeOffset,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader();
requestHeader.setGroupName(groupName);
requestHeader.setRemoveOffset(removeOffset);
Expand Down