Skip to content

Commit

Permalink
Expose forecasted and actual disk usage per tier and node (#93497)
Browse files Browse the repository at this point in the history
  • Loading branch information
idegtiarenko committed Feb 6, 2023
1 parent fc974cd commit ab5ae88
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 83 deletions.
23 changes: 20 additions & 3 deletions docs/reference/cluster/get-desired-balance.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ The API returns the following result:
"max" : 16.0,
"average" : 12.0,
"std_dev" : 2.8284271247461903
},
"actual_disk_usage" : {
"total" : 36.0,
"min" : 10.0,
"max" : 16.0,
"average" : 12.0,
"std_dev" : 2.8284271247461903
}
},
"data_warm" : {
Expand All @@ -78,24 +85,34 @@ The API returns the following result:
"max" : 18.0,
"average" : 14.0,
"std_dev" : 2.8284271247461903
},
"actual_disk_usage" : {
"total" : 42.0,
"min" : 12.0,
"max" : 18.0,
"average" : 14.0,
"std_dev" : 2.8284271247461903
}
}
},
"nodes": {
"node-1": {
"shard_count": 10,
"forecast_write_load": 8.5,
"forecast_disk_usage_bytes": 498435
"forecast_disk_usage_bytes": 498435,
"actual_disk_usage_bytes": 498435
},
"node-2": {
"shard_count": 15,
"forecast_write_load": 3.25,
"forecast_disk_usage_bytes": 384935
"forecast_disk_usage_bytes": 384935,
"actual_disk_usage_bytes": 384935
},
"node-3": {
"shard_count": 12,
"forecast_write_load": 6.0,
"forecast_disk_usage_bytes": 648766
"forecast_disk_usage_bytes": 648766,
"actual_disk_usage_bytes": 648766
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,15 @@ setup:
- is_true: 'cluster_balance_stats.tiers.data_content.forecast_disk_usage.max'
- is_true: 'cluster_balance_stats.tiers.data_content.forecast_disk_usage.average'
- is_true: 'cluster_balance_stats.tiers.data_content.forecast_disk_usage.std_dev'
- is_true: 'cluster_balance_stats.tiers.data_content.actual_disk_usage'
- is_true: 'cluster_balance_stats.tiers.data_content.actual_disk_usage.total'
- is_true: 'cluster_balance_stats.tiers.data_content.actual_disk_usage.min'
- is_true: 'cluster_balance_stats.tiers.data_content.actual_disk_usage.max'
- is_true: 'cluster_balance_stats.tiers.data_content.actual_disk_usage.average'
- is_true: 'cluster_balance_stats.tiers.data_content.actual_disk_usage.std_dev'
- is_true: 'cluster_balance_stats.nodes'
- is_true: 'cluster_balance_stats.nodes.test-cluster-0'
- gte: { 'cluster_balance_stats.nodes.test-cluster-0.shard_count' : 0 }
- gte: { 'cluster_balance_stats.nodes.test-cluster-0.forecast_write_load': 0.0 }
- gte: { 'cluster_balance_stats.nodes.test-cluster-0.forecast_disk_usage_bytes' : 0 }
- gte: { 'cluster_balance_stats.nodes.test-cluster-0.actual_disk_usage_bytes' : 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,18 @@ setup:
- is_true: 'cluster_balance_stats.tiers.data_content.forecast_disk_usage.max'
- is_true: 'cluster_balance_stats.tiers.data_content.forecast_disk_usage.average'
- is_true: 'cluster_balance_stats.tiers.data_content.forecast_disk_usage.std_dev'
- is_true: 'cluster_balance_stats.tiers.data_content.actual_disk_usage'
- is_true: 'cluster_balance_stats.tiers.data_content.actual_disk_usage.total'
- is_true: 'cluster_balance_stats.tiers.data_content.actual_disk_usage.min'
- is_true: 'cluster_balance_stats.tiers.data_content.actual_disk_usage.max'
- is_true: 'cluster_balance_stats.tiers.data_content.actual_disk_usage.average'
- is_true: 'cluster_balance_stats.tiers.data_content.actual_disk_usage.std_dev'
- is_true: 'cluster_balance_stats.nodes'
- is_true: 'cluster_balance_stats.nodes.test-cluster-0'
- gte: { 'cluster_balance_stats.nodes.test-cluster-0.shard_count' : 0 }
- gte: { 'cluster_balance_stats.nodes.test-cluster-0.forecast_write_load': 0.0 }
- gte: { 'cluster_balance_stats.nodes.test-cluster-0.forecast_disk_usage_bytes' : 0 }
- gte: { 'cluster_balance_stats.nodes.test-cluster-0.actual_disk_usage_bytes' : 0 }

---
"Test get desired balance for single shard":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -43,6 +44,7 @@ public class TransportGetDesiredBalanceAction extends TransportMasterNodeReadAct

@Nullable
private final DesiredBalanceShardsAllocator desiredBalanceShardsAllocator;
private final ClusterInfoService clusterInfoService;
private final WriteLoadForecaster writeLoadForecaster;

@Inject
Expand All @@ -53,6 +55,7 @@ public TransportGetDesiredBalanceAction(
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
WriteLoadForecaster writeLoadForecaster
) {
super(
Expand All @@ -67,6 +70,7 @@ public TransportGetDesiredBalanceAction(
ThreadPool.Names.MANAGEMENT
);
this.desiredBalanceShardsAllocator = shardsAllocator instanceof DesiredBalanceShardsAllocator allocator ? allocator : null;
this.clusterInfoService = clusterInfoService;
this.writeLoadForecaster = writeLoadForecaster;
}

Expand All @@ -90,7 +94,7 @@ protected void masterOperation(
listener.onResponse(
new DesiredBalanceResponse(
desiredBalanceShardsAllocator.getStats(),
ClusterBalanceStats.createFrom(state, writeLoadForecaster),
ClusterBalanceStats.createFrom(state, clusterInfoService.getClusterInfo(), writeLoadForecaster),
createRoutingTable(state, latestDesiredBalance)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
Expand Down Expand Up @@ -36,15 +37,19 @@ public record ClusterBalanceStats(Map<String, TierBalanceStats> tiers, Map<Strin

public static ClusterBalanceStats EMPTY = new ClusterBalanceStats(Map.of(), Map.of());

public static ClusterBalanceStats createFrom(ClusterState clusterState, WriteLoadForecaster writeLoadForecaster) {
public static ClusterBalanceStats createFrom(
ClusterState clusterState,
ClusterInfo clusterInfo,
WriteLoadForecaster writeLoadForecaster
) {
var tierToNodeStats = new HashMap<String, List<NodeBalanceStats>>();
var nodes = new HashMap<String, NodeBalanceStats>();
for (RoutingNode routingNode : clusterState.getRoutingNodes()) {
var dataRoles = routingNode.node().getRoles().stream().filter(DiscoveryNodeRole::canContainData).toList();
if (dataRoles.isEmpty()) {
continue;
}
var nodeStats = NodeBalanceStats.createFrom(routingNode, clusterState.metadata(), writeLoadForecaster);
var nodeStats = NodeBalanceStats.createFrom(routingNode, clusterState.metadata(), clusterInfo, writeLoadForecaster);
nodes.put(routingNode.node().getName(), nodeStats);
for (DiscoveryNodeRole role : dataRoles) {
tierToNodeStats.computeIfAbsent(role.roleName(), ignored -> new ArrayList<>()).add(nodeStats);
Expand All @@ -71,28 +76,37 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder.startObject().field("tiers").map(tiers).field("nodes").map(nodes).endObject();
}

public record TierBalanceStats(MetricStats shardCount, MetricStats forecastWriteLoad, MetricStats forecastShardSize)
implements
Writeable,
ToXContentObject {
public record TierBalanceStats(
MetricStats shardCount,
MetricStats forecastWriteLoad,
MetricStats forecastShardSize,
MetricStats actualShardSize
) implements Writeable, ToXContentObject {

private static TierBalanceStats createFrom(List<NodeBalanceStats> nodes) {
return new TierBalanceStats(
MetricStats.createFrom(nodes, it -> it.shards),
MetricStats.createFrom(nodes, it -> it.forecastWriteLoad),
MetricStats.createFrom(nodes, it -> it.forecastShardSize)
MetricStats.createFrom(nodes, it -> it.forecastShardSize),
MetricStats.createFrom(nodes, it -> it.actualShardSize)
);
}

public static TierBalanceStats readFrom(StreamInput in) throws IOException {
return new TierBalanceStats(MetricStats.readFrom(in), MetricStats.readFrom(in), MetricStats.readFrom(in));
return new TierBalanceStats(
MetricStats.readFrom(in),
MetricStats.readFrom(in),
MetricStats.readFrom(in),
MetricStats.readFrom(in)
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
shardCount.writeTo(out);
forecastWriteLoad.writeTo(out);
forecastShardSize.writeTo(out);
actualShardSize.writeTo(out);
}

@Override
Expand All @@ -101,6 +115,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field("shard_count", shardCount)
.field("forecast_write_load", forecastWriteLoad)
.field("forecast_disk_usage", forecastShardSize)
.field("actual_disk_usage", actualShardSize)
.endObject();
}
}
Expand Down Expand Up @@ -155,31 +170,43 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
}

public record NodeBalanceStats(int shards, double forecastWriteLoad, long forecastShardSize) implements Writeable, ToXContentObject {
public record NodeBalanceStats(int shards, double forecastWriteLoad, long forecastShardSize, long actualShardSize)
implements
Writeable,
ToXContentObject {

private static NodeBalanceStats createFrom(RoutingNode routingNode, Metadata metadata, WriteLoadForecaster writeLoadForecaster) {
double totalWriteLoad = 0.0;
long totalShardSize = 0L;
private static NodeBalanceStats createFrom(
RoutingNode routingNode,
Metadata metadata,
ClusterInfo clusterInfo,
WriteLoadForecaster writeLoadForecaster
) {
double forecastWriteLoad = 0.0;
long forecastShardSize = 0L;
long actualShardSize = 0L;

for (ShardRouting shardRouting : routingNode) {
var indexMetadata = metadata.index(shardRouting.index());
var shardSize = clusterInfo.getShardSize(shardRouting, 0L);
assert indexMetadata != null;
totalWriteLoad += writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
totalShardSize += indexMetadata.getForecastedShardSizeInBytes().orElse(0);
forecastWriteLoad += writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
forecastShardSize += indexMetadata.getForecastedShardSizeInBytes().orElse(shardSize);
actualShardSize += shardSize;
}

return new NodeBalanceStats(routingNode.size(), totalWriteLoad, totalShardSize);
return new NodeBalanceStats(routingNode.size(), forecastWriteLoad, forecastShardSize, actualShardSize);
}

public static NodeBalanceStats readFrom(StreamInput in) throws IOException {
return new NodeBalanceStats(in.readInt(), in.readDouble(), in.readLong());
return new NodeBalanceStats(in.readInt(), in.readDouble(), in.readLong(), in.readLong());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(shards);
out.writeDouble(forecastWriteLoad);
out.writeLong(forecastShardSize);
out.writeLong(actualShardSize);
}

@Override
Expand All @@ -188,6 +215,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field("shard_count", shards)
.field("forecast_write_load", forecastWriteLoad)
.humanReadableField("forecast_disk_usage_bytes", "forecast_disk_usage", ByteSizeValue.ofBytes(forecastShardSize))
.humanReadableField("actual_disk_usage_bytes", "actual_disk_usage", ByteSizeValue.ofBytes(actualShardSize))
.endObject();
}
}
Expand Down

0 comments on commit ab5ae88

Please sign in to comment.