Skip to content

Commit

Permalink
Added BalancedShardsAllocator that balances shards based on a weight …
Browse files Browse the repository at this point in the history
…function.

 * Weights are calculated per index and incorporate index level, global and primary related parameters
 * Balance operations are executed based on a win maximation strategy that tries to relocate shards
   first that offer the biggest gain towards the weight functions optimum
 * The WeightFunction allows settings to prefer index based balance over global balance and vice versa
 * Balance operations can be throttled by raising a threshold resulting in less agressive balance operations
 * WeightFunction shipps with defaults to achive evenly distributed indexes while maintaining a global balance

Closes #2555
  • Loading branch information
s1monw committed Jan 17, 2013
1 parent d97839b commit 2eb09e6
Show file tree
Hide file tree
Showing 12 changed files with 2,294 additions and 29 deletions.
16 changes: 15 additions & 1 deletion src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
Expand Up @@ -133,7 +133,9 @@ public static synchronized void addDynamicSettings(String... settings) {
private final ImmutableMap<String, IndexTemplateMetaData> templates;
private final ImmutableMap<String, Custom> customs;

private final transient int totalNumberOfShards;
private final transient int totalNumberOfShards; // Transient ? not serializable anyway?
private final int numberOfShards;


private final String[] allIndices;
private final ImmutableSet<String> allIndicesSet;
Expand All @@ -148,6 +150,7 @@ public static synchronized void addDynamicSettings(String... settings) {

private final ImmutableMap<String, String[]> aliasAndIndexToIndexMap;


MetaData(long version, Settings transientSettings, Settings persistentSettings, ImmutableMap<String, IndexMetaData> indices, ImmutableMap<String, IndexTemplateMetaData> templates, ImmutableMap<String, Custom> customs) {
this.version = version;
this.transientSettings = transientSettings;
Expand All @@ -157,10 +160,13 @@ public static synchronized void addDynamicSettings(String... settings) {
this.customs = customs;
this.templates = templates;
int totalNumberOfShards = 0;
int numberOfShards = 0;
for (IndexMetaData indexMetaData : indices.values()) {
totalNumberOfShards += indexMetaData.totalNumberOfShards();
numberOfShards += indexMetaData.numberOfShards();
}
this.totalNumberOfShards = totalNumberOfShards;
this.numberOfShards = numberOfShards;

// build all indices map
List<String> allIndicesLst = Lists.newArrayList();
Expand Down Expand Up @@ -690,6 +696,14 @@ public int totalNumberOfShards() {
public int getTotalNumberOfShards() {
return totalNumberOfShards();
}

public int numberOfShards() {
return this.numberOfShards;
}

public int getnumberOfShards() {
return numberOfShards();
}


/**
Expand Down
Expand Up @@ -351,7 +351,7 @@ public void run() {
});

return updatedState;
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("[{}] failed to create", e, request.index);
listener.onFailure(e);
return currentState;
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -48,6 +48,6 @@ public void setShardsAllocator(Class<? extends ShardsAllocator> shardsAllocator)
@Override
protected void configure() {
bind(GatewayAllocator.class).to(gatewayAllocator).asEagerSingleton();
bind(ShardsAllocator.class).to(shardsAllocator == null ? EvenShardsCountAllocator.class : shardsAllocator).asEagerSingleton();
bind(ShardsAllocator.class).to(shardsAllocator == null ? BalancedShardsAllocator.class : shardsAllocator).asEagerSingleton();
}
}
Expand Up @@ -45,7 +45,7 @@ public ShardsAllocators() {
}

public ShardsAllocators(Settings settings) {
this(settings, new NoneGatewayAllocator(), new EvenShardsCountAllocator(settings));
this(settings, new NoneGatewayAllocator(), new BalancedShardsAllocator(settings));
}

@Inject
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.logging.ESLogger;
Expand Down Expand Up @@ -55,7 +56,7 @@ public void moveShardOnceNewNodeWithAttributeAdded1() {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());

logger.info("Building initial routing table");
logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded1'");

MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
Expand Down Expand Up @@ -124,7 +125,7 @@ public void moveShardOnceNewNodeWithAttributeAdded2() {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());

logger.info("Building initial routing table");
logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded2'");

MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
Expand Down Expand Up @@ -194,9 +195,12 @@ public void moveShardOnceNewNodeWithAttributeAdded3() {
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.put("cluster.routing.allocation.balance.index", 0.0f)
.put("cluster.routing.allocation.balance.replica", 1.0f)
.put("cluster.routing.allocation.balance.primary", 0.0f)
.build());

logger.info("Building initial routing table");
logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded3'");

MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1))
Expand All @@ -215,6 +219,20 @@ public void moveShardOnceNewNodeWithAttributeAdded3() {
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();

for (ShardRouting shard : clusterState.routingNodes().shardsWithState(INITIALIZING)) {
logger.info(shard.toString());
}
for (ShardRouting shard : clusterState.routingNodes().shardsWithState(STARTED)) {
logger.info(shard.toString());
}
for (ShardRouting shard : clusterState.routingNodes().shardsWithState(RELOCATING)) {
logger.info(shard.toString());
}
for (ShardRouting shard : clusterState.routingNodes().shardsWithState(UNASSIGNED)) {
logger.info(shard.toString());
}

assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(5));

logger.info("--> start the shards (primaries)");
Expand Down Expand Up @@ -280,7 +298,7 @@ public void moveShardOnceNewNodeWithAttributeAdded4() {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());

logger.info("Building initial routing table");
logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded4'");

MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(5).numberOfReplicas(1))
Expand Down Expand Up @@ -364,7 +382,7 @@ public void moveShardOnceNewNodeWithAttributeAdded5() {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());

logger.info("Building initial routing table");
logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded5'");

MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(2))
Expand Down Expand Up @@ -443,7 +461,7 @@ public void moveShardOnceNewNodeWithAttributeAdded6() {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());

logger.info("Building initial routing table");
logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded6'");

MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(3))
Expand Down Expand Up @@ -525,7 +543,7 @@ public void fullAwareness1() {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());

logger.info("Building initial routing table");
logger.info("Building initial routing table for 'fullAwareness1'");

MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
Expand Down Expand Up @@ -593,7 +611,7 @@ public void fullAwareness2() {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());

logger.info("Building initial routing table");
logger.info("Building initial routing table for 'fullAwareness2'");

MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
Expand Down Expand Up @@ -662,9 +680,12 @@ public void fullAwareness3() {
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.put("cluster.routing.allocation.balance.index", 0.0f)
.put("cluster.routing.allocation.balance.replica", 1.0f)
.put("cluster.routing.allocation.balance.primary", 0.0f)
.build());

logger.info("Building initial routing table");
logger.info("Building initial routing table for 'fullAwareness3'");

MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(5).numberOfReplicas(1))
Expand Down

0 comments on commit 2eb09e6

Please sign in to comment.