Skip to content

Commit

Permalink
Expose basic x-pack telemetry for failure store (#108899)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmarouli committed May 23, 2024
1 parent 331b78f commit af45653
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 33 deletions.
2 changes: 2 additions & 0 deletions docs/reference/rest-api/usage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ GET /_xpack/usage
// TESTRESPONSE[s/"policy_stats" : \[[^\]]*\]/"policy_stats" : $body.$_path/]
// TESTRESPONSE[s/"slm" : \{[^\}]*\},/"slm" : $body.$_path,/]
// TESTRESPONSE[s/"health_api" : \{[^\}]*\}\s*\}/"health_api" : $body.$_path/]
// TESTRESPONSE[s/"data_streams" : \{[^\}]*\},/"data_streams" : $body.$_path,/]
// TESTRESPONSE[s/ : true/ : $body.$_path/]
// TESTRESPONSE[s/ : false/ : $body.$_path/]
// TESTRESPONSE[s/ : (\-)?[0-9]+/ : $body.$_path/]
Expand All @@ -519,3 +520,4 @@ GET /_xpack/usage
// 5. All of the numbers and strings on the right hand side of *every* field in
// the response are ignored. So we're really only asserting things about the
// the shape of this response, not the values in it.
// 6. Ignore the contents of data streams until the failure store is tech preview.
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ static TransportVersion def(int id) {
public static final TransportVersion SPARSE_VECTOR_QUERY_ADDED = def(8_667_00_0);
public static final TransportVersion ESQL_ADD_INDEX_MODE_TO_SOURCE = def(8_668_00_0);
public static final TransportVersion GET_SHUTDOWN_STATUS_TIMEOUT = def(8_669_00_0);
public static final TransportVersion FAILURE_STORE_TELEMETRY = def(8_670_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugin/core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import org.apache.tools.ant.filters.ReplaceTokens
import org.elasticsearch.gradle.internal.info.BuildParams
import org.elasticsearch.gradle.Version

import java.nio.file.Paths

Expand Down Expand Up @@ -172,6 +173,7 @@ testClusters.configureEach {
setting 'indices.lifecycle.history_index_enabled', 'false'
keystore 'bootstrap.password', 'x-pack-test-password'
user username: "x_pack_rest_user", password: "x-pack-test-password"
requiresFeature 'es.failure_store_feature_flag_enabled', Version.fromString("8.15.0")
}

if (BuildParams.inFipsJvm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.notNullValue;

public class DataStreamRestIT extends ESRestTestCase {
Expand All @@ -42,19 +42,24 @@ public void testDSXpackInfo() {
assertTrue((boolean) dataStreams.get("enabled"));
}

@SuppressWarnings("unchecked")
public void testDSXpackUsage() throws Exception {
Map<?, ?> dataStreams = (Map<?, ?>) getLocation("/_xpack/usage").get("data_streams");
assertNotNull(dataStreams);
assertTrue((boolean) dataStreams.get("available"));
assertTrue((boolean) dataStreams.get("enabled"));
assertThat(dataStreams.get("data_streams"), anyOf(equalTo(null), equalTo(0)));

assertThat(dataStreams.get("data_streams"), equalTo(0));
assertThat(dataStreams, hasKey("failure_store"));
Map<String, Integer> failureStoreStats = (Map<String, Integer>) dataStreams.get("failure_store");
assertThat(failureStoreStats.get("enabled_count"), equalTo(0));
assertThat(failureStoreStats.get("failure_indices_count"), equalTo(0));
assertBusy(() -> {
Map<?, ?> logsTemplate = (Map<?, ?>) ((List<?>) getLocation("/_index_template/logs").get("index_templates")).get(0);
assertThat(logsTemplate, notNullValue());
assertThat(logsTemplate.get("name"), equalTo("logs"));
assertThat(((Map<?, ?>) logsTemplate.get("index_template")).get("data_stream"), notNullValue());
});
putFailureStoreTemplate();

// Create a data stream
Request indexRequest = new Request("POST", "/logs-mysql-default/_doc");
Expand All @@ -65,21 +70,29 @@ public void testDSXpackUsage() throws Exception {
Request rollover = new Request("POST", "/logs-mysql-default/_rollover");
client().performRequest(rollover);

// Create failure store data stream
indexRequest = new Request("POST", "/fs/_doc");
indexRequest.setJsonEntity("{\"@timestamp\": \"2020-01-01\"}");
client().performRequest(indexRequest);

dataStreams = (Map<?, ?>) getLocation("/_xpack/usage").get("data_streams");
assertNotNull(dataStreams);
assertTrue((boolean) dataStreams.get("available"));
assertTrue((boolean) dataStreams.get("enabled"));
assertThat("got: " + dataStreams, dataStreams.get("data_streams"), equalTo(1));
assertThat("got: " + dataStreams, dataStreams.get("indices_count"), equalTo(2));
assertThat("got: " + dataStreams, dataStreams.get("data_streams"), equalTo(2));
assertThat("got: " + dataStreams, dataStreams.get("indices_count"), equalTo(3));
failureStoreStats = (Map<String, Integer>) dataStreams.get("failure_store");
assertThat(failureStoreStats.get("enabled_count"), equalTo(1));
assertThat(failureStoreStats.get("failure_indices_count"), equalTo(1));
}

Map<String, Object> getLocation(String path) {
try {
Response executeRepsonse = client().performRequest(new Request("GET", path));
Response executeResponse = client().performRequest(new Request("GET", path));
try (
XContentParser parser = JsonXContent.jsonXContent.createParser(
XContentParserConfiguration.EMPTY,
EntityUtils.toByteArray(executeRepsonse.getEntity())
EntityUtils.toByteArray(executeResponse.getEntity())
)
) {
return parser.map();
Expand All @@ -89,4 +102,15 @@ Map<String, Object> getLocation(String path) {
throw new RuntimeException(e);
}
}

private void putFailureStoreTemplate() {
try {
Request request = new Request("PUT", "/_index_template/fs-template");
request.setJsonEntity("{\"index_patterns\": [\"fs*\"], \"data_stream\": {\"failure_store\": true}}");
assertAcknowledged(client().performRequest(request));
} catch (Exception e) {
fail("failed to insert index template with failure store enabled - got: " + e);
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,25 @@ protected void masterOperation(
ActionListener<XPackUsageFeatureResponse> listener
) {
final Map<String, DataStream> dataStreams = state.metadata().dataStreams();
long backingIndicesCounter = 0;
long failureStoreEnabledCounter = 0;
long failureIndicesCounter = 0;
for (DataStream ds : dataStreams.values()) {
backingIndicesCounter += ds.getIndices().size();
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
if (ds.isFailureStoreEnabled()) {
failureStoreEnabledCounter++;
}
if (ds.getFailureIndices().getIndices().isEmpty() == false) {
failureIndicesCounter += ds.getFailureIndices().getIndices().size();
}
}
}
final DataStreamFeatureSetUsage.DataStreamStats stats = new DataStreamFeatureSetUsage.DataStreamStats(
dataStreams.size(),
dataStreams.values().stream().map(ds -> ds.getIndices().size()).reduce(Integer::sum).orElse(0)
backingIndicesCounter,
failureStoreEnabledCounter,
failureIndicesCounter
);
final DataStreamFeatureSetUsage usage = new DataStreamFeatureSetUsage(stats);
listener.onResponse(new XPackUsageFeatureResponse(usage));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -49,6 +50,12 @@ protected void innerXContent(XContentBuilder builder, Params params) throws IOEx
super.innerXContent(builder, params);
builder.field("data_streams", streamStats.totalDataStreamCount);
builder.field("indices_count", streamStats.indicesBehindDataStream);
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
builder.startObject("failure_store");
builder.field("enabled_count", streamStats.failureStoreEnabledDataStreamCount);
builder.field("failure_indices_count", streamStats.failureStoreIndicesCount);
builder.endObject();
}
}

@Override
Expand All @@ -73,39 +80,30 @@ public boolean equals(Object obj) {
return Objects.equals(streamStats, other.streamStats);
}

public static class DataStreamStats implements Writeable {

private final long totalDataStreamCount;
private final long indicesBehindDataStream;

public DataStreamStats(long totalDataStreamCount, long indicesBehindDataStream) {
this.totalDataStreamCount = totalDataStreamCount;
this.indicesBehindDataStream = indicesBehindDataStream;
}
public record DataStreamStats(
long totalDataStreamCount,
long indicesBehindDataStream,
long failureStoreEnabledDataStreamCount,
long failureStoreIndicesCount
) implements Writeable {

public DataStreamStats(StreamInput in) throws IOException {
this.totalDataStreamCount = in.readVLong();
this.indicesBehindDataStream = in.readVLong();
this(
in.readVLong(),
in.readVLong(),
in.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_TELEMETRY) ? in.readVLong() : 0,
in.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_TELEMETRY) ? in.readVLong() : 0
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(this.totalDataStreamCount);
out.writeVLong(this.indicesBehindDataStream);
}

@Override
public int hashCode() {
return Objects.hash(totalDataStreamCount, indicesBehindDataStream);
}

@Override
public boolean equals(Object obj) {
if (obj.getClass() != getClass()) {
return false;
if (out.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_TELEMETRY)) {
out.writeVLong(this.failureStoreEnabledDataStreamCount);
out.writeVLong(this.failureStoreIndicesCount);
}
DataStreamStats other = (DataStreamStats) obj;
return totalDataStreamCount == other.totalDataStreamCount && indicesBehindDataStream == other.indicesBehindDataStream;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ public class DataStreamFeatureSetUsageTests extends AbstractWireSerializingTestC
@Override
protected DataStreamFeatureSetUsage createTestInstance() {
return new DataStreamFeatureSetUsage(
new DataStreamFeatureSetUsage.DataStreamStats(randomNonNegativeLong(), randomNonNegativeLong())
new DataStreamFeatureSetUsage.DataStreamStats(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong()
)
);
}

Expand Down

0 comments on commit af45653

Please sign in to comment.