Skip to content

Commit

Permalink
[7.17] [ML] Move datafeed stats action off of master node (#82307)
Browse files Browse the repository at this point in the history
Previously the get datafeed stats action was handled on the
master node. This dated back to the time when datafeed configs
were stored in the cluster state. Now that the action consists
of running a series of searches it can be a handled transport
action.

Backport of #82271
  • Loading branch information
droberts195 committed Jan 6, 2022
1 parent eb453f2 commit 3eabfe1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ private GetDatafeedsStatsAction() {
super(NAME, Response::new);
}

// This needs to be a MasterNodeReadRequest even though the corresponding transport
// action is a HandledTransportAction so that in mixed version clusters it can be
// serialized to older nodes where the transport action was a MasterNodeReadAction.
// TODO: Make this a simple request in a future version where there is no possibility
// of this request being serialized to another node.
public static class Request extends MasterNodeReadRequest<Request> {

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,14 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
Expand All @@ -40,10 +37,11 @@

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;

public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAction<Request, Response> {
public class TransportGetDatafeedsStatsAction extends HandledTransportAction<Request, Response> {

private static final Logger logger = LogManager.getLogger(TransportGetDatafeedsStatsAction.class);

private final ClusterService clusterService;
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobResultsProvider jobResultsProvider;
private final OriginSettingClient client;
Expand All @@ -52,36 +50,22 @@ public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAct
public TransportGetDatafeedsStatsAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
DatafeedConfigProvider datafeedConfigProvider,
JobResultsProvider jobResultsProvider,
Client client
) {
super(
GetDatafeedsStatsAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
Request::new,
indexNameExpressionResolver,
Response::new,
ThreadPool.Names.SAME
);
super(GetDatafeedsStatsAction.NAME, transportService, actionFilters, Request::new);
this.clusterService = clusterService;
this.datafeedConfigProvider = datafeedConfigProvider;
this.jobResultsProvider = jobResultsProvider;
this.client = new OriginSettingClient(client, ML_ORIGIN);
}

@Override
protected void masterOperation(
GetDatafeedsStatsAction.Request request,
ClusterState state,
ActionListener<GetDatafeedsStatsAction.Response> listener
) throws Exception {
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
logger.debug(() -> new ParameterizedMessage("[{}] get stats for datafeed", request.getDatafeedId()));
ClusterState state = clusterService.state();
final PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
final Response.Builder responseBuilder = new Response.Builder();

Expand Down Expand Up @@ -124,9 +108,4 @@ protected void masterOperation(
// 1. This might also include datafeed tasks that exist but no longer have a config
datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoMatch(), tasksInProgress, true, expandIdsListener);
}

@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}

0 comments on commit 3eabfe1

Please sign in to comment.