Skip to content

Commit

Permalink
feature:Support to obtain multiple configurations through a single da…
Browse files Browse the repository at this point in the history
…taid in Nacos (#3303)
  • Loading branch information
Rubbernecker committed Dec 25, 2020
1 parent 08fbcef commit 96016f0
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 20 deletions.
2 changes: 2 additions & 0 deletions changes/1.5.0.md
Expand Up @@ -42,6 +42,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#3365](https://github.com/seata/seata/pull/3365)] 修复ParameterParserTest测试用例
- [[#3359](https://github.com/seata/seata/pull/3359)] 删除未使用的测试用例
- [[#3397](https://github.com/seata/seata/pull/3397)] 添加更改记录文件夹
- [[#3303](https://github.com/seata/seata/pull/3303)] 支持从nacos单一dataId中读取所有配置



Expand All @@ -61,6 +62,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [a364176773](https://github.com/a364176773)
- [anselleeyy](https://github.com/anselleeyy)
- [Ifdevil](https://github.com/Ifdevil)
- [Rubbernecker](https://github.com/Rubbernecker)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。

Expand Down
2 changes: 2 additions & 0 deletions changes/en-us/1.5.0.md
Expand Up @@ -41,6 +41,7 @@
- [[#3365](https://github.com/seata/seata/pull/3365)] optimize ParameterParserTest test case failed
- [[#3359](https://github.com/seata/seata/pull/3359)] remove unused test case
- [[#3397](https://github.com/seata/seata/pull/3397)] add the change records folder
- [[#3303](https://github.com/seata/seata/pull/3303)] supports reading all configurations from a single Nacos dataId

### test

Expand All @@ -57,6 +58,7 @@
- [a364176773](https://github.com/a364176773)
- [anselleeyy](https://github.com/anselleeyy)
- [Ifdevil](https://github.com/Ifdevil)
- [Rubbernecker](https://github.com/Rubbernecker)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.

Expand Down
Expand Up @@ -15,6 +15,12 @@
*/
package io.seata.config.nacos;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
Expand Down Expand Up @@ -50,8 +56,10 @@ public class NacosConfiguration extends AbstractConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(NacosConfiguration.class);
private static final String DEFAULT_GROUP = "SEATA_GROUP";
private static final String DEFAULT_DATA_ID = "seata.properties";
private static final String GROUP_KEY = "group";
private static final String PRO_SERVER_ADDR_KEY = "serverAddr";
private static final String NACOS_DATA_ID_KEY = "dataId";
private static final String ENDPOINT_KEY = "endpoint";
private static final String CONFIG_TYPE = "nacos";
private static final String DEFAULT_NAMESPACE = "";
Expand All @@ -63,8 +71,9 @@ public class NacosConfiguration extends AbstractConfiguration {
private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;
private static volatile ConfigService configService;
private static final int MAP_INITIAL_CAPACITY = 8;
private ConcurrentMap<String, ConcurrentMap<ConfigurationChangeListener, NacosListener>> configListenersMap
= new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
private static final ConcurrentMap<String, ConcurrentMap<ConfigurationChangeListener, NacosListener>> CONFIG_LISTENERS_MAP
= new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
private static volatile Properties seataConfig = new Properties();

/**
* Get instance of NacosConfiguration
Expand All @@ -89,6 +98,7 @@ private NacosConfiguration() {
if (configService == null) {
try {
configService = NacosFactory.createConfigService(getConfigProperties());
initSeataConfig();
} catch (NacosException e) {
throw new RuntimeException(e);
}
Expand All @@ -101,19 +111,30 @@ public String getLatestConfig(String dataId, String defaultValue, long timeoutMi
if (value != null) {
return value;
}
try {
value = configService.getConfig(dataId, getNacosGroup(), timeoutMills);
} catch (NacosException exx) {
LOGGER.error(exx.getErrMsg());

value = seataConfig.getProperty(dataId);

if (null == value) {
try {
value = configService.getConfig(dataId, getNacosGroup(), timeoutMills);
} catch (NacosException exx) {
LOGGER.error(exx.getErrMsg());
}
}

return value == null ? defaultValue : value;
}

@Override
public boolean putConfig(String dataId, String content, long timeoutMills) {
boolean result = false;
try {
result = configService.publishConfig(dataId, getNacosGroup(), content);
if (!seataConfig.isEmpty()) {
seataConfig.setProperty(dataId, content);
result = configService.publishConfig(getNacosDataId(), getNacosGroup(), getSeataConfigStr());
} else {
result = configService.publishConfig(dataId, getNacosGroup(), content);
}
} catch (NacosException exx) {
LOGGER.error(exx.getErrMsg());
}
Expand All @@ -129,7 +150,12 @@ public boolean putConfigIfAbsent(String dataId, String content, long timeoutMill
public boolean removeConfig(String dataId, long timeoutMills) {
boolean result = false;
try {
result = configService.removeConfig(dataId, getNacosGroup());
if (!seataConfig.isEmpty()) {
seataConfig.remove(dataId);
result = configService.publishConfig(getNacosDataId(), getNacosGroup(), getSeataConfigStr());
} else {
result = configService.removeConfig(dataId, getNacosGroup());
}
} catch (NacosException exx) {
LOGGER.error(exx.getErrMsg());
}
Expand All @@ -143,7 +169,7 @@ public void addConfigListener(String dataId, ConfigurationChangeListener listene
}
try {
NacosListener nacosListener = new NacosListener(dataId, listener);
configListenersMap.computeIfAbsent(dataId, key -> new ConcurrentHashMap<>())
CONFIG_LISTENERS_MAP.computeIfAbsent(dataId, key -> new ConcurrentHashMap<>())
.put(listener, nacosListener);
configService.addListener(dataId, getNacosGroup(), nacosListener);
} catch (Exception exx) {
Expand All @@ -161,7 +187,7 @@ public void removeConfigListener(String dataId, ConfigurationChangeListener list
for (ConfigurationChangeListener entry : configChangeListeners) {
if (listener.equals(entry)) {
NacosListener nacosListener = null;
Map<ConfigurationChangeListener, NacosListener> configListeners = configListenersMap.get(dataId);
Map<ConfigurationChangeListener, NacosListener> configListeners = CONFIG_LISTENERS_MAP.get(dataId);
if (configListeners != null) {
nacosListener = configListeners.get(listener);
configListeners.remove(entry);
Expand All @@ -177,7 +203,7 @@ public void removeConfigListener(String dataId, ConfigurationChangeListener list

@Override
public Set<ConfigurationChangeListener> getConfigListeners(String dataId) {
Map<ConfigurationChangeListener, NacosListener> configListeners = configListenersMap.get(dataId);
Map<ConfigurationChangeListener, NacosListener> configListeners = CONFIG_LISTENERS_MAP.get(dataId);
if (CollectionUtils.isNotEmpty(configListeners)) {
return configListeners.keySet();
} else {
Expand Down Expand Up @@ -210,10 +236,10 @@ private static Properties getConfigProperties() {
properties.setProperty(PRO_NAMESPACE_KEY, namespace);
}
String userName = StringUtils.isNotBlank(System.getProperty(USER_NAME)) ? System.getProperty(USER_NAME)
: FILE_CONFIG.getConfig(getNacosUserName());
: FILE_CONFIG.getConfig(getNacosUserName());
if (StringUtils.isNotBlank(userName)) {
String password = StringUtils.isNotBlank(System.getProperty(PASSWORD)) ? System.getProperty(PASSWORD)
: FILE_CONFIG.getConfig(getNacosPassword());
: FILE_CONFIG.getConfig(getNacosPassword());
if (StringUtils.isNotBlank(password)) {
properties.setProperty(USER_NAME, userName);
properties.setProperty(PASSWORD, password);
Expand All @@ -234,20 +260,57 @@ private static String getNacosGroupKey() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_CONFIG, CONFIG_TYPE, GROUP_KEY);
}

private static String getNacosDataIdKey() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_CONFIG, CONFIG_TYPE, NACOS_DATA_ID_KEY);
}

private static String getNacosUserName() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_CONFIG, CONFIG_TYPE,
USER_NAME);
USER_NAME);
}

private static String getNacosPassword() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_CONFIG, CONFIG_TYPE,
PASSWORD);
PASSWORD);
}

private static String getNacosGroup() {
return FILE_CONFIG.getConfig(getNacosGroupKey(), DEFAULT_GROUP);
}

private static String getNacosDataId() {
return FILE_CONFIG.getConfig(getNacosDataIdKey(), DEFAULT_DATA_ID);
}

private static String getSeataConfigStr() {
StringBuilder sb = new StringBuilder();

Enumeration<?> enumeration = seataConfig.propertyNames();
while (enumeration.hasMoreElements()) {
String key = (String) enumeration.nextElement();
String property = seataConfig.getProperty(key);
sb.append(key).append("=").append(property).append("\n");
}

return sb.toString();
}

private static void initSeataConfig() {
try {
String nacosDataId = getNacosDataId();
String config = configService.getConfig(nacosDataId, getNacosGroup(), DEFAULT_CONFIG_TIMEOUT);
if (StringUtils.isNotBlank(config)) {
try (Reader reader = new InputStreamReader(new ByteArrayInputStream(config.getBytes()), StandardCharsets.UTF_8)) {
seataConfig.load(reader);
}
NacosListener nacosListener = new NacosListener(nacosDataId, null);
configService.addListener(nacosDataId, getNacosGroup(), nacosListener);
}
} catch (NacosException | IOException e) {
LOGGER.error("init config properties error", e);
}
}

@Override
public String getTypeName() {
return CONFIG_TYPE;
Expand Down Expand Up @@ -282,8 +345,43 @@ public ConfigurationChangeListener getTargetListener() {

@Override
public void innerReceive(String dataId, String group, String configInfo) {
//The new configuration method to puts all configurations into a dateId
if (getNacosDataId().equals(dataId)) {
Properties seataConfigNew = new Properties();
if (StringUtils.isNotBlank(configInfo)) {
try (Reader reader = new InputStreamReader(new ByteArrayInputStream(configInfo.getBytes()), StandardCharsets.UTF_8)) {
seataConfigNew.load(reader);
} catch (IOException e) {
LOGGER.error("load config properties error", e);
return;
}
}

//Get all the monitored dataids and judge whether it has been modified
for (Map.Entry<String, ConcurrentMap<ConfigurationChangeListener, NacosListener>> entry : CONFIG_LISTENERS_MAP.entrySet()) {
String listenedDataId = entry.getKey();
String propertyOld = seataConfig.getProperty(listenedDataId, "");
String propertyNew = seataConfigNew.getProperty(listenedDataId, "");
if (!propertyOld.equals(propertyNew)) {
ConfigurationChangeEvent event = new ConfigurationChangeEvent()
.setDataId(listenedDataId)
.setNewValue(propertyNew)
.setNamespace(group);

ConcurrentMap<ConfigurationChangeListener, NacosListener> configListeners = entry.getValue();
for (ConfigurationChangeListener configListener : configListeners.keySet()) {
configListener.onProcessEvent(event);
}
}
}

seataConfig = seataConfigNew;
return;
}

//Compatible with old writing
ConfigurationChangeEvent event = new ConfigurationChangeEvent().setDataId(dataId).setNewValue(configInfo)
.setNamespace(group);
.setNamespace(group);
listener.onProcessEvent(event);
}
}
Expand Down
1 change: 1 addition & 0 deletions script/client/conf/registry.conf
Expand Up @@ -60,6 +60,7 @@ config {
group = "SEATA_GROUP"
username = ""
password = ""
dataId = "seata.properties"
}
consul {
serverAddr = "127.0.0.1:8500"
Expand Down
1 change: 1 addition & 0 deletions script/server/config/registry.conf
Expand Up @@ -65,6 +65,7 @@ config {
group = "SEATA_GROUP"
username = ""
password = ""
dataId = "seataServer.properties"
}
consul {
serverAddr = "127.0.0.1:8500"
Expand Down
1 change: 1 addition & 0 deletions script/server/config/registry.properties
Expand Up @@ -40,6 +40,7 @@ config.nacos.namespace=
config.nacos.group=SEATA_GROUP
config.nacos.username=
config.nacos.password=
config.nacos.dataId=seataServer.properties
config.consul.serverAddr=127.0.0.1:8500
config.apollo.appId=seata-server
config.apollo.apolloMeta=http://192.168.1.204:8801
Expand Down
9 changes: 5 additions & 4 deletions script/server/config/registry.yml
Expand Up @@ -11,8 +11,8 @@ registry:
namespace:
cluster: default
username:
password:
password:

eureka:
serviceUrl: http://localhost:8761/eureka
application: default
Expand Down Expand Up @@ -64,11 +64,12 @@ config:
namespace:
group: SEATA_GROUP
username:
password:
password:
dataId: seataServer.properties

consul:
serverAddr: 127.0.0.1:8500

apollo:
appId: seata-server
apolloMeta: http://192.168.1.204:8801
Expand Down
Expand Up @@ -31,6 +31,7 @@ public class ConfigNacosProperties {
private String group = "SEATA_GROUP";
private String username = "";
private String password = "";
private String dataId = "seata.properties";

public String getServerAddr() {
return serverAddr;
Expand Down Expand Up @@ -76,4 +77,13 @@ public ConfigNacosProperties setPassword(String password) {
this.password = password;
return this;
}

public String getDataId() {
return dataId;
}

public ConfigNacosProperties setDataId(String dataId) {
this.dataId = dataId;
return this;
}
}
1 change: 1 addition & 0 deletions server/src/main/resources/registry.conf
Expand Up @@ -65,6 +65,7 @@ config {
group = "SEATA_GROUP"
username = ""
password = ""
dataId = "seataServer.properties"
}
consul {
serverAddr = "127.0.0.1:8500"
Expand Down

0 comments on commit 96016f0

Please sign in to comment.