Skip to content

Commit

Permalink
resolve Conflicts:
Browse files Browse the repository at this point in the history
#	broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
  • Loading branch information
qqeasonchen committed Oct 24, 2019
2 parents a314e17 + 5a8c4ed commit c6cbab9
Show file tree
Hide file tree
Showing 57 changed files with 1,950 additions and 274 deletions.
12 changes: 6 additions & 6 deletions README.md
Expand Up @@ -8,10 +8,12 @@
It offers a variety of features:

* Pub/Sub messaging model
* Scheduled message delivery
* Financial grade transactional message
* A variety of cross language clients, such as Java, C/C++, Python, Go
* Pluggable transport protocols, such as TCP, SSL, AIO
* Inbuilt message tracing capability, also support opentracing
* Versatile big-data and streaming ecosytem integration
* Message retroactivity by time or offset
* Log hub for streaming
* Big data integration
* Reliable FIFO and strict ordered messaging in the same queue
* Efficient pull&push consumption model
* Million-level message accumulation capacity in a single queue
Expand All @@ -21,9 +23,7 @@ It offers a variety of features:
* Various message filter mechanics such as SQL and Tag
* Docker images for isolated testing and cloud isolated clusters
* Feature-rich administrative dashboard for configuration, metrics and monitoring
* Access control list
* Message trace

* Authentication and authorisation

----------

Expand Down
4 changes: 4 additions & 0 deletions acl/pom.xml
Expand Up @@ -67,6 +67,10 @@
<artifactId>logback-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-validator</groupId>
<artifactId>commons-validator</artifactId>
</dependency>

</dependencies>
</project>
Expand Up @@ -18,6 +18,7 @@
package org.apache.rocketmq.acl;

import java.util.List;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

Expand Down Expand Up @@ -66,4 +67,10 @@ public interface AccessValidator {
* @return
*/
boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList);

/**
* get broker cluster acl config information
* @return
*/
AclConfig getAllAclConfig();
}
164 changes: 151 additions & 13 deletions acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
Expand Up @@ -23,6 +23,7 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Map;
import java.util.SortedMap;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -69,24 +70,75 @@ public static String calSignature(byte[] data, String secretKey) {
return signature;
}

public static void IPv6AddressCheck(String netaddress) {
if (isAsterisk(netaddress) || isMinus(netaddress)) {
int asterisk = netaddress.indexOf("*");
int minus = netaddress.indexOf("-");
// '*' must be the end of netaddress if it exists
if (asterisk > -1 && asterisk != netaddress.length() - 1) {
throw new AclException(String.format("Netaddress examine scope Exception netaddress is %s", netaddress));
}

// format like "2::ac5:78:1-200:*" or "2::ac5:78:1-200" is legal
if (minus > -1) {
if (asterisk == -1) {
if (minus <= netaddress.lastIndexOf(":")) {
throw new AclException(String.format("Netaddress examine scope Exception netaddress is %s", netaddress));
}
} else {
if (minus <= netaddress.lastIndexOf(":", netaddress.lastIndexOf(":") - 1)) {
throw new AclException(String.format("Netaddress examine scope Exception netaddress is %s", netaddress));
}
}
}
}
}

public static String v6ipProcess(String netaddress, String[] strArray, int index) {
int part;
String subAddress;
boolean isAsterisk = isAsterisk(netaddress);
boolean isMinus = isMinus(netaddress);
if (isAsterisk && isMinus) {
part = 6;
int lastColon = netaddress.lastIndexOf(':');
int secondLastColon = netaddress.substring(0, lastColon).lastIndexOf(':');
subAddress = netaddress.substring(0, secondLastColon);
} else if (!isAsterisk && !isMinus) {
part = 8;
subAddress = netaddress;
} else {
part = 7;
subAddress = netaddress.substring(0, netaddress.lastIndexOf(':'));
}
return expandIP(subAddress, part);
}

public static void verify(String netaddress, int index) {
if (!AclUtils.isScope(netaddress, index)) {
throw new AclException(String.format("Netaddress examine scope Exception netaddress is %s", netaddress));
}
}

public static String[] getAddreeStrArray(String netaddress, String four) {
String[] fourStrArray = StringUtils.split(four.substring(1, four.length() - 1), ",");
public static String[] getAddreeStrArray(String netaddress, String partialAddress) {
String[] parAddStrArray = StringUtils.split(partialAddress.substring(1, partialAddress.length() - 1), ",");
String address = netaddress.substring(0, netaddress.indexOf("{"));
String[] addreeStrArray = new String[fourStrArray.length];
for (int i = 0; i < fourStrArray.length; i++) {
addreeStrArray[i] = address + fourStrArray[i];
String[] addreeStrArray = new String[parAddStrArray.length];
for (int i = 0; i < parAddStrArray.length; i++) {
addreeStrArray[i] = address + parAddStrArray[i];
}
return addreeStrArray;
}

public static boolean isScope(String num, int index) {
String[] strArray = StringUtils.split(num, ".");
public static boolean isScope(String netaddress, int index) {
// IPv6 Address
if (isColon(netaddress)) {
netaddress = expandIP(netaddress, 8);
String[] strArray = StringUtils.split(netaddress, ":");
return isIPv6Scope(strArray, index);
}

String[] strArray = StringUtils.split(netaddress, ".");
if (strArray.length != 4) {
return false;
}
Expand All @@ -107,6 +159,10 @@ public static boolean isScope(String[] num, int index) {

}

public static boolean isColon(String netaddress) {
return netaddress.indexOf(':') > -1;
}

public static boolean isScope(String num) {
return isScope(Integer.valueOf(num.trim()));
}
Expand All @@ -119,7 +175,7 @@ public static boolean isAsterisk(String asterisk) {
return asterisk.indexOf('*') > -1;
}

public static boolean isColon(String colon) {
public static boolean isComma(String colon) {
return colon.indexOf(',') > -1;
}

Expand All @@ -128,6 +184,88 @@ public static boolean isMinus(String minus) {

}

public static boolean isIPv6Scope(String[] num, int index) {
for (int i = 0; i < index; i++) {
int value;
try {
value = Integer.parseInt(num[i], 16);
} catch (NumberFormatException e) {
return false;
}
if (!isIPv6Scope(value)) {
return false;
}
}
return true;
}

public static boolean isIPv6Scope(int num) {
int min = Integer.parseInt("0", 16);
int max = Integer.parseInt("ffff", 16);
return num >= min && num <= max;
}

public static String expandIP(String netaddress, int part) {
boolean compress = false;
int compressIndex = -1;
String[] strArray = StringUtils.split(netaddress, ":");
ArrayList<Integer> indexes = new ArrayList<>();
for (int i = 0; i < netaddress.length(); i++) {
if (netaddress.charAt(i) == ':') {
if (indexes.size() > 0 && i - indexes.get(indexes.size() - 1) == 1) {
compressIndex = i;
compress = true;
}
indexes.add(i);
}
}

for (int i = 0; i < strArray.length; i++) {
if (strArray[i].length() < 4) {
strArray[i] = "0000".substring(0, 4 - strArray[i].length()) + strArray[i];
}
}

StringBuilder sb = new StringBuilder();
if (compress) {
int pos = indexes.indexOf(compressIndex);
int index = 0;
if (!netaddress.startsWith(":")) {
for (int i = 0; i < pos; i++) {
sb.append(strArray[index]).append(":");
index += 1;
}
}
int zeroNum = part - strArray.length;
if (netaddress.endsWith(":")) {
for (int i = 0; i < zeroNum; i++) {
sb.append("0000");
if (i != zeroNum - 1) {
sb.append(":");
}
}
} else {
for (int i = 0; i < zeroNum; i++) {
sb.append("0000").append(":");
}
for (int i = index; i < strArray.length; i++) {
sb.append(strArray[i]);
if (i != strArray.length - 1) {
sb.append(":");
}
}
}
} else {
for (int i = 0; i < strArray.length; i++) {
sb.append(strArray[i]);
if (i != strArray.length - 1) {
sb.append(":");
}
}
}
return sb.toString().toUpperCase();
}

public static <T> T getYamlDataObject(String path, Class<T> clazz) {
Yaml yaml = new Yaml();
FileInputStream fis = null;
Expand All @@ -148,7 +286,7 @@ public static <T> T getYamlDataObject(String path, Class<T> clazz) {
}
}

public static boolean writeDataObject(String path, Map<String,Object> dataMap) {
public static boolean writeDataObject(String path, Map<String, Object> dataMap) {
Yaml yaml = new Yaml();
PrintWriter pw = null;
try {
Expand All @@ -172,15 +310,15 @@ public static RPCHook getAclRPCHook(String fileName) {
yamlDataObject = AclUtils.getYamlDataObject(fileName,
JSONObject.class);
} catch (Exception e) {
log.error("Convert yaml file to data object error, ",e);
log.error("Convert yaml file to data object error, ", e);
return null;
}

if (yamlDataObject == null || yamlDataObject.isEmpty()) {
log.warn("Cannot find conf file :{}, acl isn't be enabled." ,fileName);
log.warn("Cannot find conf file :{}, acl isn't be enabled.", fileName);
return null;
}

String accessKey = yamlDataObject.getString(AclConstants.CONFIG_ACCESS_KEY);
String secretKey = yamlDataObject.getString(AclConstants.CONFIG_SECRET_KEY);

Expand All @@ -189,7 +327,7 @@ public static RPCHook getAclRPCHook(String fileName) {

return null;
}
return new AclClientRPCHook(new SessionCredentials(accessKey,secretKey));
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}

}
Expand Up @@ -26,6 +26,7 @@
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
Expand All @@ -50,7 +51,7 @@ public PlainAccessValidator() {
public AccessResource parse(RemotingCommand request, String remoteAddr) {
PlainAccessResource accessResource = new PlainAccessResource();
if (remoteAddr != null && remoteAddr.contains(":")) {
accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]);
accessResource.setWhiteRemoteAddress(remoteAddr.substring(0, remoteAddr.lastIndexOf(':')));
} else {
accessResource.setWhiteRemoteAddress(remoteAddr);
}
Expand Down Expand Up @@ -155,4 +156,7 @@ public boolean deleteAccessConfig(String accesskey) {
return aclPlugEngine.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList);
}

@Override public AclConfig getAllAclConfig() {
return aclPlugEngine.getAllAclConfig();
}
}
Expand Up @@ -30,6 +30,7 @@
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
Expand Down Expand Up @@ -270,6 +271,28 @@ public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList) {
return false;
}

public AclConfig getAllAclConfig() {
AclConfig aclConfig = new AclConfig();
List<PlainAccessConfig> configs = new ArrayList<>();
List<String> whiteAddrs = new ArrayList<>();
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
JSONObject.class);
if (plainAclConfData == null || plainAclConfData.isEmpty()) {
throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName));
}
JSONArray globalWhiteAddrs = plainAclConfData.getJSONArray(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS);
if (globalWhiteAddrs != null && !globalWhiteAddrs.isEmpty()) {
whiteAddrs = globalWhiteAddrs.toJavaList(String.class);
}
JSONArray accounts = plainAclConfData.getJSONArray(AclConstants.CONFIG_ACCOUNTS);
if (accounts != null && !accounts.isEmpty()) {
configs = accounts.toJavaList(PlainAccessConfig.class);
}
aclConfig.setGlobalWhiteAddrs(whiteAddrs);
aclConfig.setPlainAccessConfigs(configs);
return aclConfig;
}

private void watch() {
try {
String watchFilePath = fileHome + fileName;
Expand Down

0 comments on commit c6cbab9

Please sign in to comment.