Skip to content

Commit

Permalink
[8.6] Backport unit tests (#91770)
Browse files Browse the repository at this point in the history
* New balancer unit tests (#91612)

(cherry picked from commit 7c39880)

* Add tests to BalancedShardsAllocator.Balancer#getIndexDiskUsageInBytes (#91642)

This commit includes minor cleanups

(cherry picked from commit 6cff05b)

* Simulate balancer in realistic cluster (#91404)

This commit adds a test suite which creates a somewhat realistic cluster
and runs the shards allocator until convergence, then reports on the
balance quality of the resulting allocation. This is useful for
exploring changes to the balancer (although such experiments may want to
increase the size of the cluster somewhat) and by running it in CI we
can at least be sure that the balancer doesn't throw anything unexpected
and does eventually converge in these situations.

* Followup on comments from #91612 (#91700)

Co-authored-by: Francisco Fernández Castaño <francisco.fernandez.castano@gmail.com>
Co-authored-by: David Turner <david.turner@elastic.co>
  • Loading branch information
3 people committed Nov 21, 2022
1 parent 166247f commit 952ffde
Show file tree
Hide file tree
Showing 7 changed files with 809 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,11 @@ private static ShardsAllocator createShardsAllocator(
ClusterInfoService clusterInfoService
) {
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
allocators.put(
BALANCED_ALLOCATOR,
() -> new BalancedShardsAllocator(settings, clusterSettings, writeLoadForecaster, clusterInfoService)
);
allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(settings, clusterSettings, writeLoadForecaster));
allocators.put(
DESIRED_BALANCE_ALLOCATOR,
() -> new DesiredBalanceShardsAllocator(
new BalancedShardsAllocator(settings, clusterSettings, writeLoadForecaster, clusterInfoService),
new BalancedShardsAllocator(settings, clusterSettings, writeLoadForecaster),
threadPool,
clusterService,
reconciler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IntroSorter;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
Expand Down Expand Up @@ -122,31 +120,19 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile float threshold;

private final WriteLoadForecaster writeLoadForecaster;
private final ClusterInfoService clusterInfoService;

public BalancedShardsAllocator(Settings settings) {
this(
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
WriteLoadForecaster.DEFAULT,
EmptyClusterInfoService.INSTANCE
);
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), WriteLoadForecaster.DEFAULT);
}

@Inject
public BalancedShardsAllocator(
Settings settings,
ClusterSettings clusterSettings,
WriteLoadForecaster writeLoadForecaster,
ClusterInfoService clusterInfoService
) {
public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings, WriteLoadForecaster writeLoadForecaster) {
watchSetting(settings, clusterSettings, INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value);
watchSetting(settings, clusterSettings, SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value);
watchSetting(settings, clusterSettings, WRITE_LOAD_BALANCE_FACTOR_SETTING, value -> this.writeLoadBalanceFactor = value);
watchSetting(settings, clusterSettings, DISK_USAGE_BALANCE_FACTOR_SETTING, value -> this.diskUsageBalanceFactor = value);
watchSetting(settings, clusterSettings, THRESHOLD_SETTING, value -> this.threshold = value);
this.writeLoadForecaster = writeLoadForecaster;
this.clusterInfoService = clusterInfoService;
}

private <T> void watchSetting(Settings settings, ClusterSettings clusterSettings, Setting<T> setting, Consumer<T> consumer) {
Expand All @@ -168,7 +154,7 @@ public void allocate(RoutingAllocation allocation) {
writeLoadBalanceFactor,
diskUsageBalanceFactor
);
final Balancer balancer = new Balancer(writeLoadForecaster, clusterInfoService, allocation, weightFunction, threshold);
final Balancer balancer = new Balancer(writeLoadForecaster, allocation, weightFunction, threshold);
balancer.allocateUnassigned();
balancer.moveShards();
balancer.balance();
Expand All @@ -182,7 +168,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
writeLoadBalanceFactor,
diskUsageBalanceFactor
);
Balancer balancer = new Balancer(writeLoadForecaster, clusterInfoService, allocation, weightFunction, threshold);
Balancer balancer = new Balancer(writeLoadForecaster, allocation, weightFunction, threshold);
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
if (shard.unassigned()) {
Expand Down Expand Up @@ -246,13 +232,6 @@ public float getShardBalance() {
return shardBalanceFactor;
}

/**
* Returns the write load related weight factor.
*/
public float getWriteLoadBalance() {
return writeLoadBalanceFactor;
}

/**
* This class is the primary weight function used to create balanced over nodes and shards in the cluster.
* Currently this function has 3 properties:
Expand Down Expand Up @@ -299,7 +278,6 @@ float weight(Balancer balancer, ModelNode node, String index) {
final float weightShard = node.numShards() - balancer.avgShardsPerNode();
final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
final float ingestLoad = (float) (node.writeLoad() - balancer.avgWriteLoadPerNode());
// TODO: can this overflow?
final float diskUsage = (float) (node.diskUsageInBytes() - balancer.avgDiskUsageInBytesPerNode());
return theta0 * weightShard + theta1 * weightIndex + theta2 * ingestLoad + theta3 * diskUsage;
}
Expand All @@ -326,25 +304,17 @@ public static class Balancer {
private final double avgWriteLoadPerNode;
private final double avgDiskUsageInBytesPerNode;
private final NodeSorter sorter;
private final ClusterInfoService clusterInfoService;

public Balancer(
WriteLoadForecaster writeLoadForecaster,
ClusterInfoService clusterInfoService,
RoutingAllocation allocation,
WeightFunction weight,
float threshold
) {

public Balancer(WriteLoadForecaster writeLoadForecaster, RoutingAllocation allocation, WeightFunction weight, float threshold) {
this.writeLoadForecaster = writeLoadForecaster;
this.allocation = allocation;
this.weight = weight;
this.threshold = threshold;
this.routingNodes = allocation.routingNodes();
this.metadata = allocation.metadata();
this.clusterInfoService = clusterInfoService;
avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size();
avgWriteLoadPerNode = getTotalWriteLoad(writeLoadForecaster, metadata) / routingNodes.size();
avgDiskUsageInBytesPerNode = ((double) getTotalDiskUsageInBytes(clusterInfoService, metadata) / routingNodes.size());
avgDiskUsageInBytesPerNode = ((double) getTotalDiskUsageInBytes(allocation.clusterInfo(), metadata) / routingNodes.size());
nodes = Collections.unmodifiableMap(buildModelFromAssigned());
sorter = newNodeSorter();
}
Expand All @@ -362,29 +332,30 @@ private static double getIndexWriteLoad(WriteLoadForecaster writeLoadForecaster,
return shardWriteLoad * numberOfCopies(indexMetadata);
}

private static long getTotalDiskUsageInBytes(ClusterInfoService clusterInfoService, Metadata metadata) {
private static long getTotalDiskUsageInBytes(ClusterInfo clusterInfo, Metadata metadata) {
long totalDiskUsageInBytes = 0;
for (IndexMetadata indexMetadata : metadata.indices().values()) {
totalDiskUsageInBytes += getIndexDiskUsageInBytes(clusterInfoService, indexMetadata);
totalDiskUsageInBytes += getIndexDiskUsageInBytes(clusterInfo, indexMetadata);
}
return totalDiskUsageInBytes;
}

private static long getIndexDiskUsageInBytes(ClusterInfoService clusterInfoService, IndexMetadata indexMetadata) {
// Visible for testing
static long getIndexDiskUsageInBytes(ClusterInfo clusterInfo, IndexMetadata indexMetadata) {
OptionalLong forecastedShardSizeInBytes = indexMetadata.getForecastedShardSizeInBytes();
final long indexDiskUsageInBytes;
if (forecastedShardSizeInBytes.isPresent()) {
indexDiskUsageInBytes = forecastedShardSizeInBytes.getAsLong() * numberOfCopies(indexMetadata);
int i = numberOfCopies(indexMetadata);
indexDiskUsageInBytes = forecastedShardSizeInBytes.getAsLong() * i;
} else {
indexDiskUsageInBytes = getIndexDiskUsageInBytesFromClusterInfo(clusterInfoService, indexMetadata);
indexDiskUsageInBytes = getIndexDiskUsageInBytesFromClusterInfo(clusterInfo, indexMetadata);
}
return indexDiskUsageInBytes;
}

private static long getIndexDiskUsageInBytesFromClusterInfo(ClusterInfoService clusterInfoService, IndexMetadata indexMetadata) {
private static long getIndexDiskUsageInBytesFromClusterInfo(ClusterInfo clusterInfo, IndexMetadata indexMetadata) {
long totalSizeInBytes = 0;
int shardCount = 0;
final ClusterInfo clusterInfo = clusterInfoService.getClusterInfo();
for (int shard = 0; shard < indexMetadata.getNumberOfShards(); shard++) {
final ShardId shardId = new ShardId(indexMetadata.getIndex(), shard);

Expand All @@ -405,7 +376,6 @@ private static long getIndexDiskUsageInBytesFromClusterInfo(ClusterInfoService c
return totalSizeInBytes;
}

// TODO: Should we go through the cluster info service and compute the average in this case?
return shardCount == 0 ? 0 : (totalSizeInBytes / shardCount) * numberOfCopies(indexMetadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,7 @@ public void testPersistedSettings() {
settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), 0.3);
settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), 2.0);
ClusterSettings service = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
BalancedShardsAllocator allocator = new BalancedShardsAllocator(
settings.build(),
service,
WriteLoadForecaster.DEFAULT,
EmptyClusterInfoService.INSTANCE
);
BalancedShardsAllocator allocator = new BalancedShardsAllocator(settings.build(), service, WriteLoadForecaster.DEFAULT);
assertThat(allocator.getIndexBalance(), Matchers.equalTo(0.2f));
assertThat(allocator.getShardBalance(), Matchers.equalTo(0.3f));
assertThat(allocator.getThreshold(), Matchers.equalTo(2.0f));
Expand Down

0 comments on commit 952ffde

Please sign in to comment.