From f1ed0f7077134d4e5b5bb986e30424bff8332821 Mon Sep 17 00:00:00 2001 From: Pavol Mederly Date: Fri, 1 Feb 2019 16:53:26 +0100 Subject: [PATCH] Add REST support for task manager communication This is a preliminary implementation of MID-5058. Major changes: 1) task manager communication can be switched between JMX and REST 2) intra-cluster authentication is now protected by a node-level secret 3) RemoteExecutor is a helper class to enable making REST intra-cluster calls Major TODOs: - separate internally used functionality from the main REST service - reduce amount of information sent between nodes (cherry picked from commit 4c453ad952040768466a1c75a659143e541d336e) --- .../impl/util/ReportPeerQueryInterceptor.java | 3 +- .../xml/ns/public/common/common-core-3.xsd | 60 ++- .../midpoint/model/api/TaskService.java | 10 + .../NodeAuthenticationEvaluator.java | 2 +- model/model-common/pom.xml | 6 +- .../model/impl/ClusterCacheListener.java | 83 +--- .../midpoint/model/impl/ModelRestService.java | 85 ++++ .../impl/controller/ModelController.java | 56 ++- .../MidpointRestAuthenticationHandler.java | 70 ++-- .../NodeAuthenticationEvaluatorImpl.java | 48 ++- .../security/RestAuthenticationMethod.java | 32 -- .../model/impl/util/RestServiceUtil.java | 34 +- model/model-test/pom.xml | 4 + provisioning/provisioning-impl/pom.xml | 4 + provisioning/ucf-impl-connid/pom.xml | 4 + repo/repo-common/pom.xml | 4 + repo/repo-sql-impl/pom.xml | 4 + .../api/RestAuthenticationMethod.java | 48 +++ .../midpoint/security/api/SecurityUtil.java | 11 +- repo/task-api/pom.xml | 10 +- .../task/api/RemoteExecutionHelper.java | 39 ++ .../midpoint/task/api/TaskManager.java | 13 +- repo/task-quartz-impl/pom.xml | 18 +- .../quartzimpl/TaskManagerConfiguration.java | 25 ++ .../quartzimpl/TaskManagerQuartzImpl.java | 33 +- .../cluster/ClusterStatusInformation.java | 14 + .../quartzimpl/cluster/NodeRegistrar.java | 66 ++- .../cluster/RemoteExecutionHelperImpl.java | 152 +++++++ .../execution/ExecutionManager.java | 98 ++++- .../execution/LocalNodeManager.java | 2 +- .../execution/RemoteNodesManager.java | 380 +++--------------- .../execution/remote/JmxConnector.java | 360 +++++++++++++++++ .../execution/remote/RestConnector.java | 149 +++++++ samples/tasks/task-scheduling.xml | 2 +- .../midpoint/testing/sanity/TestSanity.java | 8 +- 35 files changed, 1388 insertions(+), 549 deletions(-) delete mode 100644 model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/RestAuthenticationMethod.java create mode 100644 repo/security-api/src/main/java/com/evolveum/midpoint/security/api/RestAuthenticationMethod.java create mode 100644 repo/task-api/src/main/java/com/evolveum/midpoint/task/api/RemoteExecutionHelper.java create mode 100644 repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/RemoteExecutionHelperImpl.java create mode 100644 repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/JmxConnector.java create mode 100644 repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/RestConnector.java diff --git a/gui/admin-gui/src/main/java/com/evolveum/midpoint/gui/impl/util/ReportPeerQueryInterceptor.java b/gui/admin-gui/src/main/java/com/evolveum/midpoint/gui/impl/util/ReportPeerQueryInterceptor.java index f5e45585227..fc1ca9737db 100644 --- a/gui/admin-gui/src/main/java/com/evolveum/midpoint/gui/impl/util/ReportPeerQueryInterceptor.java +++ b/gui/admin-gui/src/main/java/com/evolveum/midpoint/gui/impl/util/ReportPeerQueryInterceptor.java @@ -141,7 +141,8 @@ private boolean checkRequest(HttpServletRequest request, HttpServletResponse res return false; } - if (!nodeAuthenticator.authenticate(request.getRemoteHost(), request.getRemoteAddr(), operation)) { + // we temporarily allow authentication without credentials + if (!nodeAuthenticator.authenticate(request.getRemoteHost(), request.getRemoteAddr(), null, operation)) { LOGGER.debug("Unknown node, host: {} ", request.getRemoteHost()); response.setStatus(HttpServletResponse.SC_FORBIDDEN); return false; diff --git a/infra/schema/src/main/resources/xml/ns/public/common/common-core-3.xsd b/infra/schema/src/main/resources/xml/ns/public/common/common-core-3.xsd index ce6e6468386..9a1fbbe63db 100755 --- a/infra/schema/src/main/resources/xml/ns/public/common/common-core-3.xsd +++ b/infra/schema/src/main/resources/xml/ns/public/common/common-core-3.xsd @@ -2002,7 +2002,7 @@ infrastructure/intraClusterHttpUrlPattern system configuration property. Use only if really necessary. Support for this item is currently limited - to clusterwide cache invalidation. + to clusterwide cache invalidation and managing node task scheduler. @@ -2013,6 +2013,16 @@ + + + + Port at which this node can be contacted via REST. + + + 4.0 + + + @@ -2084,6 +2094,26 @@ + + + + The secret used for intra-cluster authentication. + + + 4.0 + + + + + + + When was the secret created or last changed. + + + 4.0 + + + @@ -3566,6 +3596,34 @@ + + + + Describes the state of the (local) scheduler. + + + 4.0 + + + + + + + Information on the current node. + + + + + + + Locally executing tasks. + + + + + + + diff --git a/model/model-api/src/main/java/com/evolveum/midpoint/model/api/TaskService.java b/model/model-api/src/main/java/com/evolveum/midpoint/model/api/TaskService.java index 7846408f277..622c5e955ce 100644 --- a/model/model-api/src/main/java/com/evolveum/midpoint/model/api/TaskService.java +++ b/model/model-api/src/main/java/com/evolveum/midpoint/model/api/TaskService.java @@ -21,6 +21,7 @@ import com.evolveum.midpoint.schema.result.OperationResult; import com.evolveum.midpoint.task.api.Task; import com.evolveum.midpoint.util.exception.*; +import com.evolveum.midpoint.xml.ns._public.common.common_3.SchedulerInformationType; import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType; import java.util.Collection; @@ -101,6 +102,9 @@ public interface TaskService { * @return */ PrismObject getTaskByIdentifier(String identifier, Collection> options, Task operationTask, OperationResult parentResult) throws SchemaException, ObjectNotFoundException, ConfigurationException, SecurityViolationException, ExpressionEvaluationException, CommunicationException; + + SchedulerInformationType getLocalSchedulerInformation(Task operationTask, OperationResult parentResult) throws CommunicationException, + ObjectNotFoundException, SchemaException, SecurityViolationException, ConfigurationException, ExpressionEvaluationException; //endregion //region Node-level operations @@ -142,6 +146,8 @@ public interface TaskService { */ void stopSchedulers(Collection nodeIdentifiers, Task operationTask, OperationResult parentResult) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException; + void stopLocalScheduler(Task operationTask, OperationResult parentResult) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException; + /** * Stops a set of schedulers (on their nodes) and tasks that are executing on these nodes. * @@ -163,6 +169,10 @@ public interface TaskService { * @return true if the operation succeeded; false otherwise. */ void startSchedulers(Collection nodeIdentifiers, Task operationTask, OperationResult result) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException; + + void startLocalScheduler(Task operationTask, OperationResult result) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException; + + void stopLocalTask(String oid, Task operationTask, OperationResult result) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException; //endregion //region Miscellaneous diff --git a/model/model-api/src/main/java/com/evolveum/midpoint/model/api/authentication/NodeAuthenticationEvaluator.java b/model/model-api/src/main/java/com/evolveum/midpoint/model/api/authentication/NodeAuthenticationEvaluator.java index 0c2740d8c61..60a6448cc9f 100644 --- a/model/model-api/src/main/java/com/evolveum/midpoint/model/api/authentication/NodeAuthenticationEvaluator.java +++ b/model/model-api/src/main/java/com/evolveum/midpoint/model/api/authentication/NodeAuthenticationEvaluator.java @@ -2,6 +2,6 @@ public interface NodeAuthenticationEvaluator { - boolean authenticate(String remoteName, String remoteAddress, String operation); + boolean authenticate(String remoteName, String remoteAddress, String credentials, String operation); } diff --git a/model/model-common/pom.xml b/model/model-common/pom.xml index bce54bd81fd..ddf389882fa 100644 --- a/model/model-common/pom.xml +++ b/model/model-common/pom.xml @@ -160,7 +160,11 @@ org.jetbrains annotations-java5 - + + javax.annotation + javax.annotation-api + + org.testng diff --git a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ClusterCacheListener.java b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ClusterCacheListener.java index 4f7fd6d692e..b1aa524cdf4 100644 --- a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ClusterCacheListener.java +++ b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ClusterCacheListener.java @@ -15,52 +15,33 @@ */ package com.evolveum.midpoint.model.impl; -import javax.annotation.PostConstruct; -import javax.ws.rs.core.Response; - -import org.apache.commons.lang.StringUtils; -import org.apache.cxf.jaxrs.client.WebClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.security.core.Authentication; -import org.springframework.security.core.context.SecurityContextHolder; -import org.springframework.stereotype.Component; - -import com.evolveum.midpoint.model.api.ModelInteractionService; -import com.evolveum.midpoint.model.api.ModelService; import com.evolveum.midpoint.model.impl.security.NodeAuthenticationToken; -import com.evolveum.midpoint.model.impl.security.RestAuthenticationMethod; -import com.evolveum.midpoint.prism.PrismContext; -import com.evolveum.midpoint.prism.PrismObject; -import com.evolveum.midpoint.prism.query.ObjectQuery; import com.evolveum.midpoint.repo.api.CacheDispatcher; import com.evolveum.midpoint.repo.api.CacheListener; -import com.evolveum.midpoint.schema.SearchResultList; import com.evolveum.midpoint.schema.constants.ObjectTypes; import com.evolveum.midpoint.schema.result.OperationResult; +import com.evolveum.midpoint.task.api.RemoteExecutionHelper; import com.evolveum.midpoint.task.api.Task; import com.evolveum.midpoint.task.api.TaskManager; -import com.evolveum.midpoint.util.exception.CommunicationException; -import com.evolveum.midpoint.util.exception.ConfigurationException; -import com.evolveum.midpoint.util.exception.ExpressionEvaluationException; -import com.evolveum.midpoint.util.exception.ObjectNotFoundException; -import com.evolveum.midpoint.util.exception.SchemaException; -import com.evolveum.midpoint.util.exception.SecurityViolationException; import com.evolveum.midpoint.util.logging.Trace; import com.evolveum.midpoint.util.logging.TraceManager; -import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType; import com.evolveum.midpoint.xml.ns._public.common.common_3.ObjectType; -import com.evolveum.midpoint.xml.ns._public.common.common_3.SystemConfigurationType; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.ws.rs.core.Response; @Component public class ClusterCacheListener implements CacheListener { private static final Trace LOGGER = TraceManager.getTrace(ClusterCacheListener.class); - @Autowired private ModelService modelService; - @Autowired private ModelInteractionService modelInteractionService; @Autowired private TaskManager taskManager; @Autowired private CacheDispatcher cacheDispatcher; - @Autowired private PrismContext prismContext; + @Autowired private RemoteExecutionHelper remoteExecutionHelper; @PostConstruct public void addListener() { @@ -75,8 +56,6 @@ public void invalidateCache(Class type, String oid) { return; } - String nodeId = taskManager.getNodeId(); - Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); if (authentication instanceof NodeAuthenticationToken) { LOGGER.trace("Skipping cluster-wide cache invalidation as this is already a remotely-invoked invalidateCache() call"); @@ -85,51 +64,13 @@ public void invalidateCache(Class type, String oid) { Task task = taskManager.createTaskInstance("invalidateCache"); OperationResult result = task.getResult(); - - SearchResultList> otherClusterNodes; - try { - ObjectQuery query = prismContext.queryFor(NodeType.class).not().item(NodeType.F_NODE_IDENTIFIER).eq(nodeId).build(); - otherClusterNodes = modelService.searchObjects(NodeType.class, query, null, task, result); - } catch (SchemaException | ObjectNotFoundException | SecurityViolationException | CommunicationException - | ConfigurationException | ExpressionEvaluationException e) { - LOGGER.warn("Cannot find nodes for clearing cache on them. Skipping.", e); - return; - } - - SystemConfigurationType systemConfig; - try { - systemConfig = modelInteractionService.getSystemConfiguration(result); - } catch (ObjectNotFoundException | SchemaException e) { - LOGGER.warn("Cannot load system configuration. Cannot determine the URL for REST calls without it" - + " (unless specified explicitly for individual nodes)"); - systemConfig = null; - } - - for (PrismObject node : otherClusterNodes.getList()) { - NodeType nodeType = node.asObjectable(); - String baseUrl; - if (nodeType.getUrl() != null) { - baseUrl = nodeType.getUrl(); - } else { - String httpUrlPattern = systemConfig != null && systemConfig.getInfrastructure() != null - ? systemConfig.getInfrastructure().getIntraClusterHttpUrlPattern() - : null; - if (StringUtils.isBlank(httpUrlPattern)) { - LOGGER.warn("Node URL nor intra-cluster URL pattern specified, skipping cache clearing for node {}", - nodeType.getNodeIdentifier()); - continue; - } - baseUrl = httpUrlPattern.replace("$host", nodeType.getHostname()); - } - - WebClient client = WebClient.create(baseUrl + "/ws/rest"); - client.header("Authorization", RestAuthenticationMethod.CLUSTER.getMethod());// + " " + Base64Utility.encode((nodeIdentifier).getBytes())); + remoteExecutionHelper.execute((client, result1) -> { client.path("/event/" + ObjectTypes.getRestTypeFromClass(type)); Response response = client.post(null); - LOGGER.info("Cluster-wide cache clearance finished with status {}, {}", response.getStatusInfo().getStatusCode(), response.getStatusInfo().getReasonPhrase()); - } + response.close(); + }, "cache invalidation", result); } } diff --git a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ModelRestService.java b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ModelRestService.java index 8c7e9db0f06..8aeaf4ad8c6 100644 --- a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ModelRestService.java +++ b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/ModelRestService.java @@ -105,6 +105,10 @@ public class ModelRestService { public static final String OPERATION_GENERATE_VALUE_RPC = CLASS_DOT + "generateValueRpc"; public static final String OPERATION_EXECUTE_CREDENTIAL_RESET = CLASS_DOT + "executeCredentialReset"; public static final String OPERATION_EXECUTE_CLUSTER_EVENT = CLASS_DOT + "executeClusterEvent"; + public static final String OPERATION_GET_LOCAL_SCHEDULER_INFORMATION = CLASS_DOT + "getLocalSchedulerInformation"; + public static final String OPERATION_STOP_LOCAL_SCHEDULER = CLASS_DOT + "stopScheduler"; + public static final String OPERATION_START_LOCAL_SCHEDULER = CLASS_DOT + "startScheduler"; + public static final String OPERATION_STOP_LOCAL_TASK = CLASS_DOT + "stopLocalTask"; private static final String CURRENT = "current"; private static final String VALIDATE = "validate"; @@ -118,6 +122,7 @@ public class ModelRestService { @Autowired private SecurityHelper securityHelper; @Autowired private ValuePolicyProcessor policyProcessor; @Autowired private TaskManager taskManager; + @Autowired private TaskService taskService; @Autowired private Protector protector; @Autowired private ResourceValidator resourceValidator; @@ -1070,6 +1075,86 @@ public Response executeClusterEvent(@PathParam("type") String type, @Context Mes } + @GET + @Path("/scheduler/information") + @Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML}) + @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML}) + public Response getLocalSchedulerInformation(@Context MessageContext mc) { + Task task = RestServiceUtil.initRequest(mc); + OperationResult result = new OperationResult(OPERATION_GET_LOCAL_SCHEDULER_INFORMATION); + + Response response; + try { + SchedulerInformationType schedulerInformation = taskService.getLocalSchedulerInformation(task, result); + response = RestServiceUtil.createResponse(Response.Status.OK, schedulerInformation, result); + } catch (Throwable t) { + response = RestServiceUtil.handleException(result, t); + } + result.computeStatus(); + finishRequest(task); + return response; + } + + @POST + @Path("/scheduler/stop") + @Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML}) + @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML}) + public Response stopLocalScheduler(@Context MessageContext mc) { + Task task = RestServiceUtil.initRequest(mc); + OperationResult result = new OperationResult(OPERATION_STOP_LOCAL_SCHEDULER); + + Response response; + try { + taskService.stopLocalScheduler(task, result); + response = RestServiceUtil.createResponse(Response.Status.OK, result); + } catch (Throwable t) { + response = RestServiceUtil.handleException(result, t); + } + result.computeStatus(); + finishRequest(task); + return response; + } + + @POST + @Path("/scheduler/start") + @Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML}) + @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML}) + public Response startLocalScheduler(@Context MessageContext mc) { + Task task = RestServiceUtil.initRequest(mc); + OperationResult result = new OperationResult(OPERATION_START_LOCAL_SCHEDULER); + + Response response; + try { + taskService.startLocalScheduler(task, result); + response = RestServiceUtil.createResponse(Response.Status.OK, result); + } catch (Throwable t) { + response = RestServiceUtil.handleException(result, t); + } + result.computeStatus(); + finishRequest(task); + return response; + } + + @POST + @Path("/tasks/{oid}/stop") + @Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML}) + @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML}) + public Response stopLocalTask(@PathParam("oid") String oid, @Context MessageContext mc) { + Task task = RestServiceUtil.initRequest(mc); + OperationResult result = new OperationResult(OPERATION_STOP_LOCAL_TASK); + + Response response; + try { + taskService.stopLocalTask(oid, task, result); + response = RestServiceUtil.createResponse(Response.Status.OK, result); + } catch (Throwable t) { + response = RestServiceUtil.handleException(result, t); + } + result.computeStatus(); + finishRequest(task); + return response; + } + // @GET // @Path("tasks/{oid}") // public Response getTaskByIdentifier(@PathParam("oid") String identifier) throws SchemaException, ObjectNotFoundException { diff --git a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/controller/ModelController.java b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/controller/ModelController.java index 381e2802f99..ede83f1ee17 100644 --- a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/controller/ModelController.java +++ b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/controller/ModelController.java @@ -31,6 +31,7 @@ import com.evolveum.midpoint.model.impl.lens.*; import com.evolveum.midpoint.model.impl.scripting.ExecutionContext; import com.evolveum.midpoint.model.impl.scripting.ScriptingExpressionEvaluator; +import com.evolveum.midpoint.model.impl.security.NodeAuthenticationToken; import com.evolveum.midpoint.model.impl.util.ModelImplUtils; import com.evolveum.midpoint.prism.*; import com.evolveum.midpoint.prism.crypto.Protector; @@ -83,6 +84,8 @@ import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; import javax.xml.datatype.XMLGregorianCalendar; @@ -1927,7 +1930,58 @@ public PrismObject getTaskByIdentifier(String identifier, Collection> nodes = repository.searchObjects(NodeType.class, query, null, result); -// if (nodes.size() != 1) { -// RestServiceUtil.createAbortMessage(requestCtx); -// return; -// } -// //TODO: http header -// -// PreAuthenticatedAuthenticationToken authentication = new PreAuthenticatedAuthenticationToken(nodes.iterator().next(), null); -// SecurityContext securityContext = SecurityContextHolder.getContext(); -// securityContext.setAuthentication(authentication); -// } catch (Base64Exception | SchemaException e) { -// RestServiceUtil.createAbortMessage(requestCtx); -// return; -// } - } + if (parts.length == 1 && RestAuthenticationMethod.SECURITY_QUESTIONS.getMethod().equals(authenticationType)) { + RestServiceUtil.createSecurityQuestionAbortMessage(requestCtx, "{\"user\" : \"username\"}"); return; } @@ -128,24 +97,37 @@ public void filter(ContainerRequestContext requestCtx) throws IOException { RestServiceUtil.createAbortMessage(requestCtx); return; } - String base64Credentials = (parts.length == 2) ? parts[1] : null; + + String base64Credentials = parts[1]; - if (RestAuthenticationMethod.SECURITY_QUESTIONS.equals(authenticationType)) { + if (RestAuthenticationMethod.SECURITY_QUESTIONS.getMethod().equals(authenticationType)) { try { String decodedCredentials = new String(Base64Utility.decode(base64Credentials)); policy = new AuthorizationPolicy(); policy.setAuthorizationType(RestAuthenticationMethod.SECURITY_QUESTIONS.getMethod()); policy.setAuthorization(decodedCredentials); securityQuestionAuthenticator.handleRequest(policy, m, requestCtx); - } catch (Base64Exception e) { RestServiceUtil.createSecurityQuestionAbortMessage(requestCtx, "{\"user\" : \"username\"}"); + } + } else if (RestAuthenticationMethod.CLUSTER.getMethod().equals(authenticationType)) { + HttpConnectionInformation connectionInfo = SecurityUtil.getCurrentConnectionInformation(); + String remoteAddress = connectionInfo != null ? connectionInfo.getRemoteHostAddress() : null; + String decodedCredentials; + try { + decodedCredentials = new String(Base64Utility.decode(base64Credentials)); + } catch (Base64Exception e) { + LoggingUtils.logUnexpectedException(LOGGER, "Couldn't decode base64-encoded credentials", e); + RestServiceUtil.createAbortMessage(requestCtx); + return; + } + if (!nodeAuthenticator.authenticate(null, remoteAddress, decodedCredentials, "?")) { + RestServiceUtil.createAbortMessage(requestCtx); return; } + Task task = taskManager.createTaskInstance(); + m.put(RestServiceUtil.MESSAGE_PROPERTY_TASK_NAME, task); } - - - } diff --git a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/NodeAuthenticationEvaluatorImpl.java b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/NodeAuthenticationEvaluatorImpl.java index 13c6dae8ced..680c24e495d 100644 --- a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/NodeAuthenticationEvaluatorImpl.java +++ b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/NodeAuthenticationEvaluatorImpl.java @@ -19,7 +19,11 @@ import java.util.Collections; import java.util.List; +import com.evolveum.midpoint.prism.crypto.EncryptionException; +import com.evolveum.midpoint.prism.crypto.Protector; import com.evolveum.midpoint.task.api.TaskManager; +import com.evolveum.prism.xml.ns._public.types_3.ProtectedStringType; +import org.jetbrains.annotations.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.security.core.context.SecurityContextHolder; @@ -45,12 +49,13 @@ public class NodeAuthenticationEvaluatorImpl implements NodeAuthenticationEvalua private RepositoryService repositoryService; @Autowired private TaskManager taskManager; @Autowired private SecurityHelper securityHelper; + @Autowired private Protector protector; private static final Trace LOGGER = TraceManager.getTrace(NodeAuthenticationEvaluatorImpl.class); private static final String OPERATION_SEARCH_NODE = NodeAuthenticationEvaluatorImpl.class.getName() + ".searchNode"; - public boolean authenticate(String remoteName, String remoteAddress, String operation) { + public boolean authenticate(@Nullable String remoteName, String remoteAddress, @Nullable String credentials, String operation) { LOGGER.debug("Checking if {} ({}) is a known node", remoteName, remoteAddress); OperationResult result = new OperationResult(OPERATION_SEARCH_NODE); @@ -61,15 +66,40 @@ public boolean authenticate(String remoteName, String remoteAddress, String oper List> matchingNodes = getMatchingNodes(allNodes, remoteName, remoteAddress, operation); if (matchingNodes.size() == 1 || matchingNodes.size() >= 1 && taskManager.isLocalNodeClusteringEnabled()) { - PrismObject actualNode = allNodes.iterator().next(); LOGGER.trace( - "Matching result: The node {} was recognized as a known node (remote host name {} or IP address {} matched). Attempting to execute the requested operation: {}", - actualNode.asObjectable().getName(), actualNode.asObjectable().getHostname(), remoteAddress, operation); - NodeAuthenticationToken authNtoken = new NodeAuthenticationToken(actualNode, remoteAddress, - Collections.emptyList()); - SecurityContextHolder.getContext().setAuthentication(authNtoken); - securityHelper.auditLoginSuccess(actualNode.asObjectable(), connEnv); - return true; + "Matching result: Node(s) {} recognized as known (remote host name {} or IP address {} matched). Attempting to execute the requested operation: {}", + allNodes, remoteName, remoteAddress, operation); + PrismObject actualNode = null; + if (credentials != null) { + for (PrismObject matchingNode : matchingNodes) { + ProtectedStringType encryptedSecret = matchingNode.asObjectable().getSecret(); + if (encryptedSecret != null) { + String plainSecret; + try { + plainSecret = protector.decryptString(encryptedSecret); + } catch (EncryptionException e) { + LoggingUtils.logUnexpectedException(LOGGER, "Couldn't decrypt node secret for {}", e, matchingNode); + continue; + } + if (credentials.equals(plainSecret)) { + LOGGER.debug("Node secret matches for {}", matchingNode); + actualNode = matchingNode; + break; + } else { + LOGGER.debug("Node secret does not match for {}", matchingNode); + } + } + } + } else { + actualNode = matchingNodes.get(0); + } + if (actualNode != null) { + NodeAuthenticationToken authNtoken = new NodeAuthenticationToken(actualNode, remoteAddress, + Collections.emptyList()); + SecurityContextHolder.getContext().setAuthentication(authNtoken); + securityHelper.auditLoginSuccess(actualNode.asObjectable(), connEnv); + return true; + } } } catch (RuntimeException | SchemaException e) { diff --git a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/RestAuthenticationMethod.java b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/RestAuthenticationMethod.java deleted file mode 100644 index a1e1713f584..00000000000 --- a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/security/RestAuthenticationMethod.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.evolveum.midpoint.model.impl.security; - -import org.apache.commons.lang.StringUtils; - -public enum RestAuthenticationMethod { - - BASIC("Basic"), - SECURITY_QUESTIONS("SecQ"), - CLUSTER("Cluster"); - - - private String method; - - RestAuthenticationMethod(String method) { - this.method = method; - } - - public String getMethod() { - return method; - } - - protected boolean equals(String authenticationType) { - if (StringUtils.isBlank(authenticationType)) { - return false; - } - - if (getMethod().equals(authenticationType)) { - return true; - } - return false; - } -} diff --git a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/util/RestServiceUtil.java b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/util/RestServiceUtil.java index 17ccd109548..f82cccce758 100644 --- a/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/util/RestServiceUtil.java +++ b/model/model-impl/src/main/java/com/evolveum/midpoint/model/impl/util/RestServiceUtil.java @@ -33,7 +33,7 @@ import org.apache.cxf.jaxrs.ext.MessageContext; import com.evolveum.midpoint.model.api.ModelExecuteOptions; -import com.evolveum.midpoint.model.impl.security.RestAuthenticationMethod; +import com.evolveum.midpoint.security.api.RestAuthenticationMethod; import com.evolveum.midpoint.model.impl.security.SecurityHelper; import com.evolveum.midpoint.schema.constants.SchemaConstants; import com.evolveum.midpoint.schema.result.OperationResult; @@ -67,14 +67,14 @@ public class RestServiceUtil { public static final String APPLICATION_YAML = "application/yaml"; - public static Response handleException(OperationResult result, Exception ex) { - LoggingUtils.logUnexpectedException(LOGGER, "Got exception while servicing REST request: {}", ex, + public static Response handleException(OperationResult result, Throwable t) { + LoggingUtils.logUnexpectedException(LOGGER, "Got exception while servicing REST request: {}", t, result != null ? result.getOperation() : "(null)"); - return handleExceptionNoLog(result, ex); + return handleExceptionNoLog(result, t); } - public static Response handleExceptionNoLog(OperationResult result, Exception ex) { - return createErrorResponseBuilder(result, ex).build(); + public static Response handleExceptionNoLog(OperationResult result, Throwable t) { + return createErrorResponseBuilder(result, t).build(); } public static Response createResponse(Response.Status statusCode, OperationResult result) { @@ -124,32 +124,32 @@ public static Response createResponse(Response.Status statusCode, URI locati - public static Response.ResponseBuilder createErrorResponseBuilder(OperationResult result, Exception ex) { - if (ex instanceof ObjectNotFoundException) { + public static Response.ResponseBuilder createErrorResponseBuilder(OperationResult result, Throwable t) { + if (t instanceof ObjectNotFoundException) { return createErrorResponseBuilder(Response.Status.NOT_FOUND, result); } - if (ex instanceof CommunicationException || ex instanceof TunnelException) { + if (t instanceof CommunicationException || t instanceof TunnelException) { return createErrorResponseBuilder(Response.Status.GATEWAY_TIMEOUT, result); } - if (ex instanceof SecurityViolationException || ex instanceof AuthorizationException) { + if (t instanceof SecurityViolationException || t instanceof AuthorizationException) { return createErrorResponseBuilder(Response.Status.FORBIDDEN, result); } - if (ex instanceof ConfigurationException) { + if (t instanceof ConfigurationException) { return createErrorResponseBuilder(Response.Status.BAD_GATEWAY, result); } - if (ex instanceof SchemaException - || ex instanceof NoFocusNameSchemaException - || ex instanceof ExpressionEvaluationException) { + if (t instanceof SchemaException + || t instanceof NoFocusNameSchemaException + || t instanceof ExpressionEvaluationException) { return createErrorResponseBuilder(Response.Status.BAD_REQUEST, result); } - if (ex instanceof PolicyViolationException - || ex instanceof ObjectAlreadyExistsException - || ex instanceof ConcurrencyException) { + if (t instanceof PolicyViolationException + || t instanceof ObjectAlreadyExistsException + || t instanceof ConcurrencyException) { return createErrorResponseBuilder(Response.Status.CONFLICT, result); } diff --git a/model/model-test/pom.xml b/model/model-test/pom.xml index 8d936e5dd0b..998c7242ba1 100644 --- a/model/model-test/pom.xml +++ b/model/model-test/pom.xml @@ -205,6 +205,10 @@ com.h2database h2 + + javax.annotation + javax.annotation-api + com.evolveum.midpoint.provisioning diff --git a/provisioning/provisioning-impl/pom.xml b/provisioning/provisioning-impl/pom.xml index 10224cdc356..7b4acba11ee 100644 --- a/provisioning/provisioning-impl/pom.xml +++ b/provisioning/provisioning-impl/pom.xml @@ -122,6 +122,10 @@ org.jetbrains annotations-java5 + + javax.annotation + javax.annotation-api + diff --git a/provisioning/ucf-impl-connid/pom.xml b/provisioning/ucf-impl-connid/pom.xml index e83525fb4dd..d72047c5f83 100644 --- a/provisioning/ucf-impl-connid/pom.xml +++ b/provisioning/ucf-impl-connid/pom.xml @@ -89,6 +89,10 @@ org.slf4j slf4j-api + + javax.annotation + javax.annotation-api + net.tirasa.connid diff --git a/repo/repo-common/pom.xml b/repo/repo-common/pom.xml index 703a05a9ca7..561f77649de 100644 --- a/repo/repo-common/pom.xml +++ b/repo/repo-common/pom.xml @@ -113,6 +113,10 @@ javax.xml.bind jaxb-api + + javax.annotation + javax.annotation-api + diff --git a/repo/repo-sql-impl/pom.xml b/repo/repo-sql-impl/pom.xml index c2231b5e8f5..9afad35d48c 100644 --- a/repo/repo-sql-impl/pom.xml +++ b/repo/repo-sql-impl/pom.xml @@ -154,6 +154,10 @@ com.google.guava guava + + javax.annotation + javax.annotation-api + diff --git a/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/RestAuthenticationMethod.java b/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/RestAuthenticationMethod.java new file mode 100644 index 00000000000..ece19051b7e --- /dev/null +++ b/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/RestAuthenticationMethod.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2010-2019 Evolveum + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.evolveum.midpoint.security.api; + +import org.apache.commons.lang.StringUtils; + +public enum RestAuthenticationMethod { + + BASIC("Basic"), + SECURITY_QUESTIONS("SecQ"), + CLUSTER("Cluster"); + + + private String method; + + RestAuthenticationMethod(String method) { + this.method = method; + } + + public String getMethod() { + return method; + } + + protected boolean equals(String authenticationType) { + if (StringUtils.isBlank(authenticationType)) { + return false; + } + + if (getMethod().equals(authenticationType)) { + return true; + } + return false; + } +} diff --git a/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/SecurityUtil.java b/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/SecurityUtil.java index 542ef3e833a..376d39cff42 100644 --- a/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/SecurityUtil.java +++ b/repo/security-api/src/main/java/com/evolveum/midpoint/security/api/SecurityUtil.java @@ -46,6 +46,7 @@ public class SecurityUtil { private static final Trace LOGGER = TraceManager.getTrace(SecurityUtil.class); + private static final long GET_LOCAL_NAME_THRESHOLD = 2000; @NotNull private static List remoteHostAddressHeaders = Collections.emptyList(); @@ -300,15 +301,19 @@ public static HttpConnectionInformation getCurrentConnectionInformation() { } ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) attr; HttpServletRequest request = servletRequestAttributes.getRequest(); - if (request == null) { - return null; - } HttpConnectionInformation rv = new HttpConnectionInformation(); HttpSession session = request.getSession(false); if (session != null) { rv.setSessionId(session.getId()); } + long start = System.currentTimeMillis(); rv.setLocalHostName(request.getLocalName()); + long delta = System.currentTimeMillis() - start; + if (delta > GET_LOCAL_NAME_THRESHOLD) { + LOGGER.warn("getLocalName() on HTTP request took {} milliseconds that is too long; " + + "please check your DNS configuration. Local name = {}, local address = {}", delta, + request.getLocalName(), request.getLocalAddr()); + } rv.setRemoteHostAddress(getRemoteHostAddress(request)); return rv; } diff --git a/repo/task-api/pom.xml b/repo/task-api/pom.xml index 20f5bedf42e..2308b3f27ae 100644 --- a/repo/task-api/pom.xml +++ b/repo/task-api/pom.xml @@ -60,7 +60,15 @@ org.jetbrains annotations-java5 - + + org.apache.cxf + cxf-rt-rs-client + + + javax.ws.rs + javax.ws.rs-api + + com.evolveum.midpoint.tools test-ng diff --git a/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/RemoteExecutionHelper.java b/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/RemoteExecutionHelper.java new file mode 100644 index 00000000000..715f2c09f1c --- /dev/null +++ b/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/RemoteExecutionHelper.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2010-2019 Evolveum + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.evolveum.midpoint.task.api; + +import com.evolveum.midpoint.schema.result.OperationResult; +import com.evolveum.midpoint.util.exception.SchemaException; +import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType; +import org.apache.cxf.jaxrs.client.WebClient; + +import javax.ws.rs.core.Response; +import java.util.function.BiConsumer; + +/** + * Helps with the intra-cluster remote code execution. + * + * TODO documentation + */ +public interface RemoteExecutionHelper { + + void execute(NodeType node, BiConsumer code, String context, OperationResult parentResult); + + void execute(BiConsumer code, String context, OperationResult parentResult); + + T extractResult(Response response, Class expectedClass) throws SchemaException; +} diff --git a/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/TaskManager.java b/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/TaskManager.java index 9f4baf2d946..68ce0ddfb2a 100644 --- a/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/TaskManager.java +++ b/repo/task-api/src/main/java/com/evolveum/midpoint/task/api/TaskManager.java @@ -357,7 +357,18 @@ void modifyTask(String oid, Collection modifications, Opera * * @return tasks that currently run on this node. */ - Set getLocallyRunningTasks(OperationResult parentResult) throws TaskManagerException; + Set getLocallyRunningTasks(OperationResult parentResult); + + /** + * Returns the local scheduler information. + */ + SchedulerInformationType getLocalSchedulerInformation(OperationResult parentResult); + + void stopLocalScheduler(OperationResult parentResult); + + void startLocalScheduler(OperationResult parentResult); + + void stopLocalTask(String oid, OperationResult parentResult); /** * Returns locally-run task by identifier. Returned instance is the same as is being used to carrying out diff --git a/repo/task-quartz-impl/pom.xml b/repo/task-quartz-impl/pom.xml index d6698528416..b37ef17c3df 100644 --- a/repo/task-quartz-impl/pom.xml +++ b/repo/task-quartz-impl/pom.xml @@ -123,6 +123,22 @@ org.jetbrains annotations-java5 + + org.apache.cxf + cxf-rt-rs-client + + + org.apache.cxf + cxf-core + + + javax.ws.rs + javax.ws.rs-api + + + javax.annotation + javax.annotation-api + @@ -225,6 +241,6 @@ log4j-over-slf4j test - + diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerConfiguration.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerConfiguration.java index 764002831a1..55f812edb72 100644 --- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerConfiguration.java +++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerConfiguration.java @@ -73,6 +73,9 @@ public class TaskManagerConfiguration { private static final String QUARTZ_NODE_REGISTRATION_INTERVAL_CONFIG_ENTRY = "quartzNodeRegistrationInterval"; private static final String NODE_REGISTRATION_INTERVAL_CONFIG_ENTRY = "nodeRegistrationInterval"; private static final String NODE_TIMEOUT_CONFIG_ENTRY = "nodeTimeout"; + private static final String DEFAULT_URL_CONFIG_ENTRY = "defaultUrl"; + private static final String DEFAULT_REST_PORT_CONFIG_ENTRY = "defaultRestPort"; + private static final String USE_JMX_CONFIG_ENTRY = "useJmx"; private static final String JMX_USERNAME_CONFIG_ENTRY = "jmxUsername"; private static final String JMX_PASSWORD_CONFIG_ENTRY = "jmxPassword"; private static final String TEST_MODE_CONFIG_ENTRY = "testMode"; @@ -106,6 +109,7 @@ public class TaskManagerConfiguration { private static final int QUARTZ_NODE_REGISTRATION_CYCLE_TIME_DEFAULT = 10; private static final int NODE_REGISTRATION_CYCLE_TIME_DEFAULT = 10; private static final int NODE_TIMEOUT_DEFAULT = 30; + private static final boolean USE_JMX_DEFAULT = false; private static final String JMX_USERNAME_DEFAULT = "midpoint"; private static final String JMX_PASSWORD_DEFAULT = "secret"; private static final int WAITING_TASKS_CHECK_INTERVAL_DEFAULT = 600; @@ -128,6 +132,8 @@ public class TaskManagerConfiguration { private boolean jdbcJobStore; private boolean clustered; private String nodeId; + private String defaultUrl; + private Integer defaultRestPort; private String jmxHostName; private int jmxPort; private int jmxConnectTimeout; @@ -149,6 +155,7 @@ public class TaskManagerConfiguration { private long workAllocationInitialDelay; private long workAllocationDefaultFreeBucketWaitInterval; + private boolean useJmx; // JMX credentials for connecting to remote nodes private String jmxUsername; private String jmxPassword; @@ -202,6 +209,9 @@ public class TaskManagerConfiguration { QUARTZ_NODE_REGISTRATION_INTERVAL_CONFIG_ENTRY, NODE_REGISTRATION_INTERVAL_CONFIG_ENTRY, NODE_TIMEOUT_CONFIG_ENTRY, + DEFAULT_URL_CONFIG_ENTRY, + DEFAULT_REST_PORT_CONFIG_ENTRY, + USE_JMX_CONFIG_ENTRY, JMX_USERNAME_CONFIG_ENTRY, JMX_PASSWORD_CONFIG_ENTRY, TEST_MODE_CONFIG_ENTRY, @@ -307,6 +317,9 @@ void setBasicInformation(MidpointConfiguration masterConfig) throws TaskManagerC nodeRegistrationCycleTime = c.getInt(NODE_REGISTRATION_INTERVAL_CONFIG_ENTRY, NODE_REGISTRATION_CYCLE_TIME_DEFAULT); nodeTimeout = c.getInt(NODE_TIMEOUT_CONFIG_ENTRY, NODE_TIMEOUT_DEFAULT); + defaultUrl = c.getString(DEFAULT_URL_CONFIG_ENTRY, null); + defaultRestPort = c.getInteger(DEFAULT_REST_PORT_CONFIG_ENTRY, null); + useJmx = c.getBoolean(USE_JMX_CONFIG_ENTRY, USE_JMX_DEFAULT); jmxUsername = c.getString(JMX_USERNAME_CONFIG_ENTRY, JMX_USERNAME_DEFAULT); jmxPassword = c.getString(JMX_PASSWORD_CONFIG_ENTRY, JMX_PASSWORD_DEFAULT); @@ -555,6 +568,18 @@ public int getQuartzNodeRegistrationCycleTime() { return quartzNodeRegistrationCycleTime; } + public String getDefaultUrl() { + return defaultUrl; + } + + public Integer getDefaultRestPort() { + return defaultRestPort; + } + + public boolean isUseJmx() { + return useJmx; + } + public String getJmxUsername() { return jmxUsername; } diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerQuartzImpl.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerQuartzImpl.java index 81473d54fcf..0b97273e074 100644 --- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerQuartzImpl.java +++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/TaskManagerQuartzImpl.java @@ -35,6 +35,7 @@ import com.evolveum.midpoint.common.LocalizationService; import com.evolveum.midpoint.prism.ItemDefinition; +import com.evolveum.midpoint.prism.crypto.Protector; import com.evolveum.midpoint.prism.path.ItemPath; import com.evolveum.midpoint.repo.api.PreconditionViolationException; import com.evolveum.midpoint.repo.api.RepoAddOptions; @@ -124,6 +125,8 @@ public class TaskManagerQuartzImpl implements TaskManager, BeanFactoryAware { @Autowired private TaskManagerConfiguration configuration; @Autowired private LocalizationService localizationService; @Autowired private SystemConfigurationChangeDispatcher systemConfigurationChangeDispatcher; + @Autowired private RemoteExecutionHelper remoteExecutionHelper; + @Autowired private Protector protector; // instances of all the helper classes (see their definitions for their description) private ExecutionManager executionManager = new ExecutionManager(this); @@ -1876,7 +1879,27 @@ public Set getLocallyRunningTasks(OperationResult parentResult) { return executionManager.getLocallyRunningTasks(parentResult); } - @Override + @Override + public SchedulerInformationType getLocalSchedulerInformation(OperationResult parentResult) { + return executionManager.getLocalSchedulerInformation(parentResult); + } + + @Override + public void stopLocalScheduler(OperationResult parentResult) { + executionManager.stopLocalScheduler(parentResult); + } + + @Override + public void startLocalScheduler(OperationResult parentResult) { + executionManager.startLocalScheduler(parentResult); + } + + @Override + public void stopLocalTask(String oid, OperationResult parentResult) { + executionManager.stopLocalTask(oid, parentResult); + } + + @Override public Task getLocallyRunningTaskByIdentifier(String lightweightIdentifier) { synchronized (locallyRunningTaskInstancesMap) { return locallyRunningTaskInstancesMap.get(lightweightIdentifier); @@ -2347,4 +2370,12 @@ public boolean isLocalNodeClusteringEnabled() { public SystemConfigurationChangeDispatcher getSystemConfigurationChangeDispatcher() { return systemConfigurationChangeDispatcher; } + + public RemoteExecutionHelper getRemoteExecutionHelper() { + return remoteExecutionHelper; + } + + public Protector getProtector() { + return protector; + } } diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/ClusterStatusInformation.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/ClusterStatusInformation.java index 56e45aa200d..7782544f538 100644 --- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/ClusterStatusInformation.java +++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/ClusterStatusInformation.java @@ -17,6 +17,8 @@ package com.evolveum.midpoint.task.quartzimpl.cluster; import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType; +import com.evolveum.midpoint.xml.ns._public.common.common_3.SchedulerInformationType; +import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType; import java.io.Serializable; import java.util.*; @@ -115,6 +117,18 @@ public void addNodeAndTaskInfo(NodeType node, List taskInfoList) { tasks.put(node, taskInfoList); } + public void addNodeAndTaskInfo(SchedulerInformationType info) { + tasks.put(info.getNode(), getTaskInfoList(info)); + } + + private List getTaskInfoList(SchedulerInformationType info) { + List rv = new ArrayList<>(); + for (TaskType taskBean : info.getExecutingTask()) { + rv.add(new TaskInfo(taskBean.getOid())); + } + return rv; + } + public NodeType findNodeById(String nodeIdentifier) { for (NodeType node : tasks.keySet()) { if (node.getNodeIdentifier().equals(nodeIdentifier)) { diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/NodeRegistrar.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/NodeRegistrar.java index 8d30043c1ec..e2aff632720 100644 --- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/NodeRegistrar.java +++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/NodeRegistrar.java @@ -19,11 +19,13 @@ import com.evolveum.midpoint.common.LocalizationService; import com.evolveum.midpoint.prism.PrismContext; import com.evolveum.midpoint.prism.PrismObject; +import com.evolveum.midpoint.prism.crypto.EncryptionException; import com.evolveum.midpoint.prism.delta.ItemDelta; import com.evolveum.midpoint.prism.delta.ObjectDelta; import com.evolveum.midpoint.prism.equivalence.EquivalenceStrategy; import com.evolveum.midpoint.prism.polystring.PolyString; import com.evolveum.midpoint.prism.query.ObjectQuery; +import com.evolveum.midpoint.prism.xml.XmlTypeConverter; import com.evolveum.midpoint.repo.api.RepositoryService; import com.evolveum.midpoint.schema.result.OperationResult; import com.evolveum.midpoint.schema.util.ObjectQueryUtil; @@ -41,13 +43,13 @@ import com.evolveum.midpoint.util.logging.TraceManager; import com.evolveum.midpoint.xml.ns._public.common.common_3.*; import com.evolveum.prism.xml.ns._public.types_3.PolyStringType; +import com.evolveum.prism.xml.ns._public.types_3.ProtectedStringType; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.Validate; +import org.apache.commons.lang3.RandomStringUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import javax.xml.datatype.DatatypeConfigurationException; -import javax.xml.datatype.DatatypeFactory; import javax.xml.datatype.XMLGregorianCalendar; import java.net.InetAddress; import java.net.NetworkInterface; @@ -63,6 +65,8 @@ public class NodeRegistrar { private static final transient Trace LOGGER = TraceManager.getTrace(NodeRegistrar.class); + private static final int SECRET_LENGTH = 20; + private static final long SECRET_RENEWAL_PERIOD = 86400L * 1000L * 10L; private TaskManagerQuartzImpl taskManager; private ClusterManager clusterManager; @@ -90,7 +94,9 @@ public NodeRegistrar(TaskManagerQuartzImpl taskManager, ClusterManager clusterMa */ NodeType createOrUpdateNodeInRepo(OperationResult result) throws TaskManagerInitializationException { - NodeType nodeToBe = createLocalNodeObject(taskManager.getConfiguration()); + TaskManagerConfiguration configuration = taskManager.getConfiguration(); + + NodeType nodeToBe = createLocalNodeObject(configuration); LOGGER.info("Registering this node in the repository as " + nodeToBe.getNodeIdentifier() + " at " + nodeToBe.getHostname() + ":" + nodeToBe.getJmxPort()); List> nodesInRepo; @@ -104,7 +110,14 @@ NodeType createOrUpdateNodeInRepo(OperationResult result) throws TaskManagerInit PrismObject nodeInRepo = nodesInRepo.get(0); // copy all information that need to be preserved from the repository nodeToBe.setTaskExecutionLimitations(nodeInRepo.asObjectable().getTaskExecutionLimitations()); - nodeToBe.setUrl(nodeInRepo.asObjectable().getUrl()); + nodeToBe.setUrl(applyDefault(nodeInRepo.asObjectable().getUrl(), configuration.getDefaultUrl())); + nodeToBe.setRestPort(applyDefault(nodeInRepo.asObjectable().getRestPort(), configuration.getDefaultRestPort())); + if (shouldRenewSecret(nodeInRepo.asObjectable())) { + LOGGER.info("Renewing node secret for the current node"); + } else { + nodeToBe.setSecret(nodeInRepo.asObjectable().getSecret()); + nodeToBe.setSecretUpdateTimestamp(nodeInRepo.asObjectable().getSecretUpdateTimestamp()); + } ObjectDelta nodeDelta = nodeInRepo.diff(nodeToBe.asPrismObject(), EquivalenceStrategy.LITERAL); LOGGER.debug("Applying delta to existing node object:\n{}", nodeDelta.debugDumpLazily()); try { @@ -150,18 +163,30 @@ NodeType createOrUpdateNodeInRepo(OperationResult result) throws TaskManagerInit return nodeToBe; } + private boolean shouldRenewSecret(NodeType nodeInRepo) { + return nodeInRepo.getSecret() == null || nodeInRepo.getSecretUpdateTimestamp() == null || + XmlTypeConverter.toMillis(nodeInRepo.getSecretUpdateTimestamp()) <= System.currentTimeMillis() + SECRET_RENEWAL_PERIOD; + } + + private T applyDefault(T oldValue, T defaultValue) { + return oldValue != null ? oldValue : defaultValue; + } + @NotNull private NodeType createLocalNodeObject(TaskManagerConfiguration configuration) { + XMLGregorianCalendar currentTime = getCurrentTime(); NodeType node = getPrismContext().createKnownObjectable(NodeType.class); String nodeId = configuration.getNodeId(); node.setNodeIdentifier(nodeId); node.setName(new PolyStringType(nodeId)); node.setHostname(getMyHostname()); node.getIpAddress().addAll(getMyIpAddresses()); + node.setUrl(configuration.getDefaultUrl()); // overridden later (if already exists in repo) + node.setRestPort(configuration.getDefaultRestPort()); // overridden later (if already exists in repo) node.setJmxPort(configuration.getJmxPort()); node.setClustered(configuration.isClustered()); node.setRunning(true); - node.setLastCheckInTime(getCurrentTime()); + node.setLastCheckInTime(currentTime); node.setBuild(getBuildInformation()); node.setTaskExecutionLimitations( new TaskExecutionLimitationsType() @@ -169,9 +194,22 @@ private NodeType createLocalNodeObject(TaskManagerConfiguration configuration) { .groupLimitation(new TaskGroupExecutionLimitationType().groupName(nodeId).limit(null)) .groupLimitation(new TaskGroupExecutionLimitationType().groupName(TaskConstants.LIMIT_FOR_OTHER_GROUPS).limit(0))); generateInternalNodeIdentifier(node); + node.setSecretUpdateTimestamp(currentTime); // overridden later (if already exists in repo) + node.setSecret(generateNodeSecret()); // overridden later (if already exists in repo) return node; } + private ProtectedStringType generateNodeSecret() { + ProtectedStringType secret; + try { + String plain = RandomStringUtils.randomAlphanumeric(SECRET_LENGTH); + secret = taskManager.getProtector().encryptString(plain); + } catch (EncryptionException e) { + throw new SystemException("Couldn't encrypt node secret: " + e.getMessage(), e); + } + return secret; + } + private BuildInformationType getBuildInformation() { BuildInformationType info = new BuildInformationType(); LocalizationService localizationService = taskManager.getLocalizationService(); @@ -192,13 +230,7 @@ private void generateInternalNodeIdentifier(NodeType node) { } private XMLGregorianCalendar getCurrentTime() { - try { - // AFAIK the DatatypeFactory is not thread safe, so we have to create an instance every time - return DatatypeFactory.newInstance().newXMLGregorianCalendar(new GregorianCalendar()); - } catch (DatatypeConfigurationException e) { - // this should not happen - throw new SystemException("Cannot create DatatypeFactory (to create XMLGregorianCalendar instance).", e); - } + return XmlTypeConverter.createXMLGregorianCalendar(System.currentTimeMillis()); } /** @@ -233,13 +265,21 @@ void updateNodeObject(OperationResult result) { String nodeName = taskManager.getNodeId(); LOGGER.trace("Updating this node registration:\n{}", cachedLocalNodeObject.debugDumpLazily()); try { + XMLGregorianCalendar currentTime = getCurrentTime(); List> modifications = getPrismContext().deltaFor(NodeType.class) .item(NodeType.F_HOSTNAME).replace(getMyHostname()) .item(NodeType.F_IP_ADDRESS).replaceRealValues(getMyIpAddresses()) - .item(NodeType.F_LAST_CHECK_IN_TIME).replace(getCurrentTime()) + .item(NodeType.F_LAST_CHECK_IN_TIME).replace(currentTime) .asItemDeltas(); + if (shouldRenewSecret(cachedLocalNodeObject.asObjectable())) { + modifications.addAll(getPrismContext().deltaFor(NodeType.class) + .item(NodeType.F_SECRET).replace(generateNodeSecret()) + .item(NodeType.F_SECRET_UPDATE_TIMESTAMP).replace(currentTime) + .asItemDeltas()); + } getRepositoryService().modifyObject(NodeType.class, nodeOid, modifications, result); LOGGER.trace("Node registration successfully updated."); + cachedLocalNodeObject = getRepositoryService().getObject(NodeType.class, nodeOid, null, result); } catch (ObjectNotFoundException e) { LoggingUtils.logUnexpectedException(LOGGER, "Cannot update registration of this node (name {}, oid {}), because it " + "does not exist in repository. It is probably caused by cluster misconfiguration (other " diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/RemoteExecutionHelperImpl.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/RemoteExecutionHelperImpl.java new file mode 100644 index 00000000000..6df067c5a0f --- /dev/null +++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/cluster/RemoteExecutionHelperImpl.java @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2010-2019 Evolveum + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.evolveum.midpoint.task.quartzimpl.cluster; + +import com.evolveum.midpoint.prism.PrismContext; +import com.evolveum.midpoint.prism.PrismObject; +import com.evolveum.midpoint.prism.crypto.Protector; +import com.evolveum.midpoint.prism.query.ObjectQuery; +import com.evolveum.midpoint.repo.api.SystemConfigurationChangeDispatcher; +import com.evolveum.midpoint.repo.api.SystemConfigurationChangeListener; +import com.evolveum.midpoint.schema.SearchResultList; +import com.evolveum.midpoint.schema.result.OperationResult; +import com.evolveum.midpoint.security.api.RestAuthenticationMethod; +import com.evolveum.midpoint.task.api.RemoteExecutionHelper; +import com.evolveum.midpoint.task.api.TaskManager; +import com.evolveum.midpoint.util.exception.*; +import com.evolveum.midpoint.util.logging.LoggingUtils; +import com.evolveum.midpoint.util.logging.Trace; +import com.evolveum.midpoint.util.logging.TraceManager; +import com.evolveum.midpoint.xml.ns._public.common.common_3.InfrastructureConfigurationType; +import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType; +import com.evolveum.midpoint.xml.ns._public.common.common_3.SystemConfigurationType; +import org.apache.commons.lang.StringUtils; +import org.apache.cxf.common.util.Base64Utility; +import org.apache.cxf.jaxrs.client.WebClient; +import org.jetbrains.annotations.Nullable; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.ws.rs.core.Response; +import java.util.function.BiConsumer; + +/** + * Helps with the intra-cluster remote code execution. + */ +@Component +public class RemoteExecutionHelperImpl implements RemoteExecutionHelper, SystemConfigurationChangeListener { + + private static final Trace LOGGER = TraceManager.getTrace(RemoteExecutionHelperImpl.class); + + @Autowired private PrismContext prismContext; + @Autowired private TaskManager taskManager; + @Autowired private Protector protector; + @Autowired private SystemConfigurationChangeDispatcher configurationChangeDispatcher; + + private static final String DOT_CLASS = RemoteExecutionHelperImpl.class.getName() + "."; + + private InfrastructureConfigurationType infrastructureConfiguration; + + @PostConstruct + public void init() { + configurationChangeDispatcher.registerListener(this); + } + + @Override + public boolean update(@Nullable SystemConfigurationType value) { + infrastructureConfiguration = value != null ? value.getInfrastructure() : null; + return true; + } + + @Override + public void execute(BiConsumer code, String context, OperationResult parentResult) { + + OperationResult result = parentResult.createSubresult(DOT_CLASS + "execute"); + String nodeId = taskManager.getNodeId(); + + SearchResultList> otherClusterNodes; + try { + ObjectQuery query = prismContext.queryFor(NodeType.class).not().item(NodeType.F_NODE_IDENTIFIER).eq(nodeId).build(); + otherClusterNodes = taskManager.searchObjects(NodeType.class, query, null, result); + } catch (SchemaException e) { + LOGGER.warn("Couldn't find nodes to execute remote operation on them ({}). Skipping it.", context, e); + result.recordFatalError("Couldn't find nodes to execute remote operation on them (" + context + "). Skipping it.", e); + return; + } + + for (PrismObject node : otherClusterNodes.getList()) { + execute(node.asObjectable(), code, context, result); + } + result.computeStatus(); + } + + @Override + public void execute(NodeType node, BiConsumer code, String context, OperationResult parentResult) { + OperationResult result = parentResult.createSubresult(DOT_CLASS + "execute.node"); + String nodeIdentifier = node.getNodeIdentifier(); + result.addParam("node", nodeIdentifier); + + String httpUrlPattern = infrastructureConfiguration != null + ? infrastructureConfiguration.getIntraClusterHttpUrlPattern() + : null; + + try { + String baseUrl; + if (node.getUrl() != null) { + baseUrl = node.getUrl(); + } else { + if (StringUtils.isBlank(httpUrlPattern)) { + LOGGER.warn("Node URL nor intra-cluster URL pattern specified, skipping remote execution ({}) for node {}", + context, node.getNodeIdentifier()); + return; + } + baseUrl = httpUrlPattern + .replace("$host", node.getHostname()) + .replace("$port", String.valueOf(node.getRestPort())); + } + + String url = baseUrl + "/ws/rest"; + LOGGER.debug("Going to execute '{}' on '{}'", context, url); + WebClient client = WebClient.create(url); + if (node.getSecret() == null) { + throw new SchemaException("No secret known for target node " + node.getNodeIdentifier()); + } + String secret = protector.decryptString(node.getSecret()); + client.header("Authorization", RestAuthenticationMethod.CLUSTER.getMethod() + " " + Base64Utility.encode(secret.getBytes())); + code.accept(client, result); + result.computeStatusIfUnknown(); + } catch (Throwable t) { + result.recordFatalError("Couldn't invoke operation (" + context + ") on node " + nodeIdentifier + ": " + t.getMessage(), t); + LoggingUtils.logUnexpectedException(LOGGER, "Couldn't invoke operation ({}) on node {}", t, context, nodeIdentifier); + } + } + + @Override + public T extractResult(Response response, Class expectedClass) throws SchemaException { + if (response.hasEntity()) { + String body = response.readEntity(String.class); + if (expectedClass == null || Object.class.equals(expectedClass)) { + return prismContext.parserFor(body).parseRealValue(); + } else { + return prismContext.parserFor(body).parseRealValue(expectedClass); + } + } else { + return null; + } + } +} diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/ExecutionManager.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/ExecutionManager.java index 7a65e23df58..853ae89d644 100644 --- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/ExecutionManager.java +++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/ExecutionManager.java @@ -30,12 +30,10 @@ import com.evolveum.midpoint.util.logging.LoggingUtils; import com.evolveum.midpoint.util.logging.Trace; import com.evolveum.midpoint.util.logging.TraceManager; -import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType; +import com.evolveum.midpoint.xml.ns._public.common.common_3.*; -import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskExecutionLimitationsType; -import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskExecutionStatusType; -import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskGroupExecutionLimitationType; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.quartz.*; import java.util.*; @@ -181,15 +179,19 @@ private void addNodeAndTaskInformation(ClusterStatusInformation info, PrismObjec LOGGER.trace("Getting node and task info from the current node ({})", node.asObjectable().getNodeIdentifier()); - List taskInfoList = new ArrayList<>(); - Set tasks = localNodeManager.getLocallyRunningTasks(result); - for (Task task : tasks) { - taskInfoList.add(new ClusterStatusInformation.TaskInfo(task.getOid())); +// List taskInfoList = new ArrayList<>(); +// Set tasks = localNodeManager.getLocallyRunningTasks(result); +// for (Task task : tasks) { +// taskInfoList.add(new ClusterStatusInformation.TaskInfo(task.getOid())); +// } +// node.asObjectable().setExecutionStatus(localNodeManager.getLocalNodeExecutionStatus()); +// node.asObjectable().setErrorStatus(taskManager.getLocalNodeErrorStatus()); + + SchedulerInformationType schedulerInformation = getLocalSchedulerInformation(result); + if (schedulerInformation.getNode() == null) { // shouldn't occur + schedulerInformation.setNode(node.asObjectable()); } - node.asObjectable().setExecutionStatus(localNodeManager.getLocalNodeExecutionStatus()); - node.asObjectable().setErrorStatus(taskManager.getLocalNodeErrorStatus()); - - info.addNodeAndTaskInfo(node.asObjectable(), taskInfoList); + info.addNodeAndTaskInfo(schedulerInformation); } else { // if remote @@ -537,7 +539,74 @@ public boolean synchronizeJobStores(OperationResult result) { public Set getLocallyRunningTasks(OperationResult parentResult) { return localNodeManager.getLocallyRunningTasks(parentResult); + } + + public SchedulerInformationType getLocalSchedulerInformation(OperationResult parentResult) { + OperationResult result = parentResult.createSubresult(DOT_CLASS + "getLocalSchedulerInformation"); + try { + SchedulerInformationType info = new SchedulerInformationType(); + info.setNode(getLocalNode()); + for (Task task : getLocallyRunningTasks(result)) { + info.getExecutingTask().add(task.getTaskType().clone()); + } + result.computeStatus(); + return info; + } catch (Throwable t) { + result.recordFatalError("Couldn't get scheduler information: " + t.getMessage(), t); + throw t; + } + } + public void stopLocalScheduler(OperationResult parentResult) { + OperationResult result = parentResult.createSubresult(DOT_CLASS + "stopLocalScheduler"); + try { + localNodeManager.stopScheduler(result); + result.computeStatus(); + } catch (Throwable t) { + result.recordFatalError("Couldn't stop local scheduler: " + t.getMessage(), t); + throw t; + } + } + + public void startLocalScheduler(OperationResult parentResult) { + OperationResult result = parentResult.createSubresult(DOT_CLASS + "stopLocalScheduler"); + try { + localNodeManager.startScheduler(result); + result.computeStatus(); + } catch (Throwable t) { + result.recordFatalError("Couldn't start local scheduler: " + t.getMessage(), t); + throw t; + } + } + + public void stopLocalTask(String oid, OperationResult parentResult) { + OperationResult result = parentResult.createSubresult(DOT_CLASS + "stopLocalTask"); + try { + localNodeManager.stopLocalTaskRun(oid, result); + // TODO if interrupting is set to WHEN_NECESSARY we should check if the task stops within 5 seconds + // and if not, interrupt the thread. However, what if the task was stopped and then restarted? We + // should check for that situation. So let's ignore conditional interruption for now. + result.computeStatus(); + } catch (Throwable t) { + result.recordFatalError("Couldn't interrupt local task: " + t.getMessage(), t); + throw t; + } + } + + /** + * @return current local node information, updated with local node execution and error status. + * Returned value is fresh, so it can be modified as needed. + */ + @Nullable + public NodeType getLocalNode() { + PrismObject localNode = taskManager.getClusterManager().getLocalNodeObject(); + if (localNode == null) { + return null; + } + NodeType node = localNode.clone().asObjectable(); + node.setExecutionStatus(localNodeManager.getLocalNodeExecutionStatus()); + node.setErrorStatus(taskManager.getLocalNodeErrorStatus()); + return node; } public void initializeLocalScheduler() throws TaskManagerInitializationException { @@ -630,11 +699,6 @@ private void addTriggerNowForTask(Task task, OperationResult result) { } } - // nodeId should not be the current node - void redirectTaskToNode(@NotNull Task task, @NotNull NodeType node, @NotNull OperationResult result) { - remoteNodesManager.redirectTaskToNode(task, node, result); - } - public void pauseTaskJob(Task task, OperationResult parentResult) { OperationResult result = parentResult.createSubresult(DOT_CLASS + "pauseTaskJob"); JobKey jobKey = TaskQuartzImplUtil.createJobKeyForTask(task); diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/LocalNodeManager.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/LocalNodeManager.java index e349d08b13f..5c87653b2c1 100644 --- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/LocalNodeManager.java +++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/LocalNodeManager.java @@ -387,7 +387,7 @@ void stopLocalTaskRun(String oid, OperationResult parentResult) { OperationResult result = parentResult.createSubresult(LocalNodeManager.class.getName() + ".stopLocalTaskRun"); result.addParam("task", oid); - LOGGER.info("Stopping local task " + oid + " run"); + LOGGER.info("Stopping local task {} run", oid); try { getQuartzScheduler().interrupt(TaskQuartzImplUtil.createJobKeyForTaskOid(oid)); diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/RemoteNodesManager.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/RemoteNodesManager.java index bc34ab9b63d..0b2d3cbb974 100644 --- a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/RemoteNodesManager.java +++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/RemoteNodesManager.java @@ -19,36 +19,19 @@ import com.evolveum.midpoint.prism.PrismObject; import com.evolveum.midpoint.schema.result.OperationResult; import com.evolveum.midpoint.schema.result.OperationResultStatus; -import com.evolveum.midpoint.task.api.Task; import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterStatusInformation; import com.evolveum.midpoint.task.quartzimpl.TaskManagerConfiguration; import com.evolveum.midpoint.task.quartzimpl.TaskManagerQuartzImpl; import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterManager; -import com.evolveum.midpoint.util.Holder; +import com.evolveum.midpoint.task.quartzimpl.execution.remote.JmxConnector; +import com.evolveum.midpoint.task.quartzimpl.execution.remote.RestConnector; import com.evolveum.midpoint.util.exception.ObjectNotFoundException; -import com.evolveum.midpoint.util.exception.SystemException; -import com.evolveum.midpoint.util.logging.LoggingUtils; import com.evolveum.midpoint.util.logging.Trace; import com.evolveum.midpoint.util.logging.TraceManager; import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeExecutionStatusType; import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType; -import org.jetbrains.annotations.NotNull; import org.quartz.*; -import org.quartz.core.jmx.QuartzSchedulerMBean; - -import javax.management.JMX; -import javax.management.MBeanServerConnection; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularData; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXServiceURL; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.TimeUnit; /** * Manages remote nodes. Concerned mainly with @@ -65,9 +48,13 @@ public class RemoteNodesManager { public static final JobKey STARTER_JOB_KEY = JobKey.jobKey("STARTER JOB"); private TaskManagerQuartzImpl taskManager; + private JmxConnector jmxConnector; + private RestConnector restConnector; public RemoteNodesManager(TaskManagerQuartzImpl taskManager) { this.taskManager = taskManager; + this.jmxConnector = new JmxConnector(taskManager); + this.restConnector = new RestConnector(taskManager); } /** @@ -77,100 +64,25 @@ public RemoteNodesManager(TaskManagerQuartzImpl taskManager) { * @param node Node which to query */ void addNodeStatusFromRemoteNode(ClusterStatusInformation info, PrismObject node, OperationResult parentResult) { - OperationResult result = parentResult.createSubresult(RemoteNodesManager.class.getName() + ".addNodeStatusFromRemoteNode"); - result.addParam("node", node); - NodeType nodeInfo = node.asObjectable(); - String nodeIdentifier = nodeInfo.getNodeIdentifier(); - String address = nodeInfo.getHostname() + ":" + nodeInfo.getJmxPort(); - - if (!taskManager.getClusterManager().isUp(nodeInfo)) { - nodeInfo.setExecutionStatus(NodeExecutionStatusType.DOWN); - info.addNodeInfo(nodeInfo); - result.recordStatus(OperationResultStatus.SUCCESS, "Node is down"); - return; - } - - JMXConnector connector = null; - + result.addParam("node", nodeInfo.getNodeIdentifier()); try { - MBeanServerConnection mbsc; - - try { - connector = connectViaJmx(address); - mbsc = connector.getMBeanServerConnection(); - } catch (IOException e) { - LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the remote node {} at {}", e, nodeIdentifier, address); - result.recordWarning("Cannot connect to the remote node " + nodeIdentifier + " at " + address + ": " + e.getMessage(), e); - nodeInfo.setExecutionStatus(NodeExecutionStatusType.COMMUNICATION_ERROR); - nodeInfo.setConnectionResult(result.createOperationResultType()); + if (!taskManager.getClusterManager().isUp(nodeInfo)) { + nodeInfo.setExecutionStatus(NodeExecutionStatusType.DOWN); info.addNodeInfo(nodeInfo); + result.recordStatus(OperationResultStatus.SUCCESS, "Node is down"); return; } - try { - - QuartzSchedulerMBean mbeanProxy = getMBeanProxy(nodeIdentifier, mbsc); - - boolean running = false, down = true; - if (mbeanProxy != null) { - try { - running = mbeanProxy.isStarted() && !mbeanProxy.isShutdown() && !mbeanProxy.isStandbyMode(); - down = mbeanProxy.isShutdown(); - } catch (Exception e) { // was: InstanceNotFoundException but it does not seem to work - String message = "Cannot get information from scheduler " + nodeIdentifier + " because it does not exist or is shut down."; - LoggingUtils.logUnexpectedException(LOGGER, message, e); - result.recordWarning(message, e); - nodeInfo.setConnectionResult(result.createOperationResultType()); - } - } else { - result.recordWarning("Cannot get information from node " + nodeIdentifier + " at " + address + " because the JMX object for scheduler cannot be found on that node."); - nodeInfo.setConnectionResult(result.createOperationResultType()); - } - - LOGGER.trace(" - scheduler found = " + (mbeanProxy != null) + ", running = " + running + ", shutdown = " + down); - - if (down) { - nodeInfo.setExecutionStatus(NodeExecutionStatusType.ERROR); // this is a mark of error situation (we expect that during ordinary shutdown the node quickly goes down so there is little probability of getting this status on that occasion) - } else if (running) { - nodeInfo.setExecutionStatus(NodeExecutionStatusType.RUNNING); - } else { - nodeInfo.setExecutionStatus(NodeExecutionStatusType.PAUSED); - } - - List taskInfoList = new ArrayList<>(); - if (mbeanProxy != null) { - TabularData jobs = mbeanProxy.getCurrentlyExecutingJobs(); - for (CompositeData job : (Collection) jobs.values()) { - String oid = (String) job.get("jobName"); - LOGGER.trace(" - task oid = " + oid); - taskInfoList.add(new ClusterStatusInformation.TaskInfo(oid)); - } - } - - if (result.isUnknown()) { - result.recordStatus(OperationResultStatus.SUCCESS, "Node " + nodeIdentifier + ": status = " + nodeInfo.getExecutionStatus() + ", # of running tasks: " + taskInfoList.size()); - } - info.addNodeAndTaskInfo(nodeInfo, taskInfoList); - } catch (Exception e) { // unfortunately, mbeanProxy.getCurrentlyExecutingJobs is declared to throw an Exception - LoggingUtils.logUnexpectedException(LOGGER, "Cannot get information from the remote node {} at {}", e, nodeIdentifier, address); - result.recordWarning("Cannot get information from the remote node " + nodeIdentifier + " at " + address + ": " + e.getMessage(), e); - nodeInfo.setExecutionStatus(NodeExecutionStatusType.COMMUNICATION_ERROR); - nodeInfo.setConnectionResult(result.createOperationResultType()); - info.addNodeInfo(nodeInfo); - return; - } - } - finally { - try { - if (connector != null) { - connector.close(); - } - } catch (IOException e) { - LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, address); + if (taskManager.getConfiguration().isUseJmx()) { + jmxConnector.addNodeStatusUsingJmx(info, nodeInfo, result); + } else { + restConnector.addNodeStatus(info, nodeInfo, result); } - result.recordSuccessIfUnknown(); + result.computeStatus(); + } catch (Throwable t) { + result.recordFatalError("Couldn't get status from remote node", t); } } @@ -184,147 +96,47 @@ private NodeType getNode(String nodeIdentifier, OperationResult result) { } public void stopRemoteScheduler(String nodeIdentifier, OperationResult parentResult) { - OperationResult result = parentResult.createSubresult(RemoteNodesManager.class.getName() + ".stopRemoteScheduler"); - result.addParam("nodeIdentifier", nodeIdentifier); - - NodeType node = getNode(nodeIdentifier, result); - if (node == null) { - return; - } - - String nodeName = node.getNodeIdentifier(); - String address = node.getHostname() + ":" + node.getJmxPort(); - - JMXConnector connector = null; - + result.addParam("node", nodeIdentifier); try { - MBeanServerConnection mbsc; - - try { - connector = connectViaJmx(address); - mbsc = connector.getMBeanServerConnection(); - } catch (IOException e) { - LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the remote node {} at {}", e, nodeName, address); - result.recordFatalError("Cannot connect to the remote node " + nodeName + " at " + address + ": " + e.getMessage(), e); - return; + NodeType node = getNode(nodeIdentifier, result); + if (node == null) { + return; // result is already updated } - try { - QuartzSchedulerMBean mbeanProxy = getMBeanProxy(nodeName, mbsc); - if (mbeanProxy != null) { - mbeanProxy.standby(); - result.recordSuccess(); - } else { - result.recordWarning("Cannot stop the scheduler on node " + nodeName + " at " + address + " because the JMX object for scheduler cannot be found on that node."); - } - return; - } - catch (Exception e) { - LoggingUtils.logUnexpectedException(LOGGER, "Cannot put remote scheduler into standby mode; remote node {} at {}", e, nodeName, address); - result.recordFatalError("Cannot put remote scheduler " + nodeName + " at " + address + " into standby mode: " + e.getMessage()); - return; - } - - } - finally { - try { - if (connector != null) { - connector.close(); - } - } catch (IOException e) { - LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, address); + if (taskManager.getConfiguration().isUseJmx()) { + jmxConnector.stopRemoteScheduler(node, result); + } else { + restConnector.stopRemoteScheduler(node, result); } + result.computeStatus(); + } catch (Throwable t) { + result.recordFatalError("Couldn't stop scheduler on remote node", t); + // todo log the exception? } - } - void startRemoteScheduler(String nodeIdentifier, OperationResult result) { - - NodeType node = getNode(nodeIdentifier, result); - if (node == null) { - return; - } - - String nodeName = node.getNodeIdentifier(); - String address = node.getHostname() + ":" + node.getJmxPort(); - - JMXConnector connector = null; - + public void startRemoteScheduler(String nodeIdentifier, OperationResult parentResult) { + OperationResult result = parentResult.createSubresult(RemoteNodesManager.class.getName() + ".startRemoteScheduler"); + result.addParam("node", nodeIdentifier); try { - MBeanServerConnection mbsc; - - try { - connector = connectViaJmx(address); - mbsc = connector.getMBeanServerConnection(); - } catch (IOException e) { - LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the remote node {} at {}", e, nodeName, address); - result.recordFatalError("Cannot connect to the remote node " + nodeName + " at " + address + ": " + e.getMessage(), e); - return; - } - - try { - QuartzSchedulerMBean mbeanProxy = getMBeanProxy(nodeName, mbsc); - if (mbeanProxy != null) { - mbeanProxy.start(); - result.recordSuccessIfUnknown(); - } else { - result.recordFatalError("Cannot start remote scheduler " + nodeName + " at " + address + " because it cannot be found on that node."); - } - return; - } - catch (Exception e) { - LoggingUtils.logUnexpectedException(LOGGER, "Cannot start remote scheduler; remote node {} at {}", e, nodeName, address); - result.recordFatalError("Cannot start remote scheduler " + nodeName + " at " + address + ": " + e.getMessage()); - return; + NodeType node = getNode(nodeIdentifier, result); + if (node == null) { + return; // result is already updated } - } - finally { - try { - if (connector != null) { - connector.close(); - } - } catch (IOException e) { - LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, address); - } - } - - } - - private QuartzSchedulerMBean getMBeanProxy(String nodeName, MBeanServerConnection mbsc) throws MalformedObjectNameException { - String mbeanNameAsString = "quartz:type=QuartzScheduler,name=midPointScheduler,instance=" + nodeName; - ObjectName mbeanName = new ObjectName(mbeanNameAsString); - - try { - if (mbsc.isRegistered(mbeanName)) { - return JMX.newMBeanProxy(mbsc, mbeanName, QuartzSchedulerMBean.class, true); + if (taskManager.getConfiguration().isUseJmx()) { + jmxConnector.startRemoteScheduler(node, result); } else { - LOGGER.trace("MBean " + mbeanNameAsString + " is not registered at " + nodeName); - return null; + restConnector.startRemoteScheduler(node, result); } - } catch (IOException e) { - LoggingUtils.logUnexpectedException(LOGGER, "Cannot communicate with remote node via JMX", e); - return null; + result.computeStatus(); + } catch (Throwable t) { + result.recordFatalError("Couldn't start scheduler on remote node", t); + // todo log the exception? } } - private JMXConnector connectViaJmx(String address) throws IOException { - - JMXServiceURL url = - new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + address + "/jmxrmi"); - - Map env = new HashMap<>(); - String jmxUsername = taskManager.getConfiguration().getJmxUsername(); - String jmxPassword = taskManager.getConfiguration().getJmxPassword(); - if (jmxUsername != null || jmxPassword != null) { - String[] creds = { jmxUsername, jmxPassword }; - env.put(JMXConnector.CREDENTIALS, creds); - } - return JmxClient.connectWithTimeout(url, env, - taskManager.getConfiguration().getJmxConnectTimeout(), TimeUnit.SECONDS); - } - private TaskManagerConfiguration getConfiguration() { return taskManager.getConfiguration(); } @@ -340,106 +152,22 @@ void stopRemoteTaskRun(String oid, NodeType node, OperationResult parentResult) result.addParam("oid", oid); result.addParam("node", node.toString()); - LOGGER.debug("Interrupting task " + oid + " running at " + getClusterManager().dumpNodeInfo(node)); - - String nodeName = node.getNodeIdentifier(); - String address = node.getHostname() + ":" + node.getJmxPort(); - - Holder connectorHolder = new Holder<>(); - - try { - QuartzSchedulerMBean mbeanProxy = getSchedulerBean(node, connectorHolder, result); - if (mbeanProxy != null) { - try { - mbeanProxy.interruptJob(oid, Scheduler.DEFAULT_GROUP); - LOGGER.debug("Successfully signalled shutdown to task " + oid + " running at " + getClusterManager().dumpNodeInfo(node)); - result.recordSuccessIfUnknown(); - } catch (Exception e) { // necessary because of mbeanProxy - String message = "Cannot signal task "+oid+" interruption to remote node "+nodeName+" at "+address; - LoggingUtils.logUnexpectedException(LOGGER, message, e); - result.recordFatalError(message + ":" + e.getMessage(), e); - } - } - } finally { - closeJmxConnection(connectorHolder, address); - } - } - - private void closeJmxConnection(Holder connectorHolder, String nodeInfo) { - try { - if (!connectorHolder.isEmpty()) { - connectorHolder.getValue().close(); - } - } catch (IOException e) { - LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, nodeInfo); - } - } - - private QuartzSchedulerMBean getSchedulerBean(NodeType node, Holder connectorHolder, - OperationResult result) { - String nodeName = node.getNodeIdentifier(); - String address = node.getHostname() + ":" + node.getJmxPort(); - try { - JMXConnector connector = connectViaJmx(address); - connectorHolder.setValue(connector); - MBeanServerConnection serverConnection = connector.getMBeanServerConnection(); - QuartzSchedulerMBean bean = getMBeanProxy(nodeName, serverConnection); - if (bean == null) { - String message = "Cannot connect to the Quartz Scheduler bean at remote node " + nodeName + " at " - + address + " because the JMX object for scheduler cannot be found on that node."; - LOGGER.warn("{}", message); - result.recordFatalError(message); - } - return bean; - } catch (IOException|MalformedObjectNameException e) { - LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the quartz scheduler bean at remote node {} at {}", e, nodeName, address); - result.recordFatalError("Cannot connect to the quartz scheduler bean at remote node " + nodeName + " at " + address + ": " + e.getMessage(), e); - return null; - } + LOGGER.debug("Interrupting task {} running at {}", oid, getClusterManager().dumpNodeInfo(node)); + try { + if (taskManager.getConfiguration().isUseJmx()) { + jmxConnector.stopRemoteTaskRun(oid, node, result); + } else { + restConnector.stopRemoteTask(oid, node, result); + } + result.computeStatus(); + } catch (Throwable t) { + result.recordFatalError("Couldn't stop task running on remote node", t); + // todo log the exception? + } } private ClusterManager getClusterManager() { return taskManager.getClusterManager(); } - public void redirectTaskToNode(@NotNull Task task, @NotNull NodeType node, @NotNull OperationResult result) { - LOGGER.trace("Trying to schedule task {} on {}", task, node.getNodeIdentifier()); - Holder connectorHolder = new Holder<>(); - try { - QuartzSchedulerMBean mbeanProxy = getSchedulerBean(node, connectorHolder, result); - if (mbeanProxy != null) { - try { - createStarterJobIfNeeded(); - mbeanProxy.triggerJob(STARTER_JOB_KEY.getName(), STARTER_JOB_KEY.getGroup(), - Collections.singletonMap(JobStarter.TASK_OID, task.getOid())); - LOGGER.debug("Successfully requested start of " + task + " at " + getClusterManager().dumpNodeInfo(node)); - result.recordSuccessIfUnknown(); - } catch (Exception e) { // necessary because of mbeanProxy - String message = "Cannot schedule " + task + " at " + getClusterManager().dumpNodeInfo(node); - LoggingUtils.logUnexpectedException(LOGGER, message, e); - result.recordFatalError(message + ":" + e.getMessage(), e); - } - } else { - LOGGER.warn("Couldn't obtain Quartz MBean so couldn't reschedule task {} on {}", task, node.getNodeIdentifier()); - } - } finally { - closeJmxConnection(connectorHolder, getClusterManager().dumpNodeInfo(node)); - } - } - - private void createStarterJobIfNeeded() { - Scheduler scheduler = taskManager.getExecutionManager().getQuartzScheduler(); - try { - if (!scheduler.checkExists(STARTER_JOB_KEY)) { - JobDetail starterJob = JobBuilder.newJob(JobStarter.class) - .withIdentity(STARTER_JOB_KEY) - .storeDurably() - .build(); - scheduler.addJob(starterJob, true); - } - } catch (SchedulerException e) { - throw new SystemException("Starter job couldn't be created", e); - } - } - } diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/JmxConnector.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/JmxConnector.java new file mode 100644 index 00000000000..1b320a44c03 --- /dev/null +++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/JmxConnector.java @@ -0,0 +1,360 @@ +/* + * Copyright (c) 2010-2019 Evolveum + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.evolveum.midpoint.task.quartzimpl.execution.remote; + +import com.evolveum.midpoint.schema.result.OperationResult; +import com.evolveum.midpoint.schema.result.OperationResultStatus; +import com.evolveum.midpoint.task.quartzimpl.TaskManagerConfiguration; +import com.evolveum.midpoint.task.quartzimpl.TaskManagerQuartzImpl; +import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterManager; +import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterStatusInformation; +import com.evolveum.midpoint.task.quartzimpl.execution.ExecutionManager; +import com.evolveum.midpoint.task.quartzimpl.execution.JmxClient; +import com.evolveum.midpoint.util.Holder; +import com.evolveum.midpoint.util.exception.ObjectNotFoundException; +import com.evolveum.midpoint.util.logging.LoggingUtils; +import com.evolveum.midpoint.util.logging.Trace; +import com.evolveum.midpoint.util.logging.TraceManager; +import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeExecutionStatusType; +import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType; +import org.quartz.Scheduler; +import org.quartz.core.jmx.QuartzSchedulerMBean; + +import javax.management.JMX; +import javax.management.MBeanServerConnection; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXServiceURL; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * Manages remote nodes using JMX. + * + * @author Pavol Mederly + */ + +public class JmxConnector { + + private static final transient Trace LOGGER = TraceManager.getTrace(JmxConnector.class); + + private TaskManagerQuartzImpl taskManager; + + public JmxConnector(TaskManagerQuartzImpl taskManager) { + this.taskManager = taskManager; + } + + public void addNodeStatusUsingJmx(ClusterStatusInformation info, NodeType nodeInfo, OperationResult result) { + String nodeIdentifier = nodeInfo.getNodeIdentifier(); + String address = nodeInfo.getHostname() + ":" + nodeInfo.getJmxPort(); + JMXConnector connector = null; + + try { + MBeanServerConnection mbsc; + + try { + connector = connectViaJmx(address); + mbsc = connector.getMBeanServerConnection(); + } catch (IOException e) { + LoggingUtils + .logUnexpectedException(LOGGER, "Cannot connect to the remote node {} at {}", e, nodeIdentifier, address); + result.recordWarning("Cannot connect to the remote node " + nodeIdentifier + " at " + address + ": " + e.getMessage(), e); + nodeInfo.setExecutionStatus(NodeExecutionStatusType.COMMUNICATION_ERROR); + nodeInfo.setConnectionResult(result.createOperationResultType()); + info.addNodeInfo(nodeInfo); + return; + } + + try { + + QuartzSchedulerMBean mbeanProxy = getMBeanProxy(nodeIdentifier, mbsc); + + boolean running = false, down = true; + if (mbeanProxy != null) { + try { + running = mbeanProxy.isStarted() && !mbeanProxy.isShutdown() && !mbeanProxy.isStandbyMode(); + down = mbeanProxy.isShutdown(); + } catch (Exception e) { // was: InstanceNotFoundException but it does not seem to work + String message = "Cannot get information from scheduler " + nodeIdentifier + " because it does not exist or is shut down."; + LoggingUtils.logUnexpectedException(LOGGER, message, e); + result.recordWarning(message, e); + nodeInfo.setConnectionResult(result.createOperationResultType()); + } + } else { + result.recordWarning("Cannot get information from node " + nodeIdentifier + " at " + address + " because the JMX object for scheduler cannot be found on that node."); + nodeInfo.setConnectionResult(result.createOperationResultType()); + } + + LOGGER.trace(" - scheduler found = " + (mbeanProxy != null) + ", running = " + running + ", shutdown = " + down); + + if (down) { + nodeInfo.setExecutionStatus(NodeExecutionStatusType.ERROR); // this is a mark of error situation (we expect that during ordinary shutdown the node quickly goes down so there is little probability of getting this status on that occasion) + } else if (running) { + nodeInfo.setExecutionStatus(NodeExecutionStatusType.RUNNING); + } else { + nodeInfo.setExecutionStatus(NodeExecutionStatusType.PAUSED); + } + + List taskInfoList = new ArrayList<>(); + if (mbeanProxy != null) { + TabularData jobs = mbeanProxy.getCurrentlyExecutingJobs(); + for (CompositeData job : (Collection) jobs.values()) { + String oid = (String) job.get("jobName"); + LOGGER.trace(" - task oid = " + oid); + taskInfoList.add(new ClusterStatusInformation.TaskInfo(oid)); + } + } + + if (result.isUnknown()) { + result.recordStatus(OperationResultStatus.SUCCESS, "Node " + nodeIdentifier + ": status = " + nodeInfo.getExecutionStatus() + ", # of running tasks: " + taskInfoList.size()); + } + info.addNodeAndTaskInfo(nodeInfo, taskInfoList); + } catch (Exception e) { // unfortunately, mbeanProxy.getCurrentlyExecutingJobs is declared to throw an Exception + LoggingUtils.logUnexpectedException(LOGGER, "Cannot get information from the remote node {} at {}", e, nodeIdentifier, address); + result.recordWarning("Cannot get information from the remote node " + nodeIdentifier + " at " + address + ": " + e.getMessage(), e); + nodeInfo.setExecutionStatus(NodeExecutionStatusType.COMMUNICATION_ERROR); + nodeInfo.setConnectionResult(result.createOperationResultType()); + info.addNodeInfo(nodeInfo); + } + } + finally { + try { + if (connector != null) { + connector.close(); + } + } catch (IOException e) { + LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, address); + } + result.recordSuccessIfUnknown(); // TODO - ok? + } + } + + private NodeType getNode(String nodeIdentifier, OperationResult result) { + try { + return taskManager.getClusterManager().getNodeById(nodeIdentifier, result).asObjectable(); + } catch (ObjectNotFoundException e) { + result.recordFatalError("A node with identifier " + nodeIdentifier + " does not exist."); + return null; + } + } + + public void stopRemoteScheduler(NodeType node, OperationResult result) { + + String nodeName = node.getNodeIdentifier(); + String address = node.getHostname() + ":" + node.getJmxPort(); + + JMXConnector connector = null; + + try { + MBeanServerConnection mbsc; + + try { + connector = connectViaJmx(address); + mbsc = connector.getMBeanServerConnection(); + } catch (IOException e) { + LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the remote node {} at {}", e, nodeName, address); + result.recordFatalError("Cannot connect to the remote node " + nodeName + " at " + address + ": " + e.getMessage(), e); + return; + } + + try { + QuartzSchedulerMBean mbeanProxy = getMBeanProxy(nodeName, mbsc); + if (mbeanProxy != null) { + mbeanProxy.standby(); + result.recordSuccess(); + } else { + result.recordWarning("Cannot stop the scheduler on node " + nodeName + " at " + address + " because the JMX object for scheduler cannot be found on that node."); + } + } + catch (Exception e) { + LoggingUtils.logUnexpectedException(LOGGER, "Cannot put remote scheduler into standby mode; remote node {} at {}", e, nodeName, address); + result.recordFatalError("Cannot put remote scheduler " + nodeName + " at " + address + " into standby mode: " + e.getMessage()); + } + } + finally { + try { + if (connector != null) { + connector.close(); + } + } catch (IOException e) { + LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, address); + } + } + + } + + public void startRemoteScheduler(NodeType node, OperationResult result) { + + String nodeName = node.getNodeIdentifier(); + String address = node.getHostname() + ":" + node.getJmxPort(); + + JMXConnector connector = null; + + try { + MBeanServerConnection mbsc; + + try { + connector = connectViaJmx(address); + mbsc = connector.getMBeanServerConnection(); + } catch (IOException e) { + LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the remote node {} at {}", e, nodeName, address); + result.recordFatalError("Cannot connect to the remote node " + nodeName + " at " + address + ": " + e.getMessage(), e); + return; + } + + try { + QuartzSchedulerMBean mbeanProxy = getMBeanProxy(nodeName, mbsc); + if (mbeanProxy != null) { + mbeanProxy.start(); + result.recordSuccessIfUnknown(); + } else { + result.recordFatalError("Cannot start remote scheduler " + nodeName + " at " + address + " because it cannot be found on that node."); + } + } + catch (Exception e) { + LoggingUtils.logUnexpectedException(LOGGER, "Cannot start remote scheduler; remote node {} at {}", e, nodeName, address); + result.recordFatalError("Cannot start remote scheduler " + nodeName + " at " + address + ": " + e.getMessage()); + } + + } + finally { + try { + if (connector != null) { + connector.close(); + } + } catch (IOException e) { + LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, address); + } + } + + } + + private QuartzSchedulerMBean getMBeanProxy(String nodeName, MBeanServerConnection mbsc) throws MalformedObjectNameException { + String mbeanNameAsString = "quartz:type=QuartzScheduler,name=midPointScheduler,instance=" + nodeName; + ObjectName mbeanName = new ObjectName(mbeanNameAsString); + + try { + if (mbsc.isRegistered(mbeanName)) { + return JMX.newMBeanProxy(mbsc, mbeanName, QuartzSchedulerMBean.class, true); + } else { + LOGGER.trace("MBean " + mbeanNameAsString + " is not registered at " + nodeName); + return null; + } + } catch (IOException e) { + LoggingUtils.logUnexpectedException(LOGGER, "Cannot communicate with remote node via JMX", e); + return null; + } + } + + private JMXConnector connectViaJmx(String address) throws IOException { + + JMXServiceURL url = + new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + address + "/jmxrmi"); + + Map env = new HashMap<>(); + String jmxUsername = taskManager.getConfiguration().getJmxUsername(); + String jmxPassword = taskManager.getConfiguration().getJmxPassword(); + if (jmxUsername != null || jmxPassword != null) { + String[] creds = { jmxUsername, jmxPassword }; + env.put(JMXConnector.CREDENTIALS, creds); + } + return JmxClient.connectWithTimeout(url, env, + taskManager.getConfiguration().getJmxConnectTimeout(), TimeUnit.SECONDS); + } + + private TaskManagerConfiguration getConfiguration() { + return taskManager.getConfiguration(); + } + + private ExecutionManager getGlobalExecutionManager() { + return taskManager.getExecutionManager(); + } + + // the task should be really running + public void stopRemoteTaskRun(String oid, NodeType node, OperationResult parentResult) { + + OperationResult result = parentResult.createSubresult(JmxConnector.class.getName() + ".stopRemoteTaskRun"); + result.addParam("oid", oid); + result.addParam("node", node.toString()); + + LOGGER.debug("Interrupting task " + oid + " running at " + getClusterManager().dumpNodeInfo(node)); + + String nodeName = node.getNodeIdentifier(); + String address = node.getHostname() + ":" + node.getJmxPort(); + + Holder connectorHolder = new Holder<>(); + + try { + QuartzSchedulerMBean mbeanProxy = getSchedulerBean(node, connectorHolder, result); + if (mbeanProxy != null) { + try { + mbeanProxy.interruptJob(oid, Scheduler.DEFAULT_GROUP); + LOGGER.debug("Successfully signalled shutdown to task " + oid + " running at " + getClusterManager().dumpNodeInfo(node)); + result.recordSuccessIfUnknown(); + } catch (Exception e) { // necessary because of mbeanProxy + String message = "Cannot signal task "+oid+" interruption to remote node "+nodeName+" at "+address; + LoggingUtils.logUnexpectedException(LOGGER, message, e); + result.recordFatalError(message + ":" + e.getMessage(), e); + } + } + } finally { + closeJmxConnection(connectorHolder, address); + } + } + + private void closeJmxConnection(Holder connectorHolder, String nodeInfo) { + try { + if (!connectorHolder.isEmpty()) { + connectorHolder.getValue().close(); + } + } catch (IOException e) { + LoggingUtils.logUnexpectedException(LOGGER, "Cannot close JMX connection to {}", e, nodeInfo); + } + } + + private QuartzSchedulerMBean getSchedulerBean(NodeType node, Holder connectorHolder, + OperationResult result) { + String nodeName = node.getNodeIdentifier(); + String address = node.getHostname() + ":" + node.getJmxPort(); + try { + JMXConnector connector = connectViaJmx(address); + connectorHolder.setValue(connector); + MBeanServerConnection serverConnection = connector.getMBeanServerConnection(); + QuartzSchedulerMBean bean = getMBeanProxy(nodeName, serverConnection); + if (bean == null) { + String message = "Cannot connect to the Quartz Scheduler bean at remote node " + nodeName + " at " + + address + " because the JMX object for scheduler cannot be found on that node."; + LOGGER.warn("{}", message); + result.recordFatalError(message); + } + return bean; + } catch (IOException|MalformedObjectNameException e) { + LoggingUtils.logUnexpectedException(LOGGER, "Cannot connect to the quartz scheduler bean at remote node {} at {}", e, nodeName, address); + result.recordFatalError("Cannot connect to the quartz scheduler bean at remote node " + nodeName + " at " + address + ": " + e.getMessage(), e); + return null; + } + } + + private ClusterManager getClusterManager() { + return taskManager.getClusterManager(); + } + +} diff --git a/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/RestConnector.java b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/RestConnector.java new file mode 100644 index 00000000000..ab2d3cbca08 --- /dev/null +++ b/repo/task-quartz-impl/src/main/java/com/evolveum/midpoint/task/quartzimpl/execution/remote/RestConnector.java @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2010-2019 Evolveum + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.evolveum.midpoint.task.quartzimpl.execution.remote; + +import com.evolveum.midpoint.schema.result.OperationResult; +import com.evolveum.midpoint.task.api.RemoteExecutionHelper; +import com.evolveum.midpoint.task.quartzimpl.TaskManagerConfiguration; +import com.evolveum.midpoint.task.quartzimpl.TaskManagerQuartzImpl; +import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterManager; +import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterStatusInformation; +import com.evolveum.midpoint.task.quartzimpl.execution.ExecutionManager; +import com.evolveum.midpoint.util.exception.ObjectNotFoundException; +import com.evolveum.midpoint.util.exception.SchemaException; +import com.evolveum.midpoint.util.logging.LoggingUtils; +import com.evolveum.midpoint.util.logging.Trace; +import com.evolveum.midpoint.util.logging.TraceManager; +import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType; +import com.evolveum.midpoint.xml.ns._public.common.common_3.SchedulerInformationType; + +import javax.ws.rs.core.Response; + +/** + * Manages remote nodes using REST. + */ +public class RestConnector { + + private static final transient Trace LOGGER = TraceManager.getTrace(RestConnector.class); + + private TaskManagerQuartzImpl taskManager; + + public RestConnector(TaskManagerQuartzImpl taskManager) { + this.taskManager = taskManager; + } + + public void addNodeStatus(ClusterStatusInformation info, NodeType nodeInfo, OperationResult result) { + RemoteExecutionHelper remoteExecutionHelper = taskManager.getRemoteExecutionHelper(); + remoteExecutionHelper.execute(nodeInfo, (client, result1) -> { + client.path("/scheduler/information"); + Response response = client.get(); + Response.StatusType statusInfo = response.getStatusInfo(); + LOGGER.debug("Querying remote scheduler information on {} finished with status {}: {}", nodeInfo.getNodeIdentifier(), + statusInfo.getStatusCode(), statusInfo.getReasonPhrase()); + if (statusInfo.getFamily() == Response.Status.Family.SUCCESSFUL) { + try { + SchedulerInformationType schedulerInfo = remoteExecutionHelper + .extractResult(response, SchedulerInformationType.class); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Received from {}:\n{}", nodeInfo.getNodeIdentifier(), + taskManager.getPrismContext().xmlSerializer().serializeRealValue(schedulerInfo)); + } + info.addNodeAndTaskInfo(schedulerInfo); + } catch (SchemaException e) { + LoggingUtils.logUnexpectedException(LOGGER, "Couldn't parse scheduler information from remote node {}", e, nodeInfo.getNodeIdentifier()); + } + } else { + LOGGER.warn("Querying remote scheduler information on {} finished with status {}: {}", nodeInfo.getNodeIdentifier(), + statusInfo.getStatusCode(), statusInfo.getReasonPhrase()); + } + response.close(); + }, "get scheduler information", result); + } + + private NodeType getNode(String nodeIdentifier, OperationResult result) { + try { + return taskManager.getClusterManager().getNodeById(nodeIdentifier, result).asObjectable(); + } catch (ObjectNotFoundException e) { + result.recordFatalError("A node with identifier " + nodeIdentifier + " does not exist."); + return null; + } + } + + public void stopRemoteScheduler(NodeType node, OperationResult result) { + RemoteExecutionHelper remoteExecutionHelper = taskManager.getRemoteExecutionHelper(); + remoteExecutionHelper.execute(node, (client, result1) -> { + client.path("/scheduler/stop"); + Response response = client.post(null); + Response.StatusType statusInfo = response.getStatusInfo(); + LOGGER.debug("Stopping remote scheduler on {} finished with status {}: {}", node.getNodeIdentifier(), + statusInfo.getStatusCode(), statusInfo.getReasonPhrase()); + if (statusInfo.getFamily() != Response.Status.Family.SUCCESSFUL) { + LOGGER.warn("Stopping scheduler on {} finished with status {}: {}", node.getNodeIdentifier(), + statusInfo.getStatusCode(), statusInfo.getReasonPhrase()); + result1.recordFatalError("Stopping remote scheduler finished with status " + statusInfo.getStatusCode() + ": " + statusInfo.getReasonPhrase()); + } + response.close(); + }, "stop scheduler", result); + } + + public void startRemoteScheduler(NodeType node, OperationResult result) { + RemoteExecutionHelper remoteExecutionHelper = taskManager.getRemoteExecutionHelper(); + remoteExecutionHelper.execute(node, (client, result1) -> { + client.path("/scheduler/start"); + Response response = client.post(null); + Response.StatusType statusInfo = response.getStatusInfo(); + LOGGER.debug("Starting remote scheduler on {} finished with status {}: {}", node.getNodeIdentifier(), + statusInfo.getStatusCode(), statusInfo.getReasonPhrase()); + if (statusInfo.getFamily() != Response.Status.Family.SUCCESSFUL) { + LOGGER.warn("Starting scheduler on {} finished with status {}: {}", node.getNodeIdentifier(), + statusInfo.getStatusCode(), statusInfo.getReasonPhrase()); + result1.recordFatalError("Starting remote scheduler finished with status " + statusInfo.getStatusCode() + ": " + statusInfo.getReasonPhrase()); + } + response.close(); + }, "start scheduler", result); + } + + public void stopRemoteTask(String oid, NodeType node, OperationResult result) { + RemoteExecutionHelper remoteExecutionHelper = taskManager.getRemoteExecutionHelper(); + remoteExecutionHelper.execute(node, (client, result1) -> { + client.path("/tasks/" + oid + "/stop"); + Response response = client.post(null); + Response.StatusType statusInfo = response.getStatusInfo(); + LOGGER.debug("Stopping task {} on {} finished with status {}: {}", oid, node.getNodeIdentifier(), + statusInfo.getStatusCode(), statusInfo.getReasonPhrase()); + if (statusInfo.getFamily() != Response.Status.Family.SUCCESSFUL) { + LOGGER.warn("Stopping task {} on {} finished with status {}: {}", oid, node.getNodeIdentifier(), + statusInfo.getStatusCode(), statusInfo.getReasonPhrase()); + result1.recordFatalError("Stopping remote task finished with status " + statusInfo.getStatusCode() + ": " + statusInfo.getReasonPhrase()); + } + response.close(); + }, "stop task", result); + } + + private TaskManagerConfiguration getConfiguration() { + return taskManager.getConfiguration(); + } + + private ExecutionManager getGlobalExecutionManager() { + return taskManager.getExecutionManager(); + } + + private ClusterManager getClusterManager() { + return taskManager.getClusterManager(); + } + +} diff --git a/samples/tasks/task-scheduling.xml b/samples/tasks/task-scheduling.xml index ea3c0cf3ea6..adbf164527b 100644 --- a/samples/tasks/task-scheduling.xml +++ b/samples/tasks/task-scheduling.xml @@ -132,7 +132,7 @@ these tasks via admin GUI. 20000 3 - 10000000-0000-0000-0000-123450000005 + 10000000-0000-0000-0000-123450000006 runnable diff --git a/testing/sanity/src/test/java/com/evolveum/midpoint/testing/sanity/TestSanity.java b/testing/sanity/src/test/java/com/evolveum/midpoint/testing/sanity/TestSanity.java index 7e96b39da37..3cf1ca7cff1 100644 --- a/testing/sanity/src/test/java/com/evolveum/midpoint/testing/sanity/TestSanity.java +++ b/testing/sanity/src/test/java/com/evolveum/midpoint/testing/sanity/TestSanity.java @@ -3975,12 +3975,8 @@ public void test999Shutdown() throws Exception { taskManager.shutdown(); waitFor("waiting for task manager shutdown", new Checker() { @Override - public boolean check() throws CommonException { - try { - return taskManager.getLocallyRunningTasks(new OperationResult("dummy")).isEmpty(); - } catch (TaskManagerException e) { - throw new SystemException(e); - } + public boolean check() { + return taskManager.getLocallyRunningTasks(new OperationResult("dummy")).isEmpty(); } @Override