Skip to content

Commit

Permalink
Add TransportHealthNodeAction (#89127)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmarouli committed Aug 10, 2022
1 parent 341f3b7 commit 399a8ac
Show file tree
Hide file tree
Showing 9 changed files with 673 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.health.node.action.HealthNodeNotDiscoveredException;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -721,7 +722,7 @@ public static <T extends Throwable> T writeStackTraces(T throwable, StreamOutput
/**
* This is the list of Exceptions Elasticsearch can throw over the wire or save into a corruption marker. Each value in the enum is a
* single exception tying the Class to an id for use of the encode side and the id back to a constructor for use on the decode side. As
* such its ok if the exceptions to change names so long as their constructor can still read the exception. Each exception is listed
* such it's ok if the exceptions to change names so long as their constructor can still read the exception. Each exception is listed
* in id order below. If you want to remove an exception leave a tombstone comment and mark the id as null in
* ExceptionSerializationTests.testIds.ids.
*/
Expand Down Expand Up @@ -1571,6 +1572,12 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.snapshots.SnapshotNameAlreadyInUseException::new,
165,
Version.V_8_2_0
),
HEALTH_NODE_NOT_DISCOVERED_EXCEPTION(
HealthNodeNotDiscoveredException.class,
HealthNodeNotDiscoveredException::new,
166,
Version.V_8_5_0
);

final Class<? extends ElasticsearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.health.node.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;

/**
* Exception which indicates that no health node is selected in this cluster, aka the
* health node persistent task is not assigned.
*/
public class HealthNodeNotDiscoveredException extends ElasticsearchException {

public HealthNodeNotDiscoveredException(String message) {
super(message);
}

public HealthNodeNotDiscoveredException(StreamInput in) throws IOException {
super(in);
}

@Override
public RestStatus status() {
return RestStatus.SERVICE_UNAVAILABLE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.health.node.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import static org.elasticsearch.core.Strings.format;

/**
* A base class for operations that need to be performed on the health node.
*/
public abstract class TransportHealthNodeAction<Request extends ActionRequest, Response extends ActionResponse> extends
HandledTransportAction<Request, Response> {

private static final Logger logger = LogManager.getLogger(TransportHealthNodeAction.class);

protected final TransportService transportService;
protected final ClusterService clusterService;
protected final ThreadPool threadPool;
protected final String executor;

private final Writeable.Reader<Response> responseReader;

protected TransportHealthNodeAction(
String actionName,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
Writeable.Reader<Request> request,
Writeable.Reader<Response> response,
String executor
) {
super(actionName, true, transportService, actionFilters, request);
this.transportService = transportService;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.executor = executor;
this.responseReader = response;
}

protected abstract void healthOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)
throws Exception;

@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
logger.trace("starting to process request [{}] with cluster state version [{}]", request, state.version());
if (isTaskCancelled(task)) {
listener.onFailure(new TaskCancelledException("Task was cancelled"));
return;
}
try {
ClusterState clusterState = clusterService.state();
DiscoveryNode healthNode = HealthNode.findHealthNode(clusterState);
DiscoveryNode localNode = clusterState.nodes().getLocalNode();
if (healthNode == null) {
listener.onFailure(new HealthNodeNotDiscoveredException("Health node was null"));
} else if (localNode.getId().equals(healthNode.getId())) {
threadPool.executor(executor).execute(() -> {
try {
if (isTaskCancelled(task)) {
listener.onFailure(new TaskCancelledException("Task was cancelled"));
} else {
healthOperation(task, request, clusterState, listener);
}
} catch (Exception e) {
listener.onFailure(e);
}
});
} else {
logger.trace("forwarding request [{}] to health node [{}]", actionName, healthNode);
ActionListenerResponseHandler<Response> handler = new ActionListenerResponseHandler<>(listener, responseReader) {
@Override
public void handleException(final TransportException exception) {
logger.trace(
() -> format("failure when forwarding request [%s] to health node [%s]", actionName, healthNode),
exception
);
listener.onFailure(exception);
}
};
if (task != null) {
transportService.sendChildRequest(healthNode, actionName, request, task, TransportRequestOptions.EMPTY, handler);
} else {
transportService.sendRequest(healthNode, actionName, request, handler);
}
}
} catch (Exception e) {
logger.trace(() -> format("Failed to route/execute health node action %s", actionName), e);
listener.onFailure(e);
}
}

private boolean isTaskCancelled(Task task) {
return (task instanceof CancellableTask t) && t.isCancelled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.health.node.selection;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
Expand Down Expand Up @@ -43,4 +44,13 @@ public static PersistentTasksCustomMetadata.PersistentTask<?> findTask(ClusterSt
PersistentTasksCustomMetadata taskMetadata = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
return taskMetadata == null ? null : taskMetadata.getTask(TASK_NAME);
}

@Nullable
public static DiscoveryNode findHealthNode(ClusterState clusterState) {
PersistentTasksCustomMetadata.PersistentTask<?> task = findTask(clusterState);
if (task == null || task.isAssigned() == false) {
return null;
}
return clusterState.nodes().get(task.getAssignment().getExecutorNode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
/**
* Encapsulates the parameters needed to start the health node task, currently no parameters are required.
*/
class HealthNodeTaskParams implements PersistentTaskParams {
public class HealthNodeTaskParams implements PersistentTaskParams {

private static final HealthNodeTaskParams INSTANCE = new HealthNodeTaskParams();
public static final HealthNodeTaskParams INSTANCE = new HealthNodeTaskParams();

public static final ObjectParser<HealthNodeTaskParams, Void> PARSER = new ObjectParser<>(TASK_NAME, true, () -> INSTANCE);

HealthNodeTaskParams() {}

HealthNodeTaskParams(StreamInput in) {}
HealthNodeTaskParams(StreamInput ignored) {}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.core.PathUtils;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.health.node.action.HealthNodeNotDiscoveredException;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.query.QueryShardException;
Expand Down Expand Up @@ -829,6 +830,7 @@ public void testIds() {
ids.put(163, RepositoryConflictException.class);
ids.put(164, VersionConflictException.class);
ids.put(165, SnapshotNameAlreadyInUseException.class);
ids.put(166, HealthNodeNotDiscoveredException.class);

Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
Expand Down

0 comments on commit 399a8ac

Please sign in to comment.