Skip to content

Commit

Permalink
Remove "Push back excessive requests for stats (#83832)" from 8.2 bra…
Browse files Browse the repository at this point in the history
…nch (#85504)
  • Loading branch information
joegallo committed Mar 31, 2022
1 parent b6d35cf commit 184ace7
Show file tree
Hide file tree
Showing 26 changed files with 53 additions and 670 deletions.
6 changes: 0 additions & 6 deletions docs/changelog/83832.yaml

This file was deleted.

35 changes: 0 additions & 35 deletions docs/reference/cluster/nodes-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ using metrics.
`transport`::
Transport statistics about sent and received bytes in cluster
communication.

`stats_requests`::
Statistics about stats requests such as indices stats, nodes stats,
recovery stats etc.
--

`<index_metric>`::
Expand Down Expand Up @@ -2637,37 +2633,6 @@ search requests on the keyed node.
The rank of this node; used for shard selection when routing search
requests.
======
======
[[cluster-nodes-stats-api-response-body-stats-requests]]
`stats_requests`::
(object)
Contains statistics about the stats requests the node has received.
+
.Properties of `stats_requests`
[%collapsible%open]
======
`<stats_requests_name>`::
(object)
Contains statistics about a specific type of a stats request the node has received.
+
.Properties of `<stats_requests_name>`
[%collapsible%open]
=======
`current`::
(integer)
Number of stats requests currently in progress.
`completed`::
(integer)
Number of stats requests that have been completed by the node (successfully or
not).
`rejected`::
(integer)
Number of stats requests that were rejected by the node because it had reached
the limit of concurrent stats requests (`node.stats.max_concurrent_requests`).
=======
=====
====

Expand Down
18 changes: 0 additions & 18 deletions docs/reference/modules/cluster/misc.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,24 +101,6 @@ number of shards for each node, use the
setting.
--

[[stats-requests-limit]]
===== Stats request limit

A stats request might require information from all nodes to be aggregated before it returns to the user.
These requests can be heavy and they put extra pressure on the coordinating node (the node collecting the
responses from all the nodes), for this reason there is a limit on the concurrent requests that a node can coordinate.

--

[[node-stats-max-concurrent-requests]]
`node.stats.max_concurrent_requests`::
+
--
(<<dynamic-cluster-setting,Dynamic>>)
Limits the stats requests a coordinating node can concurrently handle. Defaults to `100`.



[[user-defined-data]]
===== User-defined cluster metadata

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.action.support.StatsRequestLimiter.MAX_CONCURRENT_STATS_REQUESTS_PER_NODE;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
Expand Down Expand Up @@ -1387,57 +1386,52 @@ public void testConcurrentIndexingAndStatsRequests() throws BrokenBarrierExcepti
}

// start threads that will get stats concurrently with indexing
try {
updateClusterSettings(Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), numberOfStatsThreads + 1));
for (int i = 0; i < numberOfStatsThreads; i++) {
final Thread thread = new Thread(() -> {
for (int i = 0; i < numberOfStatsThreads; i++) {
final Thread thread = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
failed.set(true);
executionFailures.get().add(e);
latch.countDown();
}
final IndicesStatsRequest request = new IndicesStatsRequest();
request.all();
request.indices(new String[0]);
while (stop.get() == false) {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
failed.set(true);
executionFailures.get().add(e);
latch.countDown();
}
final IndicesStatsRequest request = new IndicesStatsRequest();
request.all();
request.indices(new String[0]);
while (stop.get() == false) {
try {
final IndicesStatsResponse response = client().admin().indices().stats(request).get();
if (response.getFailedShards() > 0) {
failed.set(true);
shardFailures.get().addAll(Arrays.asList(response.getShardFailures()));
latch.countDown();
}
} catch (final ExecutionException | InterruptedException e) {
final IndicesStatsResponse response = client().admin().indices().stats(request).get();
if (response.getFailedShards() > 0) {
failed.set(true);
executionFailures.get().add(e);
shardFailures.get().addAll(Arrays.asList(response.getShardFailures()));
latch.countDown();
}
} catch (final ExecutionException | InterruptedException e) {
failed.set(true);
executionFailures.get().add(e);
latch.countDown();
}
});
thread.setName("stats-" + i);
threads.add(thread);
thread.start();
}

// release the hounds
barrier.await();
}
});
thread.setName("stats-" + i);
threads.add(thread);
thread.start();
}

// wait for a failure, or for fifteen seconds to elapse
latch.await(15, TimeUnit.SECONDS);
// release the hounds
barrier.await();

// stop all threads and wait for them to complete
stop.set(true);
for (final Thread thread : threads) {
thread.join();
}
// wait for a failure, or for fifteen seconds to elapse
latch.await(15, TimeUnit.SECONDS);

assertThat(shardFailures.get(), emptyCollectionOf(DefaultShardOperationFailedException.class));
assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
} finally {
updateClusterSettings(Settings.builder().putNull(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey()));
// stop all threads and wait for them to complete
stop.set(true);
for (final Thread thread : threads) {
thread.join();
}

assertThat(shardFailures.get(), emptyCollectionOf(DefaultShardOperationFailedException.class));
assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@

package org.elasticsearch.action.admin.cluster.node.info;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.StatsRequestLimiter;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -35,16 +33,14 @@ public class TransportNodesInfoAction extends TransportNodesAction<
NodeInfo> {

private final NodeService nodeService;
private final StatsRequestLimiter statsRequestLimiter;

@Inject
public TransportNodesInfoAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
NodeService nodeService,
ActionFilters actionFilters,
StatsRequestLimiter statsRequestLimiter
ActionFilters actionFilters
) {
super(
NodesInfoAction.NAME,
Expand All @@ -58,7 +54,6 @@ public TransportNodesInfoAction(
NodeInfo.class
);
this.nodeService = nodeService;
this.statsRequestLimiter = statsRequestLimiter;
}

@Override
Expand Down Expand Up @@ -99,11 +94,6 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest, Task task) {
);
}

@Override
protected void doExecute(Task task, NodesInfoRequest request, ActionListener<NodesInfoResponse> listener) {
statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute);
}

public static class NodeInfoRequest extends TransportRequest {

NodesInfoRequest request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.elasticsearch.action.admin.cluster.node.stats;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.StatsRequestStats;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
Expand Down Expand Up @@ -89,9 +87,6 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private IndexingPressureStats indexingPressureStats;

@Nullable
private StatsRequestStats statsRequestStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand All @@ -112,9 +107,6 @@ public NodeStats(StreamInput in) throws IOException {
ingestStats = in.readOptionalWriteable(IngestStats::new);
adaptiveSelectionStats = in.readOptionalWriteable(AdaptiveSelectionStats::new);
indexingPressureStats = in.readOptionalWriteable(IndexingPressureStats::new);
if (in.getVersion().onOrAfter(Version.V_8_2_0)) {
statsRequestStats = in.readOptionalWriteable(StatsRequestStats::new);
}
}

public NodeStats(
Expand All @@ -134,8 +126,7 @@ public NodeStats(
@Nullable IngestStats ingestStats,
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable StatsRequestStats statsRequestStats
@Nullable IndexingPressureStats indexingPressureStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -154,7 +145,6 @@ public NodeStats(
this.adaptiveSelectionStats = adaptiveSelectionStats;
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.statsRequestStats = statsRequestStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -259,11 +249,6 @@ public IndexingPressureStats getIndexingPressureStats() {
return indexingPressureStats;
}

@Nullable
public StatsRequestStats getStatsRequestStats() {
return statsRequestStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -287,9 +272,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(ingestStats);
out.writeOptionalWriteable(adaptiveSelectionStats);
out.writeOptionalWriteable(indexingPressureStats);
if (out.getVersion().onOrAfter(Version.V_8_2_0)) {
out.writeOptionalWriteable(statsRequestStats);
}
}

@Override
Expand Down Expand Up @@ -359,9 +341,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getIndexingPressureStats() != null) {
getIndexingPressureStats().toXContent(builder, params);
}
if (getStatsRequestStats() != null) {
getStatsRequestStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ public enum Metric {
INGEST("ingest"),
ADAPTIVE_SELECTION("adaptive_selection"),
SCRIPT_CACHE("script_cache"),
INDEXING_PRESSURE("indexing_pressure"),
STATS_REQUESTS("stats_requests"),;
INDEXING_PRESSURE("indexing_pressure"),;

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@

package org.elasticsearch.action.admin.cluster.node.stats;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.StatsRequestLimiter;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -38,16 +36,14 @@ public class TransportNodesStatsAction extends TransportNodesAction<
NodeStats> {

private final NodeService nodeService;
private final StatsRequestLimiter statsRequestLimiter;

@Inject
public TransportNodesStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
NodeService nodeService,
ActionFilters actionFilters,
StatsRequestLimiter statsRequestLimiter
ActionFilters actionFilters
) {
super(
NodesStatsAction.NAME,
Expand All @@ -61,7 +57,6 @@ public TransportNodesStatsAction(
NodeStats.class
);
this.nodeService = nodeService;
this.statsRequestLimiter = statsRequestLimiter;
}

@Override
Expand Down Expand Up @@ -100,16 +95,10 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest, Task task)
NodesStatsRequest.Metric.INGEST.containedIn(metrics),
NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics),
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.STATS_REQUESTS.containedIn(metrics)
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics)
);
}

@Override
protected void doExecute(Task task, NodesStatsRequest request, ActionListener<NodesStatsResponse> listener) {
statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute);
}

public static class NodeStatsRequest extends TransportRequest {

NodesStatsRequest request;
Expand Down

0 comments on commit 184ace7

Please sign in to comment.