Skip to content

Commit

Permalink
Adding indexing pressure stats to node stats API (#59467)
Browse files Browse the repository at this point in the history
We have recently added internal metrics to monitor the amount of
indexing occurring on a node. These metrics introduce back pressure to
indexing when memory utilization is too high. This commit exposes these
stats through the node stats API.
  • Loading branch information
Tim-Brooks committed Jul 13, 2020
1 parent dc7d4c6 commit 623df95
Show file tree
Hide file tree
Showing 36 changed files with 535 additions and 230 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.XContentTestUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;

import static org.elasticsearch.rest.RestStatus.CREATED;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.elasticsearch.rest.RestStatus.TOO_MANY_REQUESTS;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

/**
* Test Indexing Pressure Metrics and Statistics
*/
@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
public class IndexingPressureRestIT extends HttpSmokeTestCase {

private static final Settings unboundedWriteQueue = Settings.builder().put("thread_pool.write.queue_size", -1).build();

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB")
.put(unboundedWriteQueue)
.build();
}

@SuppressWarnings("unchecked")
public void testIndexingPressureStats() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("{\"settings\": {\"index\": {\"number_of_shards\": 1, \"number_of_replicas\": 1, " +
"\"write.wait_for_active_shards\": 2}}}");
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request successfulIndexingRequest = new Request("POST", "/index_name/_doc/");
successfulIndexingRequest.setJsonEntity("{\"x\": \"small text\"}");
final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest);
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(CREATED.getStatus()));

Request getNodeStats = new Request("GET", "/_nodes/stats/indexing_pressure");
final Response nodeStats = getRestClient().performRequest(getNodeStats);
Map<String, Object> nodeStatsMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats.getEntity().getContent(), true);
ArrayList<Object> values = new ArrayList<>(((Map<Object, Object>) nodeStatsMap.get("nodes")).values());
assertThat(values.size(), equalTo(2));
XContentTestUtils.JsonMapView node1 = new XContentTestUtils.JsonMapView((Map<String, Object>) values.get(0));
Integer node1IndexingBytes = node1.get("indexing_pressure.total.coordinating_and_primary_bytes");
Integer node1ReplicaBytes = node1.get("indexing_pressure.total.replica_bytes");
Integer node1Rejections = node1.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
XContentTestUtils.JsonMapView node2 = new XContentTestUtils.JsonMapView((Map<String, Object>) values.get(1));
Integer node2IndexingBytes = node2.get("indexing_pressure.total.coordinating_and_primary_bytes");
Integer node2ReplicaBytes = node2.get("indexing_pressure.total.replica_bytes");
Integer node2Rejections = node2.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");

if (node1IndexingBytes == 0) {
assertThat(node2IndexingBytes, greaterThan(0));
assertThat(node2IndexingBytes, lessThan(1024));
} else {
assertThat(node1IndexingBytes, greaterThan(0));
assertThat(node1IndexingBytes, lessThan(1024));
}

if (node1ReplicaBytes == 0) {
assertThat(node2ReplicaBytes, greaterThan(0));
assertThat(node2ReplicaBytes, lessThan(1024));
} else {
assertThat(node2ReplicaBytes, equalTo(0));
assertThat(node1ReplicaBytes, lessThan(1024));
}

assertThat(node1Rejections, equalTo(0));
assertThat(node2Rejections, equalTo(0));

Request failedIndexingRequest = new Request("POST", "/index_name/_doc/");
String largeString = randomAlphaOfLength(10000);
failedIndexingRequest.setJsonEntity("{\"x\": " + largeString + "}");
ResponseException exception = expectThrows(ResponseException.class, () -> getRestClient().performRequest(failedIndexingRequest));
assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(TOO_MANY_REQUESTS.getStatus()));

Request getNodeStats2 = new Request("GET", "/_nodes/stats/indexing_pressure");
final Response nodeStats2 = getRestClient().performRequest(getNodeStats2);
Map<String, Object> nodeStatsMap2 = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats2.getEntity().getContent(),
true);
ArrayList<Object> values2 = new ArrayList<>(((Map<Object, Object>) nodeStatsMap2.get("nodes")).values());
assertThat(values2.size(), equalTo(2));
XContentTestUtils.JsonMapView node1AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(0));
node1Rejections = node1AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
XContentTestUtils.JsonMapView node2AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(1));
node2Rejections = node2AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");

if (node1Rejections == 0) {
assertThat(node2Rejections, equalTo(1));
} else {
assertThat(node1Rejections, equalTo(1));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
"process",
"thread_pool",
"transport",
"discovery"
"discovery",
"indexing_pressure"
],
"description":"Limit the information returned to the specified metrics"
}
Expand All @@ -69,7 +70,8 @@
"process",
"thread_pool",
"transport",
"discovery"
"discovery",
"indexing_pressure"
],
"description":"Limit the information returned to the specified metrics"
},
Expand Down Expand Up @@ -98,7 +100,8 @@
"process",
"thread_pool",
"transport",
"discovery"
"discovery",
"indexing_pressure"
],
"description":"Limit the information returned to the specified metrics"
},
Expand Down Expand Up @@ -145,7 +148,8 @@
"process",
"thread_pool",
"transport",
"discovery"
"discovery",
"indexing_pressure"
],
"description":"Limit the information returned to the specified metrics"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
"Indexing pressure stats":
- skip:
version: " - 7.8.99"
reason: "indexing_pressure was added in 7.9"
features: [arbitrary_key]

- do:
nodes.info: {}
- set:
nodes._arbitrary_key_: node_id

- do:
nodes.stats:
metric: [ indexing_pressure ]

- gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.replica_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.all_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_memory_limit_rejections: 0 }
- gte: { nodes.$node_id.indexing_pressure.total.replica_memory_limit_rejections: 0 }
- gte: { nodes.$node_id.indexing_pressure.current.coordinating_and_primary_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.current.replica_bytes: 0 }
- gte: { nodes.$node_id.indexing_pressure.current.all_bytes: 0 }

# TODO:
#
# Change skipped version after backport

0 comments on commit 623df95

Please sign in to comment.