Skip to content

Commit

Permalink
[7.x] Add telemetry for data tiers (#63031) (#63140)
Browse files Browse the repository at this point in the history
Backports the following commits to 7.x:

    Add telemetry for data tiers (#63031)
  • Loading branch information
dakrone committed Oct 1, 2020
1 parent 6a9cde2 commit f0f0da2
Show file tree
Hide file tree
Showing 11 changed files with 597 additions and 1 deletion.
4 changes: 4 additions & 0 deletions docs/reference/rest-api/info.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ Example response:
"data_streams" : {
"available" : true,
"enabled" : true,
},
"data_tiers" : {
"available" : true,
"enabled" : true,
}
},
"tagline" : "You know, for X"
Expand Down
52 changes: 52 additions & 0 deletions docs/reference/rest-api/usage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,58 @@ GET /_xpack/usage
"enabled" : true,
"data_streams" : 0,
"indices_count" : 0
},
"data_tiers" : {
"available" : true,
"enabled" : true,
"data_warm" : {
"node_count" : 0,
"index_count" : 0,
"total_shard_count" : 0,
"primary_shard_count" : 0,
"doc_count" : 0,
"total_size_bytes" : 0,
"primary_size_bytes" : 0,
"primary_shard_size_avg_bytes" : 0,
"primary_shard_size_median_bytes" : 0,
"primary_shard_size_mad_bytes" : 0
},
"data_cold" : {
"node_count" : 0,
"index_count" : 0,
"total_shard_count" : 0,
"primary_shard_count" : 0,
"doc_count" : 0,
"total_size_bytes" : 0,
"primary_size_bytes" : 0,
"primary_shard_size_avg_bytes" : 0,
"primary_shard_size_median_bytes" : 0,
"primary_shard_size_mad_bytes" : 0
},
"data_content" : {
"node_count" : 0,
"index_count" : 0,
"total_shard_count" : 0,
"primary_shard_count" : 0,
"doc_count" : 0,
"total_size_bytes" : 0,
"primary_size_bytes" : 0,
"primary_shard_size_avg_bytes" : 0,
"primary_shard_size_median_bytes" : 0,
"primary_shard_size_mad_bytes" : 0
},
"data_hot" : {
"node_count" : 0,
"index_count" : 0,
"total_shard_count" : 0,
"primary_shard_count" : 0,
"doc_count" : 0,
"total_size_bytes" : 0,
"primary_size_bytes" : 0,
"primary_shard_size_avg_bytes" : 0,
"primary_shard_size_median_bytes" : 0,
"primary_shard_size_mad_bytes" : 0
}
}
}
------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,23 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.core.DataTier;
import org.elasticsearch.xpack.core.DataTiersFeatureSetUsage;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.action.XPackUsageRequestBuilder;
import org.elasticsearch.xpack.core.action.XPackUsageResponse;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0)
public class DataTierIT extends ESIntegTestCase {
Expand Down Expand Up @@ -194,6 +201,64 @@ public void testTemplateOverridesDefaults() {
ensureYellow(index);
}

public void testDataTierTelemetry() {
startContentOnlyNode();
startContentOnlyNode();
startHotOnlyNode();

client().admin().indices().prepareCreate(index)
.setSettings(Settings.builder()
.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, "data_hot")
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 0))
.setWaitForActiveShards(0)
.get();

client().admin().indices().prepareCreate(index + "2")
.setSettings(Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1))
.setWaitForActiveShards(0)
.get();

ensureGreen();
client().prepareIndex(index, MapperService.SINGLE_MAPPING_NAME).setSource("foo", "bar").get();
client().prepareIndex(index + "2", MapperService.SINGLE_MAPPING_NAME).setSource("foo", "bar").get();
client().prepareIndex(index + "2", MapperService.SINGLE_MAPPING_NAME).setSource("foo", "bar").get();
refresh(index, index + "2");

DataTiersFeatureSetUsage usage = getUsage();
// We can't guarantee that internal indices aren't created, so some of these are >= checks
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).nodeCount, equalTo(2));
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).indexCount, greaterThanOrEqualTo(1));
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).totalShardCount, greaterThanOrEqualTo(2));
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).primaryShardCount, greaterThanOrEqualTo(1));
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).docCount, greaterThanOrEqualTo(2L));
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).primaryByteCount, greaterThanOrEqualTo(1L));
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).primaryByteCountMedian, greaterThanOrEqualTo(1L));
assertThat(usage.getTierStats().get(DataTier.DATA_CONTENT).primaryShardBytesMAD, greaterThanOrEqualTo(0L));
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).nodeCount, equalTo(1));
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).indexCount, greaterThanOrEqualTo(1));
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).totalShardCount, greaterThanOrEqualTo(2));
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).primaryShardCount, greaterThanOrEqualTo(2));
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).docCount, greaterThanOrEqualTo(1L));
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).primaryByteCount, greaterThanOrEqualTo(1L));
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).primaryByteCountMedian, greaterThanOrEqualTo(1L));
assertThat(usage.getTierStats().get(DataTier.DATA_HOT).primaryShardBytesMAD, greaterThanOrEqualTo(0L));
}

private DataTiersFeatureSetUsage getUsage() {
XPackUsageResponse usages = new XPackUsageRequestBuilder(client()).execute().actionGet();
XPackFeatureSet.Usage dtUsage = usages.getUsages().stream()
.filter(u -> u instanceof DataTiersFeatureSetUsage)
.collect(Collectors.toList())
.get(0);
if (dtUsage == null) {
throw new IllegalArgumentException("no data tier usage found");
}
return (DataTiersFeatureSetUsage) dtUsage;
}

public void startDataNode() {
Settings nodeSettings = Settings.builder()
.putList("node.roles", Arrays.asList("master", "data", "ingest"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.index.shard.IndexSettingProvider;
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/**
Expand All @@ -34,6 +36,8 @@ public class DataTier {
public static final String DATA_WARM = "data_warm";
public static final String DATA_COLD = "data_cold";

public static final Set<String> ALL_DATA_TIERS = new HashSet<>(Arrays.asList(DATA_CONTENT, DATA_HOT, DATA_WARM, DATA_COLD));

/**
* Returns true if the given tier name is a valid tier
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.search.aggregations.metrics.TDigestState;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class DataTiersFeatureSet implements XPackFeatureSet {

private final Client client;
private final ClusterService clusterService;

@Inject
public DataTiersFeatureSet(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
}

@Override
public String name() {
return XPackField.DATA_TIERS;
}

@Override
public boolean available() {
return true;
}

@Override
public boolean enabled() {
return true;
}

@Override
public Map<String, Object> nativeCodeInfo() {
return null;
}

@Override
public void usage(ActionListener<Usage> listener) {
final ClusterState state = clusterService.state();
client.admin().cluster().prepareNodesStats()
.all()
.setIndices(CommonStatsFlags.ALL)
.execute(ActionListener.wrap(nodesStatsResponse -> {
final RoutingNodes routingNodes = state.getRoutingNodes();

// First separate the nodes into separate tiers, note that nodes *may* be duplicated
Map<String, List<NodeStats>> tierSpecificNodeStats = separateTiers(nodesStatsResponse);

// Generate tier specific stats for the nodes
Map<String, DataTiersFeatureSetUsage.TierSpecificStats> tierSpecificStats = tierSpecificNodeStats.entrySet()
.stream().collect(Collectors.toMap(Map.Entry::getKey, ns -> calculateStats(ns.getValue(), routingNodes)));

listener.onResponse(new DataTiersFeatureSetUsage(tierSpecificStats));
}, listener::onFailure));
}

// Visible for testing
static Map<String, List<NodeStats>> separateTiers(NodesStatsResponse nodesStatsResponse) {
Map<String, List<NodeStats>> responses = new HashMap<>();
DataTier.ALL_DATA_TIERS.forEach(tier ->
responses.put(tier, nodesStatsResponse.getNodes().stream()
.filter(stats -> stats.getNode().getRoles().stream()
.map(DiscoveryNodeRole::roleName)
.anyMatch(rn -> rn.equals(tier)))
.collect(Collectors.toList())));
return responses;
}

private DataTiersFeatureSetUsage.TierSpecificStats calculateStats(List<NodeStats> nodesStats, RoutingNodes routingNodes) {
int nodeCount = 0;
int indexCount = 0;
int totalShardCount = 0;
long totalByteCount = 0;
long docCount = 0;
final AtomicInteger primaryShardCount = new AtomicInteger(0);
final AtomicLong primaryByteCount = new AtomicLong(0);
final TDigestState valueSketch = new TDigestState(1000);
for (NodeStats nodeStats : nodesStats) {
nodeCount++;
totalByteCount += nodeStats.getIndices().getStore().getSizeInBytes();
docCount += nodeStats.getIndices().getDocs().getCount();
String nodeId = nodeStats.getNode().getId();
final RoutingNode node = routingNodes.node(nodeId);
if (node != null) {
totalShardCount += node.shardsWithState(ShardRoutingState.STARTED).size();
Set<Index> indicesOnNode = node.shardsWithState(ShardRoutingState.STARTED).stream()
.map(ShardRouting::index)
.collect(Collectors.toSet());
indexCount += indicesOnNode.size();
indicesOnNode.forEach(index -> {
nodeStats.getIndices().getShardStats(index).stream()
.filter(shardStats -> shardStats.getPrimary().getStore() != null)
.forEach(shardStats -> {
StoreStats primaryStoreStats = shardStats.getPrimary().getStore();
// If storeStats is null, it means this is not a replica
primaryShardCount.incrementAndGet();
long primarySize = primaryStoreStats.getSizeInBytes();
primaryByteCount.addAndGet(primarySize);
valueSketch.add(primarySize);
});
});
}
}
long primaryShardSizeMedian = (long) valueSketch.quantile(0.5);
long primaryShardSizeMAD = computeMedianAbsoluteDeviation(valueSketch);
return new DataTiersFeatureSetUsage.TierSpecificStats(nodeCount, indexCount, totalShardCount, primaryShardCount.get(), docCount,
totalByteCount, primaryByteCount.get(), primaryShardSizeMedian, primaryShardSizeMAD);
}

// Visible for testing
static long computeMedianAbsoluteDeviation(TDigestState valuesSketch) {
if (valuesSketch.size() == 0) {
return 0;
} else {
final double approximateMedian = valuesSketch.quantile(0.5);
final TDigestState approximatedDeviationsSketch = new TDigestState(valuesSketch.compression());
valuesSketch.centroids().forEach(centroid -> {
final double deviation = Math.abs(approximateMedian - centroid.mean());
approximatedDeviationsSketch.add(deviation, centroid.count());
});

return (long) approximatedDeviationsSketch.quantile(0.5);
}
}
}

0 comments on commit f0f0da2

Please sign in to comment.