-
Notifications
You must be signed in to change notification settings - Fork 188
/
ClusterManager.java
335 lines (274 loc) · 14.8 KB
/
ClusterManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
/*
* Copyright (c) 2010-2013 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/
package com.evolveum.midpoint.task.quartzimpl.cluster;
import java.util.List;
import org.jetbrains.annotations.Nullable;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.delta.ItemDelta;
import com.evolveum.midpoint.prism.query.ObjectQuery;
import com.evolveum.midpoint.repo.api.RepositoryService;
import com.evolveum.midpoint.schema.SearchResultList;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.util.ObjectQueryUtil;
import com.evolveum.midpoint.task.api.TaskManagerInitializationException;
import com.evolveum.midpoint.task.quartzimpl.TaskManagerQuartzImpl;
import com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.exception.SystemException;
import com.evolveum.midpoint.util.logging.LoggingUtils;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeOperationalStatusType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType;
/**
* Responsible for keeping the cluster consistent.
* (Clusterwide task management operations are in ExecutionManager.)
*
* @author Pavol Mederly
*/
public class ClusterManager {
private static final Trace LOGGER = TraceManager.getTrace(ClusterManager.class);
private static final String CLASS_DOT = ClusterManager.class.getName() + ".";
private static final String CHECK_SYSTEM_CONFIGURATION_CHANGED = CLASS_DOT + "checkSystemConfigurationChanged";
private TaskManagerQuartzImpl taskManager;
private NodeRegistrar nodeRegistrar;
private ClusterManagerThread clusterManagerThread;
private static boolean updateNodeExecutionLimitations = true; // turned off when testing
public ClusterManager(TaskManagerQuartzImpl taskManager) {
this.taskManager = taskManager;
this.nodeRegistrar = new NodeRegistrar(taskManager, this);
}
public static void setUpdateNodeExecutionLimitations(boolean value) {
updateNodeExecutionLimitations = value;
}
/**
* Verifies cluster consistency (currently checks whether there is no other node with the same ID,
* and whether clustered/non-clustered nodes are OK).
*
* @return Current node record from repository, if everything is OK. Otherwise returns null.
*/
@Nullable
public NodeType checkClusterConfiguration(OperationResult result) {
NodeType currentNode = nodeRegistrar.verifyNodeObject(result); // if error, sets the error state and stops the scheduler
nodeRegistrar.checkNonClusteredNodes(result); // the same
return currentNode;
}
public boolean isClusterManagerThreadActive() {
return clusterManagerThread != null && clusterManagerThread.isAlive();
}
public void recordNodeShutdown(OperationResult result) {
nodeRegistrar.recordNodeShutdown(result);
}
public boolean isCurrentNode(PrismObject<NodeType> node) {
return nodeRegistrar.isCurrentNode(node);
}
public boolean isCurrentNode(String node) {
return nodeRegistrar.isCurrentNode(node);
}
public void deleteNode(String nodeOid, OperationResult result) throws SchemaException, ObjectNotFoundException {
nodeRegistrar.deleteNode(nodeOid, result);
}
public NodeType createOrUpdateNodeInRepo(OperationResult result) throws TaskManagerInitializationException {
return nodeRegistrar.createOrUpdateNodeInRepo(result);
}
public PrismObject<NodeType> getLocalNodeObject() {
return nodeRegistrar.getCachedLocalNodeObject();
}
public NodeType getFreshVerifiedLocalNodeObject(OperationResult result) {
return nodeRegistrar.verifyNodeObject(result);
}
public boolean isUpAndAlive(NodeType nodeType) {
return nodeRegistrar.isUpAndAlive(nodeType);
}
public boolean isCheckingIn(NodeType nodeType) {
return nodeRegistrar.isCheckingIn(nodeType);
}
public void registerNodeUp(OperationResult result) {
LOGGER.info("Registering the node as started");
nodeRegistrar.registerNodeUp(result);
}
class ClusterManagerThread extends Thread {
private boolean canRun = true;
@Override
public void run() {
LOGGER.info("ClusterManager thread starting.");
long nodeAlivenessCheckInterval = taskManager.getConfiguration().getNodeAlivenessCheckInterval() * 1000L;
long lastNodeAlivenessCheck = 0;
long delay = taskManager.getConfiguration().getNodeRegistrationCycleTime() * 1000L;
while (canRun) {
OperationResult result = new OperationResult(ClusterManagerThread.class + ".run");
try {
checkSystemConfigurationChanged(result);
// these checks are separate in order to prevent a failure in one method blocking execution of others
try {
NodeType node = checkClusterConfiguration(result); // if error, the scheduler will be stopped
if (updateNodeExecutionLimitations && node != null) {
taskManager.getExecutionManager().setLocalExecutionLimitations(node); // we want to set limitations ONLY if the cluster configuration passes (i.e. node object is not inadvertently overwritten)
}
nodeRegistrar.updateNodeObject(result); // however, we want to update repo even in that case
} catch (Throwable t) {
LoggingUtils.logUnexpectedException(LOGGER, "Unexpected exception while checking cluster configuration; continuing execution.", t);
}
try {
checkWaitingTasks(result);
} catch (Throwable t) {
LoggingUtils.logUnexpectedException(LOGGER, "Unexpected exception while checking waiting tasks; continuing execution.", t);
}
try {
checkStalledTasks(result);
} catch (Throwable t) {
LoggingUtils.logUnexpectedException(LOGGER, "Unexpected exception while checking stalled tasks; continuing execution.", t);
}
if (System.currentTimeMillis() - lastNodeAlivenessCheck >= nodeAlivenessCheckInterval) {
try {
checkNodeAliveness(result);
lastNodeAlivenessCheck = System.currentTimeMillis();
} catch (Throwable t) {
LoggingUtils.logUnexpectedException(LOGGER, "Unexpected exception while checking node aliveness; continuing execution.", t);
}
}
} catch (Throwable t) {
LoggingUtils.logUnexpectedException(LOGGER, "Unexpected exception in ClusterManager thread; continuing execution.", t);
}
LOGGER.trace("ClusterManager thread sleeping for {} msec", delay);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
LOGGER.trace("ClusterManager thread interrupted.");
}
}
LOGGER.info("ClusterManager thread stopping.");
}
private void signalShutdown() {
canRun = false;
this.interrupt();
}
}
private void checkNodeAliveness(OperationResult result) throws SchemaException {
SearchResultList<PrismObject<NodeType>> nodes = getRepositoryService()
.searchObjects(NodeType.class, null, null, result);
for (PrismObject<NodeType> nodeObject : nodes) {
NodeType node = nodeObject.asObjectable();
if (isRemoteNode(node)) {
if (shouldBeMarkedAsDown(node)) {
LOGGER.warn("Node {} is down, marking it as such", node);
List<ItemDelta<?, ?>> modifications = taskManager.getPrismContext().deltaFor(NodeType.class)
.item(NodeType.F_RUNNING).replace(false)
.item(NodeType.F_OPERATIONAL_STATUS).replace(NodeOperationalStatusType.DOWN)
.asItemDeltas();
try {
getRepositoryService().modifyObject(NodeType.class, node.getOid(), modifications, result);
} catch (ObjectNotFoundException | ObjectAlreadyExistsException e) {
LoggingUtils.logUnexpectedException(LOGGER, "Couldn't mark node {} as down", e, node);
}
} else if (startingForTooLong(node)) {
LOGGER.warn("Node {} is starting for too long. Last check-in time = {}", node, node.getLastCheckInTime());
// TODO should we mark this node as down?
}
}
}
}
private boolean isRemoteNode(NodeType node) {
return taskManager.getNodeId() == null || !taskManager.getNodeId().equals(node.getNodeIdentifier());
}
private boolean shouldBeMarkedAsDown(NodeType node) {
return node.getOperationalStatus() == NodeOperationalStatusType.UP && (node.getLastCheckInTime() == null ||
System.currentTimeMillis() - node.getLastCheckInTime().toGregorianCalendar().getTimeInMillis()
> taskManager.getConfiguration().getNodeAlivenessTimeout() * 1000L);
}
private boolean startingForTooLong(NodeType node) {
return node.getOperationalStatus() == NodeOperationalStatusType.STARTING && (node.getLastCheckInTime() == null ||
System.currentTimeMillis() - node.getLastCheckInTime().toGregorianCalendar().getTimeInMillis()
> taskManager.getConfiguration().getNodeStartupTimeout() * 1000L);
}
public void stopClusterManagerThread(long waitTime, OperationResult parentResult) {
OperationResult result = parentResult.createSubresult(ClusterManager.class.getName() + ".stopClusterManagerThread");
result.addParam("waitTime", waitTime);
if (clusterManagerThread != null) {
clusterManagerThread.signalShutdown();
try {
clusterManagerThread.join(waitTime);
} catch (InterruptedException e) {
LoggingUtils.logUnexpectedException(LOGGER, "Waiting for ClusterManagerThread shutdown was interrupted", e);
}
if (clusterManagerThread.isAlive()) {
result.recordWarning("ClusterManagerThread shutdown requested but after " + waitTime + " ms it is still running.");
} else {
result.recordSuccess();
}
} else {
result.recordSuccess();
}
}
public void startClusterManagerThread() {
clusterManagerThread = new ClusterManagerThread();
clusterManagerThread.setName("ClusterManagerThread");
clusterManagerThread.start();
}
private RepositoryService getRepositoryService() {
return taskManager.getRepositoryService();
}
public String dumpNodeInfo(NodeType node) {
return node.getNodeIdentifier() + " (" + node.getHostname() + ")";
}
public List<PrismObject<NodeType>> getAllNodes(OperationResult result) {
try {
return getRepositoryService().searchObjects(NodeType.class, null, null, result);
} catch (SchemaException e) { // should not occur
throw new SystemException("Cannot get the list of nodes from the repository", e);
}
}
public PrismObject<NodeType> getNode(String nodeOid, OperationResult result) throws SchemaException, ObjectNotFoundException {
return getRepositoryService().getObject(NodeType.class, nodeOid, null, result);
}
public PrismObject<NodeType> getNodeById(String nodeIdentifier, OperationResult result) throws ObjectNotFoundException {
try {
// QueryType q = QueryUtil.createNameQuery(nodeIdentifier); // TODO change to query-by-node-id
ObjectQuery q = ObjectQueryUtil.createNameQuery(NodeType.class, taskManager.getPrismContext(), nodeIdentifier);
List<PrismObject<NodeType>> nodes = taskManager.getRepositoryService().searchObjects(NodeType.class, q, null, result);
if (nodes.isEmpty()) {
// result.recordFatalError("A node with identifier " + nodeIdentifier + " does not exist.");
throw new ObjectNotFoundException("A node with identifier " + nodeIdentifier + " does not exist.");
} else if (nodes.size() > 1) {
throw new SystemException("Multiple nodes with the same identifier '" + nodeIdentifier + "' in the repository.");
} else {
return nodes.get(0);
}
} catch (SchemaException e) { // should not occur
throw new SystemException("Cannot get the list of nodes from the repository", e);
}
}
/**
* Check whether system configuration has not changed in repository (e.g. by another node in cluster).
* Applies new configuration if so.
*/
private void checkSystemConfigurationChanged(OperationResult parentResult) {
OperationResult result = parentResult.createSubresult(CHECK_SYSTEM_CONFIGURATION_CHANGED);
try {
taskManager.getSystemConfigurationChangeDispatcher().dispatch(false, false, result);
result.computeStatus();
} catch (Throwable t) {
LoggingUtils.logUnexpectedException(LOGGER, "Couldn't apply system configuration", t);
result.recordFatalError("Couldn't apply system configuration: " + t.getMessage(), t);
}
}
private long lastCheckedWaitingTasks = 0L;
private void checkWaitingTasks(OperationResult result) throws SchemaException {
if (System.currentTimeMillis() > lastCheckedWaitingTasks + taskManager.getConfiguration().getWaitingTasksCheckInterval() * 1000L) {
lastCheckedWaitingTasks = System.currentTimeMillis();
taskManager.checkWaitingTasks(result);
}
}
private long lastCheckedStalledTasks = 0L;
private void checkStalledTasks(OperationResult result) {
if (System.currentTimeMillis() > lastCheckedStalledTasks + taskManager.getConfiguration().getStalledTasksCheckInterval() * 1000L) {
lastCheckedStalledTasks = System.currentTimeMillis();
taskManager.checkStalledTasks(result);
}
}
}