Skip to content

Commit

Permalink
Add REST support for task manager communication
Browse files Browse the repository at this point in the history
This is a preliminary implementation of MID-5058.

Major changes:
1) task manager communication can be switched between JMX and REST
2) intra-cluster authentication is now protected by a node-level secret
3) RemoteExecutor is a helper class to enable making REST intra-cluster
calls

Major TODOs:
- separate internally used functionality from the main REST service
- reduce amount of information sent between nodes
  • Loading branch information
mederly committed Feb 1, 2019
1 parent 8a1dc80 commit 4c453ad
Show file tree
Hide file tree
Showing 35 changed files with 1,388 additions and 549 deletions.
Expand Up @@ -141,7 +141,8 @@ private boolean checkRequest(HttpServletRequest request, HttpServletResponse res
return false;
}

if (!nodeAuthenticator.authenticate(request.getRemoteHost(), request.getRemoteAddr(), operation)) {
// we temporarily allow authentication without credentials
if (!nodeAuthenticator.authenticate(request.getRemoteHost(), request.getRemoteAddr(), null, operation)) {
LOGGER.debug("Unknown node, host: {} ", request.getRemoteHost());
response.setStatus(HttpServletResponse.SC_FORBIDDEN);
return false;
Expand Down
Expand Up @@ -2002,7 +2002,7 @@
infrastructure/intraClusterHttpUrlPattern system configuration property.

Use only if really necessary. Support for this item is currently limited
to clusterwide cache invalidation.
to clusterwide cache invalidation and managing node task scheduler.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
Expand All @@ -2013,6 +2013,16 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="restPort" type="xsd:int" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Port at which this node can be contacted via REST.
</xsd:documentation>
<xsd:appinfo>
<a:since>4.0</a:since>
</xsd:appinfo>
</xsd:annotation>
</xsd:element>
<xsd:element name="jmxPort" type="xsd:int" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Expand Down Expand Up @@ -2084,6 +2094,26 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="secret" type="t:ProtectedStringType" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The secret used for intra-cluster authentication.
</xsd:documentation>
<xsd:appinfo>
<a:since>4.0</a:since>
</xsd:appinfo>
</xsd:annotation>
</xsd:element>
<xsd:element name="secretUpdateTimestamp" type="xsd:dateTime" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
When was the secret created or last changed.
</xsd:documentation>
<xsd:appinfo>
<a:since>4.0</a:since>
</xsd:appinfo>
</xsd:annotation>
</xsd:element>
<xsd:element name="taskExecutionLimitations" type="tns:TaskExecutionLimitationsType" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Expand Down Expand Up @@ -3617,6 +3647,34 @@
</xsd:sequence>
</xsd:complexType>

<xsd:complexType name="SchedulerInformationType">
<xsd:annotation>
<xsd:documentation>
Describes the state of the (local) scheduler.
</xsd:documentation>
<xsd:appinfo>
<a:since>4.0</a:since>
</xsd:appinfo>
</xsd:annotation>
<xsd:sequence>
<xsd:element name="node" type="tns:NodeType" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Information on the current node.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="executingTask" type="tns:TaskType" minOccurs="0" maxOccurs="unbounded">
<xsd:annotation>
<xsd:documentation>
Locally executing tasks.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
</xsd:complexType>
<xsd:element name="schedulerInformation" type="tns:SchedulerInformationType"/>

<xsd:complexType name="AssignmentHolderType" abstract="true">
<xsd:annotation>
<xsd:documentation>
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.util.exception.*;
import com.evolveum.midpoint.xml.ns._public.common.common_3.SchedulerInformationType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;

import java.util.Collection;
Expand Down Expand Up @@ -101,6 +102,9 @@ public interface TaskService {
* @return
*/
PrismObject<TaskType> getTaskByIdentifier(String identifier, Collection<SelectorOptions<GetOperationOptions>> options, Task operationTask, OperationResult parentResult) throws SchemaException, ObjectNotFoundException, ConfigurationException, SecurityViolationException, ExpressionEvaluationException, CommunicationException;

SchedulerInformationType getLocalSchedulerInformation(Task operationTask, OperationResult parentResult) throws CommunicationException,
ObjectNotFoundException, SchemaException, SecurityViolationException, ConfigurationException, ExpressionEvaluationException;
//endregion

//region Node-level operations
Expand Down Expand Up @@ -142,6 +146,8 @@ public interface TaskService {
*/
void stopSchedulers(Collection<String> nodeIdentifiers, Task operationTask, OperationResult parentResult) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException;

void stopLocalScheduler(Task operationTask, OperationResult parentResult) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException;

/**
* Stops a set of schedulers (on their nodes) and tasks that are executing on these nodes.
*
Expand All @@ -163,6 +169,10 @@ public interface TaskService {
* @return true if the operation succeeded; false otherwise.
*/
void startSchedulers(Collection<String> nodeIdentifiers, Task operationTask, OperationResult result) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException;

void startLocalScheduler(Task operationTask, OperationResult result) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException;

void stopLocalTask(String oid, Task operationTask, OperationResult result) throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException, CommunicationException, ConfigurationException;
//endregion

//region Miscellaneous
Expand Down
Expand Up @@ -2,6 +2,6 @@

public interface NodeAuthenticationEvaluator {

boolean authenticate(String remoteName, String remoteAddress, String operation);
boolean authenticate(String remoteName, String remoteAddress, String credentials, String operation);

}
6 changes: 5 additions & 1 deletion model/model-common/pom.xml
Expand Up @@ -160,7 +160,11 @@
<groupId>org.jetbrains</groupId>
<artifactId>annotations-java5</artifactId>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>

<!-- Testing dependecies -->
<dependency>
<groupId>org.testng</groupId>
Expand Down
Expand Up @@ -15,52 +15,33 @@
*/
package com.evolveum.midpoint.model.impl;

import javax.annotation.PostConstruct;
import javax.ws.rs.core.Response;

import org.apache.commons.lang.StringUtils;
import org.apache.cxf.jaxrs.client.WebClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;

import com.evolveum.midpoint.model.api.ModelInteractionService;
import com.evolveum.midpoint.model.api.ModelService;
import com.evolveum.midpoint.model.impl.security.NodeAuthenticationToken;
import com.evolveum.midpoint.model.impl.security.RestAuthenticationMethod;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.query.ObjectQuery;
import com.evolveum.midpoint.repo.api.CacheDispatcher;
import com.evolveum.midpoint.repo.api.CacheListener;
import com.evolveum.midpoint.schema.SearchResultList;
import com.evolveum.midpoint.schema.constants.ObjectTypes;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.api.RemoteExecutionHelper;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskManager;
import com.evolveum.midpoint.util.exception.CommunicationException;
import com.evolveum.midpoint.util.exception.ConfigurationException;
import com.evolveum.midpoint.util.exception.ExpressionEvaluationException;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.exception.SecurityViolationException;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ObjectType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.SystemConfigurationType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.ws.rs.core.Response;

@Component
public class ClusterCacheListener implements CacheListener {

private static final Trace LOGGER = TraceManager.getTrace(ClusterCacheListener.class);

@Autowired private ModelService modelService;
@Autowired private ModelInteractionService modelInteractionService;
@Autowired private TaskManager taskManager;
@Autowired private CacheDispatcher cacheDispatcher;
@Autowired private PrismContext prismContext;
@Autowired private RemoteExecutionHelper remoteExecutionHelper;

@PostConstruct
public void addListener() {
Expand All @@ -75,8 +56,6 @@ public <O extends ObjectType> void invalidateCache(Class<O> type, String oid) {
return;
}

String nodeId = taskManager.getNodeId();

Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication instanceof NodeAuthenticationToken) {
LOGGER.trace("Skipping cluster-wide cache invalidation as this is already a remotely-invoked invalidateCache() call");
Expand All @@ -85,51 +64,13 @@ public <O extends ObjectType> void invalidateCache(Class<O> type, String oid) {

Task task = taskManager.createTaskInstance("invalidateCache");
OperationResult result = task.getResult();

SearchResultList<PrismObject<NodeType>> otherClusterNodes;
try {
ObjectQuery query = prismContext.queryFor(NodeType.class).not().item(NodeType.F_NODE_IDENTIFIER).eq(nodeId).build();
otherClusterNodes = modelService.searchObjects(NodeType.class, query, null, task, result);
} catch (SchemaException | ObjectNotFoundException | SecurityViolationException | CommunicationException
| ConfigurationException | ExpressionEvaluationException e) {
LOGGER.warn("Cannot find nodes for clearing cache on them. Skipping.", e);
return;
}

SystemConfigurationType systemConfig;
try {
systemConfig = modelInteractionService.getSystemConfiguration(result);
} catch (ObjectNotFoundException | SchemaException e) {
LOGGER.warn("Cannot load system configuration. Cannot determine the URL for REST calls without it"
+ " (unless specified explicitly for individual nodes)");
systemConfig = null;
}

for (PrismObject<NodeType> node : otherClusterNodes.getList()) {
NodeType nodeType = node.asObjectable();

String baseUrl;
if (nodeType.getUrl() != null) {
baseUrl = nodeType.getUrl();
} else {
String httpUrlPattern = systemConfig != null && systemConfig.getInfrastructure() != null
? systemConfig.getInfrastructure().getIntraClusterHttpUrlPattern()
: null;
if (StringUtils.isBlank(httpUrlPattern)) {
LOGGER.warn("Node URL nor intra-cluster URL pattern specified, skipping cache clearing for node {}",
nodeType.getNodeIdentifier());
continue;
}
baseUrl = httpUrlPattern.replace("$host", nodeType.getHostname());
}

WebClient client = WebClient.create(baseUrl + "/ws/rest");
client.header("Authorization", RestAuthenticationMethod.CLUSTER.getMethod());// + " " + Base64Utility.encode((nodeIdentifier).getBytes()));
remoteExecutionHelper.execute((client, result1) -> {
client.path("/event/" + ObjectTypes.getRestTypeFromClass(type));
Response response = client.post(null);

LOGGER.info("Cluster-wide cache clearance finished with status {}, {}", response.getStatusInfo().getStatusCode(),
response.getStatusInfo().getReasonPhrase());
}
response.close();
}, "cache invalidation", result);
}
}
Expand Up @@ -105,6 +105,10 @@ public class ModelRestService {
public static final String OPERATION_GENERATE_VALUE_RPC = CLASS_DOT + "generateValueRpc";
public static final String OPERATION_EXECUTE_CREDENTIAL_RESET = CLASS_DOT + "executeCredentialReset";
public static final String OPERATION_EXECUTE_CLUSTER_EVENT = CLASS_DOT + "executeClusterEvent";
public static final String OPERATION_GET_LOCAL_SCHEDULER_INFORMATION = CLASS_DOT + "getLocalSchedulerInformation";
public static final String OPERATION_STOP_LOCAL_SCHEDULER = CLASS_DOT + "stopScheduler";
public static final String OPERATION_START_LOCAL_SCHEDULER = CLASS_DOT + "startScheduler";
public static final String OPERATION_STOP_LOCAL_TASK = CLASS_DOT + "stopLocalTask";

private static final String CURRENT = "current";
private static final String VALIDATE = "validate";
Expand All @@ -118,6 +122,7 @@ public class ModelRestService {
@Autowired private SecurityHelper securityHelper;
@Autowired private ValuePolicyProcessor policyProcessor;
@Autowired private TaskManager taskManager;
@Autowired private TaskService taskService;
@Autowired private Protector protector;
@Autowired private ResourceValidator resourceValidator;

Expand Down Expand Up @@ -1070,6 +1075,86 @@ public Response executeClusterEvent(@PathParam("type") String type, @Context Mes

}

@GET
@Path("/scheduler/information")
@Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
@Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
public Response getLocalSchedulerInformation(@Context MessageContext mc) {
Task task = RestServiceUtil.initRequest(mc);
OperationResult result = new OperationResult(OPERATION_GET_LOCAL_SCHEDULER_INFORMATION);

Response response;
try {
SchedulerInformationType schedulerInformation = taskService.getLocalSchedulerInformation(task, result);
response = RestServiceUtil.createResponse(Response.Status.OK, schedulerInformation, result);
} catch (Throwable t) {
response = RestServiceUtil.handleException(result, t);
}
result.computeStatus();
finishRequest(task);
return response;
}

@POST
@Path("/scheduler/stop")
@Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
@Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
public Response stopLocalScheduler(@Context MessageContext mc) {
Task task = RestServiceUtil.initRequest(mc);
OperationResult result = new OperationResult(OPERATION_STOP_LOCAL_SCHEDULER);

Response response;
try {
taskService.stopLocalScheduler(task, result);
response = RestServiceUtil.createResponse(Response.Status.OK, result);
} catch (Throwable t) {
response = RestServiceUtil.handleException(result, t);
}
result.computeStatus();
finishRequest(task);
return response;
}

@POST
@Path("/scheduler/start")
@Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
@Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
public Response startLocalScheduler(@Context MessageContext mc) {
Task task = RestServiceUtil.initRequest(mc);
OperationResult result = new OperationResult(OPERATION_START_LOCAL_SCHEDULER);

Response response;
try {
taskService.startLocalScheduler(task, result);
response = RestServiceUtil.createResponse(Response.Status.OK, result);
} catch (Throwable t) {
response = RestServiceUtil.handleException(result, t);
}
result.computeStatus();
finishRequest(task);
return response;
}

@POST
@Path("/tasks/{oid}/stop")
@Consumes({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
@Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, RestServiceUtil.APPLICATION_YAML})
public Response stopLocalTask(@PathParam("oid") String oid, @Context MessageContext mc) {
Task task = RestServiceUtil.initRequest(mc);
OperationResult result = new OperationResult(OPERATION_STOP_LOCAL_TASK);

Response response;
try {
taskService.stopLocalTask(oid, task, result);
response = RestServiceUtil.createResponse(Response.Status.OK, result);
} catch (Throwable t) {
response = RestServiceUtil.handleException(result, t);
}
result.computeStatus();
finishRequest(task);
return response;
}

// @GET
// @Path("tasks/{oid}")
// public Response getTaskByIdentifier(@PathParam("oid") String identifier) throws SchemaException, ObjectNotFoundException {
Expand Down

0 comments on commit 4c453ad

Please sign in to comment.