Skip to content

Commit

Permalink
[Transform] Make transform _stats work again, even if there are no tr…
Browse files Browse the repository at this point in the history
…ansform nodes (#72221)
  • Loading branch information
przemekwitek committed Apr 27, 2021
1 parent d39f797 commit f992e47
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

Expand All @@ -34,21 +35,16 @@ protected Settings nodeSettings() {

public void testGetTransformStats() {
GetTransformStatsAction.Request request = new GetTransformStatsAction.Request("_all");
ElasticsearchStatusException e =
expectThrows(
ElasticsearchStatusException.class,
() -> client().execute(GetTransformStatsAction.INSTANCE, request).actionGet());
assertThat(
e.getMessage(),
is(equalTo("Transform requires the transform node role for at least 1 node, found no transform nodes")));
GetTransformStatsAction.Response response = client().execute(GetTransformStatsAction.INSTANCE, request).actionGet();
assertThat(response.getTransformsStats(), is(empty()));

assertWarnings("Transform requires the transform node role for at least 1 node, found no transform nodes");
}

public void testGetTransform() {
GetTransformAction.Request request = new GetTransformAction.Request("_all");
GetTransformAction.Response response = client().execute(GetTransformAction.INSTANCE, request).actionGet();
assertEquals(0, response.getTransformConfigurations().size());
assertThat(response.getTransformConfigurations(), is(empty()));

assertWarnings("Transform requires the transform node role for at least 1 node, found no transform nodes");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ protected void doExecute(Task task, Request request, ActionListener<Response> fi
request.getPageParams(),
request.isAllowNoMatch(),
ActionListener.wrap(hitsAndIds -> {
TransformNodes.throwIfNoTransformNodes(clusterState);
boolean hasAnyTransformNode = TransformNodes.hasAnyTransformNode(clusterState.getNodes());
boolean requiresRemote = hitsAndIds.v2().v2().stream().anyMatch(config -> config.getSource().requiresRemoteCluster());
if (TransformNodes.redirectToAnotherNodeIfNeeded(
if (hasAnyTransformNode && TransformNodes.redirectToAnotherNodeIfNeeded(
clusterState, nodeSettings, requiresRemote, transportService, actionName, request, Response::new, finalListener)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,12 @@ public static Assignment getAssignment(String transformId, ClusterState clusterS
/**
* Get the number of transform nodes in the cluster
*
* @param clusterState state
* @param nodes nodes to examine
* @return number of transform nodes
*/
public static long getNumberOfTransformNodes(ClusterState clusterState) {
return StreamSupport.stream(clusterState.getNodes().spliterator(), false)
.filter(node -> node.getRoles().contains(DiscoveryNodeRole.TRANSFORM_ROLE))
.count();
public static boolean hasAnyTransformNode(DiscoveryNodes nodes) {
return StreamSupport.stream(nodes.spliterator(), false)
.anyMatch(node -> node.getRoles().contains(DiscoveryNodeRole.TRANSFORM_ROLE));
}

/**
Expand All @@ -160,8 +159,7 @@ public static long getNumberOfTransformNodes(ClusterState clusterState) {
*/
public static void warnIfNoTransformNodes(ClusterState clusterState) {
if (TransformMetadata.getTransformMetadata(clusterState).isResetMode() == false) {
long transformNodes = getNumberOfTransformNodes(clusterState);
if (transformNodes == 0) {
if (hasAnyTransformNode(clusterState.getNodes()) == false) {
HeaderWarning.addWarning(TransformMessages.REST_WARN_NO_TRANSFORM_NODES);
}
}
Expand All @@ -174,8 +172,7 @@ public static void warnIfNoTransformNodes(ClusterState clusterState) {
* @param clusterState state
*/
public static void throwIfNoTransformNodes(ClusterState clusterState) {
long transformNodes = getNumberOfTransformNodes(clusterState);
if (transformNodes == 0) {
if (hasAnyTransformNode(clusterState.getNodes()) == false) {
throw ExceptionsHelper.badRequestException(TransformMessages.REST_WARN_NO_TRANSFORM_NODES);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.transform.transforms;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -15,6 +16,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
Expand Down Expand Up @@ -239,6 +241,41 @@ public void testSelectAnyNodeThatCanRunThisTransform() {
assertThat(TransformNodes.selectAnyNodeThatCanRunThisTransform(nodes, false).get().getId(), is(oneOf("node-2", "node-4")));
}

public void testHasAnyTransformNode() {
{
DiscoveryNodes nodes = DiscoveryNodes.EMPTY_NODES;
assertThat(TransformNodes.hasAnyTransformNode(nodes), is(false));
expectThrows(ElasticsearchStatusException.class, () -> TransformNodes.throwIfNoTransformNodes(newClusterState(nodes)));
}
{
DiscoveryNodes nodes =
DiscoveryNodes.builder()
.add(newDiscoveryNode("node-1", Version.V_7_12_0))
.add(newDiscoveryNode("node-2", Version.V_7_13_0))
.add(newDiscoveryNode("node-3", Version.V_7_13_0))
.build();
assertThat(TransformNodes.hasAnyTransformNode(nodes), is(false));
expectThrows(ElasticsearchStatusException.class, () -> TransformNodes.throwIfNoTransformNodes(newClusterState(nodes)));
}
{
DiscoveryNodes nodes =
DiscoveryNodes.builder()
.add(newDiscoveryNode("node-1", Version.V_7_12_0))
.add(newDiscoveryNode("node-2", Version.V_7_13_0, TRANSFORM_ROLE))
.add(newDiscoveryNode("node-3", Version.V_7_13_0, REMOTE_CLUSTER_CLIENT_ROLE))
.add(newDiscoveryNode("node-4", Version.V_7_13_0))
.build();
assertThat(TransformNodes.hasAnyTransformNode(nodes), is(true));
TransformNodes.throwIfNoTransformNodes(newClusterState(nodes));
}
}

private static ClusterState newClusterState(DiscoveryNodes nodes) {
return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.nodes(nodes)
.build();
}

private static DiscoveryNode newDiscoveryNode(String id, Version version, DiscoveryNodeRole... roles) {
return new DiscoveryNode(
id,
Expand Down

0 comments on commit f992e47

Please sign in to comment.