diff --git a/gui/admin-gui/src/main/java/com/evolveum/midpoint/gui/impl/util/ReportPeerQueryInterceptor.java b/gui/admin-gui/src/main/java/com/evolveum/midpoint/gui/impl/util/ReportPeerQueryInterceptor.java
index f5e45585227..fc1ca9737db 100644
--- a/gui/admin-gui/src/main/java/com/evolveum/midpoint/gui/impl/util/ReportPeerQueryInterceptor.java
+++ b/gui/admin-gui/src/main/java/com/evolveum/midpoint/gui/impl/util/ReportPeerQueryInterceptor.java
@@ -141,7 +141,8 @@ private boolean checkRequest(HttpServletRequest request, HttpServletResponse res
return false;
}
- if (!nodeAuthenticator.authenticate(request.getRemoteHost(), request.getRemoteAddr(), operation)) {
+ // we temporarily allow authentication without credentials
+ if (!nodeAuthenticator.authenticate(request.getRemoteHost(), request.getRemoteAddr(), null, operation)) {
LOGGER.debug("Unknown node, host: {} ", request.getRemoteHost());
response.setStatus(HttpServletResponse.SC_FORBIDDEN);
return false;
diff --git a/infra/schema/src/main/resources/xml/ns/public/common/common-core-3.xsd b/infra/schema/src/main/resources/xml/ns/public/common/common-core-3.xsd
index ce6e6468386..9a1fbbe63db 100755
--- a/infra/schema/src/main/resources/xml/ns/public/common/common-core-3.xsd
+++ b/infra/schema/src/main/resources/xml/ns/public/common/common-core-3.xsd
@@ -2002,7 +2002,7 @@
infrastructure/intraClusterHttpUrlPattern system configuration property.
Use only if really necessary. Support for this item is currently limited
- to clusterwide cache invalidation.
+ to clusterwide cache invalidation and managing node task scheduler.
@@ -2013,6 +2013,16 @@
+
+
+
+ Port at which this node can be contacted via REST.
+
+
+ 4.0
+
+
+
@@ -2084,6 +2094,26 @@
+
+
+
+ The secret used for intra-cluster authentication.
+
+
+ 4.0
+
+
+
+
+
+
+ When was the secret created or last changed.
+
+
+ 4.0
+
+
+
@@ -3566,6 +3596,34 @@
+
+
+
+ Describes the state of the (local) scheduler.
+
+
+ 4.0
+
+
+
+
+
+
+ Information on the current node.
+
+
+
+
+
+
+ Locally executing tasks.
+
+
+
+
+
+
+
diff --git a/model/model-api/src/main/java/com/evolveum/midpoint/model/api/TaskService.java b/model/model-api/src/main/java/com/evolveum/midpoint/model/api/TaskService.java
index 7846408f277..622c5e955ce 100644
--- a/model/model-api/src/main/java/com/evolveum/midpoint/model/api/TaskService.java
+++ b/model/model-api/src/main/java/com/evolveum/midpoint/model/api/TaskService.java
@@ -21,6 +21,7 @@
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.util.exception.*;
+import com.evolveum.midpoint.xml.ns._public.common.common_3.SchedulerInformationType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;
import java.util.Collection;
@@ -101,6 +102,9 @@ public interface TaskService {
* @return
*/
PrismObject getTaskByIdentifier(String identifier, Collection> options, Task operationTask, OperationResult parentResult) throws SchemaException, ObjectNotFoundException, ConfigurationException, SecurityViolationException, ExpressionEvaluationException, CommunicationException;
+
+ SchedulerInformationType getLocalSchedulerInformation(Task operationTask, OperationResult parentResult) throws CommunicationException,
+ ObjectNotFoundException, SchemaException, SecurityViolationException, ConfigurationException, ExpressionEvaluationException;
//endregion
//region Node-level operations
@@ -142,6 +146,8 @@ public interface TaskService {
*/
void stopSchedulers(Collection nodeIdentifiers, Task operationTask, OperationResult parentResult) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException;
+ void stopLocalScheduler(Task operationTask, OperationResult parentResult) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException;
+
/**
* Stops a set of schedulers (on their nodes) and tasks that are executing on these nodes.
*
@@ -163,6 +169,10 @@ public interface TaskService {
* @return true if the operation succeeded; false otherwise.
*/
void startSchedulers(Collection nodeIdentifiers, Task operationTask, OperationResult result) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException;
+
+ void startLocalScheduler(Task operationTask, OperationResult result) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException;
+
+ void stopLocalTask(String oid, Task operationTask, OperationResult result) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException;
//endregion
//region Miscellaneous
diff --git a/model/model-api/src/main/java/com/evolveum/midpoint/model/api/authentication/NodeAuthenticationEvaluator.java b/model/model-api/src/main/java/com/evolveum/midpoint/model/api/authentication/NodeAuthenticationEvaluator.java
index 0c2740d8c61..60a6448cc9f 100644
--- a/model/model-api/src/main/java/com/evolveum/midpoint/model/api/authentication/NodeAuthenticationEvaluator.java
+++ b/model/model-api/src/main/java/com/evolveum/midpoint/model/api/authentication/NodeAuthenticationEvaluator.java
@@ -2,6 +2,6 @@
public interface NodeAuthenticationEvaluator {
- boolean authenticate(String remoteName, String remoteAddress, String operation);
+ boolean authenticate(String remoteName, String remoteAddress, String credentials, String operation);
}
diff --git a/model/model-common/pom.xml b/model/model-common/pom.xml
index bce54bd81fd..ddf389882fa 100644
--- a/model/model-common/pom.xml
+++ b/model/model-common/pom.xml
@@ -160,7 +160,11 @@
org.jetbrains
annotations-java5
-
+
+ javax.annotation
+ javax.annotation-api
+
+
org.testng
diff --git a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ClusterCacheListener.java b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ClusterCacheListener.java
index 4f7fd6d692e..b1aa524cdf4 100644
--- a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ClusterCacheListener.java
+++ b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ClusterCacheListener.java
@@ -15,52 +15,33 @@
*/
package com.evolveum.midpoint.model.impl;
-import javax.annotation.PostConstruct;
-import javax.ws.rs.core.Response;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.cxf.jaxrs.client.WebClient;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.security.core.Authentication;
-import org.springframework.security.core.context.SecurityContextHolder;
-import org.springframework.stereotype.Component;
-
-import com.evolveum.midpoint.model.api.ModelInteractionService;
-import com.evolveum.midpoint.model.api.ModelService;
import com.evolveum.midpoint.model.impl.security.NodeAuthenticationToken;
-import com.evolveum.midpoint.model.impl.security.RestAuthenticationMethod;
-import com.evolveum.midpoint.prism.PrismContext;
-import com.evolveum.midpoint.prism.PrismObject;
-import com.evolveum.midpoint.prism.query.ObjectQuery;
import com.evolveum.midpoint.repo.api.CacheDispatcher;
import com.evolveum.midpoint.repo.api.CacheListener;
-import com.evolveum.midpoint.schema.SearchResultList;
import com.evolveum.midpoint.schema.constants.ObjectTypes;
import com.evolveum.midpoint.schema.result.OperationResult;
+import com.evolveum.midpoint.task.api.RemoteExecutionHelper;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskManager;
-import com.evolveum.midpoint.util.exception.CommunicationException;
-import com.evolveum.midpoint.util.exception.ConfigurationException;
-import com.evolveum.midpoint.util.exception.ExpressionEvaluationException;
-import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
-import com.evolveum.midpoint.util.exception.SchemaException;
-import com.evolveum.midpoint.util.exception.SecurityViolationException;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
-import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ObjectType;
-import com.evolveum.midpoint.xml.ns._public.common.common_3.SystemConfigurationType;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.ws.rs.core.Response;
@Component
public class ClusterCacheListener implements CacheListener {
private static final Trace LOGGER = TraceManager.getTrace(ClusterCacheListener.class);
- @Autowired private ModelService modelService;
- @Autowired private ModelInteractionService modelInteractionService;
@Autowired private TaskManager taskManager;
@Autowired private CacheDispatcher cacheDispatcher;
- @Autowired private PrismContext prismContext;
+ @Autowired private RemoteExecutionHelper remoteExecutionHelper;
@PostConstruct
public void addListener() {
@@ -75,8 +56,6 @@ public void invalidateCache(Class type, String oid) {
return;
}
- String nodeId = taskManager.getNodeId();
-
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication instanceof NodeAuthenticationToken) {
LOGGER.trace("Skipping cluster-wide cache invalidation as this is already a remotely-invoked invalidateCache() call");
@@ -85,51 +64,13 @@ public void invalidateCache(Class type, String oid) {
Task task = taskManager.createTaskInstance("invalidateCache");
OperationResult result = task.getResult();
-
- SearchResultList> otherClusterNodes;
- try {
- ObjectQuery query = prismContext.queryFor(NodeType.class).not().item(NodeType.F_NODE_IDENTIFIER).eq(nodeId).build();
- otherClusterNodes = modelService.searchObjects(NodeType.class, query, null, task, result);
- } catch (SchemaException | ObjectNotFoundException | SecurityViolationException | CommunicationException
- | ConfigurationException | ExpressionEvaluationException e) {
- LOGGER.warn("Cannot find nodes for clearing cache on them. Skipping.", e);
- return;
- }
-
- SystemConfigurationType systemConfig;
- try {
- systemConfig = modelInteractionService.getSystemConfiguration(result);
- } catch (ObjectNotFoundException | SchemaException e) {
- LOGGER.warn("Cannot load system configuration. Cannot determine the URL for REST calls without it"
- + " (unless specified explicitly for individual nodes)");
- systemConfig = null;
- }
-
- for (PrismObject node : otherClusterNodes.getList()) {
- NodeType nodeType = node.asObjectable();
- String baseUrl;
- if (nodeType.getUrl() != null) {
- baseUrl = nodeType.getUrl();
- } else {
- String httpUrlPattern = systemConfig != null && systemConfig.getInfrastructure() != null
- ? systemConfig.getInfrastructure().getIntraClusterHttpUrlPattern()
- : null;
- if (StringUtils.isBlank(httpUrlPattern)) {
- LOGGER.warn("Node URL nor intra-cluster URL pattern specified, skipping cache clearing for node {}",
- nodeType.getNodeIdentifier());
- continue;
- }
- baseUrl = httpUrlPattern.replace("$host", nodeType.getHostname());
- }
-
- WebClient client = WebClient.create(baseUrl + "/ws/rest");
- client.header("Authorization", RestAuthenticationMethod.CLUSTER.getMethod());// + " " + Base64Utility.encode((nodeIdentifier).getBytes()));
+ remoteExecutionHelper.execute((client, result1) -> {
client.path("/event/" + ObjectTypes.getRestTypeFromClass(type));
Response response = client.post(null);
-
LOGGER.info("Cluster-wide cache clearance finished with status {}, {}", response.getStatusInfo().getStatusCode(),
response.getStatusInfo().getReasonPhrase());
- }
+ response.close();
+ }, "cache invalidation", result);
}
}
diff --git a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ModelRestService.java b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ModelRestService.java
index 8c7e9db0f06..8aeaf4ad8c6 100644
--- a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ModelRestService.java
+++ b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ModelRestService.java
@@ -105,6 +105,10 @@ public class ModelRestService {
public static final String OPERATION_GENERATE_VALUE_RPC = CLASS_DOT + "generateValueRpc";
public static final String OPERATION_EXECUTE_CREDENTIAL_RESET = CLASS_DOT + "executeCredentialReset";
public static final String OPERATION_EXECUTE_CLUSTER_EVENT = CLASS_DOT + "executeClusterEvent";
+ public static final String OPERATION_GET_LOCAL_SCHEDULER_INFORMATION = CLASS_DOT + "getLocalSchedulerInformation";
+ public static final String OPERATION_STOP_LOCAL_SCHEDULER = CLASS_DOT + "stopScheduler";
+ public static final String OPERATION_START_LOCAL_SCHEDULER = CLASS_DOT + "startScheduler";
+ public static final String OPERATION_STOP_LOCAL_TASK = CLASS_DOT + "stopLocalTask";
private static final String CURRENT = "current";
private static final String VALIDATE = "validate";
@@ -118,6 +122,7 @@ public class ModelRestService {
@Autowired private SecurityHelper securityHelper;
@Autowired private ValuePolicyProcessor policyProcessor;
@Autowired private TaskManager taskManager;
+ @Autowired private TaskService taskService;
@Autowired private Protector protector;
@Autowired private ResourceValidator resourceValidator;
@@ -1070,6 +1075,86 @@ public Response executeClusterEvent(@PathParam("type") String type, @Context Mes
}
+ @GET
+ @Path("/scheduler/information")
+ @Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
+ @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
+ public Response getLocalSchedulerInformation(@Context MessageContext mc) {
+ Task task = RestServiceUtil.initRequest(mc);
+ OperationResult result = new OperationResult(OPERATION_GET_LOCAL_SCHEDULER_INFORMATION);
+
+ Response response;
+ try {
+ SchedulerInformationType schedulerInformation = taskService.getLocalSchedulerInformation(task, result);
+ response = RestServiceUtil.createResponse(Response.Status.OK, schedulerInformation, result);
+ } catch (Throwable t) {
+ response = RestServiceUtil.handleException(result, t);
+ }
+ result.computeStatus();
+ finishRequest(task);
+ return response;
+ }
+
+ @POST
+ @Path("/scheduler/stop")
+ @Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
+ @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
+ public Response stopLocalScheduler(@Context MessageContext mc) {
+ Task task = RestServiceUtil.initRequest(mc);
+ OperationResult result = new OperationResult(OPERATION_STOP_LOCAL_SCHEDULER);
+
+ Response response;
+ try {
+ taskService.stopLocalScheduler(task, result);
+ response = RestServiceUtil.createResponse(Response.Status.OK, result);
+ } catch (Throwable t) {
+ response = RestServiceUtil.handleException(result, t);
+ }
+ result.computeStatus();
+ finishRequest(task);
+ return response;
+ }
+
+ @POST
+ @Path("/scheduler/start")
+ @Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
+ @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
+ public Response startLocalScheduler(@Context MessageContext mc) {
+ Task task = RestServiceUtil.initRequest(mc);
+ OperationResult result = new OperationResult(OPERATION_START_LOCAL_SCHEDULER);
+
+ Response response;
+ try {
+ taskService.startLocalScheduler(task, result);
+ response = RestServiceUtil.createResponse(Response.Status.OK, result);
+ } catch (Throwable t) {
+ response = RestServiceUtil.handleException(result, t);
+ }
+ result.computeStatus();
+ finishRequest(task);
+ return response;
+ }
+
+ @POST
+ @Path("/tasks/{oid}/stop")
+ @Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
+ @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
+ public Response stopLocalTask(@PathParam("oid") String oid, @Context MessageContext mc) {
+ Task task = RestServiceUtil.initRequest(mc);
+ OperationResult result = new OperationResult(OPERATION_STOP_LOCAL_TASK);
+
+ Response response;
+ try {
+ taskService.stopLocalTask(oid, task, result);
+ response = RestServiceUtil.createResponse(Response.Status.OK, result);
+ } catch (Throwable t) {
+ response = RestServiceUtil.handleException(result, t);
+ }
+ result.computeStatus();
+ finishRequest(task);
+ return response;
+ }
+
// @GET
// @Path("tasks/{oid}")
// public Response getTaskByIdentifier(@PathParam("oid") String identifier) throws SchemaException, ObjectNotFoundException {
diff --git a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/controller/ModelController.java b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/controller/ModelController.java
index 381e2802f99..ede83f1ee17 100644
--- a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/controller/ModelController.java
+++ b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/controller/ModelController.java
@@ -31,6 +31,7 @@
import com.evolveum.midpoint.model.impl.lens.*;
import com.evolveum.midpoint.model.impl.scripting.ExecutionContext;
import com.evolveum.midpoint.model.impl.scripting.ScriptingExpressionEvaluator;
+import com.evolveum.midpoint.model.impl.security.NodeAuthenticationToken;
import com.evolveum.midpoint.model.impl.util.ModelImplUtils;
import com.evolveum.midpoint.prism.*;
import com.evolveum.midpoint.prism.crypto.Protector;
@@ -83,6 +84,8 @@
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import javax.xml.datatype.XMLGregorianCalendar;
@@ -1927,7 +1930,58 @@ public PrismObject getTaskByIdentifier(String identifier, Collection> nodes = repository.searchObjects(NodeType.class, query, null, result);
-// if (nodes.size() != 1) {
-// RestServiceUtil.createAbortMessage(requestCtx);
-// return;
-// }
-// //TODO: http header
-//
-// PreAuthenticatedAuthenticationToken authentication = new PreAuthenticatedAuthenticationToken(nodes.iterator().next(), null);
-// SecurityContext securityContext = SecurityContextHolder.getContext();
-// securityContext.setAuthentication(authentication);
-// } catch (Base64Exception | SchemaException e) {
-// RestServiceUtil.createAbortMessage(requestCtx);
-// return;
-// }
- }
+ if (parts.length == 1 && RestAuthenticationMethod.SECURITY_QUESTIONS.getMethod().equals(authenticationType)) {
+ RestServiceUtil.createSecurityQuestionAbortMessage(requestCtx, "{\"user\" : \"username\"}");
return;
}
@@ -128,24 +97,37 @@ public void filter(ContainerRequestContext requestCtx) throws IOException {
RestServiceUtil.createAbortMessage(requestCtx);
return;
}
- String base64Credentials = (parts.length == 2) ? parts[1] : null;
+
+ String base64Credentials = parts[1];
- if (RestAuthenticationMethod.SECURITY_QUESTIONS.equals(authenticationType)) {
+ if (RestAuthenticationMethod.SECURITY_QUESTIONS.getMethod().equals(authenticationType)) {
try {
String decodedCredentials = new String(Base64Utility.decode(base64Credentials));
policy = new AuthorizationPolicy();
policy.setAuthorizationType(RestAuthenticationMethod.SECURITY_QUESTIONS.getMethod());
policy.setAuthorization(decodedCredentials);
securityQuestionAuthenticator.handleRequest(policy, m, requestCtx);
-
} catch (Base64Exception e) {
RestServiceUtil.createSecurityQuestionAbortMessage(requestCtx, "{\"user\" : \"username\"}");
+ }
+ } else if (RestAuthenticationMethod.CLUSTER.getMethod().equals(authenticationType)) {
+ HttpConnectionInformation connectionInfo = SecurityUtil.getCurrentConnectionInformation();
+ String remoteAddress = connectionInfo != null ? connectionInfo.getRemoteHostAddress() : null;
+ String decodedCredentials;
+ try {
+ decodedCredentials = new String(Base64Utility.decode(base64Credentials));
+ } catch (Base64Exception e) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Couldn't decode base64-encoded credentials", e);
+ RestServiceUtil.createAbortMessage(requestCtx);
+ return;
+ }
+ if (!nodeAuthenticator.authenticate(null, remoteAddress, decodedCredentials, "?")) {
+ RestServiceUtil.createAbortMessage(requestCtx);
return;
}
+ Task task = taskManager.createTaskInstance();
+ m.put(RestServiceUtil.MESSAGE_PROPERTY_TASK_NAME, task);
}
-
-
-
}
diff --git a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/NodeAuthenticationEvaluatorImpl.java b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/NodeAuthenticationEvaluatorImpl.java
index 13c6dae8ced..680c24e495d 100644
--- a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/NodeAuthenticationEvaluatorImpl.java
+++ b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/NodeAuthenticationEvaluatorImpl.java
@@ -19,7 +19,11 @@
import java.util.Collections;
import java.util.List;
+import com.evolveum.midpoint.prism.crypto.EncryptionException;
+import com.evolveum.midpoint.prism.crypto.Protector;
import com.evolveum.midpoint.task.api.TaskManager;
+import com.evolveum.prism.xml.ns._public.types_3.ProtectedStringType;
+import org.jetbrains.annotations.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.security.core.context.SecurityContextHolder;
@@ -45,12 +49,13 @@ public class NodeAuthenticationEvaluatorImpl implements NodeAuthenticationEvalua
private RepositoryService repositoryService;
@Autowired private TaskManager taskManager;
@Autowired private SecurityHelper securityHelper;
+ @Autowired private Protector protector;
private static final Trace LOGGER = TraceManager.getTrace(NodeAuthenticationEvaluatorImpl.class);
private static final String OPERATION_SEARCH_NODE = NodeAuthenticationEvaluatorImpl.class.getName() + ".searchNode";
- public boolean authenticate(String remoteName, String remoteAddress, String operation) {
+ public boolean authenticate(@Nullable String remoteName, String remoteAddress, @Nullable String credentials, String operation) {
LOGGER.debug("Checking if {} ({}) is a known node", remoteName, remoteAddress);
OperationResult result = new OperationResult(OPERATION_SEARCH_NODE);
@@ -61,15 +66,40 @@ public boolean authenticate(String remoteName, String remoteAddress, String oper
List> matchingNodes = getMatchingNodes(allNodes, remoteName, remoteAddress, operation);
if (matchingNodes.size() == 1 || matchingNodes.size() >= 1 && taskManager.isLocalNodeClusteringEnabled()) {
- PrismObject actualNode = allNodes.iterator().next();
LOGGER.trace(
- "Matching result: The node {} was recognized as a known node (remote host name {} or IP address {} matched). Attempting to execute the requested operation: {}",
- actualNode.asObjectable().getName(), actualNode.asObjectable().getHostname(), remoteAddress, operation);
- NodeAuthenticationToken authNtoken = new NodeAuthenticationToken(actualNode, remoteAddress,
- Collections.emptyList());
- SecurityContextHolder.getContext().setAuthentication(authNtoken);
- securityHelper.auditLoginSuccess(actualNode.asObjectable(), connEnv);
- return true;
+ "Matching result: Node(s) {} recognized as known (remote host name {} or IP address {} matched). Attempting to execute the requested operation: {}",
+ allNodes, remoteName, remoteAddress, operation);
+ PrismObject actualNode = null;
+ if (credentials != null) {
+ for (PrismObject matchingNode : matchingNodes) {
+ ProtectedStringType encryptedSecret = matchingNode.asObjectable().getSecret();
+ if (encryptedSecret != null) {
+ String plainSecret;
+ try {
+ plainSecret = protector.decryptString(encryptedSecret);
+ } catch (EncryptionException e) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Couldn't decrypt node secret for {}", e, matchingNode);
+ continue;
+ }
+ if (credentials.equals(plainSecret)) {
+ LOGGER.debug("Node secret matches for {}", matchingNode);
+ actualNode = matchingNode;
+ break;
+ } else {
+ LOGGER.debug("Node secret does not match for {}", matchingNode);
+ }
+ }
+ }
+ } else {
+ actualNode = matchingNodes.get(0);
+ }
+ if (actualNode != null) {
+ NodeAuthenticationToken authNtoken = new NodeAuthenticationToken(actualNode, remoteAddress,
+ Collections.emptyList());
+ SecurityContextHolder.getContext().setAuthentication(authNtoken);
+ securityHelper.auditLoginSuccess(actualNode.asObjectable(), connEnv);
+ return true;
+ }
}
} catch (RuntimeException | SchemaException e) {
diff --git a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/RestAuthenticationMethod.java b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/RestAuthenticationMethod.java
deleted file mode 100644
index a1e1713f584..00000000000
--- a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/RestAuthenticationMethod.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package com.evolveum.midpoint.model.impl.security;
-
-import org.apache.commons.lang.StringUtils;
-
-public enum RestAuthenticationMethod {
-
- BASIC("Basic"),
- SECURITY_QUESTIONS("SecQ"),
- CLUSTER("Cluster");
-
-
- private String method;
-
- RestAuthenticationMethod(String method) {
- this.method = method;
- }
-
- public String getMethod() {
- return method;
- }
-
- protected boolean equals(String authenticationType) {
- if (StringUtils.isBlank(authenticationType)) {
- return false;
- }
-
- if (getMethod().equals(authenticationType)) {
- return true;
- }
- return false;
- }
-}
diff --git a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/util/RestServiceUtil.java b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/util/RestServiceUtil.java
index 17ccd109548..f82cccce758 100644
--- a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/util/RestServiceUtil.java
+++ b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/util/RestServiceUtil.java
@@ -33,7 +33,7 @@
import org.apache.cxf.jaxrs.ext.MessageContext;
import com.evolveum.midpoint.model.api.ModelExecuteOptions;
-import com.evolveum.midpoint.model.impl.security.RestAuthenticationMethod;
+import com.evolveum.midpoint.security.api.RestAuthenticationMethod;
import com.evolveum.midpoint.model.impl.security.SecurityHelper;
import com.evolveum.midpoint.schema.constants.SchemaConstants;
import com.evolveum.midpoint.schema.result.OperationResult;
@@ -67,14 +67,14 @@ public class RestServiceUtil {
public static final String APPLICATION_YAML = "application/yaml";
- public static Response handleException(OperationResult result, Exception ex) {
- LoggingUtils.logUnexpectedException(LOGGER, "Got exception while servicing REST request: {}", ex,
+ public static Response handleException(OperationResult result, Throwable t) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Got exception while servicing REST request: {}", t,
result != null ? result.getOperation() : "(null)");
- return handleExceptionNoLog(result, ex);
+ return handleExceptionNoLog(result, t);
}
- public static Response handleExceptionNoLog(OperationResult result, Exception ex) {
- return createErrorResponseBuilder(result, ex).build();
+ public static Response handleExceptionNoLog(OperationResult result, Throwable t) {
+ return createErrorResponseBuilder(result, t).build();
}
public static Response createResponse(Response.Status statusCode, OperationResult result) {
@@ -124,32 +124,32 @@ public static Response createResponse(Response.Status statusCode, URI locati
- public static Response.ResponseBuilder createErrorResponseBuilder(OperationResult result, Exception ex) {
- if (ex instanceof ObjectNotFoundException) {
+ public static Response.ResponseBuilder createErrorResponseBuilder(OperationResult result, Throwable t) {
+ if (t instanceof ObjectNotFoundException) {
return createErrorResponseBuilder(Response.Status.NOT_FOUND, result);
}
- if (ex instanceof CommunicationException || ex instanceof TunnelException) {
+ if (t instanceof CommunicationException || t instanceof TunnelException) {
return createErrorResponseBuilder(Response.Status.GATEWAY_TIMEOUT, result);
}
- if (ex instanceof SecurityViolationException || ex instanceof AuthorizationException) {
+ if (t instanceof SecurityViolationException || t instanceof AuthorizationException) {
return createErrorResponseBuilder(Response.Status.FORBIDDEN, result);
}
- if (ex instanceof ConfigurationException) {
+ if (t instanceof ConfigurationException) {
return createErrorResponseBuilder(Response.Status.BAD_GATEWAY, result);
}
- if (ex instanceof SchemaException
- || ex instanceof NoFocusNameSchemaException
- || ex instanceof ExpressionEvaluationException) {
+ if (t instanceof SchemaException
+ || t instanceof NoFocusNameSchemaException
+ || t instanceof ExpressionEvaluationException) {
return createErrorResponseBuilder(Response.Status.BAD_REQUEST, result);
}
- if (ex instanceof PolicyViolationException
- || ex instanceof ObjectAlreadyExistsException
- || ex instanceof ConcurrencyException) {
+ if (t instanceof PolicyViolationException
+ || t instanceof ObjectAlreadyExistsException
+ || t instanceof ConcurrencyException) {
return createErrorResponseBuilder(Response.Status.CONFLICT, result);
}
diff --git a/model/model-test/pom.xml b/model/model-test/pom.xml
index 8d936e5dd0b..998c7242ba1 100644
--- a/model/model-test/pom.xml
+++ b/model/model-test/pom.xml
@@ -205,6 +205,10 @@
com.h2database
h2
+
+ javax.annotation
+ javax.annotation-api
+
com.evolveum.midpoint.provisioning
diff --git a/provisioning/provisioning-impl/pom.xml b/provisioning/provisioning-impl/pom.xml
index 10224cdc356..7b4acba11ee 100644
--- a/provisioning/provisioning-impl/pom.xml
+++ b/provisioning/provisioning-impl/pom.xml
@@ -122,6 +122,10 @@
org.jetbrains
annotations-java5
+
+ javax.annotation
+ javax.annotation-api
+
diff --git a/provisioning/ucf-impl-connid/pom.xml b/provisioning/ucf-impl-connid/pom.xml
index e83525fb4dd..d72047c5f83 100644
--- a/provisioning/ucf-impl-connid/pom.xml
+++ b/provisioning/ucf-impl-connid/pom.xml
@@ -89,6 +89,10 @@
org.slf4j
slf4j-api
+
+ javax.annotation
+ javax.annotation-api
+
net.tirasa.connid
diff --git a/repo/repo-common/pom.xml b/repo/repo-common/pom.xml
index 703a05a9ca7..561f77649de 100644
--- a/repo/repo-common/pom.xml
+++ b/repo/repo-common/pom.xml
@@ -113,6 +113,10 @@
javax.xml.bind
jaxb-api
+
+ javax.annotation
+ javax.annotation-api
+
diff --git a/repo/repo-sql-impl/pom.xml b/repo/repo-sql-impl/pom.xml
index c2231b5e8f5..9afad35d48c 100644
--- a/repo/repo-sql-impl/pom.xml
+++ b/repo/repo-sql-impl/pom.xml
@@ -154,6 +154,10 @@
com.google.guava
guava
+
+ javax.annotation
+ javax.annotation-api
+
diff --git a/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/RestAuthenticationMethod.java b/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/RestAuthenticationMethod.java
new file mode 100644
index 00000000000..ece19051b7e
--- /dev/null
+++ b/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/RestAuthenticationMethod.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2010-2019 Evolveum
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.evolveum.midpoint.security.api;
+
+import org.apache.commons.lang.StringUtils;
+
+public enum RestAuthenticationMethod {
+
+ BASIC("Basic"),
+ SECURITY_QUESTIONS("SecQ"),
+ CLUSTER("Cluster");
+
+
+ private String method;
+
+ RestAuthenticationMethod(String method) {
+ this.method = method;
+ }
+
+ public String getMethod() {
+ return method;
+ }
+
+ protected boolean equals(String authenticationType) {
+ if (StringUtils.isBlank(authenticationType)) {
+ return false;
+ }
+
+ if (getMethod().equals(authenticationType)) {
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/SecurityUtil.java b/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/SecurityUtil.java
index 542ef3e833a..376d39cff42 100644
--- a/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/SecurityUtil.java
+++ b/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/SecurityUtil.java
@@ -46,6 +46,7 @@
public class SecurityUtil {
private static final Trace LOGGER = TraceManager.getTrace(SecurityUtil.class);
+ private static final long GET_LOCAL_NAME_THRESHOLD = 2000;
@NotNull private static List remoteHostAddressHeaders = Collections.emptyList();
@@ -300,15 +301,19 @@ public static HttpConnectionInformation getCurrentConnectionInformation() {
}
ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) attr;
HttpServletRequest request = servletRequestAttributes.getRequest();
- if (request == null) {
- return null;
- }
HttpConnectionInformation rv = new HttpConnectionInformation();
HttpSession session = request.getSession(false);
if (session != null) {
rv.setSessionId(session.getId());
}
+ long start = System.currentTimeMillis();
rv.setLocalHostName(request.getLocalName());
+ long delta = System.currentTimeMillis() - start;
+ if (delta > GET_LOCAL_NAME_THRESHOLD) {
+ LOGGER.warn("getLocalName() on HTTP request took {} milliseconds that is too long; "
+ + "please check your DNS configuration. Local name = {}, local address = {}", delta,
+ request.getLocalName(), request.getLocalAddr());
+ }
rv.setRemoteHostAddress(getRemoteHostAddress(request));
return rv;
}
diff --git a/repo/task-api/pom.xml b/repo/task-api/pom.xml
index 20f5bedf42e..2308b3f27ae 100644
--- a/repo/task-api/pom.xml
+++ b/repo/task-api/pom.xml
@@ -60,7 +60,15 @@
org.jetbrains
annotations-java5
-
+
+ org.apache.cxf
+ cxf-rt-rs-client
+
+
+ javax.ws.rs
+ javax.ws.rs-api
+
+
com.evolveum.midpoint.tools
test-ng
diff --git a/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/RemoteExecutionHelper.java b/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/RemoteExecutionHelper.java
new file mode 100644
index 00000000000..715f2c09f1c
--- /dev/null
+++ b/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/RemoteExecutionHelper.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2010-2019 Evolveum
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.evolveum.midpoint.task.api;
+
+import com.evolveum.midpoint.schema.result.OperationResult;
+import com.evolveum.midpoint.util.exception.SchemaException;
+import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType;
+import org.apache.cxf.jaxrs.client.WebClient;
+
+import javax.ws.rs.core.Response;
+import java.util.function.BiConsumer;
+
+/**
+ * Helps with the intra-cluster remote code execution.
+ *
+ * TODO documentation
+ */
+public interface RemoteExecutionHelper {
+
+ void execute(NodeType node, BiConsumer code, String context, OperationResult parentResult);
+
+ void execute(BiConsumer code, String context, OperationResult parentResult);
+
+ T extractResult(Response response, Class expectedClass) throws SchemaException;
+}
diff --git a/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/TaskManager.java b/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/TaskManager.java
index 9f4baf2d946..68ce0ddfb2a 100644
--- a/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/TaskManager.java
+++ b/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/TaskManager.java
@@ -357,7 +357,18 @@ void modifyTask(String oid, Collection extends ItemDelta> modifications, Opera
*
* @return tasks that currently run on this node.
*/
- Set getLocallyRunningTasks(OperationResult parentResult) throws TaskManagerException;
+ Set getLocallyRunningTasks(OperationResult parentResult);
+
+ /**
+ * Returns the local scheduler information.
+ */
+ SchedulerInformationType getLocalSchedulerInformation(OperationResult parentResult);
+
+ void stopLocalScheduler(OperationResult parentResult);
+
+ void startLocalScheduler(OperationResult parentResult);
+
+ void stopLocalTask(String oid, OperationResult parentResult);
/**
* Returns locally-run task by identifier. Returned instance is the same as is being used to carrying out
diff --git a/repo/task-quartz-impl/pom.xml b/repo/task-quartz-impl/pom.xml
index d6698528416..b37ef17c3df 100644
--- a/repo/task-quartz-impl/pom.xml
+++ b/repo/task-quartz-impl/pom.xml
@@ -123,6 +123,22 @@
org.jetbrains
annotations-java5
+
+ org.apache.cxf
+ cxf-rt-rs-client
+
+
+ org.apache.cxf
+ cxf-core
+
+
+ javax.ws.rs
+ javax.ws.rs-api
+
+
+ javax.annotation
+ javax.annotation-api
+
@@ -225,6 +241,6 @@
log4j-over-slf4j
test
-
+
diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerConfiguration.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerConfiguration.java
index 764002831a1..55f812edb72 100644
--- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerConfiguration.java
+++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerConfiguration.java
@@ -73,6 +73,9 @@ public class TaskManagerConfiguration {
private static final String QUARTZ_NODE_REGISTRATION_INTERVAL_CONFIG_ENTRY = "quartzNodeRegistrationInterval";
private static final String NODE_REGISTRATION_INTERVAL_CONFIG_ENTRY = "nodeRegistrationInterval";
private static final String NODE_TIMEOUT_CONFIG_ENTRY = "nodeTimeout";
+ private static final String DEFAULT_URL_CONFIG_ENTRY = "defaultUrl";
+ private static final String DEFAULT_REST_PORT_CONFIG_ENTRY = "defaultRestPort";
+ private static final String USE_JMX_CONFIG_ENTRY = "useJmx";
private static final String JMX_USERNAME_CONFIG_ENTRY = "jmxUsername";
private static final String JMX_PASSWORD_CONFIG_ENTRY = "jmxPassword";
private static final String TEST_MODE_CONFIG_ENTRY = "testMode";
@@ -106,6 +109,7 @@ public class TaskManagerConfiguration {
private static final int QUARTZ_NODE_REGISTRATION_CYCLE_TIME_DEFAULT = 10;
private static final int NODE_REGISTRATION_CYCLE_TIME_DEFAULT = 10;
private static final int NODE_TIMEOUT_DEFAULT = 30;
+ private static final boolean USE_JMX_DEFAULT = false;
private static final String JMX_USERNAME_DEFAULT = "midpoint";
private static final String JMX_PASSWORD_DEFAULT = "secret";
private static final int WAITING_TASKS_CHECK_INTERVAL_DEFAULT = 600;
@@ -128,6 +132,8 @@ public class TaskManagerConfiguration {
private boolean jdbcJobStore;
private boolean clustered;
private String nodeId;
+ private String defaultUrl;
+ private Integer defaultRestPort;
private String jmxHostName;
private int jmxPort;
private int jmxConnectTimeout;
@@ -149,6 +155,7 @@ public class TaskManagerConfiguration {
private long workAllocationInitialDelay;
private long workAllocationDefaultFreeBucketWaitInterval;
+ private boolean useJmx;
// JMX credentials for connecting to remote nodes
private String jmxUsername;
private String jmxPassword;
@@ -202,6 +209,9 @@ public class TaskManagerConfiguration {
QUARTZ_NODE_REGISTRATION_INTERVAL_CONFIG_ENTRY,
NODE_REGISTRATION_INTERVAL_CONFIG_ENTRY,
NODE_TIMEOUT_CONFIG_ENTRY,
+ DEFAULT_URL_CONFIG_ENTRY,
+ DEFAULT_REST_PORT_CONFIG_ENTRY,
+ USE_JMX_CONFIG_ENTRY,
JMX_USERNAME_CONFIG_ENTRY,
JMX_PASSWORD_CONFIG_ENTRY,
TEST_MODE_CONFIG_ENTRY,
@@ -307,6 +317,9 @@ void setBasicInformation(MidpointConfiguration masterConfig) throws TaskManagerC
nodeRegistrationCycleTime = c.getInt(NODE_REGISTRATION_INTERVAL_CONFIG_ENTRY, NODE_REGISTRATION_CYCLE_TIME_DEFAULT);
nodeTimeout = c.getInt(NODE_TIMEOUT_CONFIG_ENTRY, NODE_TIMEOUT_DEFAULT);
+ defaultUrl = c.getString(DEFAULT_URL_CONFIG_ENTRY, null);
+ defaultRestPort = c.getInteger(DEFAULT_REST_PORT_CONFIG_ENTRY, null);
+ useJmx = c.getBoolean(USE_JMX_CONFIG_ENTRY, USE_JMX_DEFAULT);
jmxUsername = c.getString(JMX_USERNAME_CONFIG_ENTRY, JMX_USERNAME_DEFAULT);
jmxPassword = c.getString(JMX_PASSWORD_CONFIG_ENTRY, JMX_PASSWORD_DEFAULT);
@@ -555,6 +568,18 @@ public int getQuartzNodeRegistrationCycleTime() {
return quartzNodeRegistrationCycleTime;
}
+ public String getDefaultUrl() {
+ return defaultUrl;
+ }
+
+ public Integer getDefaultRestPort() {
+ return defaultRestPort;
+ }
+
+ public boolean isUseJmx() {
+ return useJmx;
+ }
+
public String getJmxUsername() {
return jmxUsername;
}
diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerQuartzImpl.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerQuartzImpl.java
index 81473d54fcf..0b97273e074 100644
--- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerQuartzImpl.java
+++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerQuartzImpl.java
@@ -35,6 +35,7 @@
import com.evolveum.midpoint.common.LocalizationService;
import com.evolveum.midpoint.prism.ItemDefinition;
+import com.evolveum.midpoint.prism.crypto.Protector;
import com.evolveum.midpoint.prism.path.ItemPath;
import com.evolveum.midpoint.repo.api.PreconditionViolationException;
import com.evolveum.midpoint.repo.api.RepoAddOptions;
@@ -124,6 +125,8 @@ public class TaskManagerQuartzImpl implements TaskManager, BeanFactoryAware {
@Autowired private TaskManagerConfiguration configuration;
@Autowired private LocalizationService localizationService;
@Autowired private SystemConfigurationChangeDispatcher systemConfigurationChangeDispatcher;
+ @Autowired private RemoteExecutionHelper remoteExecutionHelper;
+ @Autowired private Protector protector;
// instances of all the helper classes (see their definitions for their description)
private ExecutionManager executionManager = new ExecutionManager(this);
@@ -1876,7 +1879,27 @@ public Set getLocallyRunningTasks(OperationResult parentResult) {
return executionManager.getLocallyRunningTasks(parentResult);
}
- @Override
+ @Override
+ public SchedulerInformationType getLocalSchedulerInformation(OperationResult parentResult) {
+ return executionManager.getLocalSchedulerInformation(parentResult);
+ }
+
+ @Override
+ public void stopLocalScheduler(OperationResult parentResult) {
+ executionManager.stopLocalScheduler(parentResult);
+ }
+
+ @Override
+ public void startLocalScheduler(OperationResult parentResult) {
+ executionManager.startLocalScheduler(parentResult);
+ }
+
+ @Override
+ public void stopLocalTask(String oid, OperationResult parentResult) {
+ executionManager.stopLocalTask(oid, parentResult);
+ }
+
+ @Override
public Task getLocallyRunningTaskByIdentifier(String lightweightIdentifier) {
synchronized (locallyRunningTaskInstancesMap) {
return locallyRunningTaskInstancesMap.get(lightweightIdentifier);
@@ -2347,4 +2370,12 @@ public boolean isLocalNodeClusteringEnabled() {
public SystemConfigurationChangeDispatcher getSystemConfigurationChangeDispatcher() {
return systemConfigurationChangeDispatcher;
}
+
+ public RemoteExecutionHelper getRemoteExecutionHelper() {
+ return remoteExecutionHelper;
+ }
+
+ public Protector getProtector() {
+ return protector;
+ }
}
diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/ClusterStatusInformation.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/ClusterStatusInformation.java
index 56e45aa200d..7782544f538 100644
--- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/ClusterStatusInformation.java
+++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/ClusterStatusInformation.java
@@ -17,6 +17,8 @@
package com.evolveum.midpoint.task.quartzimpl.cluster;
import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType;
+import com.evolveum.midpoint.xml.ns._public.common.common_3.SchedulerInformationType;
+import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;
import java.io.Serializable;
import java.util.*;
@@ -115,6 +117,18 @@ public void addNodeAndTaskInfo(NodeType node, List taskInfoList) {
tasks.put(node, taskInfoList);
}
+ public void addNodeAndTaskInfo(SchedulerInformationType info) {
+ tasks.put(info.getNode(), getTaskInfoList(info));
+ }
+
+ private List getTaskInfoList(SchedulerInformationType info) {
+ List rv = new ArrayList<>();
+ for (TaskType taskBean : info.getExecutingTask()) {
+ rv.add(new TaskInfo(taskBean.getOid()));
+ }
+ return rv;
+ }
+
public NodeType findNodeById(String nodeIdentifier) {
for (NodeType node : tasks.keySet()) {
if (node.getNodeIdentifier().equals(nodeIdentifier)) {
diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/NodeRegistrar.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/NodeRegistrar.java
index 8d30043c1ec..e2aff632720 100644
--- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/NodeRegistrar.java
+++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/NodeRegistrar.java
@@ -19,11 +19,13 @@
import com.evolveum.midpoint.common.LocalizationService;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.PrismObject;
+import com.evolveum.midpoint.prism.crypto.EncryptionException;
import com.evolveum.midpoint.prism.delta.ItemDelta;
import com.evolveum.midpoint.prism.delta.ObjectDelta;
import com.evolveum.midpoint.prism.equivalence.EquivalenceStrategy;
import com.evolveum.midpoint.prism.polystring.PolyString;
import com.evolveum.midpoint.prism.query.ObjectQuery;
+import com.evolveum.midpoint.prism.xml.XmlTypeConverter;
import com.evolveum.midpoint.repo.api.RepositoryService;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.util.ObjectQueryUtil;
@@ -41,13 +43,13 @@
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
import com.evolveum.prism.xml.ns._public.types_3.PolyStringType;
+import com.evolveum.prism.xml.ns._public.types_3.ProtectedStringType;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.RandomStringUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import javax.xml.datatype.DatatypeConfigurationException;
-import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.XMLGregorianCalendar;
import java.net.InetAddress;
import java.net.NetworkInterface;
@@ -63,6 +65,8 @@
public class NodeRegistrar {
private static final transient Trace LOGGER = TraceManager.getTrace(NodeRegistrar.class);
+ private static final int SECRET_LENGTH = 20;
+ private static final long SECRET_RENEWAL_PERIOD = 86400L * 1000L * 10L;
private TaskManagerQuartzImpl taskManager;
private ClusterManager clusterManager;
@@ -90,7 +94,9 @@ public NodeRegistrar(TaskManagerQuartzImpl taskManager, ClusterManager clusterMa
*/
NodeType createOrUpdateNodeInRepo(OperationResult result) throws TaskManagerInitializationException {
- NodeType nodeToBe = createLocalNodeObject(taskManager.getConfiguration());
+ TaskManagerConfiguration configuration = taskManager.getConfiguration();
+
+ NodeType nodeToBe = createLocalNodeObject(configuration);
LOGGER.info("Registering this node in the repository as " + nodeToBe.getNodeIdentifier() + " at " + nodeToBe.getHostname() + ":" + nodeToBe.getJmxPort());
List> nodesInRepo;
@@ -104,7 +110,14 @@ NodeType createOrUpdateNodeInRepo(OperationResult result) throws TaskManagerInit
PrismObject nodeInRepo = nodesInRepo.get(0);
// copy all information that need to be preserved from the repository
nodeToBe.setTaskExecutionLimitations(nodeInRepo.asObjectable().getTaskExecutionLimitations());
- nodeToBe.setUrl(nodeInRepo.asObjectable().getUrl());
+ nodeToBe.setUrl(applyDefault(nodeInRepo.asObjectable().getUrl(), configuration.getDefaultUrl()));
+ nodeToBe.setRestPort(applyDefault(nodeInRepo.asObjectable().getRestPort(), configuration.getDefaultRestPort()));
+ if (shouldRenewSecret(nodeInRepo.asObjectable())) {
+ LOGGER.info("Renewing node secret for the current node");
+ } else {
+ nodeToBe.setSecret(nodeInRepo.asObjectable().getSecret());
+ nodeToBe.setSecretUpdateTimestamp(nodeInRepo.asObjectable().getSecretUpdateTimestamp());
+ }
ObjectDelta nodeDelta = nodeInRepo.diff(nodeToBe.asPrismObject(), EquivalenceStrategy.LITERAL);
LOGGER.debug("Applying delta to existing node object:\n{}", nodeDelta.debugDumpLazily());
try {
@@ -150,18 +163,30 @@ NodeType createOrUpdateNodeInRepo(OperationResult result) throws TaskManagerInit
return nodeToBe;
}
+ private boolean shouldRenewSecret(NodeType nodeInRepo) {
+ return nodeInRepo.getSecret() == null || nodeInRepo.getSecretUpdateTimestamp() == null ||
+ XmlTypeConverter.toMillis(nodeInRepo.getSecretUpdateTimestamp()) <= System.currentTimeMillis() + SECRET_RENEWAL_PERIOD;
+ }
+
+ private T applyDefault(T oldValue, T defaultValue) {
+ return oldValue != null ? oldValue : defaultValue;
+ }
+
@NotNull
private NodeType createLocalNodeObject(TaskManagerConfiguration configuration) {
+ XMLGregorianCalendar currentTime = getCurrentTime();
NodeType node = getPrismContext().createKnownObjectable(NodeType.class);
String nodeId = configuration.getNodeId();
node.setNodeIdentifier(nodeId);
node.setName(new PolyStringType(nodeId));
node.setHostname(getMyHostname());
node.getIpAddress().addAll(getMyIpAddresses());
+ node.setUrl(configuration.getDefaultUrl()); // overridden later (if already exists in repo)
+ node.setRestPort(configuration.getDefaultRestPort()); // overridden later (if already exists in repo)
node.setJmxPort(configuration.getJmxPort());
node.setClustered(configuration.isClustered());
node.setRunning(true);
- node.setLastCheckInTime(getCurrentTime());
+ node.setLastCheckInTime(currentTime);
node.setBuild(getBuildInformation());
node.setTaskExecutionLimitations(
new TaskExecutionLimitationsType()
@@ -169,9 +194,22 @@ private NodeType createLocalNodeObject(TaskManagerConfiguration configuration) {
.groupLimitation(new TaskGroupExecutionLimitationType().groupName(nodeId).limit(null))
.groupLimitation(new TaskGroupExecutionLimitationType().groupName(TaskConstants.LIMIT_FOR_OTHER_GROUPS).limit(0)));
generateInternalNodeIdentifier(node);
+ node.setSecretUpdateTimestamp(currentTime); // overridden later (if already exists in repo)
+ node.setSecret(generateNodeSecret()); // overridden later (if already exists in repo)
return node;
}
+ private ProtectedStringType generateNodeSecret() {
+ ProtectedStringType secret;
+ try {
+ String plain = RandomStringUtils.randomAlphanumeric(SECRET_LENGTH);
+ secret = taskManager.getProtector().encryptString(plain);
+ } catch (EncryptionException e) {
+ throw new SystemException("Couldn't encrypt node secret: " + e.getMessage(), e);
+ }
+ return secret;
+ }
+
private BuildInformationType getBuildInformation() {
BuildInformationType info = new BuildInformationType();
LocalizationService localizationService = taskManager.getLocalizationService();
@@ -192,13 +230,7 @@ private void generateInternalNodeIdentifier(NodeType node) {
}
private XMLGregorianCalendar getCurrentTime() {
- try {
- // AFAIK the DatatypeFactory is not thread safe, so we have to create an instance every time
- return DatatypeFactory.newInstance().newXMLGregorianCalendar(new GregorianCalendar());
- } catch (DatatypeConfigurationException e) {
- // this should not happen
- throw new SystemException("Cannot create DatatypeFactory (to create XMLGregorianCalendar instance).", e);
- }
+ return XmlTypeConverter.createXMLGregorianCalendar(System.currentTimeMillis());
}
/**
@@ -233,13 +265,21 @@ void updateNodeObject(OperationResult result) {
String nodeName = taskManager.getNodeId();
LOGGER.trace("Updating this node registration:\n{}", cachedLocalNodeObject.debugDumpLazily());
try {
+ XMLGregorianCalendar currentTime = getCurrentTime();
List> modifications = getPrismContext().deltaFor(NodeType.class)
.item(NodeType.F_HOSTNAME).replace(getMyHostname())
.item(NodeType.F_IP_ADDRESS).replaceRealValues(getMyIpAddresses())
- .item(NodeType.F_LAST_CHECK_IN_TIME).replace(getCurrentTime())
+ .item(NodeType.F_LAST_CHECK_IN_TIME).replace(currentTime)
.asItemDeltas();
+ if (shouldRenewSecret(cachedLocalNodeObject.asObjectable())) {
+ modifications.addAll(getPrismContext().deltaFor(NodeType.class)
+ .item(NodeType.F_SECRET).replace(generateNodeSecret())
+ .item(NodeType.F_SECRET_UPDATE_TIMESTAMP).replace(currentTime)
+ .asItemDeltas());
+ }
getRepositoryService().modifyObject(NodeType.class, nodeOid, modifications, result);
LOGGER.trace("Node registration successfully updated.");
+ cachedLocalNodeObject = getRepositoryService().getObject(NodeType.class, nodeOid, null, result);
} catch (ObjectNotFoundException e) {
LoggingUtils.logUnexpectedException(LOGGER, "Cannot update registration of this node (name {}, oid {}), because it "
+ "does not exist in repository. It is probably caused by cluster misconfiguration (other "
diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/RemoteExecutionHelperImpl.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/RemoteExecutionHelperImpl.java
new file mode 100644
index 00000000000..6df067c5a0f
--- /dev/null
+++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/RemoteExecutionHelperImpl.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2010-2019 Evolveum
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.evolveum.midpoint.task.quartzimpl.cluster;
+
+import com.evolveum.midpoint.prism.PrismContext;
+import com.evolveum.midpoint.prism.PrismObject;
+import com.evolveum.midpoint.prism.crypto.Protector;
+import com.evolveum.midpoint.prism.query.ObjectQuery;
+import com.evolveum.midpoint.repo.api.SystemConfigurationChangeDispatcher;
+import com.evolveum.midpoint.repo.api.SystemConfigurationChangeListener;
+import com.evolveum.midpoint.schema.SearchResultList;
+import com.evolveum.midpoint.schema.result.OperationResult;
+import com.evolveum.midpoint.security.api.RestAuthenticationMethod;
+import com.evolveum.midpoint.task.api.RemoteExecutionHelper;
+import com.evolveum.midpoint.task.api.TaskManager;
+import com.evolveum.midpoint.util.exception.*;
+import com.evolveum.midpoint.util.logging.LoggingUtils;
+import com.evolveum.midpoint.util.logging.Trace;
+import com.evolveum.midpoint.util.logging.TraceManager;
+import com.evolveum.midpoint.xml.ns._public.common.common_3.InfrastructureConfigurationType;
+import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType;
+import com.evolveum.midpoint.xml.ns._public.common.common_3.SystemConfigurationType;
+import org.apache.commons.lang.StringUtils;
+import org.apache.cxf.common.util.Base64Utility;
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.jetbrains.annotations.Nullable;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.ws.rs.core.Response;
+import java.util.function.BiConsumer;
+
+/**
+ * Helps with the intra-cluster remote code execution.
+ */
+@Component
+public class RemoteExecutionHelperImpl implements RemoteExecutionHelper, SystemConfigurationChangeListener {
+
+ private static final Trace LOGGER = TraceManager.getTrace(RemoteExecutionHelperImpl.class);
+
+ @Autowired private PrismContext prismContext;
+ @Autowired private TaskManager taskManager;
+ @Autowired private Protector protector;
+ @Autowired private SystemConfigurationChangeDispatcher configurationChangeDispatcher;
+
+ private static final String DOT_CLASS = RemoteExecutionHelperImpl.class.getName() + ".";
+
+ private InfrastructureConfigurationType infrastructureConfiguration;
+
+ @PostConstruct
+ public void init() {
+ configurationChangeDispatcher.registerListener(this);
+ }
+
+ @Override
+ public boolean update(@Nullable SystemConfigurationType value) {
+ infrastructureConfiguration = value != null ? value.getInfrastructure() : null;
+ return true;
+ }
+
+ @Override
+ public void execute(BiConsumer code, String context, OperationResult parentResult) {
+
+ OperationResult result = parentResult.createSubresult(DOT_CLASS + "execute");
+ String nodeId = taskManager.getNodeId();
+
+ SearchResultList> otherClusterNodes;
+ try {
+ ObjectQuery query = prismContext.queryFor(NodeType.class).not().item(NodeType.F_NODE_IDENTIFIER).eq(nodeId).build();
+ otherClusterNodes = taskManager.searchObjects(NodeType.class, query, null, result);
+ } catch (SchemaException e) {
+ LOGGER.warn("Couldn't find nodes to execute remote operation on them ({}). Skipping it.", context, e);
+ result.recordFatalError("Couldn't find nodes to execute remote operation on them (" + context + "). Skipping it.", e);
+ return;
+ }
+
+ for (PrismObject node : otherClusterNodes.getList()) {
+ execute(node.asObjectable(), code, context, result);
+ }
+ result.computeStatus();
+ }
+
+ @Override
+ public void execute(NodeType node, BiConsumer code, String context, OperationResult parentResult) {
+ OperationResult result = parentResult.createSubresult(DOT_CLASS + "execute.node");
+ String nodeIdentifier = node.getNodeIdentifier();
+ result.addParam("node", nodeIdentifier);
+
+ String httpUrlPattern = infrastructureConfiguration != null
+ ? infrastructureConfiguration.getIntraClusterHttpUrlPattern()
+ : null;
+
+ try {
+ String baseUrl;
+ if (node.getUrl() != null) {
+ baseUrl = node.getUrl();
+ } else {
+ if (StringUtils.isBlank(httpUrlPattern)) {
+ LOGGER.warn("Node URL nor intra-cluster URL pattern specified, skipping remote execution ({}) for node {}",
+ context, node.getNodeIdentifier());
+ return;
+ }
+ baseUrl = httpUrlPattern
+ .replace("$host", node.getHostname())
+ .replace("$port", String.valueOf(node.getRestPort()));
+ }
+
+ String url = baseUrl + "/ws/rest";
+ LOGGER.debug("Going to execute '{}' on '{}'", context, url);
+ WebClient client = WebClient.create(url);
+ if (node.getSecret() == null) {
+ throw new SchemaException("No secret known for target node " + node.getNodeIdentifier());
+ }
+ String secret = protector.decryptString(node.getSecret());
+ client.header("Authorization", RestAuthenticationMethod.CLUSTER.getMethod() + " " + Base64Utility.encode(secret.getBytes()));
+ code.accept(client, result);
+ result.computeStatusIfUnknown();
+ } catch (Throwable t) {
+ result.recordFatalError("Couldn't invoke operation (" + context + ") on node " + nodeIdentifier + ": " + t.getMessage(), t);
+ LoggingUtils.logUnexpectedException(LOGGER, "Couldn't invoke operation ({}) on node {}", t, context, nodeIdentifier);
+ }
+ }
+
+ @Override
+ public T extractResult(Response response, Class expectedClass) throws SchemaException {
+ if (response.hasEntity()) {
+ String body = response.readEntity(String.class);
+ if (expectedClass == null || Object.class.equals(expectedClass)) {
+ return prismContext.parserFor(body).parseRealValue();
+ } else {
+ return prismContext.parserFor(body).parseRealValue(expectedClass);
+ }
+ } else {
+ return null;
+ }
+ }
+}
diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/ExecutionManager.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/ExecutionManager.java
index 7a65e23df58..853ae89d644 100644
--- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/ExecutionManager.java
+++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/ExecutionManager.java
@@ -30,12 +30,10 @@
import com.evolveum.midpoint.util.logging.LoggingUtils;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
-import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType;
+import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
-import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskExecutionLimitationsType;
-import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskExecutionStatusType;
-import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskGroupExecutionLimitationType;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import org.quartz.*;
import java.util.*;
@@ -181,15 +179,19 @@ private void addNodeAndTaskInformation(ClusterStatusInformation info, PrismObjec
LOGGER.trace("Getting node and task info from the current node ({})", node.asObjectable().getNodeIdentifier());
- List taskInfoList = new ArrayList<>();
- Set tasks = localNodeManager.getLocallyRunningTasks(result);
- for (Task task : tasks) {
- taskInfoList.add(new ClusterStatusInformation.TaskInfo(task.getOid()));
+// List taskInfoList = new ArrayList<>();
+// Set tasks = localNodeManager.getLocallyRunningTasks(result);
+// for (Task task : tasks) {
+// taskInfoList.add(new ClusterStatusInformation.TaskInfo(task.getOid()));
+// }
+// node.asObjectable().setExecutionStatus(localNodeManager.getLocalNodeExecutionStatus());
+// node.asObjectable().setErrorStatus(taskManager.getLocalNodeErrorStatus());
+
+ SchedulerInformationType schedulerInformation = getLocalSchedulerInformation(result);
+ if (schedulerInformation.getNode() == null) { // shouldn't occur
+ schedulerInformation.setNode(node.asObjectable());
}
- node.asObjectable().setExecutionStatus(localNodeManager.getLocalNodeExecutionStatus());
- node.asObjectable().setErrorStatus(taskManager.getLocalNodeErrorStatus());
-
- info.addNodeAndTaskInfo(node.asObjectable(), taskInfoList);
+ info.addNodeAndTaskInfo(schedulerInformation);
} else { // if remote
@@ -537,7 +539,74 @@ public boolean synchronizeJobStores(OperationResult result) {
public Set getLocallyRunningTasks(OperationResult parentResult) {
return localNodeManager.getLocallyRunningTasks(parentResult);
+ }
+
+ public SchedulerInformationType getLocalSchedulerInformation(OperationResult parentResult) {
+ OperationResult result = parentResult.createSubresult(DOT_CLASS + "getLocalSchedulerInformation");
+ try {
+ SchedulerInformationType info = new SchedulerInformationType();
+ info.setNode(getLocalNode());
+ for (Task task : getLocallyRunningTasks(result)) {
+ info.getExecutingTask().add(task.getTaskType().clone());
+ }
+ result.computeStatus();
+ return info;
+ } catch (Throwable t) {
+ result.recordFatalError("Couldn't get scheduler information: " + t.getMessage(), t);
+ throw t;
+ }
+ }
+ public void stopLocalScheduler(OperationResult parentResult) {
+ OperationResult result = parentResult.createSubresult(DOT_CLASS + "stopLocalScheduler");
+ try {
+ localNodeManager.stopScheduler(result);
+ result.computeStatus();
+ } catch (Throwable t) {
+ result.recordFatalError("Couldn't stop local scheduler: " + t.getMessage(), t);
+ throw t;
+ }
+ }
+
+ public void startLocalScheduler(OperationResult parentResult) {
+ OperationResult result = parentResult.createSubresult(DOT_CLASS + "stopLocalScheduler");
+ try {
+ localNodeManager.startScheduler(result);
+ result.computeStatus();
+ } catch (Throwable t) {
+ result.recordFatalError("Couldn't start local scheduler: " + t.getMessage(), t);
+ throw t;
+ }
+ }
+
+ public void stopLocalTask(String oid, OperationResult parentResult) {
+ OperationResult result = parentResult.createSubresult(DOT_CLASS + "stopLocalTask");
+ try {
+ localNodeManager.stopLocalTaskRun(oid, result);
+ // TODO if interrupting is set to WHEN_NECESSARY we should check if the task stops within 5 seconds
+ // and if not, interrupt the thread. However, what if the task was stopped and then restarted? We
+ // should check for that situation. So let's ignore conditional interruption for now.
+ result.computeStatus();
+ } catch (Throwable t) {
+ result.recordFatalError("Couldn't interrupt local task: " + t.getMessage(), t);
+ throw t;
+ }
+ }
+
+ /**
+ * @return current local node information, updated with local node execution and error status.
+ * Returned value is fresh, so it can be modified as needed.
+ */
+ @Nullable
+ public NodeType getLocalNode() {
+ PrismObject localNode = taskManager.getClusterManager().getLocalNodeObject();
+ if (localNode == null) {
+ return null;
+ }
+ NodeType node = localNode.clone().asObjectable();
+ node.setExecutionStatus(localNodeManager.getLocalNodeExecutionStatus());
+ node.setErrorStatus(taskManager.getLocalNodeErrorStatus());
+ return node;
}
public void initializeLocalScheduler() throws TaskManagerInitializationException {
@@ -630,11 +699,6 @@ private void addTriggerNowForTask(Task task, OperationResult result) {
}
}
- // nodeId should not be the current node
- void redirectTaskToNode(@NotNull Task task, @NotNull NodeType node, @NotNull OperationResult result) {
- remoteNodesManager.redirectTaskToNode(task, node, result);
- }
-
public void pauseTaskJob(Task task, OperationResult parentResult) {
OperationResult result = parentResult.createSubresult(DOT_CLASS + "pauseTaskJob");
JobKey jobKey = TaskQuartzImplUtil.createJobKeyForTask(task);
diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/LocalNodeManager.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/LocalNodeManager.java
index e349d08b13f..5c87653b2c1 100644
--- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/LocalNodeManager.java
+++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/LocalNodeManager.java
@@ -387,7 +387,7 @@ void stopLocalTaskRun(String oid, OperationResult parentResult) {
OperationResult result = parentResult.createSubresult(LocalNodeManager.class.getName() + ".stopLocalTaskRun");
result.addParam("task", oid);
- LOGGER.info("Stopping local task " + oid + " run");
+ LOGGER.info("Stopping local task {} run", oid);
try {
getQuartzScheduler().interrupt(TaskQuartzImplUtil.createJobKeyForTaskOid(oid));
diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/RemoteNodesManager.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/RemoteNodesManager.java
index bc34ab9b63d..0b2d3cbb974 100644
--- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/RemoteNodesManager.java
+++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/RemoteNodesManager.java
@@ -19,36 +19,19 @@
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.result.OperationResultStatus;
-import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterStatusInformation;
import com.evolveum.midpoint.task.quartzimpl.TaskManagerConfiguration;
import com.evolveum.midpoint.task.quartzimpl.TaskManagerQuartzImpl;
import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterManager;
-import com.evolveum.midpoint.util.Holder;
+import com.evolveum.midpoint.task.quartzimpl.execution.remote.JmxConnector;
+import com.evolveum.midpoint.task.quartzimpl.execution.remote.RestConnector;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
-import com.evolveum.midpoint.util.exception.SystemException;
-import com.evolveum.midpoint.util.logging.LoggingUtils;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeExecutionStatusType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType;
-import org.jetbrains.annotations.NotNull;
import org.quartz.*;
-import org.quartz.core.jmx.QuartzSchedulerMBean;
-
-import javax.management.JMX;
-import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXServiceURL;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
/**
* Manages remote nodes. Concerned mainly with
@@ -65,9 +48,13 @@ public class RemoteNodesManager {
public static final JobKey STARTER_JOB_KEY = JobKey.jobKey("STARTER JOB");
private TaskManagerQuartzImpl taskManager;
+ private JmxConnector jmxConnector;
+ private RestConnector restConnector;
public RemoteNodesManager(TaskManagerQuartzImpl taskManager) {
this.taskManager = taskManager;
+ this.jmxConnector = new JmxConnector(taskManager);
+ this.restConnector = new RestConnector(taskManager);
}
/**
@@ -77,100 +64,25 @@ public RemoteNodesManager(TaskManagerQuartzImpl taskManager) {
* @param node Node which to query
*/
void addNodeStatusFromRemoteNode(ClusterStatusInformation info, PrismObject node, OperationResult parentResult) {
-
OperationResult result = parentResult.createSubresult(RemoteNodesManager.class.getName() + ".addNodeStatusFromRemoteNode");
- result.addParam("node", node);
-
NodeType nodeInfo = node.asObjectable();
- String nodeIdentifier = nodeInfo.getNodeIdentifier();
- String address = nodeInfo.getHostname() + ":" + nodeInfo.getJmxPort();
-
- if (!taskManager.getClusterManager().isUp(nodeInfo)) {
- nodeInfo.setExecutionStatus(NodeExecutionStatusType.DOWN);
- info.addNodeInfo(nodeInfo);
- result.recordStatus(OperationResultStatus.SUCCESS, "Node is down");
- return;
- }
-
- JMXConnector connector = null;
-
+ result.addParam("node", nodeInfo.getNodeIdentifier());
try {
- MBeanServerConnection mbsc;
-
- try {
- connector = connectViaJmx(address);
- mbsc = connector.getMBeanServerConnection();
- } catch (IOException e) {
- LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the remote node {} at {}", e, nodeIdentifier, address);
- result.recordWarning("Cannot connect to the remote node " + nodeIdentifier + " at " + address + ": " + e.getMessage(), e);
- nodeInfo.setExecutionStatus(NodeExecutionStatusType.COMMUNICATION_ERROR);
- nodeInfo.setConnectionResult(result.createOperationResultType());
+ if (!taskManager.getClusterManager().isUp(nodeInfo)) {
+ nodeInfo.setExecutionStatus(NodeExecutionStatusType.DOWN);
info.addNodeInfo(nodeInfo);
+ result.recordStatus(OperationResultStatus.SUCCESS, "Node is down");
return;
}
- try {
-
- QuartzSchedulerMBean mbeanProxy = getMBeanProxy(nodeIdentifier, mbsc);
-
- boolean running = false, down = true;
- if (mbeanProxy != null) {
- try {
- running = mbeanProxy.isStarted() && !mbeanProxy.isShutdown() && !mbeanProxy.isStandbyMode();
- down = mbeanProxy.isShutdown();
- } catch (Exception e) { // was: InstanceNotFoundException but it does not seem to work
- String message = "Cannot get information from scheduler " + nodeIdentifier + " because it does not exist or is shut down.";
- LoggingUtils.logUnexpectedException(LOGGER, message, e);
- result.recordWarning(message, e);
- nodeInfo.setConnectionResult(result.createOperationResultType());
- }
- } else {
- result.recordWarning("Cannot get information from node " + nodeIdentifier + " at " + address + " because the JMX object for scheduler cannot be found on that node.");
- nodeInfo.setConnectionResult(result.createOperationResultType());
- }
-
- LOGGER.trace(" - scheduler found = " + (mbeanProxy != null) + ", running = " + running + ", shutdown = " + down);
-
- if (down) {
- nodeInfo.setExecutionStatus(NodeExecutionStatusType.ERROR); // this is a mark of error situation (we expect that during ordinary shutdown the node quickly goes down so there is little probability of getting this status on that occasion)
- } else if (running) {
- nodeInfo.setExecutionStatus(NodeExecutionStatusType.RUNNING);
- } else {
- nodeInfo.setExecutionStatus(NodeExecutionStatusType.PAUSED);
- }
-
- List taskInfoList = new ArrayList<>();
- if (mbeanProxy != null) {
- TabularData jobs = mbeanProxy.getCurrentlyExecutingJobs();
- for (CompositeData job : (Collection) jobs.values()) {
- String oid = (String) job.get("jobName");
- LOGGER.trace(" - task oid = " + oid);
- taskInfoList.add(new ClusterStatusInformation.TaskInfo(oid));
- }
- }
-
- if (result.isUnknown()) {
- result.recordStatus(OperationResultStatus.SUCCESS, "Node " + nodeIdentifier + ": status = " + nodeInfo.getExecutionStatus() + ", # of running tasks: " + taskInfoList.size());
- }
- info.addNodeAndTaskInfo(nodeInfo, taskInfoList);
- } catch (Exception e) { // unfortunately, mbeanProxy.getCurrentlyExecutingJobs is declared to throw an Exception
- LoggingUtils.logUnexpectedException(LOGGER, "Cannot get information from the remote node {} at {}", e, nodeIdentifier, address);
- result.recordWarning("Cannot get information from the remote node " + nodeIdentifier + " at " + address + ": " + e.getMessage(), e);
- nodeInfo.setExecutionStatus(NodeExecutionStatusType.COMMUNICATION_ERROR);
- nodeInfo.setConnectionResult(result.createOperationResultType());
- info.addNodeInfo(nodeInfo);
- return;
- }
- }
- finally {
- try {
- if (connector != null) {
- connector.close();
- }
- } catch (IOException e) {
- LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, address);
+ if (taskManager.getConfiguration().isUseJmx()) {
+ jmxConnector.addNodeStatusUsingJmx(info, nodeInfo, result);
+ } else {
+ restConnector.addNodeStatus(info, nodeInfo, result);
}
- result.recordSuccessIfUnknown();
+ result.computeStatus();
+ } catch (Throwable t) {
+ result.recordFatalError("Couldn't get status from remote node", t);
}
}
@@ -184,147 +96,47 @@ private NodeType getNode(String nodeIdentifier, OperationResult result) {
}
public void stopRemoteScheduler(String nodeIdentifier, OperationResult parentResult) {
-
OperationResult result = parentResult.createSubresult(RemoteNodesManager.class.getName() + ".stopRemoteScheduler");
- result.addParam("nodeIdentifier", nodeIdentifier);
-
- NodeType node = getNode(nodeIdentifier, result);
- if (node == null) {
- return;
- }
-
- String nodeName = node.getNodeIdentifier();
- String address = node.getHostname() + ":" + node.getJmxPort();
-
- JMXConnector connector = null;
-
+ result.addParam("node", nodeIdentifier);
try {
- MBeanServerConnection mbsc;
-
- try {
- connector = connectViaJmx(address);
- mbsc = connector.getMBeanServerConnection();
- } catch (IOException e) {
- LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the remote node {} at {}", e, nodeName, address);
- result.recordFatalError("Cannot connect to the remote node " + nodeName + " at " + address + ": " + e.getMessage(), e);
- return;
+ NodeType node = getNode(nodeIdentifier, result);
+ if (node == null) {
+ return; // result is already updated
}
- try {
- QuartzSchedulerMBean mbeanProxy = getMBeanProxy(nodeName, mbsc);
- if (mbeanProxy != null) {
- mbeanProxy.standby();
- result.recordSuccess();
- } else {
- result.recordWarning("Cannot stop the scheduler on node " + nodeName + " at " + address + " because the JMX object for scheduler cannot be found on that node.");
- }
- return;
- }
- catch (Exception e) {
- LoggingUtils.logUnexpectedException(LOGGER, "Cannot put remote scheduler into standby mode; remote node {} at {}", e, nodeName, address);
- result.recordFatalError("Cannot put remote scheduler " + nodeName + " at " + address + " into standby mode: " + e.getMessage());
- return;
- }
-
- }
- finally {
- try {
- if (connector != null) {
- connector.close();
- }
- } catch (IOException e) {
- LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, address);
+ if (taskManager.getConfiguration().isUseJmx()) {
+ jmxConnector.stopRemoteScheduler(node, result);
+ } else {
+ restConnector.stopRemoteScheduler(node, result);
}
+ result.computeStatus();
+ } catch (Throwable t) {
+ result.recordFatalError("Couldn't stop scheduler on remote node", t);
+ // todo log the exception?
}
-
}
- void startRemoteScheduler(String nodeIdentifier, OperationResult result) {
-
- NodeType node = getNode(nodeIdentifier, result);
- if (node == null) {
- return;
- }
-
- String nodeName = node.getNodeIdentifier();
- String address = node.getHostname() + ":" + node.getJmxPort();
-
- JMXConnector connector = null;
-
+ public void startRemoteScheduler(String nodeIdentifier, OperationResult parentResult) {
+ OperationResult result = parentResult.createSubresult(RemoteNodesManager.class.getName() + ".startRemoteScheduler");
+ result.addParam("node", nodeIdentifier);
try {
- MBeanServerConnection mbsc;
-
- try {
- connector = connectViaJmx(address);
- mbsc = connector.getMBeanServerConnection();
- } catch (IOException e) {
- LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the remote node {} at {}", e, nodeName, address);
- result.recordFatalError("Cannot connect to the remote node " + nodeName + " at " + address + ": " + e.getMessage(), e);
- return;
- }
-
- try {
- QuartzSchedulerMBean mbeanProxy = getMBeanProxy(nodeName, mbsc);
- if (mbeanProxy != null) {
- mbeanProxy.start();
- result.recordSuccessIfUnknown();
- } else {
- result.recordFatalError("Cannot start remote scheduler " + nodeName + " at " + address + " because it cannot be found on that node.");
- }
- return;
- }
- catch (Exception e) {
- LoggingUtils.logUnexpectedException(LOGGER, "Cannot start remote scheduler; remote node {} at {}", e, nodeName, address);
- result.recordFatalError("Cannot start remote scheduler " + nodeName + " at " + address + ": " + e.getMessage());
- return;
+ NodeType node = getNode(nodeIdentifier, result);
+ if (node == null) {
+ return; // result is already updated
}
- }
- finally {
- try {
- if (connector != null) {
- connector.close();
- }
- } catch (IOException e) {
- LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, address);
- }
- }
-
- }
-
- private QuartzSchedulerMBean getMBeanProxy(String nodeName, MBeanServerConnection mbsc) throws MalformedObjectNameException {
- String mbeanNameAsString = "quartz:type=QuartzScheduler,name=midPointScheduler,instance=" + nodeName;
- ObjectName mbeanName = new ObjectName(mbeanNameAsString);
-
- try {
- if (mbsc.isRegistered(mbeanName)) {
- return JMX.newMBeanProxy(mbsc, mbeanName, QuartzSchedulerMBean.class, true);
+ if (taskManager.getConfiguration().isUseJmx()) {
+ jmxConnector.startRemoteScheduler(node, result);
} else {
- LOGGER.trace("MBean " + mbeanNameAsString + " is not registered at " + nodeName);
- return null;
+ restConnector.startRemoteScheduler(node, result);
}
- } catch (IOException e) {
- LoggingUtils.logUnexpectedException(LOGGER, "Cannot communicate with remote node via JMX", e);
- return null;
+ result.computeStatus();
+ } catch (Throwable t) {
+ result.recordFatalError("Couldn't start scheduler on remote node", t);
+ // todo log the exception?
}
}
- private JMXConnector connectViaJmx(String address) throws IOException {
-
- JMXServiceURL url =
- new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + address + "/jmxrmi");
-
- Map env = new HashMap<>();
- String jmxUsername = taskManager.getConfiguration().getJmxUsername();
- String jmxPassword = taskManager.getConfiguration().getJmxPassword();
- if (jmxUsername != null || jmxPassword != null) {
- String[] creds = { jmxUsername, jmxPassword };
- env.put(JMXConnector.CREDENTIALS, creds);
- }
- return JmxClient.connectWithTimeout(url, env,
- taskManager.getConfiguration().getJmxConnectTimeout(), TimeUnit.SECONDS);
- }
-
private TaskManagerConfiguration getConfiguration() {
return taskManager.getConfiguration();
}
@@ -340,106 +152,22 @@ void stopRemoteTaskRun(String oid, NodeType node, OperationResult parentResult)
result.addParam("oid", oid);
result.addParam("node", node.toString());
- LOGGER.debug("Interrupting task " + oid + " running at " + getClusterManager().dumpNodeInfo(node));
-
- String nodeName = node.getNodeIdentifier();
- String address = node.getHostname() + ":" + node.getJmxPort();
-
- Holder connectorHolder = new Holder<>();
-
- try {
- QuartzSchedulerMBean mbeanProxy = getSchedulerBean(node, connectorHolder, result);
- if (mbeanProxy != null) {
- try {
- mbeanProxy.interruptJob(oid, Scheduler.DEFAULT_GROUP);
- LOGGER.debug("Successfully signalled shutdown to task " + oid + " running at " + getClusterManager().dumpNodeInfo(node));
- result.recordSuccessIfUnknown();
- } catch (Exception e) { // necessary because of mbeanProxy
- String message = "Cannot signal task "+oid+" interruption to remote node "+nodeName+" at "+address;
- LoggingUtils.logUnexpectedException(LOGGER, message, e);
- result.recordFatalError(message + ":" + e.getMessage(), e);
- }
- }
- } finally {
- closeJmxConnection(connectorHolder, address);
- }
- }
-
- private void closeJmxConnection(Holder connectorHolder, String nodeInfo) {
- try {
- if (!connectorHolder.isEmpty()) {
- connectorHolder.getValue().close();
- }
- } catch (IOException e) {
- LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, nodeInfo);
- }
- }
-
- private QuartzSchedulerMBean getSchedulerBean(NodeType node, Holder connectorHolder,
- OperationResult result) {
- String nodeName = node.getNodeIdentifier();
- String address = node.getHostname() + ":" + node.getJmxPort();
- try {
- JMXConnector connector = connectViaJmx(address);
- connectorHolder.setValue(connector);
- MBeanServerConnection serverConnection = connector.getMBeanServerConnection();
- QuartzSchedulerMBean bean = getMBeanProxy(nodeName, serverConnection);
- if (bean == null) {
- String message = "Cannot connect to the Quartz Scheduler bean at remote node " + nodeName + " at "
- + address + " because the JMX object for scheduler cannot be found on that node.";
- LOGGER.warn("{}", message);
- result.recordFatalError(message);
- }
- return bean;
- } catch (IOException|MalformedObjectNameException e) {
- LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the quartz scheduler bean at remote node {} at {}", e, nodeName, address);
- result.recordFatalError("Cannot connect to the quartz scheduler bean at remote node " + nodeName + " at " + address + ": " + e.getMessage(), e);
- return null;
- }
+ LOGGER.debug("Interrupting task {} running at {}", oid, getClusterManager().dumpNodeInfo(node));
+ try {
+ if (taskManager.getConfiguration().isUseJmx()) {
+ jmxConnector.stopRemoteTaskRun(oid, node, result);
+ } else {
+ restConnector.stopRemoteTask(oid, node, result);
+ }
+ result.computeStatus();
+ } catch (Throwable t) {
+ result.recordFatalError("Couldn't stop task running on remote node", t);
+ // todo log the exception?
+ }
}
private ClusterManager getClusterManager() {
return taskManager.getClusterManager();
}
- public void redirectTaskToNode(@NotNull Task task, @NotNull NodeType node, @NotNull OperationResult result) {
- LOGGER.trace("Trying to schedule task {} on {}", task, node.getNodeIdentifier());
- Holder connectorHolder = new Holder<>();
- try {
- QuartzSchedulerMBean mbeanProxy = getSchedulerBean(node, connectorHolder, result);
- if (mbeanProxy != null) {
- try {
- createStarterJobIfNeeded();
- mbeanProxy.triggerJob(STARTER_JOB_KEY.getName(), STARTER_JOB_KEY.getGroup(),
- Collections.singletonMap(JobStarter.TASK_OID, task.getOid()));
- LOGGER.debug("Successfully requested start of " + task + " at " + getClusterManager().dumpNodeInfo(node));
- result.recordSuccessIfUnknown();
- } catch (Exception e) { // necessary because of mbeanProxy
- String message = "Cannot schedule " + task + " at " + getClusterManager().dumpNodeInfo(node);
- LoggingUtils.logUnexpectedException(LOGGER, message, e);
- result.recordFatalError(message + ":" + e.getMessage(), e);
- }
- } else {
- LOGGER.warn("Couldn't obtain Quartz MBean so couldn't reschedule task {} on {}", task, node.getNodeIdentifier());
- }
- } finally {
- closeJmxConnection(connectorHolder, getClusterManager().dumpNodeInfo(node));
- }
- }
-
- private void createStarterJobIfNeeded() {
- Scheduler scheduler = taskManager.getExecutionManager().getQuartzScheduler();
- try {
- if (!scheduler.checkExists(STARTER_JOB_KEY)) {
- JobDetail starterJob = JobBuilder.newJob(JobStarter.class)
- .withIdentity(STARTER_JOB_KEY)
- .storeDurably()
- .build();
- scheduler.addJob(starterJob, true);
- }
- } catch (SchedulerException e) {
- throw new SystemException("Starter job couldn't be created", e);
- }
- }
-
}
diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/JmxConnector.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/JmxConnector.java
new file mode 100644
index 00000000000..1b320a44c03
--- /dev/null
+++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/JmxConnector.java
@@ -0,0 +1,360 @@
+/*
+ * Copyright (c) 2010-2019 Evolveum
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.evolveum.midpoint.task.quartzimpl.execution.remote;
+
+import com.evolveum.midpoint.schema.result.OperationResult;
+import com.evolveum.midpoint.schema.result.OperationResultStatus;
+import com.evolveum.midpoint.task.quartzimpl.TaskManagerConfiguration;
+import com.evolveum.midpoint.task.quartzimpl.TaskManagerQuartzImpl;
+import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterManager;
+import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterStatusInformation;
+import com.evolveum.midpoint.task.quartzimpl.execution.ExecutionManager;
+import com.evolveum.midpoint.task.quartzimpl.execution.JmxClient;
+import com.evolveum.midpoint.util.Holder;
+import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
+import com.evolveum.midpoint.util.logging.LoggingUtils;
+import com.evolveum.midpoint.util.logging.Trace;
+import com.evolveum.midpoint.util.logging.TraceManager;
+import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeExecutionStatusType;
+import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType;
+import org.quartz.Scheduler;
+import org.quartz.core.jmx.QuartzSchedulerMBean;
+
+import javax.management.JMX;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXServiceURL;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manages remote nodes using JMX.
+ *
+ * @author Pavol Mederly
+ */
+
+public class JmxConnector {
+
+ private static final transient Trace LOGGER = TraceManager.getTrace(JmxConnector.class);
+
+ private TaskManagerQuartzImpl taskManager;
+
+ public JmxConnector(TaskManagerQuartzImpl taskManager) {
+ this.taskManager = taskManager;
+ }
+
+ public void addNodeStatusUsingJmx(ClusterStatusInformation info, NodeType nodeInfo, OperationResult result) {
+ String nodeIdentifier = nodeInfo.getNodeIdentifier();
+ String address = nodeInfo.getHostname() + ":" + nodeInfo.getJmxPort();
+ JMXConnector connector = null;
+
+ try {
+ MBeanServerConnection mbsc;
+
+ try {
+ connector = connectViaJmx(address);
+ mbsc = connector.getMBeanServerConnection();
+ } catch (IOException e) {
+ LoggingUtils
+ .logUnexpectedException(LOGGER, "Cannot connect to the remote node {} at {}", e, nodeIdentifier, address);
+ result.recordWarning("Cannot connect to the remote node " + nodeIdentifier + " at " + address + ": " + e.getMessage(), e);
+ nodeInfo.setExecutionStatus(NodeExecutionStatusType.COMMUNICATION_ERROR);
+ nodeInfo.setConnectionResult(result.createOperationResultType());
+ info.addNodeInfo(nodeInfo);
+ return;
+ }
+
+ try {
+
+ QuartzSchedulerMBean mbeanProxy = getMBeanProxy(nodeIdentifier, mbsc);
+
+ boolean running = false, down = true;
+ if (mbeanProxy != null) {
+ try {
+ running = mbeanProxy.isStarted() && !mbeanProxy.isShutdown() && !mbeanProxy.isStandbyMode();
+ down = mbeanProxy.isShutdown();
+ } catch (Exception e) { // was: InstanceNotFoundException but it does not seem to work
+ String message = "Cannot get information from scheduler " + nodeIdentifier + " because it does not exist or is shut down.";
+ LoggingUtils.logUnexpectedException(LOGGER, message, e);
+ result.recordWarning(message, e);
+ nodeInfo.setConnectionResult(result.createOperationResultType());
+ }
+ } else {
+ result.recordWarning("Cannot get information from node " + nodeIdentifier + " at " + address + " because the JMX object for scheduler cannot be found on that node.");
+ nodeInfo.setConnectionResult(result.createOperationResultType());
+ }
+
+ LOGGER.trace(" - scheduler found = " + (mbeanProxy != null) + ", running = " + running + ", shutdown = " + down);
+
+ if (down) {
+ nodeInfo.setExecutionStatus(NodeExecutionStatusType.ERROR); // this is a mark of error situation (we expect that during ordinary shutdown the node quickly goes down so there is little probability of getting this status on that occasion)
+ } else if (running) {
+ nodeInfo.setExecutionStatus(NodeExecutionStatusType.RUNNING);
+ } else {
+ nodeInfo.setExecutionStatus(NodeExecutionStatusType.PAUSED);
+ }
+
+ List taskInfoList = new ArrayList<>();
+ if (mbeanProxy != null) {
+ TabularData jobs = mbeanProxy.getCurrentlyExecutingJobs();
+ for (CompositeData job : (Collection) jobs.values()) {
+ String oid = (String) job.get("jobName");
+ LOGGER.trace(" - task oid = " + oid);
+ taskInfoList.add(new ClusterStatusInformation.TaskInfo(oid));
+ }
+ }
+
+ if (result.isUnknown()) {
+ result.recordStatus(OperationResultStatus.SUCCESS, "Node " + nodeIdentifier + ": status = " + nodeInfo.getExecutionStatus() + ", # of running tasks: " + taskInfoList.size());
+ }
+ info.addNodeAndTaskInfo(nodeInfo, taskInfoList);
+ } catch (Exception e) { // unfortunately, mbeanProxy.getCurrentlyExecutingJobs is declared to throw an Exception
+ LoggingUtils.logUnexpectedException(LOGGER, "Cannot get information from the remote node {} at {}", e, nodeIdentifier, address);
+ result.recordWarning("Cannot get information from the remote node " + nodeIdentifier + " at " + address + ": " + e.getMessage(), e);
+ nodeInfo.setExecutionStatus(NodeExecutionStatusType.COMMUNICATION_ERROR);
+ nodeInfo.setConnectionResult(result.createOperationResultType());
+ info.addNodeInfo(nodeInfo);
+ }
+ }
+ finally {
+ try {
+ if (connector != null) {
+ connector.close();
+ }
+ } catch (IOException e) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, address);
+ }
+ result.recordSuccessIfUnknown(); // TODO - ok?
+ }
+ }
+
+ private NodeType getNode(String nodeIdentifier, OperationResult result) {
+ try {
+ return taskManager.getClusterManager().getNodeById(nodeIdentifier, result).asObjectable();
+ } catch (ObjectNotFoundException e) {
+ result.recordFatalError("A node with identifier " + nodeIdentifier + " does not exist.");
+ return null;
+ }
+ }
+
+ public void stopRemoteScheduler(NodeType node, OperationResult result) {
+
+ String nodeName = node.getNodeIdentifier();
+ String address = node.getHostname() + ":" + node.getJmxPort();
+
+ JMXConnector connector = null;
+
+ try {
+ MBeanServerConnection mbsc;
+
+ try {
+ connector = connectViaJmx(address);
+ mbsc = connector.getMBeanServerConnection();
+ } catch (IOException e) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the remote node {} at {}", e, nodeName, address);
+ result.recordFatalError("Cannot connect to the remote node " + nodeName + " at " + address + ": " + e.getMessage(), e);
+ return;
+ }
+
+ try {
+ QuartzSchedulerMBean mbeanProxy = getMBeanProxy(nodeName, mbsc);
+ if (mbeanProxy != null) {
+ mbeanProxy.standby();
+ result.recordSuccess();
+ } else {
+ result.recordWarning("Cannot stop the scheduler on node " + nodeName + " at " + address + " because the JMX object for scheduler cannot be found on that node.");
+ }
+ }
+ catch (Exception e) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Cannot put remote scheduler into standby mode; remote node {} at {}", e, nodeName, address);
+ result.recordFatalError("Cannot put remote scheduler " + nodeName + " at " + address + " into standby mode: " + e.getMessage());
+ }
+ }
+ finally {
+ try {
+ if (connector != null) {
+ connector.close();
+ }
+ } catch (IOException e) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, address);
+ }
+ }
+
+ }
+
+ public void startRemoteScheduler(NodeType node, OperationResult result) {
+
+ String nodeName = node.getNodeIdentifier();
+ String address = node.getHostname() + ":" + node.getJmxPort();
+
+ JMXConnector connector = null;
+
+ try {
+ MBeanServerConnection mbsc;
+
+ try {
+ connector = connectViaJmx(address);
+ mbsc = connector.getMBeanServerConnection();
+ } catch (IOException e) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the remote node {} at {}", e, nodeName, address);
+ result.recordFatalError("Cannot connect to the remote node " + nodeName + " at " + address + ": " + e.getMessage(), e);
+ return;
+ }
+
+ try {
+ QuartzSchedulerMBean mbeanProxy = getMBeanProxy(nodeName, mbsc);
+ if (mbeanProxy != null) {
+ mbeanProxy.start();
+ result.recordSuccessIfUnknown();
+ } else {
+ result.recordFatalError("Cannot start remote scheduler " + nodeName + " at " + address + " because it cannot be found on that node.");
+ }
+ }
+ catch (Exception e) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Cannot start remote scheduler; remote node {} at {}", e, nodeName, address);
+ result.recordFatalError("Cannot start remote scheduler " + nodeName + " at " + address + ": " + e.getMessage());
+ }
+
+ }
+ finally {
+ try {
+ if (connector != null) {
+ connector.close();
+ }
+ } catch (IOException e) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, address);
+ }
+ }
+
+ }
+
+ private QuartzSchedulerMBean getMBeanProxy(String nodeName, MBeanServerConnection mbsc) throws MalformedObjectNameException {
+ String mbeanNameAsString = "quartz:type=QuartzScheduler,name=midPointScheduler,instance=" + nodeName;
+ ObjectName mbeanName = new ObjectName(mbeanNameAsString);
+
+ try {
+ if (mbsc.isRegistered(mbeanName)) {
+ return JMX.newMBeanProxy(mbsc, mbeanName, QuartzSchedulerMBean.class, true);
+ } else {
+ LOGGER.trace("MBean " + mbeanNameAsString + " is not registered at " + nodeName);
+ return null;
+ }
+ } catch (IOException e) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Cannot communicate with remote node via JMX", e);
+ return null;
+ }
+ }
+
+ private JMXConnector connectViaJmx(String address) throws IOException {
+
+ JMXServiceURL url =
+ new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + address + "/jmxrmi");
+
+ Map env = new HashMap<>();
+ String jmxUsername = taskManager.getConfiguration().getJmxUsername();
+ String jmxPassword = taskManager.getConfiguration().getJmxPassword();
+ if (jmxUsername != null || jmxPassword != null) {
+ String[] creds = { jmxUsername, jmxPassword };
+ env.put(JMXConnector.CREDENTIALS, creds);
+ }
+ return JmxClient.connectWithTimeout(url, env,
+ taskManager.getConfiguration().getJmxConnectTimeout(), TimeUnit.SECONDS);
+ }
+
+ private TaskManagerConfiguration getConfiguration() {
+ return taskManager.getConfiguration();
+ }
+
+ private ExecutionManager getGlobalExecutionManager() {
+ return taskManager.getExecutionManager();
+ }
+
+ // the task should be really running
+ public void stopRemoteTaskRun(String oid, NodeType node, OperationResult parentResult) {
+
+ OperationResult result = parentResult.createSubresult(JmxConnector.class.getName() + ".stopRemoteTaskRun");
+ result.addParam("oid", oid);
+ result.addParam("node", node.toString());
+
+ LOGGER.debug("Interrupting task " + oid + " running at " + getClusterManager().dumpNodeInfo(node));
+
+ String nodeName = node.getNodeIdentifier();
+ String address = node.getHostname() + ":" + node.getJmxPort();
+
+ Holder connectorHolder = new Holder<>();
+
+ try {
+ QuartzSchedulerMBean mbeanProxy = getSchedulerBean(node, connectorHolder, result);
+ if (mbeanProxy != null) {
+ try {
+ mbeanProxy.interruptJob(oid, Scheduler.DEFAULT_GROUP);
+ LOGGER.debug("Successfully signalled shutdown to task " + oid + " running at " + getClusterManager().dumpNodeInfo(node));
+ result.recordSuccessIfUnknown();
+ } catch (Exception e) { // necessary because of mbeanProxy
+ String message = "Cannot signal task "+oid+" interruption to remote node "+nodeName+" at "+address;
+ LoggingUtils.logUnexpectedException(LOGGER, message, e);
+ result.recordFatalError(message + ":" + e.getMessage(), e);
+ }
+ }
+ } finally {
+ closeJmxConnection(connectorHolder, address);
+ }
+ }
+
+ private void closeJmxConnection(Holder connectorHolder, String nodeInfo) {
+ try {
+ if (!connectorHolder.isEmpty()) {
+ connectorHolder.getValue().close();
+ }
+ } catch (IOException e) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, nodeInfo);
+ }
+ }
+
+ private QuartzSchedulerMBean getSchedulerBean(NodeType node, Holder connectorHolder,
+ OperationResult result) {
+ String nodeName = node.getNodeIdentifier();
+ String address = node.getHostname() + ":" + node.getJmxPort();
+ try {
+ JMXConnector connector = connectViaJmx(address);
+ connectorHolder.setValue(connector);
+ MBeanServerConnection serverConnection = connector.getMBeanServerConnection();
+ QuartzSchedulerMBean bean = getMBeanProxy(nodeName, serverConnection);
+ if (bean == null) {
+ String message = "Cannot connect to the Quartz Scheduler bean at remote node " + nodeName + " at "
+ + address + " because the JMX object for scheduler cannot be found on that node.";
+ LOGGER.warn("{}", message);
+ result.recordFatalError(message);
+ }
+ return bean;
+ } catch (IOException|MalformedObjectNameException e) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the quartz scheduler bean at remote node {} at {}", e, nodeName, address);
+ result.recordFatalError("Cannot connect to the quartz scheduler bean at remote node " + nodeName + " at " + address + ": " + e.getMessage(), e);
+ return null;
+ }
+ }
+
+ private ClusterManager getClusterManager() {
+ return taskManager.getClusterManager();
+ }
+
+}
diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/RestConnector.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/RestConnector.java
new file mode 100644
index 00000000000..ab2d3cbca08
--- /dev/null
+++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/RestConnector.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2010-2019 Evolveum
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.evolveum.midpoint.task.quartzimpl.execution.remote;
+
+import com.evolveum.midpoint.schema.result.OperationResult;
+import com.evolveum.midpoint.task.api.RemoteExecutionHelper;
+import com.evolveum.midpoint.task.quartzimpl.TaskManagerConfiguration;
+import com.evolveum.midpoint.task.quartzimpl.TaskManagerQuartzImpl;
+import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterManager;
+import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterStatusInformation;
+import com.evolveum.midpoint.task.quartzimpl.execution.ExecutionManager;
+import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
+import com.evolveum.midpoint.util.exception.SchemaException;
+import com.evolveum.midpoint.util.logging.LoggingUtils;
+import com.evolveum.midpoint.util.logging.Trace;
+import com.evolveum.midpoint.util.logging.TraceManager;
+import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType;
+import com.evolveum.midpoint.xml.ns._public.common.common_3.SchedulerInformationType;
+
+import javax.ws.rs.core.Response;
+
+/**
+ * Manages remote nodes using REST.
+ */
+public class RestConnector {
+
+ private static final transient Trace LOGGER = TraceManager.getTrace(RestConnector.class);
+
+ private TaskManagerQuartzImpl taskManager;
+
+ public RestConnector(TaskManagerQuartzImpl taskManager) {
+ this.taskManager = taskManager;
+ }
+
+ public void addNodeStatus(ClusterStatusInformation info, NodeType nodeInfo, OperationResult result) {
+ RemoteExecutionHelper remoteExecutionHelper = taskManager.getRemoteExecutionHelper();
+ remoteExecutionHelper.execute(nodeInfo, (client, result1) -> {
+ client.path("/scheduler/information");
+ Response response = client.get();
+ Response.StatusType statusInfo = response.getStatusInfo();
+ LOGGER.debug("Querying remote scheduler information on {} finished with status {}: {}", nodeInfo.getNodeIdentifier(),
+ statusInfo.getStatusCode(), statusInfo.getReasonPhrase());
+ if (statusInfo.getFamily() == Response.Status.Family.SUCCESSFUL) {
+ try {
+ SchedulerInformationType schedulerInfo = remoteExecutionHelper
+ .extractResult(response, SchedulerInformationType.class);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Received from {}:\n{}", nodeInfo.getNodeIdentifier(),
+ taskManager.getPrismContext().xmlSerializer().serializeRealValue(schedulerInfo));
+ }
+ info.addNodeAndTaskInfo(schedulerInfo);
+ } catch (SchemaException e) {
+ LoggingUtils.logUnexpectedException(LOGGER, "Couldn't parse scheduler information from remote node {}", e, nodeInfo.getNodeIdentifier());
+ }
+ } else {
+ LOGGER.warn("Querying remote scheduler information on {} finished with status {}: {}", nodeInfo.getNodeIdentifier(),
+ statusInfo.getStatusCode(), statusInfo.getReasonPhrase());
+ }
+ response.close();
+ }, "get scheduler information", result);
+ }
+
+ private NodeType getNode(String nodeIdentifier, OperationResult result) {
+ try {
+ return taskManager.getClusterManager().getNodeById(nodeIdentifier, result).asObjectable();
+ } catch (ObjectNotFoundException e) {
+ result.recordFatalError("A node with identifier " + nodeIdentifier + " does not exist.");
+ return null;
+ }
+ }
+
+ public void stopRemoteScheduler(NodeType node, OperationResult result) {
+ RemoteExecutionHelper remoteExecutionHelper = taskManager.getRemoteExecutionHelper();
+ remoteExecutionHelper.execute(node, (client, result1) -> {
+ client.path("/scheduler/stop");
+ Response response = client.post(null);
+ Response.StatusType statusInfo = response.getStatusInfo();
+ LOGGER.debug("Stopping remote scheduler on {} finished with status {}: {}", node.getNodeIdentifier(),
+ statusInfo.getStatusCode(), statusInfo.getReasonPhrase());
+ if (statusInfo.getFamily() != Response.Status.Family.SUCCESSFUL) {
+ LOGGER.warn("Stopping scheduler on {} finished with status {}: {}", node.getNodeIdentifier(),
+ statusInfo.getStatusCode(), statusInfo.getReasonPhrase());
+ result1.recordFatalError("Stopping remote scheduler finished with status " + statusInfo.getStatusCode() + ": " + statusInfo.getReasonPhrase());
+ }
+ response.close();
+ }, "stop scheduler", result);
+ }
+
+ public void startRemoteScheduler(NodeType node, OperationResult result) {
+ RemoteExecutionHelper remoteExecutionHelper = taskManager.getRemoteExecutionHelper();
+ remoteExecutionHelper.execute(node, (client, result1) -> {
+ client.path("/scheduler/start");
+ Response response = client.post(null);
+ Response.StatusType statusInfo = response.getStatusInfo();
+ LOGGER.debug("Starting remote scheduler on {} finished with status {}: {}", node.getNodeIdentifier(),
+ statusInfo.getStatusCode(), statusInfo.getReasonPhrase());
+ if (statusInfo.getFamily() != Response.Status.Family.SUCCESSFUL) {
+ LOGGER.warn("Starting scheduler on {} finished with status {}: {}", node.getNodeIdentifier(),
+ statusInfo.getStatusCode(), statusInfo.getReasonPhrase());
+ result1.recordFatalError("Starting remote scheduler finished with status " + statusInfo.getStatusCode() + ": " + statusInfo.getReasonPhrase());
+ }
+ response.close();
+ }, "start scheduler", result);
+ }
+
+ public void stopRemoteTask(String oid, NodeType node, OperationResult result) {
+ RemoteExecutionHelper remoteExecutionHelper = taskManager.getRemoteExecutionHelper();
+ remoteExecutionHelper.execute(node, (client, result1) -> {
+ client.path("/tasks/" + oid + "/stop");
+ Response response = client.post(null);
+ Response.StatusType statusInfo = response.getStatusInfo();
+ LOGGER.debug("Stopping task {} on {} finished with status {}: {}", oid, node.getNodeIdentifier(),
+ statusInfo.getStatusCode(), statusInfo.getReasonPhrase());
+ if (statusInfo.getFamily() != Response.Status.Family.SUCCESSFUL) {
+ LOGGER.warn("Stopping task {} on {} finished with status {}: {}", oid, node.getNodeIdentifier(),
+ statusInfo.getStatusCode(), statusInfo.getReasonPhrase());
+ result1.recordFatalError("Stopping remote task finished with status " + statusInfo.getStatusCode() + ": " + statusInfo.getReasonPhrase());
+ }
+ response.close();
+ }, "stop task", result);
+ }
+
+ private TaskManagerConfiguration getConfiguration() {
+ return taskManager.getConfiguration();
+ }
+
+ private ExecutionManager getGlobalExecutionManager() {
+ return taskManager.getExecutionManager();
+ }
+
+ private ClusterManager getClusterManager() {
+ return taskManager.getClusterManager();
+ }
+
+}
diff --git a/samples/tasks/task-scheduling.xml b/samples/tasks/task-scheduling.xml
index ea3c0cf3ea6..adbf164527b 100644
--- a/samples/tasks/task-scheduling.xml
+++ b/samples/tasks/task-scheduling.xml
@@ -132,7 +132,7 @@ these tasks via admin GUI.
20000
3
- 10000000-0000-0000-0000-123450000005
+ 10000000-0000-0000-0000-123450000006
runnable
diff --git a/testing/sanity/src/test/java/com/evolveum/midpoint/testing/sanity/TestSanity.java b/testing/sanity/src/test/java/com/evolveum/midpoint/testing/sanity/TestSanity.java
index 7e96b39da37..3cf1ca7cff1 100644
--- a/testing/sanity/src/test/java/com/evolveum/midpoint/testing/sanity/TestSanity.java
+++ b/testing/sanity/src/test/java/com/evolveum/midpoint/testing/sanity/TestSanity.java
@@ -3975,12 +3975,8 @@ public void test999Shutdown() throws Exception {
taskManager.shutdown();
waitFor("waiting for task manager shutdown", new Checker() {
@Override
- public boolean check() throws CommonException {
- try {
- return taskManager.getLocallyRunningTasks(new OperationResult("dummy")).isEmpty();
- } catch (TaskManagerException e) {
- throw new SystemException(e);
- }
+ public boolean check() {
+ return taskManager.getLocallyRunningTasks(new OperationResult("dummy")).isEmpty();
}
@Override