Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shard size allocation balance function #7171

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 23 additions & 8 deletions docs/reference/cluster/update-settings.asciidoc
Expand Up @@ -71,6 +71,11 @@ when no allowed action can bring the weights of each node closer together by
more then the fourth setting. Actions might not be allowed, for instance,
due to forced awareness or allocation filtering.

`cluster.routing.allocation.balance.threshold`::
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to the top because it isn't like the other settings. Also, the added[1.4.0] flag would be confusing with this at the bottom.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw @nik9000 - use coming[1.4.0] for unreleased versions - they get changed to added[] when we release.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Minimal optimization value of operations that should be performed (non
negative float). Defaults to `1.0f`. Raising this will cause the cluster
to be less aggressive about optimizing the shard balance.

`cluster.routing.allocation.balance.shard`::
Defines the weight factor for shards allocated on a node
(float). Defaults to `0.45f`. Raising this raises the tendency to
Expand All @@ -79,18 +84,28 @@ due to forced awareness or allocation filtering.
`cluster.routing.allocation.balance.index`::
Defines a factor to the number of shards per index allocated
on a specific node (float). Defaults to `0.5f`. Raising this raises the
tendency to equalize the number of shards per index across all nodes in
the cluster.
tendency to equalize the number of shards in each index across all nodes
in the cluster.

`cluster.routing.allocation.balance.primary`::
Defines a weight factor for the number of primaries of a specific index
allocated on a node (float). `0.05f`. Raising this raises the tendency
to equalize the number of primary shards across all nodes in the cluster.
allocated on a node (float). Defaults to `0.05f`. Raising this raises
the tendency to equalize the number of primary shards across all nodes in
the cluster.

`cluster.routing.allocation.balance.threshold`::
Minimal optimization value of operations that should be performed (non
negative float). Defaults to `1.0f`. Raising this will cause the cluster
to be less aggressive about optimizing the shard balance.
`cluster.routing.allocation.balance.shard_size` coming[1.5.0]::
Defines a weight factor for the number of similarly sized shards allocated
on a node (float). Defaults to `0.00f` because it is new and we're not
yet sure what a good default is. Raising this raises the tendency to
equalize the number of similarly sized shards across all nodes in the
cluster. "Similarly sized" means that this formula spits out the same
number:
`size <= 10MB ? 0 : floor(log10(size))`
So 1GB and 5GB shards are "similar" and so are 1KB and 8MB.
The 10MB is configurable dynamically as `cluster.info.small_shard_size`.
See <<indices-update-settings, `index.minimum_shard_size`>> for
information on what to do to prevent this balance function from causing
excess shuffling during fast index building.

[float]
===== Concurrent Rebalance
Expand Down
10 changes: 10 additions & 0 deletions docs/reference/indices/update-settings.asciidoc
Expand Up @@ -159,6 +159,16 @@ added[1.0.0.RC1]
`index.warmer.enabled`::
See <<indices-warmers>>. Defaults to `true`.


coming[1.5.0]
`index.minimum_shard_size`::
If a shard of the index is smaller then this size it is considered to be
this size for the purposes of the disk aware allocation decider and the
shard size allocation function. This should prevent the shard size
allocation function from causing many index moves when an index is under
construction. Its best to set this setting to `0` once the index has grown
to the size that is it going to stay.

[float]
[[bulk]]
=== Bulk Indexing Usage
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Expand Up @@ -63,6 +63,12 @@
<version>2.1.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-test-framework</artifactId>
Expand Down
96 changes: 88 additions & 8 deletions src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Expand Up @@ -19,31 +19,111 @@

package org.elasticsearch.cluster;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;

import java.util.Map;

import static java.lang.Math.floor;
import static java.lang.Math.log10;

/**
* ClusterInfo is an object representing a map of nodes to {@link DiskUsage}
* and a map of shard ids to shard sizes, see
* <code>InternalClusterInfoService.shardIdentifierFromRouting(String)</code>
* for the key used in the shardSizes map
* ClusterInfo contains information like {@link DiskUsage}, shard sizes, and
* average shard sizes.
*/
public class ClusterInfo {

private final ImmutableMap<String, DiskUsage> usages;
private final ImmutableMap<String, Long> shardSizes;
private final ImmutableMap<String, Long> indexToAverageShardSize;
private final ShardsBucketedBySize shardsBucketedBySize;

public ClusterInfo(ImmutableMap<String, DiskUsage> usages, ImmutableMap<String, Long> shardSizes) {
public ClusterInfo(ImmutableMap<String, DiskUsage> usages, ImmutableMap<String, Long> shardSizes,
ImmutableMap<String, Long> indexToAverageShardSize, ShardsBucketedBySize shardsBucketedBySize) {
this.usages = usages;
this.shardSizes = shardSizes;
this.indexToAverageShardSize = indexToAverageShardSize;
this.shardsBucketedBySize = shardsBucketedBySize;

}

public Map<String, DiskUsage> getNodeDiskUsages() {
return this.usages;
return usages;
}

/**
* @return map from {@link InternalClusterInfoService#shardIdentifierFromRouting(String)} to shard size in bytes
*/
public Map<String, Long> getShardSizes() {
return this.shardSizes;
return shardSizes;
}

/**
* @return map from index name to average shard size
*/
public ImmutableMap<String, Long> getIndexToAverageShardSize() {
return indexToAverageShardSize;
}

/**
* @return looks up all shards (as
* {@link InternalClusterInfoService#shardIdentifierFromRouting(String)}
* ) that fall into size range buckets
*/
public ShardsBucketedBySize getShardsBucketedBySize() {
return shardsBucketedBySize;
}

/**
* Looks up all shards that fall into size range buckets.
*/
public static class ShardsBucketedBySize {
public static Builder builder(long smallShardLimit) {
return new Builder(smallShardLimit);
}

private ImmutableListMultimap<Integer, String> bucketToShard;
private final long smallShardLimit;

private ShardsBucketedBySize(ImmutableListMultimap<Integer, String> bucketToShard, long smallShardLimit) {
this.bucketToShard = bucketToShard;
this.smallShardLimit = smallShardLimit;
}

/**
* @param size size in bytes
* @return get all shards in the same bucket as a shard with this size
*/
public ImmutableList<String> shardsInSameBucket(long size) {
return bucketToShard.get(bucket(smallShardLimit, size));
}

/**
* @return does this object store no shard information at all?
*/
public boolean isEmpty() {
return bucketToShard.isEmpty();
}

static int bucket(long smallShardLimit, long size) {
return size <= smallShardLimit ? 0 : (int)floor(log10(size));
}

public static class Builder {
private final ImmutableListMultimap.Builder<Integer, String> bucketToShard = ImmutableListMultimap.builder();
private final long smallShardLimit;

private Builder(long smallShardLimit) {
this.smallShardLimit = smallShardLimit;
}

public void add(long size, String id) {
bucketToShard.put(bucket(smallShardLimit, size), id);
}

public ShardsBucketedBySize build() {
return new ShardsBucketedBySize(bucketToShard.build(), smallShardLimit);
}
}
}
}
Expand Up @@ -35,7 +35,8 @@ private final static class Holder {

private EmptyClusterInfoService() {
super(ImmutableSettings.EMPTY);
emptyClusterInfo = new ClusterInfo(ImmutableMap.<String, DiskUsage>of(), ImmutableMap.<String, Long>of());
emptyClusterInfo = new ClusterInfo(ImmutableMap.<String, DiskUsage>of(), ImmutableMap.<String, Long>of(),
ImmutableMap.<String, Long>of(), ClusterInfo.ShardsBucketedBySize.builder(0).build());
}

public static EmptyClusterInfoService getInstance() {
Expand Down