Skip to content

Commit

Permalink
[Segment Replication] Prioritize replica shard movement during shard …
Browse files Browse the repository at this point in the history
…relocation (opensearch-project#8875)

* add shard movement strategy setting

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* add tests

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* add changelog

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* Add NodeVersionAllocationDecider check

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* refactoring

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* add annotation + refactor

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

---------

Signed-off-by: Poojita Raj <poojiraj@amazon.com>
Signed-off-by: Ivan Brusic <ivan.brusic@flocksafety.com>
  • Loading branch information
Poojita-Raj authored and brusic committed Sep 25, 2023
1 parent c71da39 commit cdf4bad
Show file tree
Hide file tree
Showing 11 changed files with 491 additions and 91 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [distribution/archives] [Linux] [x64] Provide the variant of the distributions bundled with JRE ([#8195]()https://github.com/opensearch-project/OpenSearch/pull/8195)
- Add configuration for file cache size to max remote data ratio to prevent oversubscription of file cache ([#8606](https://github.com/opensearch-project/OpenSearch/pull/8606))
- Disallow compression level to be set for default and best_compression index codecs ([#8737]()https://github.com/opensearch-project/OpenSearch/pull/8737)
- Prioritize replica shard movement during shard relocation ([#8875](https://github.com/opensearch-project/OpenSearch/pull/8875))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRecoveriesAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings));
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
Expand Down
167 changes: 99 additions & 68 deletions server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -1310,100 +1310,131 @@ private void ensureMutable() {
}

/**
* Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
* all the nodes have been returned.
* @param movePrimaryFirst if true, all primary shards are iterated over before iterating replica for any node
* @return iterator of shard routings
* Returns iterator of shard routings used by {@link #nodeInterleavedShardIterator(ShardMovementStrategy)}
* @param primaryFirst true when ShardMovementStrategy = ShardMovementStrategy.PRIMARY_FIRST, false when it is ShardMovementStrategy.REPLICA_FIRST
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator(boolean movePrimaryFirst) {
private Iterator<ShardRouting> buildIteratorForMovementStrategy(boolean primaryFirst) {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
queue.add(entry.getValue().copyShards().iterator());
}
if (movePrimaryFirst) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> replicaShards = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> replicaIterators = new ArrayDeque<>();

public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
if (!replicaShards.isEmpty()) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> shardRoutings = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> shardIterators = new ArrayDeque<>();

public boolean hasNext() {
while (queue.isEmpty() == false) {
if (queue.peek().hasNext()) {
return true;
}
while (!replicaIterators.isEmpty()) {
if (replicaIterators.peek().hasNext()) {
return true;
}
replicaIterators.poll();
queue.poll();
}
if (!shardRoutings.isEmpty()) {
return true;
}
while (!shardIterators.isEmpty()) {
if (shardIterators.peek().hasNext()) {
return true;
}
return false;
shardIterators.poll();
}
return false;
}

public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
if (primaryFirst) {
if (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary()) {
queue.offer(iter);
return result;
}
replicaShards.offer(result);
replicaIterators.offer(iter);
shardRoutings.offer(result);
shardIterators.offer(iter);
}
} else {
while (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary() == false) {
queue.offer(iter);
return result;
}
shardRoutings.offer(result);
shardIterators.offer(iter);
}
}
if (!replicaShards.isEmpty()) {
return replicaShards.poll();
}
Iterator<ShardRouting> replicaIterator = replicaIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
replicaIterators.offer(replicaIterator);

assert !replicaShard.primary();
return replicaShard;
}

public void remove() {
throw new UnsupportedOperationException();
if (!shardRoutings.isEmpty()) {
return shardRoutings.poll();
}
};
Iterator<ShardRouting> replicaIterator = shardIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
shardIterators.offer(replicaIterator);

assert replicaShard.primary() != primaryFirst;
return replicaShard;
}

public void remove() {
throw new UnsupportedOperationException();
}

};
}

/**
* Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
* all the nodes have been returned.
* @param shardMovementStrategy if ShardMovementStrategy.PRIMARY_FIRST, all primary shards are iterated over before iterating replica for any node
* if ShardMovementStrategy.REPLICA_FIRST, all replica shards are iterated over before iterating primary for any node
* if ShardMovementStrategy.NO_PREFERENCE, order of replica and primary shards doesn't matter in iteration
* @return iterator of shard routings
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator(ShardMovementStrategy shardMovementStrategy) {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
queue.add(entry.getValue().copyShards().iterator());
}
if (shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) {
return buildIteratorForMovementStrategy(true);
} else {
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
if (shardMovementStrategy == ShardMovementStrategy.REPLICA_FIRST) {
return buildIteratorForMovementStrategy(false);
} else {
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
queue.poll();
return false;
}
return false;
}

@Override
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
@Override
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
Iterator<ShardRouting> iter = queue.poll();
queue.offer(iter);
return iter.next();
}
Iterator<ShardRouting> iter = queue.poll();
queue.offer(iter);
return iter.next();
}

public void remove() {
throw new UnsupportedOperationException();
}
};
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;

import java.util.Locale;

/**
* ShardMovementStrategy defines the order in which shard movement occurs.
*
* ShardMovementStrategy values or rather their string representation to be used with
* {@link BalancedShardsAllocator#SHARD_MOVEMENT_STRATEGY_SETTING} via cluster settings.
*
* @opensearch.internal
*/
public enum ShardMovementStrategy {
/**
* default behavior in which order of shard movement doesn't matter.
*/
NO_PREFERENCE,

/**
* primary shards are moved first
*/
PRIMARY_FIRST,

/**
* replica shards are moved first
*/
REPLICA_FIRST;

public static ShardMovementStrategy parse(String strValue) {
if (strValue == null) {
return null;
} else {
strValue = strValue.toUpperCase(Locale.ROOT);
try {
return ShardMovementStrategy.valueOf(strValue);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Illegal allocation.shard_movement_strategy value [" + strValue + "]");
}
}
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.util.IntroSorter;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardMovementStrategy;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
Expand Down Expand Up @@ -107,8 +108,22 @@ public class BalancedShardsAllocator implements ShardsAllocator {
"cluster.routing.allocation.move.primary_first",
false,
Property.Dynamic,
Property.NodeScope,
Property.Deprecated
);

/**
* Decides order in which to move shards from node when shards can not stay on node anymore. {@link LocalShardsBalancer#moveShards()}
* Encapsulates behavior of above SHARD_MOVE_PRIMARY_FIRST_SETTING.
*/
public static final Setting<ShardMovementStrategy> SHARD_MOVEMENT_STRATEGY_SETTING = new Setting<ShardMovementStrategy>(
"cluster.routing.allocation.shard_movement_strategy",
ShardMovementStrategy.NO_PREFERENCE.toString(),
ShardMovementStrategy::parse,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting(
"cluster.routing.allocation.balance.threshold",
1.0f,
Expand All @@ -131,6 +146,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
);

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

private volatile boolean preferPrimaryShardBalance;
private volatile WeightFunction weightFunction;
Expand All @@ -145,8 +161,10 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings));
setThreshold(THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
}
Expand All @@ -155,6 +173,10 @@ private void setMovePrimaryFirst(boolean movePrimaryFirst) {
this.movePrimaryFirst = movePrimaryFirst;
}

private void setShardMovementStrategy(ShardMovementStrategy shardMovementStrategy) {
this.shardMovementStrategy = shardMovementStrategy;
}

private void setWeightFunction(float indexBalance, float shardBalanceFactor) {
weightFunction = new WeightFunction(indexBalance, shardBalanceFactor);
}
Expand Down Expand Up @@ -184,6 +206,7 @@ public void allocate(RoutingAllocation allocation) {
logger,
allocation,
movePrimaryFirst,
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance
Expand All @@ -205,6 +228,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
logger,
allocation,
movePrimaryFirst,
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance
Expand Down Expand Up @@ -456,11 +480,12 @@ public Balancer(
Logger logger,
RoutingAllocation allocation,
boolean movePrimaryFirst,
ShardMovementStrategy shardMovementStrategy,
BalancedShardsAllocator.WeightFunction weight,
float threshold,
boolean preferPrimaryBalance
) {
super(logger, allocation, movePrimaryFirst, weight, threshold, preferPrimaryBalance);
super(logger, allocation, movePrimaryFirst, shardMovementStrategy, weight, threshold, preferPrimaryBalance);
}
}

Expand Down

0 comments on commit cdf4bad

Please sign in to comment.