/
NacosDataSource.java
163 lines (148 loc) · 6.38 KB
/
NacosDataSource.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.datasource.nacos;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import com.alibaba.csp.sentinel.datasource.AbstractDataSource;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
/**
* A read-only {@code DataSource} with Nacos backend. When the data in Nacos backend has been modified,
* Nacos will automatically push the new value so that the dynamic configuration can be real-time.
*
* @author Eric Zhao
*/
public class NacosDataSource<T> extends AbstractDataSource<String, T> {
private static final int DEFAULT_TIMEOUT = 3000;
/**
* Single-thread pool. Once the thread pool is blocked, we throw up the old task.
*/
private final ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory("sentinel-nacos-ds-update", true),
new ThreadPoolExecutor.DiscardOldestPolicy());
private final Listener configListener;
private final String groupId;
private final String dataId;
private final Properties properties;
/**
* Note: The Nacos config might be null if its initialization failed.
*/
private ConfigService configService = null;
/**
* Constructs an read-only DataSource with Nacos backend.
*
* @param serverAddr server address of Nacos, cannot be empty
* @param groupId group ID, cannot be empty
* @param dataId data ID, cannot be empty
* @param parser customized data parser, cannot be empty
*/
public NacosDataSource(final String serverAddr, final String groupId, final String dataId,
Converter<String, T> parser) {
this(NacosDataSource.buildProperties(serverAddr), groupId, dataId, parser);
}
/**
*
* @param properties properties for construct {@link ConfigService} using {@link NacosFactory#createConfigService(Properties)}
* @param groupId group ID, cannot be empty
* @param dataId data ID, cannot be empty
* @param parser customized data parser, cannot be empty
*/
public NacosDataSource(final Properties properties, final String groupId, final String dataId,
Converter<String, T> parser) {
super(parser);
if (StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) {
throw new IllegalArgumentException(String.format("Bad argument: groupId=[%s], dataId=[%s]",
groupId, dataId));
}
AssertUtil.notNull(properties, "Nacos properties must not be null, you could put some keys from PropertyKeyConst");
this.groupId = groupId;
this.dataId = dataId;
this.properties = properties;
this.configListener = new Listener() {
@Override
public Executor getExecutor() {
return pool;
}
@Override
public void receiveConfigInfo(final String configInfo) {
RecordLog.info("[NacosDataSource] New property value received for (properties: {}) (dataId: {}, groupId: {}): {}",
properties, dataId, groupId, configInfo);
T newValue = NacosDataSource.this.parser.convert(configInfo);
// Update the new value to the property.
getProperty().updateValue(newValue);
}
};
initNacosListener();
loadInitialConfig();
}
private void loadInitialConfig() {
try {
T newValue = loadConfig();
if (newValue == null) {
RecordLog.warn("[NacosDataSource] WARN: initial config is null, you may have to check your data source");
}
getProperty().updateValue(newValue);
} catch (Exception ex) {
RecordLog.warn("[NacosDataSource] Error when loading initial config", ex);
}
}
private void initNacosListener() {
try {
this.configService = NacosFactory.createConfigService(this.properties);
// Add config listener.
configService.addListener(dataId, groupId, configListener);
} catch (Exception e) {
RecordLog.warn("[NacosDataSource] Error occurred when initializing Nacos data source", e);
e.printStackTrace();
}
}
@Override
public String readSource() throws Exception {
if (configService == null) {
throw new IllegalStateException("Nacos config service has not been initialized or error occurred");
}
return configService.getConfig(dataId, groupId, DEFAULT_TIMEOUT);
}
@Override
public void close() {
if (configService != null) {
configService.removeListener(dataId, groupId, configListener);
try {
configService.shutDown();
} catch (Exception e) {
RecordLog.warn("[NacosDataSource] Error occurred when closing Nacos data source", e);
e.printStackTrace();
}
}
pool.shutdownNow();
}
private static Properties buildProperties(String serverAddr) {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverAddr);
return properties;
}
}