/
NodeManagementService.java
371 lines (330 loc) · 18.1 KB
/
NodeManagementService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
/*
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU Lesser General Public License (the
* "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.service.jmx;
import java.io.File;
import java.io.FileWriter;
import java.io.Writer;
import java.text.DateFormat;
import java.text.NumberFormat;
import java.util.Date;
import java.util.Map;
import javax.sql.DataSource;
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.security.SecurityConstants;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.transport.ConcurrentConnectionManager.NodeConnectionStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedOperationParameter;
import org.springframework.jmx.export.annotation.ManagedOperationParameters;
import org.springframework.jmx.export.annotation.ManagedResource;
@ManagedResource(description = "The management interface for a node")
public class NodeManagementService implements IBuiltInExtensionPoint, ISymmetricEngineAware {
final Logger log = LoggerFactory.getLogger(getClass());
protected ISymmetricEngine engine;
public NodeManagementService() {
}
public void setSymmetricEngine(ISymmetricEngine engine) {
this.engine = engine;
}
@ManagedAttribute(description = "Checks if SymmetricDS has been started.")
public boolean isStarted() {
if (engine != null) {
return engine.isStarted();
} else {
return false;
}
}
@ManagedOperation(description = "Start the SymmetricDS engine")
public boolean start() {
try {
if (engine != null) {
return engine.start();
} else {
return false;
}
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
return false;
}
}
@ManagedOperation(description = "Stop the SymmetricDS engine")
public void stop() {
try {
if (engine != null) {
engine.stop();
}
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
@ManagedOperation(description = "Run the outgoing purge process")
public void purge() {
engine.getPurgeService().purgeOutgoing(true);
}
@ManagedOperation(description = "Run the purge process")
public String snapshot() {
File file = engine.snapshot();
if (file != null) {
return file.getAbsolutePath();
} else {
return null;
}
}
@ManagedOperation(description = "Force the cached objects to be reread from the database the next time they are accessed")
public void clearCaches() {
engine.clearCaches();
}
@ManagedOperation(description = "Synchronize the triggers")
public void syncTriggers() {
engine.getTriggerRouterService().syncTriggers();
}
@ManagedAttribute(description = "Get the number of current connections allowed to this "
+ "instance of the node via HTTP. If this value is 20, then 20 concurrent push"
+ " clients and 20 concurrent pull clients will be allowed")
public int getNumfNodeConnectionsPerInstance() {
return engine.getParameterService().getInt(ParameterConstants.CONCURRENT_WORKERS);
}
@ManagedAttribute(description = "Get connection statistics about indivdual nodes")
public String getNodeConcurrencyStatisticsAsText() {
String lineFeed = "\n";
if (engine.getParameterService().getString(ParameterConstants.JMX_LINE_FEED).equals("html")) {
lineFeed = "</br>";
}
Map<String, Map<String, NodeConnectionStatistics>> stats = engine
.getConcurrentConnectionManager().getNodeConnectionStatisticsByPoolByNodeId();
StringBuilder out = new StringBuilder();
for (String pool : stats.keySet()) {
out.append("-------------------------------------------------------------------------------------------------------------------------------");
out.append(lineFeed);
out.append(" CONNECTION TYPE: ");
out.append(pool);
out.append(lineFeed);
out.append("-------------------------------------------------------------------------------------------------------------------------------");
out.append(lineFeed);
out.append(" NODE ID LAST CONNECT TIME NUMBER OF CONNECTIONS NUMBER OF REJECTIONS AVG CONNECTED TIME");
out.append(lineFeed);
out.append("-------------------------------------------------------------------------------------------------------------------------------");
out.append(lineFeed);
Map<String, NodeConnectionStatistics> nodeStats = stats.get(pool);
for (String nodeId : nodeStats.keySet()) {
NodeConnectionStatistics nodeStat = nodeStats.get(nodeId);
out.append(StringUtils.leftPad(nodeId, 20));
out.append(StringUtils.leftPad(
DateFormat.getDateTimeInstance(DateFormat.MEDIUM, DateFormat.MEDIUM)
.format(new Date(nodeStat.getLastConnectionTimeMs())), 30));
out.append(StringUtils.leftPad(Long.toString(nodeStat.getTotalConnectionCount()),
27));
out.append(StringUtils.leftPad(Integer.toString(nodeStat.getNumOfRejections()), 25));
out.append(StringUtils.leftPad(
NumberFormat.getIntegerInstance().format(
nodeStat.getTotalConnectionTimeMs()
/ nodeStat.getTotalConnectionCount()), 25));
}
out.append(lineFeed);
}
return out.toString();
}
@ManagedOperation(description = "Clean up both incoming and outgoing resources that are older than the passed in number of milliseconds")
@ManagedOperationParameters({ @ManagedOperationParameter(name = "timeToLiveInMS", description = "The number of milliseconds old a resource should be before it is cleaned up") })
public long cleanStaging(long timeToLiveInMS) {
return engine.getStagingManager().clean(timeToLiveInMS);
}
@ManagedAttribute(description = "Get a list of nodes that have been added to the white list, a list of node ids that always get through the concurrency manager.")
public String getNodesInWhiteList() {
StringBuilder ret = new StringBuilder();
String[] list = engine.getConcurrentConnectionManager().getWhiteList();
for (String string : list) {
ret.append(string);
ret.append(",");
}
return ret.length() > 0 ? ret.substring(0, ret.length() - 1) : "";
}
@ManagedOperation(description = "Add a node id to the list of nodes that will always get through the concurrency manager")
@ManagedOperationParameters({ @ManagedOperationParameter(name = "nodeId", description = "The node id to add to the white list") })
public void addNodeToWhiteList(String nodeId) {
engine.getConcurrentConnectionManager().addToWhitelist(nodeId);
}
@ManagedOperation(description = "Remove a node id to the list of nodes that will always get through the concurrency manager")
@ManagedOperationParameters({ @ManagedOperationParameter(name = "nodeId", description = "The node id to remove from the white list") })
public void removeNodeFromWhiteList(String nodeId) {
engine.getConcurrentConnectionManager().removeFromWhiteList(nodeId);
}
@ManagedAttribute(description = "Configure the number of connections allowed to this node."
+ " If the value is set to zero you are effectively disabling your transport"
+ " (wihch can be useful for maintainance")
public void setNumOfNodeConnectionsPerInstance(int value) {
engine.getParameterService().saveParameter(ParameterConstants.CONCURRENT_WORKERS, value);
}
@ManagedAttribute(description = "The group this node belongs to")
public String getNodeGroupId() {
return engine.getParameterService().getNodeGroupId();
}
@ManagedAttribute(description = "An external name given to this SymmetricDS node")
public String getExternalId() {
return engine.getParameterService().getExternalId();
}
@ManagedAttribute(description = "The node id given to this SymmetricDS node")
public String getNodeId() {
Node node = engine.getNodeService().findIdentity();
if (node != null) {
return node.getNodeId();
} else {
return "?";
}
}
@ManagedAttribute(description = "Whether the basic DataSource is being used as the default datasource.")
public boolean isBasicDataSource() {
DataSource dataSource = engine.getDataSource();
return dataSource instanceof BasicDataSource;
}
@ManagedAttribute(description = "If a BasicDataSource, then show the number of active connections")
public int getNumberOfActiveConnections() {
if (isBasicDataSource()) {
DataSource dataSource = engine.getDataSource();
return ((BasicDataSource) dataSource).getNumActive();
} else {
return -1;
}
}
@ManagedOperation(description = "Check to see if the external id is registered")
@ManagedOperationParameters({
@ManagedOperationParameter(name = "nodeGroupId", description = "The node group id for a node"),
@ManagedOperationParameter(name = "externalId", description = "The external id for a node") })
public boolean isExternalIdRegistered(String nodeGroupdId, String externalId) {
return engine.getNodeService().isExternalIdRegistered(nodeGroupdId, externalId);
}
@ManagedOperation(description = "Emergency remove all locks (if left abandoned on a cluster)")
public void clearAllLocks() {
engine.getClusterService().clearAllLocks();
}
@ManagedOperation(description = "Check to see if the initial load for a node id is complete. This method will throw an exception if the load error'd out or was never started.")
@ManagedOperationParameters({ @ManagedOperationParameter(name = "nodeId", description = "The node id") })
public boolean areAllLoadBatchesComplete(String nodeId) {
return engine.getOutgoingBatchService().areAllLoadBatchesComplete(nodeId);
}
@ManagedOperation(description = "Enable or disable synchronization completely for a node")
@ManagedOperationParameters({
@ManagedOperationParameter(name = "nodeId", description = "The node to enable or disable"),
@ManagedOperationParameter(name = "syncEnabled", description = "true is enabled, false is disabled") })
public boolean setSyncEnabledForNode(String nodeId, boolean syncEnabled) {
Node node = engine.getNodeService().findNode(nodeId);
if (node != null) {
node.setSyncEnabled(syncEnabled);
engine.getNodeService().save(node);
return true;
} else {
return false;
}
}
@ManagedOperation(description = "Enable or disable a channel for a specific external id")
@ManagedOperationParameters({
@ManagedOperationParameter(name = "ignore", description = "Set to true to enable and false to disable"),
@ManagedOperationParameter(name = "channelId", description = "The channel id to enable or disable"),
@ManagedOperationParameter(name = "nodeGroupId", description = "The node group id for a node"),
@ManagedOperationParameter(name = "externalId", description = "The external id for a node") })
public void ignoreNodeChannelForExternalId(boolean ignore, String channelId,
String nodeGroupId, String externalId) {
engine.getNodeService().ignoreNodeChannelForExternalId(ignore, channelId, nodeGroupId,
externalId);
}
@ManagedOperation(description = "Open the registration for a node with the specified external id")
@ManagedOperationParameters({
@ManagedOperationParameter(name = "nodeGroup", description = "The node group id this node will belong to"),
@ManagedOperationParameter(name = "externalId", description = "The external id for the node") })
public void openRegistration(String nodeGroupId, String externalId) {
Node node = engine.getNodeService().findNodeByExternalId(nodeGroupId, externalId);
if (node != null) {
engine.getRegistrationService().reOpenRegistration(node.getExternalId());
} else {
engine.getRegistrationService().openRegistration(nodeGroupId, externalId);
}
}
@ManagedOperation(description = "Send an initial load of data to a node.")
@ManagedOperationParameters({ @ManagedOperationParameter(name = "nodeId", description = "The node id to reload.") })
public String reloadNode(String nodeId) {
return engine.getDataService().reloadNode(nodeId, false);
}
@ManagedOperation(description = "Send a SQL event to a node.")
@ManagedOperationParameters({
@ManagedOperationParameter(name = "nodeId", description = "The node id to sent the event to."),
@ManagedOperationParameter(name = "catalogName", description = "The catalog name to reload. Can be null."),
@ManagedOperationParameter(name = "schemaName", description = "The schema name to reload. Can be null."),
@ManagedOperationParameter(name = "tableName", description = "The table name the SQL is for."),
@ManagedOperationParameter(name = "sql", description = "The SQL statement to send.") })
public String sendSQL(String nodeId, String catalogName, String schemaName, String tableName,
String sql) {
return engine.getDataService().sendSQL(nodeId, catalogName, schemaName, tableName, sql,
false);
}
@ManagedOperation(description = "Send a delete and reload of a table to a node.")
@ManagedOperationParameters({
@ManagedOperationParameter(name = "nodeId", description = "The node id to reload."),
@ManagedOperationParameter(name = "catalogName", description = "The catalog name to reload. Can be null."),
@ManagedOperationParameter(name = "schemaName", description = "The schema name to reload. Can be null."),
@ManagedOperationParameter(name = "tableName", description = "The table name to reload.") })
public String reloadTable(String nodeId, String catalogName, String schemaName, String tableName) {
return engine.getDataService().reloadTable(nodeId, catalogName, schemaName, tableName);
}
@ManagedOperation(description = "Send a delete and reload of a table to a node.")
@ManagedOperationParameters({
@ManagedOperationParameter(name = "nodeId", description = "The node id to reload."),
@ManagedOperationParameter(name = "catalogName", description = "The catalog name to reload. Can be null."),
@ManagedOperationParameter(name = "schemaName", description = "The schema name to reload. Can be null."),
@ManagedOperationParameter(name = "tableName", description = "The table name to reload."),
@ManagedOperationParameter(name = "overrideInitialLoadSelect", description = "Override initial load select where-clause.") })
public String reloadTable(String nodeId, String catalogName, String schemaName,
String tableName, String overrideInitialLoadSelect) {
return engine.getDataService().reloadTable(nodeId, catalogName, schemaName, tableName,
overrideInitialLoadSelect);
}
@ManagedOperation(description = "Write a range of batches to a file in SymmetricDS Data Format.")
@ManagedOperationParameters({
@ManagedOperationParameter(name = "nodeId", description = "The node id for the batches which will be written"),
@ManagedOperationParameter(name = "startBatchId", description = "Starting batch ID of range"),
@ManagedOperationParameter(name = "endBatchId", description = "Ending batch ID of range"),
@ManagedOperationParameter(name = "fileName", description = "File name to write batches") })
public void writeBatchRangeToFile(String nodeId, String startBatchId, String endBatchId,
String fileName) throws Exception {
Writer writer = new FileWriter(new File(fileName));
engine.getDataExtractorService().extractBatchRange(writer, nodeId,
Long.valueOf(startBatchId), Long.valueOf(endBatchId));
IOUtils.closeQuietly(writer);
}
@ManagedOperation(description = "Encrypts plain text for use with db.user and db.password properties")
@ManagedOperationParameters({ @ManagedOperationParameter(name = "plainText", description = "Plain text to encrypt") })
public String encryptText(String plainText) throws Exception {
try {
return SecurityConstants.PREFIX_ENC + engine.getSecurityService().encrypt(plainText);
} catch (Exception e) {
e.printStackTrace();
}
return "";
}
}